diff --git a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/RebuildNotedb.java b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/RebuildNotedb.java index db5058e18d..f497444006 100644 --- a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/RebuildNotedb.java +++ b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/RebuildNotedb.java @@ -17,7 +17,9 @@ package com.google.gerrit.pgm; import static com.google.gerrit.server.schema.DataSourceProvider.Context.MULTI_USER; import com.google.common.base.Stopwatch; +import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -29,7 +31,9 @@ import com.google.gerrit.pgm.util.BatchProgramModule; import com.google.gerrit.pgm.util.SiteProgram; import com.google.gerrit.pgm.util.ThreadLimiter; import com.google.gerrit.reviewdb.client.Change; +import com.google.gerrit.reviewdb.client.Project; import com.google.gerrit.reviewdb.server.ReviewDb; +import com.google.gerrit.server.git.GitRepositoryManager; import com.google.gerrit.server.git.MultiProgressMonitor; import com.google.gerrit.server.git.MultiProgressMonitor.Task; import com.google.gerrit.server.git.WorkQueue; @@ -43,6 +47,9 @@ import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.TypeLiteral; +import org.eclipse.jgit.lib.BatchRefUpdate; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.revwalk.RevWalk; import org.kohsuke.args4j.Option; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,71 +85,105 @@ public class RebuildNotedb extends SiteProgram { sysManager.start(); ListeningExecutorService executor = newExecutor(); - final MultiProgressMonitor mpm = - new MultiProgressMonitor(System.out, "Rebuilding notedb"); - final Task doneTask = - mpm.beginSubTask("changes", MultiProgressMonitor.UNKNOWN); - final Task failedTask = - mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN); + System.out.println("Rebuilding the notedb"); ChangeRebuilder rebuilder = sysInjector.getInstance(ChangeRebuilder.class); - List allChanges = getAllChanges(); - final List> futures = Lists.newArrayList(); + Multimap changesByProject = getChangesByProject(); final AtomicBoolean ok = new AtomicBoolean(true); Stopwatch sw = Stopwatch.createStarted(); - for (final Change c : allChanges) { - final ListenableFuture future = rebuilder.rebuildAsync(c, executor); - futures.add(future); - future.addListener(new Runnable() { - @Override - public void run() { - try { - future.get(); - doneTask.update(1); - } catch (ExecutionException | InterruptedException e) { - fail(e); - } catch (RuntimeException e) { - failAndThrow(e); - } catch (Error e) { - // Can't join with RuntimeException because "RuntimeException | - // Error" becomes Throwable, which messes with signatures. - failAndThrow(e); - } - } + GitRepositoryManager repoManager = + sysInjector.getInstance(GitRepositoryManager.class); - private void fail(Throwable t) { - log.error("Failed to rebuild change " + c.getId(), t); - ok.set(false); - failedTask.update(1); - } + for (final Project.NameKey project : changesByProject.keySet()) { + final Repository repo = repoManager.openRepository(project); + try { + final BatchRefUpdate bru = repo.getRefDatabase().newBatchUpdate(); + List> futures = Lists.newArrayList(); - private void failAndThrow(RuntimeException e) { - fail(e); - throw e; + // Here, we truncate the project name to 50 characters to ensure that + // the whole monitor line for a project fits on one line (<80 chars). + int monitorStringMaxLength = 50; + String projectString = project.toString(); + String monitorString = (projectString.length() > monitorStringMaxLength) + ? projectString.substring(0, monitorStringMaxLength) + : projectString; + if (projectString.length() > monitorString.length()) { + monitorString = monitorString + "..."; } + final MultiProgressMonitor mpm = new MultiProgressMonitor(System.out, + monitorString); + final Task doneTask = + mpm.beginSubTask("done", changesByProject.get(project).size()); + final Task failedTask = mpm.beginSubTask("failed", + MultiProgressMonitor.UNKNOWN); - private void failAndThrow(Error e) { - fail(e); - throw e; - } - }, MoreExecutors.sameThreadExecutor()); - } - try { - mpm.waitFor(Futures.transform(Futures.successfulAsList(futures), - new AsyncFunction, Void>() { + for (final Change c : changesByProject.get(project)) { + final ListenableFuture future = + rebuilder.rebuildAsync(c, executor, bru); + futures.add(future); + future.addListener(new Runnable() { @Override - public ListenableFuture apply(List input) { - mpm.end(); - return Futures.immediateFuture(null); + public void run() { + try { + future.get(); + doneTask.update(1); + } catch (ExecutionException | InterruptedException e) { + fail(e); + } catch (RuntimeException e) { + failAndThrow(e); + } catch (Error e) { + // Can't join with RuntimeException because "RuntimeException | + // Error" becomes Throwable, which messes with signatures. + failAndThrow(e); + } } - })); - } catch (ExecutionException e) { - log.error("Error rebuilding notedb", e); - ok.set(false); + + private void fail(Throwable t) { + log.error("Failed to rebuild change " + c.getId(), t); + ok.set(false); + failedTask.update(1); + } + + private void failAndThrow(RuntimeException e) { + fail(e); + throw e; + } + + private void failAndThrow(Error e) { + fail(e); + throw e; + } + }, MoreExecutors.sameThreadExecutor()); + } + + mpm.waitFor(Futures.transform(Futures.successfulAsList(futures), + new AsyncFunction, Void>() { + @Override + public ListenableFuture apply(List input) + throws Exception { + Task t = mpm.beginSubTask("update refs", + MultiProgressMonitor.UNKNOWN); + RevWalk walk = new RevWalk(repo); + try { + bru.execute(walk, t); + mpm.end(); + return Futures.immediateFuture(null); + } finally { + walk.release(); + } + } + })); + } catch (Exception e) { + log.error("Error rebuilding notedb", e); + ok.set(false); + break; + } finally { + repo.close(); + } } double t = sw.elapsed(TimeUnit.MILLISECONDS) / 1000d; System.out.format("Rebuild %d changes in %.01fs (%.01f/s)\n", - allChanges.size(), t, allChanges.size() / t); + changesByProject.size(), t, changesByProject.size() / t); return ok.get() ? 0 : 1; } @@ -168,17 +209,20 @@ public class RebuildNotedb extends SiteProgram { } } - private List getAllChanges() throws OrmException { - // Memoize all changes to a list so we can close the db connection and allow + private Multimap getChangesByProject() + throws OrmException { + // Memorize all changes so we can close the db connection and allow // rebuilder threads to use the full connection pool. - // TODO(dborowitz): May need to batch changes, e.g. by project (though note - // that unlike Reindex, we don't think there is an inherent benefit to - // grouping by project), to avoid wasting too much memory here. SchemaFactory schemaFactory = sysInjector.getInstance(Key.get( new TypeLiteral>() {})); ReviewDb db = schemaFactory.open(); + Multimap changesByProject = + ArrayListMultimap.create(); try { - return db.changes().all().toList(); + for (Change c : db.changes().all()) { + changesByProject.put(c.getProject(), c); + } + return changesByProject; } finally { db.close(); } diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/notedb/ChangeRebuilder.java b/gerrit-server/src/main/java/com/google/gerrit/server/notedb/ChangeRebuilder.java index c4a7c075ad..910edf9ef0 100644 --- a/gerrit-server/src/main/java/com/google/gerrit/server/notedb/ChangeRebuilder.java +++ b/gerrit-server/src/main/java/com/google/gerrit/server/notedb/ChangeRebuilder.java @@ -72,12 +72,12 @@ public class ChangeRebuilder { this.updateFactory = updateFactory; } - public ListenableFuture rebuildAsync( - final Change change, ListeningExecutorService executor) { + public ListenableFuture rebuildAsync(final Change change, + ListeningExecutorService executor, final BatchRefUpdate bru) { return executor.submit(new Callable() { - @Override + @Override public Void call() throws Exception { - rebuild(change, null); + rebuild(change, bru); return null; } }); @@ -109,7 +109,7 @@ public class ChangeRebuilder { controlFactory.controlFor(change, user), e.when); update.setPatchSetId(e.psId); if (batch == null) { - batch = update.openUpdate(); + batch = update.openUpdateInBatch(bru); } } e.apply(update); @@ -118,7 +118,15 @@ public class ChangeRebuilder { if (update != null) { writeToBatch(batch, update); } - batch.commit(); + + // Since the BatchMetaDataUpdates generated by all ChangeRebuilders on a + // given project are backed by the same BatchRefUpdate, we need to + // synchronize on the BatchRefUpdate. Therefore, since commit on a + // BatchMetaDataUpdate is the only method that modifies a BatchRefUpdate, + // we can just synchronize this call. + synchronized (bru) { + batch.commit(); + } } }