Rewrite Reindex around projects

Opening and closing repositories and packs and constantly re-parsing
commits and trees thrashes even beefy machines, even when the
PatchListCache is hot. Instead, use a single RevWalk per Repository (i.e.
project), so we can reuse parsed trees when doing all the tree diffs.

Also take advantage of the fact that the set of changed paths without
rename detection matches that with rename detection (since we aren't
doing find-copies-harder), so skip rename detection.

Change-Id: I49b85cc93796332c63d8be8e0ce7b918c98a21f7
This commit is contained in:
Dave Borowitz
2013-06-25 14:57:54 -06:00
parent b2eaee406a
commit 6093720753
4 changed files with 272 additions and 61 deletions

View File

@@ -17,11 +17,18 @@ package com.google.gerrit.pgm;
import static com.google.gerrit.lucene.IndexVersionCheck.SCHEMA_VERSIONS; import static com.google.gerrit.lucene.IndexVersionCheck.SCHEMA_VERSIONS;
import static com.google.gerrit.lucene.IndexVersionCheck.gerritIndexConfig; import static com.google.gerrit.lucene.IndexVersionCheck.gerritIndexConfig;
import static com.google.gerrit.lucene.LuceneChangeIndex.LUCENE_VERSION; import static com.google.gerrit.lucene.LuceneChangeIndex.LUCENE_VERSION;
import static com.google.gerrit.server.schema.DataSourceProvider.Context.SINGLE_USER; import static com.google.gerrit.server.schema.DataSourceProvider.Context.MULTI_USER;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.extensions.events.LifecycleListener; import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicSet; import com.google.gerrit.extensions.registration.DynamicSet;
@@ -30,16 +37,20 @@ import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.gerrit.lucene.LuceneIndexModule; import com.google.gerrit.lucene.LuceneIndexModule;
import com.google.gerrit.pgm.util.SiteProgram; import com.google.gerrit.pgm.util.SiteProgram;
import com.google.gerrit.reviewdb.client.Change; import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.reviewdb.server.ReviewDb; import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.cache.CacheRemovalListener; import com.google.gerrit.server.cache.CacheRemovalListener;
import com.google.gerrit.server.cache.h2.DefaultCacheFactory; import com.google.gerrit.server.cache.h2.DefaultCacheFactory;
import com.google.gerrit.server.config.SitePaths; import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.git.MultiProgressMonitor; import com.google.gerrit.server.git.MultiProgressMonitor;
import com.google.gerrit.server.git.MultiProgressMonitor.Task; import com.google.gerrit.server.git.MultiProgressMonitor.Task;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.index.ChangeIndexer; import com.google.gerrit.server.index.ChangeIndexer;
import com.google.gerrit.server.index.IndexExecutor;
import com.google.gerrit.server.index.IndexModule; import com.google.gerrit.server.index.IndexModule;
import com.google.gerrit.server.patch.PatchListCacheImpl; import com.google.gerrit.server.patch.PatchListCacheImpl;
import com.google.gerrit.server.patch.PatchListLoader;
import com.google.gerrit.server.query.change.ChangeData;
import com.google.gwtorm.server.OrmException; import com.google.gwtorm.server.OrmException;
import com.google.gwtorm.server.SchemaFactory; import com.google.gwtorm.server.SchemaFactory;
import com.google.inject.AbstractModule; import com.google.inject.AbstractModule;
@@ -52,22 +63,43 @@ import com.google.inject.TypeLiteral;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.FSDirectory;
import org.eclipse.jgit.diff.DiffEntry;
import org.eclipse.jgit.diff.DiffFormatter;
import org.eclipse.jgit.errors.ConfigInvalidException; import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.ObjectInserter;
import org.eclipse.jgit.lib.ProgressMonitor;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.lib.RepositoryCache;
import org.eclipse.jgit.lib.TextProgressMonitor;
import org.eclipse.jgit.revwalk.RevCommit;
import org.eclipse.jgit.revwalk.RevObject;
import org.eclipse.jgit.revwalk.RevTree;
import org.eclipse.jgit.revwalk.RevWalk;
import org.eclipse.jgit.storage.file.FileBasedConfig; import org.eclipse.jgit.storage.file.FileBasedConfig;
import org.eclipse.jgit.util.FS; import org.eclipse.jgit.util.FS;
import org.eclipse.jgit.util.io.DisabledOutputStream;
import org.kohsuke.args4j.Option; import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
public class Reindex extends SiteProgram { public class Reindex extends SiteProgram {
private static final Logger log = LoggerFactory.getLogger(Reindex.class);
@Option(name = "--threads", usage = "Number of threads to use for indexing") @Option(name = "--threads", usage = "Number of threads to use for indexing")
private int threads = Runtime.getRuntime().availableProcessors(); private int threads = Runtime.getRuntime().availableProcessors();
@@ -81,7 +113,7 @@ public class Reindex extends SiteProgram {
@Override @Override
public int run() throws Exception { public int run() throws Exception {
mustHaveValidSite(); mustHaveValidSite();
dbInjector = createDbInjector(SINGLE_USER); dbInjector = createDbInjector(MULTI_USER);
if (!IndexModule.isEnabled(dbInjector)) { if (!IndexModule.isEnabled(dbInjector)) {
throw die("Secondary index not enabled"); throw die("Secondary index not enabled");
} }
@@ -90,7 +122,7 @@ public class Reindex extends SiteProgram {
dbManager.start(); dbManager.start();
sitePaths = dbInjector.getInstance(SitePaths.class); sitePaths = dbInjector.getInstance(SitePaths.class);
// Delete before any LuceneChangeIndex may be created. // Delete before any index may be created depending on this data.
deleteAll(); deleteAll();
sysInjector = createSysInjector(); sysInjector = createSysInjector();
@@ -187,77 +219,225 @@ public class Reindex extends SiteProgram {
private int indexAll() throws Exception { private int indexAll() throws Exception {
ReviewDb db = sysInjector.getInstance(ReviewDb.class); ReviewDb db = sysInjector.getInstance(ReviewDb.class);
ChangeIndexer indexer = sysInjector.getInstance(ChangeIndexer.class); ListeningScheduledExecutorService executor = sysInjector.getInstance(
Key.get(ListeningScheduledExecutorService.class, IndexExecutor.class));
ProgressMonitor pm = new TextProgressMonitor(new PrintWriter(System.out));
pm.start(1);
pm.beginTask("Collecting projects", ProgressMonitor.UNKNOWN);
Set<Project.NameKey> projects = Sets.newTreeSet();
try {
for (Change change : db.changes().all()) {
if (projects.add(change.getProject())) {
pm.update(1);
}
}
} finally {
db.close();
}
pm.endTask();
final MultiProgressMonitor mpm =
new MultiProgressMonitor(System.out, "Reindexing changes");
final Task projTask = mpm.beginSubTask("projects", projects.size());
final Task doneTask = mpm.beginSubTask(null, MultiProgressMonitor.UNKNOWN);
final Task failedTask = mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN);
Stopwatch sw = new Stopwatch().start(); Stopwatch sw = new Stopwatch().start();
final int queueLen = 2 * threads; final List<ListenableFuture<?>> futures =
final Semaphore sem = new Semaphore(queueLen); Lists.newArrayListWithCapacity(projects.size());
final AtomicBoolean ok = new AtomicBoolean(true); final AtomicBoolean ok = new AtomicBoolean(true);
final MultiProgressMonitor pm = for (final Project.NameKey project : projects) {
new MultiProgressMonitor(System.out, "Reindexing changes"); final ListenableFuture<?> future = executor.submit(
final Task done = pm.beginSubTask(null, MultiProgressMonitor.UNKNOWN); new ReindexProject(project, doneTask, failedTask));
final Task failed = pm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN); futures.add(future);
int i = 0;
for (final Change change : db.changes().all()) {
sem.acquire();
final ListenableFuture<?> future = indexer.index(change);
future.addListener(new Runnable() { future.addListener(new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
future.get(); future.get();
done.update(1);
} catch (InterruptedException e) { } catch (InterruptedException e) {
fail(change, e); fail(project, e);
} catch (ExecutionException e) { } catch (ExecutionException e) {
fail(change, e); ok.set(false); // Logged by indexer.
} catch (RuntimeException e) { } catch (RuntimeException e) {
failAndThrow(change, e); failAndThrow(project, e);
} catch (Error e) { } catch (Error e) {
failAndThrow(change, e); failAndThrow(project, e);
} finally { } finally {
sem.release(); projTask.update(1);
} }
} }
private void fail(Change change, Throwable t) { private void fail(Project.NameKey project, Throwable t) {
log.error("Failed to index project " + project, t);
ok.set(false); ok.set(false);
failed.update(1);
} }
private void failAndThrow(Change change, RuntimeException e) { private void failAndThrow(Project.NameKey project, RuntimeException e) {
fail(change, e); fail(project, e);
throw e; throw e;
} }
private void failAndThrow(Change change, Error e) { private void failAndThrow(Project.NameKey project, Error e) {
fail(change, e); fail(project, e);
throw e; throw e;
} }
}, MoreExecutors.sameThreadExecutor()); }, MoreExecutors.sameThreadExecutor());
i++;
} }
pm.waitFor(sysInjector.getInstance(WorkQueue.class).getDefaultQueue() mpm.waitFor(Futures.transform(Futures.successfulAsList(futures),
.submit(new Runnable() { new AsyncFunction<List<?>, Void>() {
@Override @Override
public void run() { public ListenableFuture<Void> apply(List<?> input) throws Exception {
try { mpm.end();
sem.acquire(queueLen); return Futures.immediateFuture(null);
} catch (InterruptedException e) {
e.printStackTrace();
}
pm.end();
} }
})); }));
double elapsed = sw.elapsed(TimeUnit.MILLISECONDS) / 1000d; double elapsed = sw.elapsed(TimeUnit.MILLISECONDS) / 1000d;
System.out.format("Reindexed %d changes in %.02fs (%.01f/s)\n", int n = doneTask.getCount() + failedTask.getCount();
i, elapsed, i/elapsed); System.out.format("Reindexed %d changes in %.01fs (%.01f/s)\n",
n, elapsed, n/elapsed);
return ok.get() ? 0 : 1; return ok.get() ? 0 : 1;
} }
private class ReindexProject implements Callable<Void> {
private final ChangeIndexer indexer;
private final Project.NameKey project;
private final ListMultimap<ObjectId, ChangeData> byId;
private final Task done;
private final Task failed;
private Repository repo;
private RevWalk walk;
private ReindexProject(Project.NameKey project, Task done, Task failed) {
this.indexer = sysInjector.getInstance(ChangeIndexer.class);
this.project = project;
this.byId = ArrayListMultimap.create();
this.done = done;
this.failed = failed;
}
@Override
public Void call() throws Exception {
ReviewDb db = sysInjector.getInstance(ReviewDb.class);
GitRepositoryManager mgr = sysInjector.getInstance(GitRepositoryManager.class);
repo = mgr.openRepository(project);
try {
Map<String, Ref> refs = repo.getAllRefs();
for (Change c : db.changes().byProject(project)) {
Ref r = refs.get(c.currentPatchSetId().toRefName());
if (r != null) {
byId.put(r.getObjectId(), new ChangeData(c));
}
}
walk();
} finally {
repo.close();
RepositoryCache.close(repo); // Only used once per Reindex call.
}
return null;
}
private void walk() throws Exception {
walk = new RevWalk(repo);
try {
// Walk only refs first to cover as many changes as we can without having
// to mark every single change.
for (Ref ref : repo.getRefDatabase().getRefs(Constants.R_HEADS).values()) {
RevObject o = walk.parseAny(ref.getObjectId());
if (o instanceof RevCommit) {
walk.markStart((RevCommit) o);
}
}
RevCommit bCommit;
while ((bCommit = walk.next()) != null && !byId.isEmpty()) {
if (byId.containsKey(bCommit)) {
getPathsAndIndex(bCommit);
byId.removeAll(bCommit);
}
}
for (ObjectId id : byId.keySet()) {
getPathsAndIndex(walk.parseCommit(id));
}
} finally {
walk.release();
}
}
private void getPathsAndIndex(RevCommit bCommit) throws Exception {
RevTree bTree = bCommit.getTree();
try {
RevTree aTree = aFor(bCommit, walk);
if (aTree == null) {
return;
}
DiffFormatter df = new DiffFormatter(DisabledOutputStream.INSTANCE);
try {
df.setRepository(repo);
List<ChangeData> cds = byId.get(bCommit);
if (!cds.isEmpty()) {
List<String> paths = getPaths(df.scan(aTree, bTree));
for (ChangeData cd : cds) {
cd.setCurrentFilePaths(paths);
indexer.indexTask(cd).call();
done.update(1);
}
}
} finally {
df.release();
}
} catch (Exception e) {
log.warn("Failed to index changes for commit " + bCommit.name(), e);
failed.update(1);
}
}
private List<String> getPaths(List<DiffEntry> filenames) {
Set<String> paths = Sets.newTreeSet();
for (DiffEntry e : filenames) {
if (e.getOldPath() != null) {
paths.add(e.getOldPath());
}
if (e.getNewPath() != null) {
paths.add(e.getNewPath());
}
}
return ImmutableList.copyOf(paths);
}
private RevTree aFor(RevCommit b, RevWalk walk) throws IOException {
switch (b.getParentCount()) {
case 0:
return walk.parseTree(emptyTree());
case 1:
RevCommit a = b.getParent(0);
walk.parseBody(a);
return walk.parseTree(a.getTree());
case 2:
return PatchListLoader.automerge(repo, walk, b);
default:
return null;
}
}
private ObjectId emptyTree() throws IOException {
ObjectInserter oi = repo.newObjectInserter();
try {
ObjectId id = oi.insert(Constants.OBJ_TREE, new byte[] {});
oi.flush();
return id;
} finally {
oi.release();
}
}
}
private void writeVersion() throws IOException, private void writeVersion() throws IOException,
ConfigInvalidException { ConfigInvalidException {
if (dryRun) { if (dryRun) {

View File

@@ -16,7 +16,11 @@ package com.google.gerrit.server.index;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.gerrit.reviewdb.client.Change; import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.server.query.change.ChangeData;
import java.util.concurrent.Callable;
/** /**
* Helper for (re)indexing a change document. * Helper for (re)indexing a change document.
@@ -24,20 +28,50 @@ import com.google.gerrit.reviewdb.client.Change;
* Indexing is run in the background, as it may require substantial work to * Indexing is run in the background, as it may require substantial work to
* compute some of the fields and/or update the index. * compute some of the fields and/or update the index.
*/ */
public interface ChangeIndexer { public abstract class ChangeIndexer {
/** Instance indicating secondary index is disabled. */ /** Instance indicating secondary index is disabled. */
public static final ChangeIndexer DISABLED = new ChangeIndexer() { public static final ChangeIndexer DISABLED = new ChangeIndexer(null) {
@Override @Override
public ListenableFuture<?> index(Change change) { public ListenableFuture<?> index(ChangeData cd) {
return Futures.immediateFuture(null); return Futures.immediateFuture(null);
} }
public Callable<Void> indexTask(ChangeData cd) {
return new Callable<Void>() {
@Override
public Void call() {
return null;
}
};
}
}; };
private final ListeningScheduledExecutorService executor;
protected ChangeIndexer(ListeningScheduledExecutorService executor) {
this.executor = executor;
}
/** /**
* Start indexing a change. * Start indexing a change.
* *
* @param change change to index. * @param change change to index.
* @return future for the indexing task. * @return future for the indexing task.
*/ */
public ListenableFuture<?> index(Change change); public ListenableFuture<?> index(Change change) {
return index(new ChangeData(change));
}
/**
* Start indexing a change.
*
* @param change change to index.
* @param prop propagator to wrap any created runnables in.
* @return future for the indexing task.
*/
public ListenableFuture<?> index(ChangeData cd) {
return executor.submit(indexTask(cd));
}
public abstract Callable<Void> indexTask(ChangeData cd);
} }

View File

@@ -14,9 +14,7 @@
package com.google.gerrit.server.index; package com.google.gerrit.server.index;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.reviewdb.server.ReviewDb; import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.CurrentUser; import com.google.gerrit.server.CurrentUser;
import com.google.gerrit.server.query.change.ChangeData; import com.google.gerrit.server.query.change.ChangeData;
@@ -39,11 +37,10 @@ import java.util.concurrent.Callable;
* Indexing is run in the background, as it may require substantial work to * Indexing is run in the background, as it may require substantial work to
* compute some of the fields and/or update the index. * compute some of the fields and/or update the index.
*/ */
public class ChangeIndexerImpl implements ChangeIndexer { public class ChangeIndexerImpl extends ChangeIndexer {
private static final Logger log = private static final Logger log =
LoggerFactory.getLogger(ChangeIndexerImpl.class); LoggerFactory.getLogger(ChangeIndexerImpl.class);
private final ListeningScheduledExecutorService executor;
private final ChangeIndex index; private final ChangeIndex index;
private final SchemaFactory<ReviewDb> schemaFactory; private final SchemaFactory<ReviewDb> schemaFactory;
private final ThreadLocalRequestContext context; private final ThreadLocalRequestContext context;
@@ -53,22 +50,22 @@ public class ChangeIndexerImpl implements ChangeIndexer {
ChangeIndex index, ChangeIndex index,
SchemaFactory<ReviewDb> schemaFactory, SchemaFactory<ReviewDb> schemaFactory,
ThreadLocalRequestContext context) { ThreadLocalRequestContext context) {
this.executor = executor; super(executor);
this.index = index; this.index = index;
this.schemaFactory = schemaFactory; this.schemaFactory = schemaFactory;
this.context = context; this.context = context;
} }
@Override @Override
public ListenableFuture<?> index(Change change) { public Callable<Void> indexTask(ChangeData cd) {
return executor.submit(new Task(change)); return new Task(cd);
} }
private class Task implements Callable<Void> { private class Task implements Callable<Void> {
private final Change change; private final ChangeData cd;
private Task(Change change) { private Task(ChangeData cd) {
this.change = change; this.cd = cd;
} }
@Override @Override
@@ -87,7 +84,7 @@ public class ChangeIndexerImpl implements ChangeIndexer {
throw new OutOfScopeException("No user during ChangeIndexer"); throw new OutOfScopeException("No user during ChangeIndexer");
} }
}); });
index.replace(new ChangeData(change)); index.replace(cd);
return null; return null;
} finally { } finally {
context.setContext(null); context.setContext(null);
@@ -96,14 +93,14 @@ public class ChangeIndexerImpl implements ChangeIndexer {
} catch (Exception e) { } catch (Exception e) {
log.error(String.format( log.error(String.format(
"Failed to index change %d in %s", "Failed to index change %d in %s",
change.getChangeId(), change.getProject().get()), e); cd.getId().get(), cd.getChange().getProject().get()), e);
throw e; throw e;
} }
} }
@Override @Override
public String toString() { public String toString() {
return "index-change-" + change.getId().get(); return "index-change-" + cd.getId().get();
} }
} }
} }

View File

@@ -64,7 +64,7 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
class PatchListLoader extends CacheLoader<PatchListKey, PatchList> { public class PatchListLoader extends CacheLoader<PatchListKey, PatchList> {
static final Logger log = LoggerFactory.getLogger(PatchListLoader.class); static final Logger log = LoggerFactory.getLogger(PatchListLoader.class);
private final GitRepositoryManager repoManager; private final GitRepositoryManager repoManager;
@@ -241,7 +241,7 @@ class PatchListLoader extends CacheLoader<PatchListKey, PatchList> {
} }
} }
private static RevObject automerge(Repository repo, RevWalk rw, RevCommit b) public static RevTree automerge(Repository repo, RevWalk rw, RevCommit b)
throws IOException { throws IOException {
String hash = b.name(); String hash = b.name();
String refName = GitRepositoryManager.REFS_CACHE_AUTOMERGE String refName = GitRepositoryManager.REFS_CACHE_AUTOMERGE