Merge "Reindex: Support generic indexes"

This commit is contained in:
Saša Živkov
2016-03-30 12:00:42 +00:00
committed by Gerrit Code Review
3 changed files with 80 additions and 82 deletions

View File

@@ -14,45 +14,38 @@
package com.google.gerrit.pgm;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.gerrit.server.schema.DataSourceProvider.Context.MULTI_USER;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.gerrit.common.Die;
import com.google.gerrit.lifecycle.LifecycleManager;
import com.google.gerrit.lucene.LuceneIndexModule;
import com.google.gerrit.pgm.util.BatchProgramModule;
import com.google.gerrit.pgm.util.SiteProgram;
import com.google.gerrit.pgm.util.ThreadLimiter;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.git.ScanningChangeCacheImpl;
import com.google.gerrit.server.index.Index;
import com.google.gerrit.server.index.IndexDefinition;
import com.google.gerrit.server.index.IndexModule;
import com.google.gerrit.server.index.IndexModule.IndexType;
import com.google.gerrit.server.index.SiteIndexer;
import com.google.gerrit.server.index.change.AllChangesIndexer;
import com.google.gerrit.server.index.change.ChangeIndex;
import com.google.gerrit.server.index.change.ChangeIndexCollection;
import com.google.gerrit.server.index.change.ChangeSchemaDefinitions;
import com.google.gerrit.server.notedb.ChangeNotes;
import com.google.gerrit.server.project.ProjectCache;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import org.eclipse.jgit.lib.Config;
import org.eclipse.jgit.lib.ProgressMonitor;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.lib.TextProgressMonitor;
import org.eclipse.jgit.util.io.NullOutputStream;
import org.kohsuke.args4j.Option;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
public class Reindex extends SiteProgram {
@@ -74,16 +67,7 @@ public class Reindex extends SiteProgram {
private Config globalConfig;
@Inject
private AllChangesIndexer batchIndexer;
@Inject
private ChangeIndexCollection changeIndexes;
@Inject
private GitRepositoryManager repoManager;
@Inject
private ProjectCache projectCache;
private Collection<IndexDefinition<?, ?, ?>> indexDefs;
@Override
public int run() throws Exception {
@@ -105,19 +89,18 @@ public class Reindex extends SiteProgram {
sysManager.start();
sysInjector.injectMembers(this);
ChangeIndex index = changeIndexes.getSearchIndex();
int result = 0;
try {
index.markReady(false);
index.deleteAll();
result = indexAll(index);
index.markReady(true);
boolean ok = true;
for (IndexDefinition<?, ?, ?> def : indexDefs) {
ok &= reindex(def);
}
return ok ? 0 : 1;
} catch (Exception e) {
throw die(e.getMessage(), e);
} finally {
sysManager.stop();
dbManager.stop();
}
sysManager.stop();
dbManager.stop();
return result;
}
private void checkNotSlaveMode() throws Die {
@@ -132,16 +115,16 @@ public class Reindex extends SiteProgram {
versions.put(ChangeSchemaDefinitions.INSTANCE.getName(), changesVersion);
}
List<Module> modules = Lists.newArrayList();
Module changeIndexModule;
Module indexModule;
switch (IndexModule.getIndexType(dbInjector)) {
case LUCENE:
changeIndexModule = LuceneIndexModule.singleVersionWithExplicitVersions(
indexModule = LuceneIndexModule.singleVersionWithExplicitVersions(
versions, threads);
break;
default:
throw new IllegalStateException("unsupported index.type");
}
modules.add(changeIndexModule);
modules.add(indexModule);
// Scan changes from git instead of relying on the secondary index, as we
// will have just deleted the old (possibly corrupt) index.
modules.add(ScanningChangeCacheImpl.module());
@@ -161,28 +144,25 @@ public class Reindex extends SiteProgram {
globalConfig.setLong("cache", "changes", "maximumWeight", 0);
}
private int indexAll(ChangeIndex index) throws Exception {
ProgressMonitor pm = new TextProgressMonitor();
pm.start(1);
pm.beginTask("Collecting projects", ProgressMonitor.UNKNOWN);
Set<Project.NameKey> projects = Sets.newTreeSet();
int changeCount = 0;
for (Project.NameKey project : projectCache.all()) {
try (Repository repo = repoManager.openRepository(project)) {
changeCount += ChangeNotes.Factory.scan(repo).size();
}
projects.add(project);
pm.update(1);
}
pm.endTask();
private <K, V, I extends Index<K, V>> boolean reindex(
IndexDefinition<K, V, I> def) throws IOException {
I index = def.getIndexCollection().getSearchIndex();
checkNotNull(index,
"no active search index configured for %s", def.getName());
index.markReady(false);
index.deleteAll();
SiteIndexer.Result result = batchIndexer.setTotalWork(changeCount)
.setProgressOut(System.err)
.setVerboseOut(verbose ? System.out : NullOutputStream.INSTANCE)
.indexAll(index, projects);
SiteIndexer<K, V, I> siteIndexer = def.getSiteIndexer();
siteIndexer.setProgressOut(System.err);
siteIndexer.setVerboseOut(verbose ? System.out : NullOutputStream.INSTANCE);
SiteIndexer.Result result = siteIndexer.indexAll(index);
int n = result.doneCount() + result.failedCount();
double t = result.elapsed(TimeUnit.MILLISECONDS) / 1000d;
System.out.format("Reindexed %d changes in %.01fs (%.01f/s)\n", n, t, n/t);
return result.success() ? 0 : 1;
System.out.format("Reindexed %d documents in %s index in %.01fs (%.01f/s)\n",
n, def.getName(), t, n/t);
if (result.success()) {
index.markReady(true);
}
return result.success();
}
}

View File

@@ -14,12 +14,18 @@
package com.google.gerrit.server.index;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.base.Stopwatch;
import org.eclipse.jgit.util.io.NullOutputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.concurrent.TimeUnit;
public interface SiteIndexer<K, V, I extends Index<K, V>> {
public class Result {
public abstract class SiteIndexer<K, V, I extends Index<K, V>> {
public static class Result {
private final long elapsedNanos;
private final boolean success;
private final int done;
@@ -49,5 +55,22 @@ public interface SiteIndexer<K, V, I extends Index<K, V>> {
}
}
Result indexAll(I index);
protected int totalWork = -1;
protected OutputStream progressOut = NullOutputStream.INSTANCE;
protected PrintWriter verboseWriter =
new PrintWriter(NullOutputStream.INSTANCE);
public void setTotalWork(int num) {
totalWork = num;
}
public void setProgressOut(OutputStream out) {
progressOut = checkNotNull(out);
}
public void setVerboseOut(OutputStream out) {
verboseWriter = new PrintWriter(checkNotNull(out));
}
public abstract Result indexAll(I index);
}

View File

@@ -14,7 +14,6 @@
package com.google.gerrit.server.index.change;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.gerrit.server.git.QueueProvider.QueueType.BATCH;
import static org.eclipse.jgit.lib.RefDatabase.ALL;
@@ -56,18 +55,17 @@ import org.eclipse.jgit.lib.ObjectInserter;
import org.eclipse.jgit.lib.ProgressMonitor;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.lib.TextProgressMonitor;
import org.eclipse.jgit.merge.ThreeWayMergeStrategy;
import org.eclipse.jgit.revwalk.RevCommit;
import org.eclipse.jgit.revwalk.RevObject;
import org.eclipse.jgit.revwalk.RevTree;
import org.eclipse.jgit.revwalk.RevWalk;
import org.eclipse.jgit.util.io.DisabledOutputStream;
import org.eclipse.jgit.util.io.NullOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.Collection;
import java.util.Collections;
@@ -80,7 +78,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
public class AllChangesIndexer
implements SiteIndexer<Change.Id, ChangeData, ChangeIndex> {
extends SiteIndexer<Change.Id, ChangeData, ChangeIndex> {
private static final Logger log =
LoggerFactory.getLogger(AllChangesIndexer.class);
@@ -93,11 +91,6 @@ public class AllChangesIndexer
private final ProjectCache projectCache;
private final ThreeWayMergeStrategy mergeStrategy;
private int totalWork = -1;
private OutputStream progressOut = NullOutputStream.INSTANCE;
private PrintWriter verboseWriter =
new PrintWriter(NullOutputStream.INSTANCE);
@Inject
AllChangesIndexer(SchemaFactory<ReviewDb> schemaFactory,
ChangeData.Factory changeDataFactory,
@@ -117,24 +110,26 @@ public class AllChangesIndexer
this.mergeStrategy = MergeUtil.getMergeStrategy(config);
}
public AllChangesIndexer setTotalWork(int num) {
totalWork = num;
return this;
}
public AllChangesIndexer setProgressOut(OutputStream out) {
progressOut = checkNotNull(out);
return this;
}
public AllChangesIndexer setVerboseOut(OutputStream out) {
verboseWriter = new PrintWriter(checkNotNull(out));
return this;
}
@Override
public Result indexAll(ChangeIndex index) {
return indexAll(index, projectCache.all());
ProgressMonitor pm = new TextProgressMonitor();
pm.beginTask("Collecting projects", ProgressMonitor.UNKNOWN);
Set<Project.NameKey> projects = Sets.newTreeSet();
int changeCount = 0;
Stopwatch sw = Stopwatch.createStarted();
for (Project.NameKey project : projectCache.all()) {
try (Repository repo = repoManager.openRepository(project)) {
changeCount += ChangeNotes.Factory.scan(repo).size();
} catch (IOException e) {
log.error("Error collecting projects", e);
return new Result(sw, false, 0, 0);
}
projects.add(project);
pm.update(1);
}
pm.endTask();
setTotalWork(changeCount);
return indexAll(index, projects);
}
public SiteIndexer.Result indexAll(ChangeIndex index,