ES: Implement online reindex for ElasticSearch

Implement online reindexing for ElasticSearch based on the code for
Lucene online reindex.

Testing scenario:
 1. Start fresh Gerrit site with this patch
 2. Create account
 3. Verify data in ElasticSearch:
   curl http://localhost:9200/gerritaccounts_0004/
   curl http://localhost:9200/gerritaccounts_0004/_search
 4. Stop Gerrit
 5. Cherry pick change I77e1643cd1a7fbef9f4d2fa214823759188e9592
 6. Start Gerrit
 6. Wait for log message:
     Starting online reindex from schema version 4 to 5
 7. Verify state in ElasticSearch:
   curl http://localhost:9200/gerritaccounts_0005/
   curl http://localhost:9200/gerritaccounts_0005/_search

Entry for user account created in step 2 should have "elastic_online"
property with value "reindex work".

Change-Id: I9efcf5735e65b4f2dc2a97914d398f81656fc12a
This commit is contained in:
Dariusz Luksza
2017-04-24 16:34:52 +02:00
parent 23a2ee2eff
commit 74bb6d6184
9 changed files with 323 additions and 5 deletions

View File

@@ -62,8 +62,12 @@ public class ElasticIndexModule extends LifecycleModule {
.build(GroupIndex.Factory.class)); .build(GroupIndex.Factory.class));
install(new IndexModule(threads)); install(new IndexModule(threads));
if (singleVersions == null) {
listener().to(ElasticVersionManager.class);
} else {
install(new SingleVersionModule(singleVersions)); install(new SingleVersionModule(singleVersions));
} }
}
@Provides @Provides
@Singleton @Singleton

View File

@@ -0,0 +1,52 @@
// Copyright (C) 2017 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.elasticsearch;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.searchbox.client.JestResult;
import io.searchbox.client.http.JestHttpClient;
import io.searchbox.indices.aliases.GetAliases;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;
@Singleton
class ElasticIndexVersionDiscovery {
private final JestHttpClient client;
@Inject
ElasticIndexVersionDiscovery(JestClientBuilder clientBuilder) {
this.client = clientBuilder.build();
}
List<String> discover(String prefix, String indexName) throws IOException {
String name = prefix + indexName + "_";
JestResult result = client.execute(new GetAliases.Builder().addIndex(name + "*").build());
if (result.isSucceeded()) {
JsonObject object = result.getJsonObject().getAsJsonObject();
List<String> versions = new ArrayList<>(object.size());
for (Entry<String, JsonElement> entry : object.entrySet()) {
versions.add(entry.getKey().replace(name, ""));
}
return versions;
}
return Collections.emptyList();
}
}

View File

@@ -0,0 +1,251 @@
// Copyright (C) 2017 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.elasticsearch;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.index.Index;
import com.google.gerrit.server.index.IndexCollection;
import com.google.gerrit.server.index.IndexDefinition;
import com.google.gerrit.server.index.IndexDefinition.IndexFactory;
import com.google.gerrit.server.index.IndexUtils;
import com.google.gerrit.server.index.OnlineReindexer;
import com.google.gerrit.server.index.ReindexerAlreadyRunningException;
import com.google.gerrit.server.index.Schema;
import com.google.inject.Inject;
import com.google.inject.ProvisionException;
import com.google.inject.Singleton;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.eclipse.jgit.lib.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Singleton
public class ElasticVersionManager implements LifecycleListener {
private static final Logger log = LoggerFactory.getLogger(ElasticVersionManager.class);
private static class Version<V> {
private final Schema<V> schema;
private final int version;
private final boolean ready;
private Version(Schema<V> schema, int version, boolean ready) {
checkArgument(schema == null || schema.getVersion() == version);
this.schema = schema;
this.version = version;
this.ready = ready;
}
}
private final Map<String, IndexDefinition<?, ?, ?>> defs;
private final Map<String, OnlineReindexer<?, ?, ?>> reindexers;
private final ElasticIndexVersionDiscovery versionDiscovery;
private final SitePaths sitePaths;
private final boolean onlineUpgrade;
private final String runReindexMsg;
private final String prefix;
@Inject
ElasticVersionManager(
@GerritServerConfig Config cfg,
SitePaths sitePaths,
Collection<IndexDefinition<?, ?, ?>> defs,
ElasticIndexVersionDiscovery versionDiscovery) {
this.sitePaths = sitePaths;
this.versionDiscovery = versionDiscovery;
this.defs = Maps.newHashMapWithExpectedSize(defs.size());
for (IndexDefinition<?, ?, ?> def : defs) {
this.defs.put(def.getName(), def);
}
prefix = MoreObjects.firstNonNull(cfg.getString("index", null, "prefix"), "gerrit");
reindexers = Maps.newHashMapWithExpectedSize(defs.size());
onlineUpgrade = cfg.getBoolean("index", null, "onlineUpgrade", true);
runReindexMsg =
"No index versions ready; run java -jar "
+ sitePaths.gerrit_war.toAbsolutePath()
+ " reindex";
}
@Override
public void start() {
try {
for (IndexDefinition<?, ?, ?> def : defs.values()) {
initIndex(def);
}
} catch (IOException e) {
ProvisionException ex = new ProvisionException("Error scanning indexes");
ex.initCause(e);
throw ex;
}
}
private <K, V, I extends Index<K, V>> void initIndex(IndexDefinition<K, V, I> def)
throws IOException {
TreeMap<Integer, Version<V>> versions = scanVersions(def);
// Search from the most recent ready version.
// Write to the most recent ready version and the most recent version.
Version<V> search = null;
List<Version<V>> write = Lists.newArrayListWithCapacity(2);
for (Version<V> v : versions.descendingMap().values()) {
if (v.schema == null) {
continue;
}
if (write.isEmpty() && onlineUpgrade) {
write.add(v);
}
if (v.ready) {
search = v;
if (!write.contains(v)) {
write.add(v);
}
break;
}
}
if (search == null) {
throw new ProvisionException(runReindexMsg);
}
IndexFactory<K, V, I> factory = def.getIndexFactory();
I searchIndex = factory.create(search.schema);
IndexCollection<K, V, I> indexes = def.getIndexCollection();
indexes.setSearchIndex(searchIndex);
for (Version<V> v : write) {
if (v.schema != null) {
if (v.version != search.version) {
indexes.addWriteIndex(factory.create(v.schema));
} else {
indexes.addWriteIndex(searchIndex);
}
}
}
markNotReady(def.getName(), versions.values(), write);
synchronized (this) {
if (!reindexers.containsKey(def.getName())) {
int latest = write.get(0).version;
OnlineReindexer<K, V, I> reindexer = new OnlineReindexer<>(def, latest);
reindexers.put(def.getName(), reindexer);
if (onlineUpgrade && latest != search.version) {
reindexer.start();
}
}
}
}
/**
* Start the online reindexer if the current index is not already the latest.
*
* @param name index name
* @param force start re-index
* @return true if started, otherwise false.
* @throws ReindexerAlreadyRunningException
*/
public synchronized boolean startReindexer(String name, boolean force)
throws ReindexerAlreadyRunningException {
OnlineReindexer<?, ?, ?> reindexer = reindexers.get(name);
validateReindexerNotRunning(reindexer);
if (force || !isLatestIndexVersion(name, reindexer)) {
reindexer.start();
return true;
}
return false;
}
/**
* Activate the latest index if the current index is not already the latest.
*
* @param name index name
* @return true if index was activated, otherwise false.
* @throws ReindexerAlreadyRunningException
*/
public synchronized boolean activateLatestIndex(String name)
throws ReindexerAlreadyRunningException {
OnlineReindexer<?, ?, ?> reindexer = reindexers.get(name);
validateReindexerNotRunning(reindexer);
if (!isLatestIndexVersion(name, reindexer)) {
reindexer.activateIndex();
return true;
}
return false;
}
private boolean isLatestIndexVersion(String name, OnlineReindexer<?, ?, ?> reindexer) {
int readVersion = defs.get(name).getIndexCollection().getSearchIndex().getSchema().getVersion();
return reindexer == null || reindexer.getVersion() == readVersion;
}
private static void validateReindexerNotRunning(OnlineReindexer<?, ?, ?> reindexer)
throws ReindexerAlreadyRunningException {
if (reindexer != null && reindexer.isRunning()) {
throw new ReindexerAlreadyRunningException();
}
}
private <K, V, I extends Index<K, V>> TreeMap<Integer, Version<V>> scanVersions(
IndexDefinition<K, V, I> def) throws IOException {
TreeMap<Integer, Version<V>> versions = new TreeMap<>();
for (Schema<V> schema : def.getSchemas().values()) {
int v = schema.getVersion();
versions.put(
v,
new Version<>(
schema, v, IndexUtils.getReady(sitePaths, def.getName(), schema.getVersion())));
}
try {
for (String version : versionDiscovery.discover(prefix, def.getName())) {
Integer v = Ints.tryParse(version);
if (v == null || version.length() != 4) {
log.warn("Unrecognized version in index {}: {}", def.getName(), version);
continue;
}
if (!versions.containsKey(v)) {
versions.put(
v, new Version<V>(null, v, IndexUtils.getReady(sitePaths, def.getName(), v)));
}
}
} catch (IOException e) {
log.error("Error scanning index: " + def.getName(), e);
}
return versions;
}
private <V> void markNotReady(
String name, Iterable<Version<V>> versions, Collection<Version<V>> inUse) throws IOException {
for (Version<V> v : versions) {
if (!inUse.contains(v)) {
IndexUtils.getReady(sitePaths, name, v.version);
}
}
}
@Override
public void stop() {
// Do nothing; indexes are closed on demand by IndexCollection.
}
}

View File

@@ -28,6 +28,7 @@ import com.google.gerrit.server.index.IndexCollection;
import com.google.gerrit.server.index.IndexDefinition; import com.google.gerrit.server.index.IndexDefinition;
import com.google.gerrit.server.index.IndexDefinition.IndexFactory; import com.google.gerrit.server.index.IndexDefinition.IndexFactory;
import com.google.gerrit.server.index.OnlineReindexer; import com.google.gerrit.server.index.OnlineReindexer;
import com.google.gerrit.server.index.ReindexerAlreadyRunningException;
import com.google.gerrit.server.index.Schema; import com.google.gerrit.server.index.Schema;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.ProvisionException; import com.google.inject.ProvisionException;

View File

@@ -43,6 +43,15 @@ public final class IndexUtils {
} }
} }
public static boolean getReady(SitePaths sitePaths, String name, int version) throws IOException {
try {
GerritIndexStatus cfg = new GerritIndexStatus(sitePaths);
return cfg.getReady(name, version);
} catch (ConfigInvalidException e) {
throw new IOException(e);
}
}
public static Set<String> accountFields(QueryOptions opts) { public static Set<String> accountFields(QueryOptions opts) {
Set<String> fs = opts.fields(); Set<String> fs = opts.fields();
return fs.contains(AccountField.ID.getName()) return fs.contains(AccountField.ID.getName())

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package com.google.gerrit.lucene; package com.google.gerrit.server.index;
public class ReindexerAlreadyRunningException extends Exception { public class ReindexerAlreadyRunningException extends Exception {

View File

@@ -24,6 +24,7 @@ import com.google.inject.Singleton;
import com.google.inject.TypeLiteral; import com.google.inject.TypeLiteral;
import com.google.inject.name.Named; import com.google.inject.name.Named;
import com.google.inject.name.Names; import com.google.inject.name.Names;
import com.google.inject.util.Providers;
import java.util.Collection; import java.util.Collection;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@@ -44,7 +45,7 @@ public class SingleVersionModule extends LifecycleModule {
listener().to(SingleVersionListener.class); listener().to(SingleVersionListener.class);
bind(new TypeLiteral<Map<String, Integer>>() {}) bind(new TypeLiteral<Map<String, Integer>>() {})
.annotatedWith(Names.named(SINGLE_VERSIONS)) .annotatedWith(Names.named(SINGLE_VERSIONS))
.toInstance(singleVersions); .toProvider(Providers.of(singleVersions));
} }
@Singleton @Singleton

View File

@@ -17,7 +17,7 @@ package com.google.gerrit.sshd.commands;
import com.google.gerrit.common.data.GlobalCapability; import com.google.gerrit.common.data.GlobalCapability;
import com.google.gerrit.extensions.annotations.RequiresCapability; import com.google.gerrit.extensions.annotations.RequiresCapability;
import com.google.gerrit.lucene.LuceneVersionManager; import com.google.gerrit.lucene.LuceneVersionManager;
import com.google.gerrit.lucene.ReindexerAlreadyRunningException; import com.google.gerrit.server.index.ReindexerAlreadyRunningException;
import com.google.gerrit.sshd.CommandMetaData; import com.google.gerrit.sshd.CommandMetaData;
import com.google.gerrit.sshd.SshCommand; import com.google.gerrit.sshd.SshCommand;
import com.google.inject.Inject; import com.google.inject.Inject;

View File

@@ -17,7 +17,7 @@ package com.google.gerrit.sshd.commands;
import com.google.gerrit.common.data.GlobalCapability; import com.google.gerrit.common.data.GlobalCapability;
import com.google.gerrit.extensions.annotations.RequiresCapability; import com.google.gerrit.extensions.annotations.RequiresCapability;
import com.google.gerrit.lucene.LuceneVersionManager; import com.google.gerrit.lucene.LuceneVersionManager;
import com.google.gerrit.lucene.ReindexerAlreadyRunningException; import com.google.gerrit.server.index.ReindexerAlreadyRunningException;
import com.google.gerrit.sshd.CommandMetaData; import com.google.gerrit.sshd.CommandMetaData;
import com.google.gerrit.sshd.SshCommand; import com.google.gerrit.sshd.SshCommand;
import com.google.inject.Inject; import com.google.inject.Inject;