RebuildNoteDb: Organize by project
Repeated reopening and closing large projects was causing heap thrashing due to paging stuff in and out of JGit's internal window cache. This problem persisted even when preopening the repos to get them in JGit's internal RepositoryCache. We can simplify the code and avoid heap thrashing by just rebuilding one project at a time in each thread. This way we can also reuse a single NoteDbUpdateManager to make the ref update batching behavior more transparent. This does not completely eliminate performance issues, particularly on spinning disk, but as I write this I am rebuilding AOSP with no particular CPU thrashing issues. Change-Id: I2250eb01c8e57e690138dc38ee2cf12b8d0b3aa7
This commit is contained in:
@@ -17,18 +17,17 @@ package com.google.gerrit.pgm;
|
||||
import static com.google.gerrit.server.schema.DataSourceProvider.Context.MULTI_USER;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Predicates;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.collect.ArrayListMultimap;
|
||||
import com.google.common.collect.ImmutableMultimap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.util.concurrent.AsyncFunction;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.gerrit.common.FormatUtil;
|
||||
import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
|
||||
import com.google.gerrit.extensions.registration.DynamicSet;
|
||||
import com.google.gerrit.lifecycle.LifecycleManager;
|
||||
@@ -42,8 +41,6 @@ import com.google.gerrit.reviewdb.server.ReviewDb;
|
||||
import com.google.gerrit.server.config.AllUsersName;
|
||||
import com.google.gerrit.server.config.GerritServerConfig;
|
||||
import com.google.gerrit.server.git.GitRepositoryManager;
|
||||
import com.google.gerrit.server.git.MultiProgressMonitor;
|
||||
import com.google.gerrit.server.git.MultiProgressMonitor.Task;
|
||||
import com.google.gerrit.server.git.SearchingChangeCacheImpl;
|
||||
import com.google.gerrit.server.git.WorkQueue;
|
||||
import com.google.gerrit.server.index.DummyIndexModule;
|
||||
@@ -57,7 +54,6 @@ import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Injector;
|
||||
|
||||
import org.eclipse.jgit.errors.RepositoryNotFoundException;
|
||||
import org.eclipse.jgit.lib.BatchRefUpdate;
|
||||
import org.eclipse.jgit.lib.Config;
|
||||
import org.eclipse.jgit.lib.NullProgressMonitor;
|
||||
@@ -75,9 +71,9 @@ import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class RebuildNoteDb extends SiteProgram {
|
||||
private static final Logger log =
|
||||
@@ -142,58 +138,46 @@ public class RebuildNoteDb extends SiteProgram {
|
||||
ListeningExecutorService executor = newExecutor();
|
||||
System.out.println("Rebuilding the NoteDb");
|
||||
|
||||
Multimap<Project.NameKey, Change.Id> changesByProject =
|
||||
final ImmutableMultimap<Project.NameKey, Change.Id> changesByProject =
|
||||
getChangesByProject();
|
||||
AtomicBoolean ok = new AtomicBoolean(true);
|
||||
boolean ok;
|
||||
Stopwatch sw = Stopwatch.createStarted();
|
||||
try (Repository allUsersRepo = repoManager.openRepository(allUsersName)) {
|
||||
deleteRefs(RefNames.REFS_DRAFT_COMMENTS, allUsersRepo);
|
||||
|
||||
List<ListenableFuture<Boolean>> futures = new ArrayList<>();
|
||||
List<Project.NameKey> projectNames = Ordering.usingToString()
|
||||
.sortedCopy(changesByProject.keySet());
|
||||
for (Project.NameKey project : projectNames) {
|
||||
try {
|
||||
List<ListenableFuture<?>> futures = Lists.newArrayList();
|
||||
|
||||
// Here, we elide the project name to 50 characters to ensure that
|
||||
// the whole monitor line for a project fits on one line (<80 chars).
|
||||
final MultiProgressMonitor mpm = new MultiProgressMonitor(System.out,
|
||||
FormatUtil.elide(project.get(), 50));
|
||||
Task doneTask =
|
||||
mpm.beginSubTask("done", changesByProject.get(project).size());
|
||||
Task failedTask =
|
||||
mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN);
|
||||
|
||||
for (Change.Id id : changesByProject.get(project)) {
|
||||
// TODO(dborowitz): This can fail if the project no longer exists.
|
||||
// We might not want to just skip conversion of those changes, and
|
||||
// instead move them somewhere like a special lost+found repo.
|
||||
ListenableFuture<?> future = rebuilder.rebuildAsync(id, executor);
|
||||
futures.add(future);
|
||||
future.addListener(
|
||||
new RebuildListener(id, future, ok, doneTask, failedTask),
|
||||
MoreExecutors.directExecutor());
|
||||
}
|
||||
|
||||
mpm.waitFor(Futures.transformAsync(Futures.successfulAsList(futures),
|
||||
new AsyncFunction<List<?>, Void>() {
|
||||
@Override
|
||||
public ListenableFuture<Void> apply(List<?> input) {
|
||||
mpm.end();
|
||||
return Futures.immediateFuture(null);
|
||||
for (final Project.NameKey project : projectNames) {
|
||||
ListenableFuture<Boolean> future = executor.submit(
|
||||
new Callable<Boolean>() {
|
||||
@Override
|
||||
public Boolean call() {
|
||||
try (ReviewDb db = unwrap(schemaFactory.open())) {
|
||||
return rebuilder.rebuildProject(
|
||||
db, changesByProject, project, allUsersRepo);
|
||||
} catch (Exception e) {
|
||||
log.error("Error rebuilding project " + project, e);
|
||||
return false;
|
||||
}
|
||||
}));
|
||||
} catch (Exception e) {
|
||||
log.error("Error rebuilding NoteDb", e);
|
||||
ok.set(false);
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
futures.add(future);
|
||||
}
|
||||
|
||||
try {
|
||||
ok = Iterables.all(
|
||||
Futures.allAsList(futures).get(), Predicates.equalTo(true));
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
log.error("Error rebuilding projects", e);
|
||||
ok = false;
|
||||
}
|
||||
}
|
||||
|
||||
double t = sw.elapsed(TimeUnit.MILLISECONDS) / 1000d;
|
||||
System.out.format("Rebuild %d changes in %.01fs (%.01f/s)\n",
|
||||
changesByProject.size(), t, changesByProject.size() / t);
|
||||
return ok.get() ? 0 : 1;
|
||||
return ok ? 0 : 1;
|
||||
}
|
||||
|
||||
private static void execute(BatchRefUpdate bru, Repository repo)
|
||||
@@ -243,7 +227,7 @@ public class RebuildNoteDb extends SiteProgram {
|
||||
}
|
||||
}
|
||||
|
||||
private Multimap<Project.NameKey, Change.Id> getChangesByProject()
|
||||
private ImmutableMultimap<Project.NameKey, Change.Id> getChangesByProject()
|
||||
throws OrmException {
|
||||
// Memorize all changes so we can close the db connection and allow
|
||||
// rebuilder threads to use the full connection pool.
|
||||
@@ -277,7 +261,7 @@ public class RebuildNoteDb extends SiteProgram {
|
||||
}
|
||||
}
|
||||
}
|
||||
return changesByProject;
|
||||
return ImmutableMultimap.copyOf(changesByProject);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -287,63 +271,4 @@ public class RebuildNoteDb extends SiteProgram {
|
||||
}
|
||||
return db;
|
||||
}
|
||||
|
||||
private static class RebuildListener implements Runnable {
|
||||
private Change.Id changeId;
|
||||
private ListenableFuture<?> future;
|
||||
private AtomicBoolean ok;
|
||||
private Task doneTask;
|
||||
private Task failedTask;
|
||||
|
||||
private RebuildListener(Change.Id changeId, ListenableFuture<?> future,
|
||||
AtomicBoolean ok, Task doneTask, Task failedTask) {
|
||||
this.changeId = changeId;
|
||||
this.future = future;
|
||||
this.ok = ok;
|
||||
this.doneTask = doneTask;
|
||||
this.failedTask = failedTask;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
future.get();
|
||||
doneTask.update(1);
|
||||
} catch (ExecutionException | InterruptedException e) {
|
||||
if (e.getCause() instanceof RepositoryNotFoundException) {
|
||||
noRepo((RepositoryNotFoundException) e.getCause());
|
||||
} else {
|
||||
fail(e);
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
failAndThrow(e);
|
||||
} catch (Error e) {
|
||||
// Can't join with RuntimeException because "RuntimeException
|
||||
// | Error" becomes Throwable, which messes with signatures.
|
||||
failAndThrow(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void fail(Throwable t) {
|
||||
log.error("Failed to rebuild change " + changeId, t);
|
||||
ok.set(false);
|
||||
failedTask.update(1);
|
||||
}
|
||||
|
||||
private void noRepo(RepositoryNotFoundException e) {
|
||||
log.warn("Skipped rebuilding change " + changeId + ": " + e.getMessage());
|
||||
// Don't flip ok bit.
|
||||
failedTask.update(1);
|
||||
}
|
||||
|
||||
private void failAndThrow(RuntimeException e) {
|
||||
fail(e);
|
||||
throw e;
|
||||
}
|
||||
|
||||
private void failAndThrow(Error e) {
|
||||
fail(e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user