Merge branch 'stable-2.14'
* stable-2.14: Index: Extract common parts into AbstractVersionManager ES: Adds configuration for the JEST client Change-Id: Idcd918450d6ab51fb71f68d949f77e35494043ee
This commit is contained in:
		| @@ -83,7 +83,7 @@ abstract class AbstractElasticIndex<K, V> implements Index<K, V> { | ||||
|     this.indexName = | ||||
|         String.format( | ||||
|             "%s%s%04d", | ||||
|             Strings.nullToEmpty(cfg.getString("index", null, "prefix")), | ||||
|             Strings.nullToEmpty(cfg.getString("elasticsearch", null, "prefix")), | ||||
|             indexName, | ||||
|             schema.getVersion()); | ||||
|     this.client = clientBuilder.build(); | ||||
|   | ||||
| @@ -24,6 +24,7 @@ import java.util.ArrayList; | ||||
| import java.util.Arrays; | ||||
| import java.util.List; | ||||
| import java.util.Set; | ||||
| import java.util.concurrent.TimeUnit; | ||||
| import org.eclipse.jgit.lib.Config; | ||||
|  | ||||
| @Singleton | ||||
| @@ -33,9 +34,29 @@ class ElasticConfiguration { | ||||
|   private static final String DEFAULT_PROTOCOL = "http"; | ||||
|  | ||||
|   final List<String> urls; | ||||
|   final String username; | ||||
|   final String password; | ||||
|   final boolean requestCompression; | ||||
|   final long connectionTimeout; | ||||
|   final long maxConnectionIdleTime; | ||||
|   final TimeUnit maxConnectionIdleUnit = TimeUnit.MILLISECONDS; | ||||
|   final int maxTotalConnection; | ||||
|   final int readTimeout; | ||||
|  | ||||
|   @Inject | ||||
|   ElasticConfiguration(@GerritServerConfig Config cfg) { | ||||
|     this.username = cfg.getString("elasticsearch", null, "username"); | ||||
|     this.password = cfg.getString("elasticsearch", null, "password"); | ||||
|     this.requestCompression = cfg.getBoolean("elasticsearch", null, "requestCompression", false); | ||||
|     this.connectionTimeout = | ||||
|         cfg.getTimeUnit("elasticsearch", null, "connectionTimeout", 3000, TimeUnit.MILLISECONDS); | ||||
|     this.maxConnectionIdleTime = | ||||
|         cfg.getTimeUnit( | ||||
|             "elasticsearch", null, "maxConnectionIdleTime", 3000, TimeUnit.MILLISECONDS); | ||||
|     this.maxTotalConnection = cfg.getInt("elasticsearch", null, "maxTotalConnection", 1); | ||||
|     this.readTimeout = | ||||
|         (int) cfg.getTimeUnit("elasticsearch", null, "readTimeout", 3000, TimeUnit.MICROSECONDS); | ||||
|  | ||||
|     Set<String> subsections = cfg.getSubsections("elasticsearch"); | ||||
|     if (subsections.isEmpty()) { | ||||
|       this.urls = Arrays.asList(buildUrl(DEFAULT_PROTOCOL, DEFAULT_HOST, DEFAULT_PORT)); | ||||
|   | ||||
| @@ -14,59 +14,31 @@ | ||||
|  | ||||
| package com.google.gerrit.elasticsearch; | ||||
|  | ||||
| import static com.google.common.base.Preconditions.checkArgument; | ||||
|  | ||||
| import com.google.common.base.MoreObjects; | ||||
| import com.google.common.collect.Lists; | ||||
| import com.google.common.collect.Maps; | ||||
| import com.google.common.primitives.Ints; | ||||
| import com.google.gerrit.extensions.events.LifecycleListener; | ||||
| import com.google.gerrit.server.config.GerritServerConfig; | ||||
| import com.google.gerrit.server.config.SitePaths; | ||||
| import com.google.gerrit.server.index.AbstractVersionManager; | ||||
| import com.google.gerrit.server.index.GerritIndexStatus; | ||||
| import com.google.gerrit.server.index.Index; | ||||
| import com.google.gerrit.server.index.IndexCollection; | ||||
| import com.google.gerrit.server.index.IndexDefinition; | ||||
| import com.google.gerrit.server.index.IndexDefinition.IndexFactory; | ||||
| import com.google.gerrit.server.index.IndexUtils; | ||||
| import com.google.gerrit.server.index.OnlineReindexer; | ||||
| import com.google.gerrit.server.index.ReindexerAlreadyRunningException; | ||||
| import com.google.gerrit.server.index.Schema; | ||||
| import com.google.inject.Inject; | ||||
| import com.google.inject.ProvisionException; | ||||
| import com.google.inject.Singleton; | ||||
| import java.io.IOException; | ||||
| import java.util.Collection; | ||||
| import java.util.List; | ||||
| import java.util.Map; | ||||
| import java.util.TreeMap; | ||||
| import org.eclipse.jgit.lib.Config; | ||||
| import org.slf4j.Logger; | ||||
| import org.slf4j.LoggerFactory; | ||||
|  | ||||
| @Singleton | ||||
| public class ElasticVersionManager implements LifecycleListener { | ||||
| public class ElasticVersionManager extends AbstractVersionManager implements LifecycleListener { | ||||
|   private static final Logger log = LoggerFactory.getLogger(ElasticVersionManager.class); | ||||
|  | ||||
|   private static class Version<V> { | ||||
|     private final Schema<V> schema; | ||||
|     private final int version; | ||||
|     private final boolean ready; | ||||
|  | ||||
|     private Version(Schema<V> schema, int version, boolean ready) { | ||||
|       checkArgument(schema == null || schema.getVersion() == version); | ||||
|       this.schema = schema; | ||||
|       this.version = version; | ||||
|       this.ready = ready; | ||||
|     } | ||||
|   } | ||||
|  | ||||
|   private final Map<String, IndexDefinition<?, ?, ?>> defs; | ||||
|   private final Map<String, OnlineReindexer<?, ?, ?>> reindexers; | ||||
|   private final ElasticIndexVersionDiscovery versionDiscovery; | ||||
|   private final SitePaths sitePaths; | ||||
|   private final boolean onlineUpgrade; | ||||
|   private final String runReindexMsg; | ||||
|   private final String prefix; | ||||
|   private final ElasticIndexVersionDiscovery versionDiscovery; | ||||
|  | ||||
|   @Inject | ||||
|   ElasticVersionManager( | ||||
| @@ -74,147 +46,23 @@ public class ElasticVersionManager implements LifecycleListener { | ||||
|       SitePaths sitePaths, | ||||
|       Collection<IndexDefinition<?, ?, ?>> defs, | ||||
|       ElasticIndexVersionDiscovery versionDiscovery) { | ||||
|     this.sitePaths = sitePaths; | ||||
|     super(cfg, sitePaths, defs); | ||||
|     this.versionDiscovery = versionDiscovery; | ||||
|     this.defs = Maps.newHashMapWithExpectedSize(defs.size()); | ||||
|     for (IndexDefinition<?, ?, ?> def : defs) { | ||||
|       this.defs.put(def.getName(), def); | ||||
|     } | ||||
|  | ||||
|     prefix = MoreObjects.firstNonNull(cfg.getString("index", null, "prefix"), "gerrit"); | ||||
|     reindexers = Maps.newHashMapWithExpectedSize(defs.size()); | ||||
|     onlineUpgrade = cfg.getBoolean("index", null, "onlineUpgrade", true); | ||||
|     runReindexMsg = | ||||
|         "No index versions ready; run java -jar " | ||||
|             + sitePaths.gerrit_war.toAbsolutePath() | ||||
|             + " reindex"; | ||||
|   } | ||||
|  | ||||
|   @Override | ||||
|   public void start() { | ||||
|     try { | ||||
|       for (IndexDefinition<?, ?, ?> def : defs.values()) { | ||||
|         initIndex(def); | ||||
|       } | ||||
|     } catch (IOException e) { | ||||
|       ProvisionException ex = new ProvisionException("Error scanning indexes"); | ||||
|       ex.initCause(e); | ||||
|       throw ex; | ||||
|     } | ||||
|   protected <V> boolean isDirty(Collection<Version<V>> inUse, Version<V> v) { | ||||
|     return !inUse.contains(v); | ||||
|   } | ||||
|  | ||||
|   private <K, V, I extends Index<K, V>> void initIndex(IndexDefinition<K, V, I> def) | ||||
|       throws IOException { | ||||
|     TreeMap<Integer, Version<V>> versions = scanVersions(def); | ||||
|     // Search from the most recent ready version. | ||||
|     // Write to the most recent ready version and the most recent version. | ||||
|     Version<V> search = null; | ||||
|     List<Version<V>> write = Lists.newArrayListWithCapacity(2); | ||||
|     for (Version<V> v : versions.descendingMap().values()) { | ||||
|       if (v.schema == null) { | ||||
|         continue; | ||||
|       } | ||||
|       if (write.isEmpty() && onlineUpgrade) { | ||||
|         write.add(v); | ||||
|       } | ||||
|       if (v.ready) { | ||||
|         search = v; | ||||
|         if (!write.contains(v)) { | ||||
|           write.add(v); | ||||
|         } | ||||
|         break; | ||||
|       } | ||||
|     } | ||||
|     if (search == null) { | ||||
|       throw new ProvisionException(runReindexMsg); | ||||
|     } | ||||
|  | ||||
|     IndexFactory<K, V, I> factory = def.getIndexFactory(); | ||||
|     I searchIndex = factory.create(search.schema); | ||||
|     IndexCollection<K, V, I> indexes = def.getIndexCollection(); | ||||
|     indexes.setSearchIndex(searchIndex); | ||||
|     for (Version<V> v : write) { | ||||
|       if (v.schema != null) { | ||||
|         if (v.version != search.version) { | ||||
|           indexes.addWriteIndex(factory.create(v.schema)); | ||||
|         } else { | ||||
|           indexes.addWriteIndex(searchIndex); | ||||
|         } | ||||
|       } | ||||
|     } | ||||
|  | ||||
|     markNotReady(def.getName(), versions.values(), write); | ||||
|  | ||||
|     synchronized (this) { | ||||
|       if (!reindexers.containsKey(def.getName())) { | ||||
|         int latest = write.get(0).version; | ||||
|         OnlineReindexer<K, V, I> reindexer = new OnlineReindexer<>(def, latest); | ||||
|         reindexers.put(def.getName(), reindexer); | ||||
|         if (onlineUpgrade && latest != search.version) { | ||||
|           reindexer.start(); | ||||
|         } | ||||
|       } | ||||
|     } | ||||
|   } | ||||
|  | ||||
|   /** | ||||
|    * Start the online reindexer if the current index is not already the latest. | ||||
|    * | ||||
|    * @param name index name | ||||
|    * @param force start re-index | ||||
|    * @return true if started, otherwise false. | ||||
|    * @throws ReindexerAlreadyRunningException | ||||
|    */ | ||||
|   public synchronized boolean startReindexer(String name, boolean force) | ||||
|       throws ReindexerAlreadyRunningException { | ||||
|     OnlineReindexer<?, ?, ?> reindexer = reindexers.get(name); | ||||
|     validateReindexerNotRunning(reindexer); | ||||
|     if (force || !isLatestIndexVersion(name, reindexer)) { | ||||
|       reindexer.start(); | ||||
|       return true; | ||||
|     } | ||||
|     return false; | ||||
|   } | ||||
|  | ||||
|   /** | ||||
|    * Activate the latest index if the current index is not already the latest. | ||||
|    * | ||||
|    * @param name index name | ||||
|    * @return true if index was activated, otherwise false. | ||||
|    * @throws ReindexerAlreadyRunningException | ||||
|    */ | ||||
|   public synchronized boolean activateLatestIndex(String name) | ||||
|       throws ReindexerAlreadyRunningException { | ||||
|     OnlineReindexer<?, ?, ?> reindexer = reindexers.get(name); | ||||
|     validateReindexerNotRunning(reindexer); | ||||
|     if (!isLatestIndexVersion(name, reindexer)) { | ||||
|       reindexer.activateIndex(); | ||||
|       return true; | ||||
|     } | ||||
|     return false; | ||||
|   } | ||||
|  | ||||
|   private boolean isLatestIndexVersion(String name, OnlineReindexer<?, ?, ?> reindexer) { | ||||
|     int readVersion = defs.get(name).getIndexCollection().getSearchIndex().getSchema().getVersion(); | ||||
|     return reindexer == null || reindexer.getVersion() == readVersion; | ||||
|   } | ||||
|  | ||||
|   private static void validateReindexerNotRunning(OnlineReindexer<?, ?, ?> reindexer) | ||||
|       throws ReindexerAlreadyRunningException { | ||||
|     if (reindexer != null && reindexer.isRunning()) { | ||||
|       throw new ReindexerAlreadyRunningException(); | ||||
|     } | ||||
|   } | ||||
|  | ||||
|   private <K, V, I extends Index<K, V>> TreeMap<Integer, Version<V>> scanVersions( | ||||
|       IndexDefinition<K, V, I> def) throws IOException { | ||||
|   @Override | ||||
|   protected <K, V, I extends Index<K, V>> TreeMap<Integer, Version<V>> scanVersions( | ||||
|       IndexDefinition<K, V, I> def, GerritIndexStatus cfg) { | ||||
|     TreeMap<Integer, Version<V>> versions = new TreeMap<>(); | ||||
|     for (Schema<V> schema : def.getSchemas().values()) { | ||||
|       int v = schema.getVersion(); | ||||
|       versions.put( | ||||
|           v, | ||||
|           new Version<>( | ||||
|               schema, v, IndexUtils.getReady(sitePaths, def.getName(), schema.getVersion()))); | ||||
|       versions.put(v, new Version<>(schema, v, cfg.getReady(def.getName(), v))); | ||||
|     } | ||||
|  | ||||
|     try { | ||||
| @@ -225,8 +73,7 @@ public class ElasticVersionManager implements LifecycleListener { | ||||
|           continue; | ||||
|         } | ||||
|         if (!versions.containsKey(v)) { | ||||
|           versions.put( | ||||
|               v, new Version<V>(null, v, IndexUtils.getReady(sitePaths, def.getName(), v))); | ||||
|           versions.put(v, new Version<V>(null, v, cfg.getReady(def.getName(), v))); | ||||
|         } | ||||
|       } | ||||
|     } catch (IOException e) { | ||||
| @@ -234,18 +81,4 @@ public class ElasticVersionManager implements LifecycleListener { | ||||
|     } | ||||
|     return versions; | ||||
|   } | ||||
|  | ||||
|   private <V> void markNotReady( | ||||
|       String name, Iterable<Version<V>> versions, Collection<Version<V>> inUse) throws IOException { | ||||
|     for (Version<V> v : versions) { | ||||
|       if (!inUse.contains(v)) { | ||||
|         IndexUtils.getReady(sitePaths, name, v.version); | ||||
|       } | ||||
|     } | ||||
|   } | ||||
|  | ||||
|   @Override | ||||
|   public void stop() { | ||||
|     // Do nothing; indexes are closed on demand by IndexCollection. | ||||
|   } | ||||
| } | ||||
|   | ||||
| @@ -18,6 +18,7 @@ import com.google.inject.Inject; | ||||
| import com.google.inject.Singleton; | ||||
| import io.searchbox.client.JestClientFactory; | ||||
| import io.searchbox.client.config.HttpClientConfig; | ||||
| import io.searchbox.client.config.HttpClientConfig.Builder; | ||||
| import io.searchbox.client.http.JestHttpClient; | ||||
| import java.util.concurrent.TimeUnit; | ||||
|  | ||||
| @@ -32,12 +33,22 @@ class JestClientBuilder { | ||||
|  | ||||
|   JestHttpClient build() { | ||||
|     JestClientFactory factory = new JestClientFactory(); | ||||
|     factory.setHttpClientConfig( | ||||
|     Builder builder = | ||||
|         new HttpClientConfig.Builder(cfg.urls) | ||||
|             .multiThreaded(true) | ||||
|             .discoveryEnabled(false) | ||||
|             .discoveryFrequency(1L, TimeUnit.MINUTES) | ||||
|             .build()); | ||||
|             .connTimeout((int) cfg.connectionTimeout) | ||||
|             .maxConnectionIdleTime(cfg.maxConnectionIdleTime, cfg.maxConnectionIdleUnit) | ||||
|             .maxTotalConnection(cfg.maxTotalConnection) | ||||
|             .readTimeout(cfg.readTimeout) | ||||
|             .requestCompressionEnabled(cfg.requestCompression) | ||||
|             .discoveryFrequency(1L, TimeUnit.MINUTES); | ||||
|  | ||||
|     if (cfg.username != null && cfg.password != null) { | ||||
|       builder.defaultCredentials(cfg.username, cfg.password); | ||||
|     } | ||||
|  | ||||
|     factory.setHttpClientConfig(builder.build()); | ||||
|     return (JestHttpClient) factory.getObject(); | ||||
|   } | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 David Pursehouse
					David Pursehouse