RebuildNotedb: batch changes by project

If we batch changes by project before rebuilding them and writing them
to the notedb, we can use a single BatchRefUpdate for all of the
changes in a project, which reduces overhead for writing every change
to the notedb.

Additionally, within the code the rebuilds each change, I synchronized
on the BatchRefUpdate object because it is not thread safe. Since all
changes in a project will be using the same one, we can just
synchronize the function calls that modify that BatchRefUpdate.

Change-Id: I4af196fa720180b0846e9a6e7cc6d9083a75f695
This commit is contained in:
Yacob Yonas
2014-08-08 11:11:47 -07:00
parent 7c51481994
commit 904c08183c
2 changed files with 116 additions and 64 deletions

View File

@@ -17,7 +17,9 @@ package com.google.gerrit.pgm;
import static com.google.gerrit.server.schema.DataSourceProvider.Context.MULTI_USER; import static com.google.gerrit.server.schema.DataSourceProvider.Context.MULTI_USER;
import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
@@ -29,7 +31,9 @@ import com.google.gerrit.pgm.util.BatchProgramModule;
import com.google.gerrit.pgm.util.SiteProgram; import com.google.gerrit.pgm.util.SiteProgram;
import com.google.gerrit.pgm.util.ThreadLimiter; import com.google.gerrit.pgm.util.ThreadLimiter;
import com.google.gerrit.reviewdb.client.Change; import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.reviewdb.server.ReviewDb; import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.git.MultiProgressMonitor; import com.google.gerrit.server.git.MultiProgressMonitor;
import com.google.gerrit.server.git.MultiProgressMonitor.Task; import com.google.gerrit.server.git.MultiProgressMonitor.Task;
import com.google.gerrit.server.git.WorkQueue; import com.google.gerrit.server.git.WorkQueue;
@@ -43,6 +47,9 @@ import com.google.inject.Injector;
import com.google.inject.Key; import com.google.inject.Key;
import com.google.inject.TypeLiteral; import com.google.inject.TypeLiteral;
import org.eclipse.jgit.lib.BatchRefUpdate;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.revwalk.RevWalk;
import org.kohsuke.args4j.Option; import org.kohsuke.args4j.Option;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -78,71 +85,105 @@ public class RebuildNotedb extends SiteProgram {
sysManager.start(); sysManager.start();
ListeningExecutorService executor = newExecutor(); ListeningExecutorService executor = newExecutor();
final MultiProgressMonitor mpm = System.out.println("Rebuilding the notedb");
new MultiProgressMonitor(System.out, "Rebuilding notedb");
final Task doneTask =
mpm.beginSubTask("changes", MultiProgressMonitor.UNKNOWN);
final Task failedTask =
mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN);
ChangeRebuilder rebuilder = sysInjector.getInstance(ChangeRebuilder.class); ChangeRebuilder rebuilder = sysInjector.getInstance(ChangeRebuilder.class);
List<Change> allChanges = getAllChanges(); Multimap<Project.NameKey, Change> changesByProject = getChangesByProject();
final List<ListenableFuture<?>> futures = Lists.newArrayList();
final AtomicBoolean ok = new AtomicBoolean(true); final AtomicBoolean ok = new AtomicBoolean(true);
Stopwatch sw = Stopwatch.createStarted(); Stopwatch sw = Stopwatch.createStarted();
for (final Change c : allChanges) { GitRepositoryManager repoManager =
final ListenableFuture<?> future = rebuilder.rebuildAsync(c, executor); sysInjector.getInstance(GitRepositoryManager.class);
futures.add(future);
future.addListener(new Runnable() {
@Override
public void run() {
try {
future.get();
doneTask.update(1);
} catch (ExecutionException | InterruptedException e) {
fail(e);
} catch (RuntimeException e) {
failAndThrow(e);
} catch (Error e) {
// Can't join with RuntimeException because "RuntimeException |
// Error" becomes Throwable, which messes with signatures.
failAndThrow(e);
}
}
private void fail(Throwable t) { for (final Project.NameKey project : changesByProject.keySet()) {
log.error("Failed to rebuild change " + c.getId(), t); final Repository repo = repoManager.openRepository(project);
ok.set(false); try {
failedTask.update(1); final BatchRefUpdate bru = repo.getRefDatabase().newBatchUpdate();
} List<ListenableFuture<?>> futures = Lists.newArrayList();
private void failAndThrow(RuntimeException e) { // Here, we truncate the project name to 50 characters to ensure that
fail(e); // the whole monitor line for a project fits on one line (<80 chars).
throw e; int monitorStringMaxLength = 50;
String projectString = project.toString();
String monitorString = (projectString.length() > monitorStringMaxLength)
? projectString.substring(0, monitorStringMaxLength)
: projectString;
if (projectString.length() > monitorString.length()) {
monitorString = monitorString + "...";
} }
final MultiProgressMonitor mpm = new MultiProgressMonitor(System.out,
monitorString);
final Task doneTask =
mpm.beginSubTask("done", changesByProject.get(project).size());
final Task failedTask = mpm.beginSubTask("failed",
MultiProgressMonitor.UNKNOWN);
private void failAndThrow(Error e) { for (final Change c : changesByProject.get(project)) {
fail(e); final ListenableFuture<?> future =
throw e; rebuilder.rebuildAsync(c, executor, bru);
} futures.add(future);
}, MoreExecutors.sameThreadExecutor()); future.addListener(new Runnable() {
}
try {
mpm.waitFor(Futures.transform(Futures.successfulAsList(futures),
new AsyncFunction<List<?>, Void>() {
@Override @Override
public ListenableFuture<Void> apply(List<?> input) { public void run() {
mpm.end(); try {
return Futures.immediateFuture(null); future.get();
doneTask.update(1);
} catch (ExecutionException | InterruptedException e) {
fail(e);
} catch (RuntimeException e) {
failAndThrow(e);
} catch (Error e) {
// Can't join with RuntimeException because "RuntimeException |
// Error" becomes Throwable, which messes with signatures.
failAndThrow(e);
}
} }
}));
} catch (ExecutionException e) { private void fail(Throwable t) {
log.error("Error rebuilding notedb", e); log.error("Failed to rebuild change " + c.getId(), t);
ok.set(false); ok.set(false);
failedTask.update(1);
}
private void failAndThrow(RuntimeException e) {
fail(e);
throw e;
}
private void failAndThrow(Error e) {
fail(e);
throw e;
}
}, MoreExecutors.sameThreadExecutor());
}
mpm.waitFor(Futures.transform(Futures.successfulAsList(futures),
new AsyncFunction<List<?>, Void>() {
@Override
public ListenableFuture<Void> apply(List<?> input)
throws Exception {
Task t = mpm.beginSubTask("update refs",
MultiProgressMonitor.UNKNOWN);
RevWalk walk = new RevWalk(repo);
try {
bru.execute(walk, t);
mpm.end();
return Futures.immediateFuture(null);
} finally {
walk.release();
}
}
}));
} catch (Exception e) {
log.error("Error rebuilding notedb", e);
ok.set(false);
break;
} finally {
repo.close();
}
} }
double t = sw.elapsed(TimeUnit.MILLISECONDS) / 1000d; double t = sw.elapsed(TimeUnit.MILLISECONDS) / 1000d;
System.out.format("Rebuild %d changes in %.01fs (%.01f/s)\n", System.out.format("Rebuild %d changes in %.01fs (%.01f/s)\n",
allChanges.size(), t, allChanges.size() / t); changesByProject.size(), t, changesByProject.size() / t);
return ok.get() ? 0 : 1; return ok.get() ? 0 : 1;
} }
@@ -168,17 +209,20 @@ public class RebuildNotedb extends SiteProgram {
} }
} }
private List<Change> getAllChanges() throws OrmException { private Multimap<Project.NameKey, Change> getChangesByProject()
// Memoize all changes to a list so we can close the db connection and allow throws OrmException {
// Memorize all changes so we can close the db connection and allow
// rebuilder threads to use the full connection pool. // rebuilder threads to use the full connection pool.
// TODO(dborowitz): May need to batch changes, e.g. by project (though note
// that unlike Reindex, we don't think there is an inherent benefit to
// grouping by project), to avoid wasting too much memory here.
SchemaFactory<ReviewDb> schemaFactory = sysInjector.getInstance(Key.get( SchemaFactory<ReviewDb> schemaFactory = sysInjector.getInstance(Key.get(
new TypeLiteral<SchemaFactory<ReviewDb>>() {})); new TypeLiteral<SchemaFactory<ReviewDb>>() {}));
ReviewDb db = schemaFactory.open(); ReviewDb db = schemaFactory.open();
Multimap<Project.NameKey, Change> changesByProject =
ArrayListMultimap.create();
try { try {
return db.changes().all().toList(); for (Change c : db.changes().all()) {
changesByProject.put(c.getProject(), c);
}
return changesByProject;
} finally { } finally {
db.close(); db.close();
} }

View File

@@ -72,12 +72,12 @@ public class ChangeRebuilder {
this.updateFactory = updateFactory; this.updateFactory = updateFactory;
} }
public ListenableFuture<?> rebuildAsync( public ListenableFuture<?> rebuildAsync(final Change change,
final Change change, ListeningExecutorService executor) { ListeningExecutorService executor, final BatchRefUpdate bru) {
return executor.submit(new Callable<Void>() { return executor.submit(new Callable<Void>() {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
rebuild(change, null); rebuild(change, bru);
return null; return null;
} }
}); });
@@ -109,7 +109,7 @@ public class ChangeRebuilder {
controlFactory.controlFor(change, user), e.when); controlFactory.controlFor(change, user), e.when);
update.setPatchSetId(e.psId); update.setPatchSetId(e.psId);
if (batch == null) { if (batch == null) {
batch = update.openUpdate(); batch = update.openUpdateInBatch(bru);
} }
} }
e.apply(update); e.apply(update);
@@ -118,7 +118,15 @@ public class ChangeRebuilder {
if (update != null) { if (update != null) {
writeToBatch(batch, update); writeToBatch(batch, update);
} }
batch.commit();
// Since the BatchMetaDataUpdates generated by all ChangeRebuilders on a
// given project are backed by the same BatchRefUpdate, we need to
// synchronize on the BatchRefUpdate. Therefore, since commit on a
// BatchMetaDataUpdate is the only method that modifies a BatchRefUpdate,
// we can just synchronize this call.
synchronized (bru) {
batch.commit();
}
} }
} }