Support generic indexes in LuceneVersionManager
We now loop over all IndexDefinitions and support online reindexing for each of them independently. They share the single configured batch indexing threadpool, so running an arbitrary (small) number of them shouldn't affect performance more than running one of them. The markReady implementation still does not correctly respect multiple indexes; this obviously needs to be fixed before adding any new index types. Change-Id: Id6c58a595086e7dc22cbc7302169d96c4ccf1aa4
This commit is contained in:

committed by
David Ostrovsky

parent
e6eb52771b
commit
1762bacf0c
@@ -5,7 +5,7 @@ gerrit index activate - Activate the latest index version available
|
||||
|
||||
== SYNOPSIS
|
||||
--
|
||||
'ssh' -p @SSH_PORT@ @SSH_HOST@ 'gerrit index activate'
|
||||
'ssh' -p @SSH_PORT@ @SSH_HOST@ 'gerrit index activate <index>'
|
||||
--
|
||||
|
||||
== DESCRIPTION
|
||||
@@ -18,6 +18,9 @@ number of successfully/failed indexed changes.
|
||||
This command allows to activate the latest index even if there were some
|
||||
failures.
|
||||
|
||||
The <index> argument controls which secondary index is activated. Currently, the
|
||||
only supported value is "changes".
|
||||
|
||||
== ACCESS
|
||||
Caller must be a member of the privileged 'Administrators' group.
|
||||
|
||||
|
@@ -5,7 +5,7 @@ gerrit index start - Start the online indexer
|
||||
|
||||
== SYNOPSIS
|
||||
--
|
||||
'ssh' -p @SSH_PORT@ @SSH_HOST@ 'gerrit index start'
|
||||
'ssh' -p @SSH_PORT@ @SSH_HOST@ 'gerrit index start <index>'
|
||||
--
|
||||
|
||||
== DESCRIPTION
|
||||
@@ -19,6 +19,9 @@ This command allows restarting the online indexer without having to restart
|
||||
Gerrit. This command will not start the indexer if it is already running or if
|
||||
the active index is the latest.
|
||||
|
||||
The <index> argument controls which secondary index is started. Currently, the
|
||||
only supported value is "changes".
|
||||
|
||||
== ACCESS
|
||||
Caller must be a member of the privileged 'Administrators' group.
|
||||
|
||||
|
@@ -20,15 +20,14 @@ import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.google.gerrit.extensions.events.LifecycleListener;
|
||||
import com.google.gerrit.reviewdb.client.Change;
|
||||
import com.google.gerrit.server.config.GerritServerConfig;
|
||||
import com.google.gerrit.server.config.SitePaths;
|
||||
import com.google.gerrit.server.index.Index;
|
||||
import com.google.gerrit.server.index.IndexCollection;
|
||||
import com.google.gerrit.server.index.IndexDefinition;
|
||||
import com.google.gerrit.server.index.IndexDefinition.IndexFactory;
|
||||
import com.google.gerrit.server.index.OnlineReindexer;
|
||||
import com.google.gerrit.server.index.Schema;
|
||||
import com.google.gerrit.server.index.change.ChangeIndex;
|
||||
import com.google.gerrit.server.index.change.ChangeIndexCollection;
|
||||
import com.google.gerrit.server.index.change.ChangeIndexDefinition;
|
||||
import com.google.gerrit.server.query.change.ChangeData;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.ProvisionException;
|
||||
import com.google.inject.Singleton;
|
||||
@@ -46,6 +45,7 @@ import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
|
||||
@Singleton
|
||||
@@ -55,13 +55,13 @@ public class LuceneVersionManager implements LifecycleListener {
|
||||
|
||||
static final String CHANGES_PREFIX = "changes_";
|
||||
|
||||
private static class Version {
|
||||
private final Schema<ChangeData> schema;
|
||||
private static class Version<V> {
|
||||
private final Schema<V> schema;
|
||||
private final int version;
|
||||
private final boolean exists;
|
||||
private final boolean ready;
|
||||
|
||||
private Version(Schema<ChangeData> schema, int version, boolean exists,
|
||||
private Version(Schema<V> schema, int version, boolean exists,
|
||||
boolean ready) {
|
||||
checkArgument(schema == null || schema.getVersion() == version);
|
||||
this.schema = schema;
|
||||
@@ -94,33 +94,32 @@ public class LuceneVersionManager implements LifecycleListener {
|
||||
}
|
||||
|
||||
private final SitePaths sitePaths;
|
||||
private final LuceneChangeIndex.Factory indexFactory;
|
||||
private final ChangeIndexCollection indexes;
|
||||
private final ChangeIndexDefinition changeDef;
|
||||
private final Map<String, IndexDefinition<?, ?, ?>> defs;
|
||||
private final Map<String, OnlineReindexer<?, ?, ?>> reindexers;
|
||||
private final boolean onlineUpgrade;
|
||||
private OnlineReindexer<Change.Id, ChangeData, ChangeIndex> reindexer;
|
||||
private final String runReindexMsg;
|
||||
|
||||
@Inject
|
||||
LuceneVersionManager(
|
||||
@GerritServerConfig Config cfg,
|
||||
SitePaths sitePaths,
|
||||
LuceneChangeIndex.Factory indexFactory,
|
||||
ChangeIndexCollection indexes,
|
||||
ChangeIndexDefinition changeDef) {
|
||||
Collection<IndexDefinition<?, ?, ?>> defs) {
|
||||
this.sitePaths = sitePaths;
|
||||
this.indexFactory = indexFactory;
|
||||
this.indexes = indexes;
|
||||
this.changeDef = changeDef;
|
||||
this.onlineUpgrade = cfg.getBoolean("index", null, "onlineUpgrade", true);
|
||||
this.defs = Maps.newHashMapWithExpectedSize(defs.size());
|
||||
for (IndexDefinition<?, ?, ?> def : defs) {
|
||||
this.defs.put(def.getName(), def);
|
||||
}
|
||||
|
||||
reindexers = Maps.newHashMapWithExpectedSize(defs.size());
|
||||
onlineUpgrade = cfg.getBoolean("index", null, "onlineUpgrade", true);
|
||||
runReindexMsg =
|
||||
"No index versions ready; run java -jar " +
|
||||
sitePaths.gerrit_war.toAbsolutePath() +
|
||||
" reindex";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
String runReindex =
|
||||
"No index versions ready; run java -jar " +
|
||||
sitePaths.gerrit_war.toAbsolutePath() +
|
||||
" reindex";
|
||||
|
||||
FileBasedConfig cfg;
|
||||
try {
|
||||
cfg = loadGerritIndexConfig(sitePaths);
|
||||
@@ -129,18 +128,25 @@ public class LuceneVersionManager implements LifecycleListener {
|
||||
}
|
||||
|
||||
if (!Files.exists(sitePaths.index_dir)) {
|
||||
throw new ProvisionException(runReindex);
|
||||
throw new ProvisionException(runReindexMsg);
|
||||
} else if (!Files.exists(sitePaths.index_dir)) {
|
||||
log.warn("Not a directory: %s", sitePaths.index_dir.toAbsolutePath());
|
||||
throw new ProvisionException(runReindex);
|
||||
throw new ProvisionException(runReindexMsg);
|
||||
}
|
||||
|
||||
TreeMap<Integer, Version> versions = scanVersions(cfg);
|
||||
for (IndexDefinition<?, ?, ?> def : defs.values()) {
|
||||
initIndex(def, cfg);
|
||||
}
|
||||
}
|
||||
|
||||
private <K, V, I extends Index<K, V>> void initIndex(
|
||||
IndexDefinition<K, V, I> def, FileBasedConfig cfg) {
|
||||
TreeMap<Integer, Version<V>> versions = scanVersions(def, cfg);
|
||||
// Search from the most recent ready version.
|
||||
// Write to the most recent ready version and the most recent version.
|
||||
Version search = null;
|
||||
List<Version> write = Lists.newArrayListWithCapacity(2);
|
||||
for (Version v : versions.descendingMap().values()) {
|
||||
Version<V> search = null;
|
||||
List<Version<V>> write = Lists.newArrayListWithCapacity(2);
|
||||
for (Version<V> v : versions.descendingMap().values()) {
|
||||
if (v.schema == null) {
|
||||
continue;
|
||||
}
|
||||
@@ -156,27 +162,35 @@ public class LuceneVersionManager implements LifecycleListener {
|
||||
}
|
||||
}
|
||||
if (search == null) {
|
||||
throw new ProvisionException(runReindex);
|
||||
throw new ProvisionException(runReindexMsg);
|
||||
}
|
||||
|
||||
markNotReady(cfg, versions.values(), write);
|
||||
LuceneChangeIndex searchIndex =
|
||||
(LuceneChangeIndex) indexFactory.create(search.schema);
|
||||
IndexFactory<K, V, I> factory = def.getIndexFactory();
|
||||
I searchIndex = factory.create(search.schema);
|
||||
IndexCollection<K, V, I> indexes = def.getIndexCollection();
|
||||
indexes.setSearchIndex(searchIndex);
|
||||
for (Version v : write) {
|
||||
for (Version<V> v : write) {
|
||||
if (v.schema != null) {
|
||||
if (v.version != search.version) {
|
||||
indexes.addWriteIndex(indexFactory.create(v.schema));
|
||||
} else {
|
||||
indexes.addWriteIndex(searchIndex);
|
||||
indexes.addWriteIndex(factory.create(v.schema));
|
||||
}
|
||||
} else {
|
||||
indexes.addWriteIndex(searchIndex);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: include index name.
|
||||
markNotReady(cfg, versions.values(), write);
|
||||
|
||||
int latest = write.get(0).version;
|
||||
if (onlineUpgrade && latest != search.version) {
|
||||
reindexer = new OnlineReindexer<>(changeDef, latest);
|
||||
reindexer.start();
|
||||
OnlineReindexer<K, V, I> reindexer = new OnlineReindexer<>(def, latest);
|
||||
synchronized (this) {
|
||||
if (!reindexers.containsKey(def.getName())) {
|
||||
reindexers.put(def.getName(), reindexer);
|
||||
reindexer.start();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -186,10 +200,11 @@ public class LuceneVersionManager implements LifecycleListener {
|
||||
* @return true if started, otherwise false.
|
||||
* @throws ReindexerAlreadyRunningException
|
||||
*/
|
||||
public synchronized boolean startReindexer()
|
||||
public synchronized boolean startReindexer(String name)
|
||||
throws ReindexerAlreadyRunningException {
|
||||
validateReindexerNotRunning();
|
||||
if (!isCurrentIndexVersionLatest()) {
|
||||
OnlineReindexer<?, ?, ?> reindexer = reindexers.get(name);
|
||||
validateReindexerNotRunning(reindexer);
|
||||
if (!isCurrentIndexVersionLatest(name, reindexer)) {
|
||||
reindexer.start();
|
||||
return true;
|
||||
}
|
||||
@@ -202,49 +217,56 @@ public class LuceneVersionManager implements LifecycleListener {
|
||||
* @return true if index was activate, otherwise false.
|
||||
* @throws ReindexerAlreadyRunningException
|
||||
*/
|
||||
public synchronized boolean activateLatestIndex()
|
||||
public synchronized boolean activateLatestIndex(String name)
|
||||
throws ReindexerAlreadyRunningException {
|
||||
validateReindexerNotRunning();
|
||||
if (!isCurrentIndexVersionLatest()) {
|
||||
OnlineReindexer<?, ?, ?> reindexer = reindexers.get(name);
|
||||
validateReindexerNotRunning(reindexer);
|
||||
if (!isCurrentIndexVersionLatest(name, reindexer)) {
|
||||
reindexer.activateIndex();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean isCurrentIndexVersionLatest() {
|
||||
private boolean isCurrentIndexVersionLatest(
|
||||
String name, OnlineReindexer<?, ?, ?> reindexer) {
|
||||
int readVersion = defs.get(name).getIndexCollection().getSearchIndex()
|
||||
.getSchema().getVersion();
|
||||
return reindexer == null
|
||||
|| reindexer.getVersion() == indexes.getSearchIndex().getSchema()
|
||||
.getVersion();
|
||||
|| reindexer.getVersion() == readVersion;
|
||||
}
|
||||
|
||||
private void validateReindexerNotRunning()
|
||||
private static void validateReindexerNotRunning(
|
||||
OnlineReindexer<?, ?, ?> reindexer)
|
||||
throws ReindexerAlreadyRunningException {
|
||||
if (reindexer != null && reindexer.isRunning()) {
|
||||
throw new ReindexerAlreadyRunningException();
|
||||
}
|
||||
}
|
||||
|
||||
private TreeMap<Integer, Version> scanVersions(Config cfg) {
|
||||
TreeMap<Integer, Version> versions = Maps.newTreeMap();
|
||||
for (Schema<ChangeData> schema : changeDef.getSchemas().values()) {
|
||||
Path p = getDir(sitePaths, CHANGES_PREFIX, schema);
|
||||
private <K, V, I extends Index<K, V>> TreeMap<Integer, Version<V>>
|
||||
scanVersions(IndexDefinition<K, V, I> def, Config cfg) {
|
||||
TreeMap<Integer, Version<V>> versions = Maps.newTreeMap();
|
||||
for (Schema<V> schema : def.getSchemas().values()) {
|
||||
// This part is Lucene-specific.
|
||||
Path p = getDir(sitePaths, def.getName(), schema);
|
||||
boolean isDir = Files.isDirectory(p);
|
||||
if (Files.exists(p) && !isDir) {
|
||||
log.warn("Not a directory: %s", p.toAbsolutePath());
|
||||
}
|
||||
int v = schema.getVersion();
|
||||
versions.put(v, new Version(schema, v, isDir, getReady(cfg, v)));
|
||||
versions.put(v, new Version<>(schema, v, isDir, getReady(cfg, v)));
|
||||
}
|
||||
|
||||
String prefix = def.getName() + "_";
|
||||
try (DirectoryStream<Path> paths =
|
||||
Files.newDirectoryStream(sitePaths.index_dir)) {
|
||||
for (Path p : paths) {
|
||||
String n = p.getFileName().toString();
|
||||
if (!n.startsWith(CHANGES_PREFIX)) {
|
||||
if (!n.startsWith(prefix)) {
|
||||
continue;
|
||||
}
|
||||
String versionStr = n.substring(CHANGES_PREFIX.length());
|
||||
String versionStr = n.substring(prefix.length());
|
||||
Integer v = Ints.tryParse(versionStr);
|
||||
if (v == null || versionStr.length() != 4) {
|
||||
log.warn("Unrecognized version in index directory: {}",
|
||||
@@ -252,7 +274,7 @@ public class LuceneVersionManager implements LifecycleListener {
|
||||
continue;
|
||||
}
|
||||
if (!versions.containsKey(v)) {
|
||||
versions.put(v, new Version(null, v, true, getReady(cfg, v)));
|
||||
versions.put(v, new Version<V>(null, v, true, getReady(cfg, v)));
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
@@ -261,10 +283,10 @@ public class LuceneVersionManager implements LifecycleListener {
|
||||
return versions;
|
||||
}
|
||||
|
||||
private void markNotReady(FileBasedConfig cfg, Iterable<Version> versions,
|
||||
Collection<Version> inUse) {
|
||||
private <V> void markNotReady(FileBasedConfig cfg, Iterable<Version<V>> versions,
|
||||
Collection<Version<V>> inUse) {
|
||||
boolean dirty = false;
|
||||
for (Version v : versions) {
|
||||
for (Version<V> v : versions) {
|
||||
if (!inUse.contains(v) && v.exists) {
|
||||
setReady(cfg, v.version, false);
|
||||
dirty = true;
|
||||
|
@@ -24,19 +24,25 @@ import com.google.gerrit.sshd.CommandMetaData;
|
||||
import com.google.gerrit.sshd.SshCommand;
|
||||
import com.google.inject.Inject;
|
||||
|
||||
import org.kohsuke.args4j.Argument;
|
||||
|
||||
@RequiresCapability(GlobalCapability.ADMINISTRATE_SERVER)
|
||||
@CommandMetaData(name = "activate",
|
||||
description = "Activate the latest index version available",
|
||||
runsAt = MASTER)
|
||||
public class IndexActivateCommand extends SshCommand {
|
||||
|
||||
@Argument(index = 0, required = true, metaVar = "INDEX",
|
||||
usage = "index name to activate")
|
||||
private String name;
|
||||
|
||||
@Inject
|
||||
private LuceneVersionManager luceneVersionManager;
|
||||
|
||||
@Override
|
||||
protected void run() throws UnloggedFailure {
|
||||
try {
|
||||
if (luceneVersionManager.activateLatestIndex()) {
|
||||
if (luceneVersionManager.activateLatestIndex(name)) {
|
||||
stdout.println("Activated latest index version");
|
||||
} else {
|
||||
stdout.println("Not activating index, already using latest version");
|
||||
|
@@ -24,18 +24,24 @@ import com.google.gerrit.sshd.CommandMetaData;
|
||||
import com.google.gerrit.sshd.SshCommand;
|
||||
import com.google.inject.Inject;
|
||||
|
||||
import org.kohsuke.args4j.Argument;
|
||||
|
||||
@RequiresCapability(GlobalCapability.ADMINISTRATE_SERVER)
|
||||
@CommandMetaData(name = "start", description = "Start the online reindexer",
|
||||
runsAt = MASTER)
|
||||
public class IndexStartCommand extends SshCommand {
|
||||
|
||||
@Argument(index = 0, required = true, metaVar = "INDEX",
|
||||
usage = "index name to activate")
|
||||
private String name;
|
||||
|
||||
@Inject
|
||||
private LuceneVersionManager luceneVersionManager;
|
||||
|
||||
@Override
|
||||
protected void run() throws UnloggedFailure {
|
||||
try {
|
||||
if (luceneVersionManager.startReindexer()) {
|
||||
if (luceneVersionManager.startReindexer(name)) {
|
||||
stdout.println("Reindexer started");
|
||||
} else {
|
||||
stdout.println("Nothing to reindex, index is already the latest version");
|
||||
|
Reference in New Issue
Block a user