Rewrite ChangeRebuilder to better use NoteDbUpdateManager

Let's give up on the idea for now of reusing a single update manager
across multiple changes. With the common case of a FileRepository,
opening the repository is cheap (it's probably held open in the cache)
and writing out disjoint sets of loose objects and refs doesn't
benefit from batching them together. On googlesource.com's DFS
backend, we will be handling aggregation using custom code anyway.

However, we can now use the ChainedReceiveCommands exposed by
NoteDbUpdateManager to batch together updates to a single ref, in
particular the delete of the old value and the add of the new value.

Change-Id: I2f2c471856d803e5986853144afd76b375b314b0
This commit is contained in:
Dave Borowitz
2016-02-19 14:52:06 -05:00
parent 0e8077dc8d
commit 103db472e6
3 changed files with 75 additions and 67 deletions

View File

@@ -108,44 +108,43 @@ public class RebuildNotedb extends SiteProgram {
System.out.println("Rebuilding the notedb"); System.out.println("Rebuilding the notedb");
ChangeRebuilder rebuilder = sysInjector.getInstance(ChangeRebuilder.class); ChangeRebuilder rebuilder = sysInjector.getInstance(ChangeRebuilder.class);
Multimap<Project.NameKey, Change> changesByProject = getChangesByProject(); Multimap<Project.NameKey, Change.Id> changesByProject =
final AtomicBoolean ok = new AtomicBoolean(true); getChangesByProject();
AtomicBoolean ok = new AtomicBoolean(true);
Stopwatch sw = Stopwatch.createStarted(); Stopwatch sw = Stopwatch.createStarted();
GitRepositoryManager repoManager = GitRepositoryManager repoManager =
sysInjector.getInstance(GitRepositoryManager.class); sysInjector.getInstance(GitRepositoryManager.class);
final Project.NameKey allUsersName = Project.NameKey allUsersName =
sysInjector.getInstance(AllUsersName.class); sysInjector.getInstance(AllUsersName.class);
try (Repository allUsersRepo = try (Repository allUsersRepo =
repoManager.openMetadataRepository(allUsersName)) { repoManager.openMetadataRepository(allUsersName)) {
deleteRefs(RefNames.REFS_DRAFT_COMMENTS, allUsersRepo); deleteRefs(RefNames.REFS_DRAFT_COMMENTS, allUsersRepo);
deleteRefs(RefNames.REFS_STARRED_CHANGES, allUsersRepo); deleteRefs(RefNames.REFS_STARRED_CHANGES, allUsersRepo);
for (final Project.NameKey project : changesByProject.keySet()) { for (Project.NameKey project : changesByProject.keySet()) {
try (Repository repo = repoManager.openMetadataRepository(project)) { try {
List<ListenableFuture<?>> futures = Lists.newArrayList(); List<ListenableFuture<?>> futures = Lists.newArrayList();
// Here, we elide the project name to 50 characters to ensure that // Here, we elide the project name to 50 characters to ensure that
// the whole monitor line for a project fits on one line (<80 chars). // the whole monitor line for a project fits on one line (<80 chars).
final MultiProgressMonitor mpm = new MultiProgressMonitor(System.out, final MultiProgressMonitor mpm = new MultiProgressMonitor(System.out,
FormatUtil.elide(project.get(), 50)); FormatUtil.elide(project.get(), 50));
final Task doneTask = Task doneTask =
mpm.beginSubTask("done", changesByProject.get(project).size()); mpm.beginSubTask("done", changesByProject.get(project).size());
final Task failedTask = Task failedTask =
mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN); mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN);
for (final Change c : changesByProject.get(project)) { for (Change.Id id : changesByProject.get(project)) {
final ListenableFuture<?> future = ListenableFuture<?> future = rebuilder.rebuildAsync(id, executor);
rebuilder.rebuildAsync(c, executor, repo);
futures.add(future); futures.add(future);
future.addListener( future.addListener(
new RebuildListener(c.getId(), future, ok, doneTask, failedTask), new RebuildListener(id, future, ok, doneTask, failedTask),
MoreExecutors.directExecutor()); MoreExecutors.directExecutor());
} }
mpm.waitFor(Futures.transformAsync(Futures.successfulAsList(futures), mpm.waitFor(Futures.transformAsync(Futures.successfulAsList(futures),
new AsyncFunction<List<?>, Void>() { new AsyncFunction<List<?>, Void>() {
@Override @Override
public ListenableFuture<Void> apply(List<?> input) public ListenableFuture<Void> apply(List<?> input) {
throws Exception {
mpm.end(); mpm.end();
return Futures.immediateFuture(null); return Futures.immediateFuture(null);
} }
@@ -213,17 +212,17 @@ public class RebuildNotedb extends SiteProgram {
} }
} }
private Multimap<Project.NameKey, Change> getChangesByProject() private Multimap<Project.NameKey, Change.Id> getChangesByProject()
throws OrmException { throws OrmException {
// Memorize all changes so we can close the db connection and allow // Memorize all changes so we can close the db connection and allow
// rebuilder threads to use the full connection pool. // rebuilder threads to use the full connection pool.
SchemaFactory<ReviewDb> schemaFactory = sysInjector.getInstance(Key.get( SchemaFactory<ReviewDb> schemaFactory = sysInjector.getInstance(Key.get(
new TypeLiteral<SchemaFactory<ReviewDb>>() {})); new TypeLiteral<SchemaFactory<ReviewDb>>() {}));
Multimap<Project.NameKey, Change> changesByProject = Multimap<Project.NameKey, Change.Id> changesByProject =
ArrayListMultimap.create(); ArrayListMultimap.create();
try (ReviewDb db = schemaFactory.open()) { try (ReviewDb db = schemaFactory.open()) {
for (Change c : db.changes().all()) { for (Change c : db.changes().all()) {
changesByProject.put(c.getProject(), c); changesByProject.put(c.getProject(), c.getId());
} }
return changesByProject; return changesByProject;
} }

View File

@@ -43,18 +43,18 @@ import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.IdentifiedUser; import com.google.gerrit.server.IdentifiedUser;
import com.google.gerrit.server.PatchLineCommentsUtil; import com.google.gerrit.server.PatchLineCommentsUtil;
import com.google.gerrit.server.git.ChainedReceiveCommands; import com.google.gerrit.server.git.ChainedReceiveCommands;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.patch.PatchListCache; import com.google.gerrit.server.patch.PatchListCache;
import com.google.gerrit.server.project.ChangeControl; import com.google.gerrit.server.project.ChangeControl;
import com.google.gerrit.server.project.NoSuchChangeException; import com.google.gerrit.server.project.NoSuchChangeException;
import com.google.gwtorm.server.OrmException; import com.google.gwtorm.server.OrmException;
import com.google.gwtorm.server.SchemaFactory;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Provider; import com.google.inject.util.Providers;
import org.eclipse.jgit.lib.Constants; import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.ObjectId; import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.ObjectInserter; import org.eclipse.jgit.lib.ObjectInserter;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.RefUpdate;
import org.eclipse.jgit.lib.Repository; import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.revwalk.RevCommit; import org.eclipse.jgit.revwalk.RevCommit;
import org.eclipse.jgit.revwalk.RevWalk; import org.eclipse.jgit.revwalk.RevWalk;
@@ -77,7 +77,8 @@ public class ChangeRebuilder {
private static final long TS_WINDOW_MS = private static final long TS_WINDOW_MS =
TimeUnit.MILLISECONDS.convert(1, TimeUnit.SECONDS); TimeUnit.MILLISECONDS.convert(1, TimeUnit.SECONDS);
private final Provider<ReviewDb> dbProvider; private final SchemaFactory<ReviewDb> schemaFactory;
private final GitRepositoryManager repoManager;
private final ChangeControl.GenericFactory controlFactory; private final ChangeControl.GenericFactory controlFactory;
private final IdentifiedUser.GenericFactory userFactory; private final IdentifiedUser.GenericFactory userFactory;
private final PatchListCache patchListCache; private final PatchListCache patchListCache;
@@ -86,14 +87,16 @@ public class ChangeRebuilder {
private final NoteDbUpdateManager.Factory updateManagerFactory; private final NoteDbUpdateManager.Factory updateManagerFactory;
@Inject @Inject
ChangeRebuilder(Provider<ReviewDb> dbProvider, ChangeRebuilder(SchemaFactory<ReviewDb> schemaFactory,
GitRepositoryManager repoManager,
ChangeControl.GenericFactory controlFactory, ChangeControl.GenericFactory controlFactory,
IdentifiedUser.GenericFactory userFactory, IdentifiedUser.GenericFactory userFactory,
PatchListCache patchListCache, PatchListCache patchListCache,
ChangeUpdate.Factory updateFactory, ChangeUpdate.Factory updateFactory,
ChangeDraftUpdate.Factory draftUpdateFactory, ChangeDraftUpdate.Factory draftUpdateFactory,
NoteDbUpdateManager.Factory updateManagerFactory) { NoteDbUpdateManager.Factory updateManagerFactory) {
this.dbProvider = dbProvider; this.schemaFactory = schemaFactory;
this.repoManager = repoManager;
this.controlFactory = controlFactory; this.controlFactory = controlFactory;
this.userFactory = userFactory; this.userFactory = userFactory;
this.patchListCache = patchListCache; this.patchListCache = patchListCache;
@@ -102,34 +105,44 @@ public class ChangeRebuilder {
this.updateManagerFactory = updateManagerFactory; this.updateManagerFactory = updateManagerFactory;
} }
public ListenableFuture<?> rebuildAsync(final Change change, public ListenableFuture<?> rebuildAsync(final Change.Id id,
ListeningExecutorService executor, final Repository changeRepo) { ListeningExecutorService executor) {
return executor.submit(new Callable<Void>() { return executor.submit(new Callable<Void>() {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
rebuild(change, changeRepo); try (ReviewDb db = schemaFactory.open()) {
rebuild(db, id);
}
return null; return null;
} }
}); });
} }
public void rebuild(Change change, Repository changeRepo) public void rebuild(ReviewDb db, Change.Id changeId)
throws NoSuchChangeException, IOException, OrmException { throws NoSuchChangeException, IOException, OrmException {
ReviewDb db = dbProvider.get(); Change change = db.changes().get(changeId);
Change.Id changeId = change.getId(); if (change == null) {
return;
}
NoteDbUpdateManager manager =
updateManagerFactory.create(change.getProject());
// We will rebuild all events, except for draft comments, in buckets based // We will rebuild all events, except for draft comments, in buckets based
// on author and timestamp. // on author and timestamp.
List<Event> events = Lists.newArrayList(); List<Event> events = Lists.newArrayList();
Multimap<Account.Id, PatchLineCommentEvent> draftCommentEvents = Multimap<Account.Id, PatchLineCommentEvent> draftCommentEvents =
ArrayListMultimap.create(); ArrayListMultimap.create();
try (RevWalk rw = new RevWalk(changeRepo)) { Repository changeMetaRepo = manager.getChangeRepo();
events.addAll(getHashtagsEvents(change, changeRepo, rw)); events.addAll(getHashtagsEvents(change, manager));
deleteRef(change, changeRepo); // Delete ref only after hashtags have been read
deleteRef(change, changeMetaRepo, manager.getChangeCommands());
try (Repository codeRepo = repoManager.openRepository(change.getProject());
RevWalk codeRw = new RevWalk(codeRepo)) {
for (PatchSet ps : db.patchSets().byChange(changeId)) { for (PatchSet ps : db.patchSets().byChange(changeId)) {
events.add(new PatchSetEvent(change, ps, rw)); events.add(new PatchSetEvent(change, ps, codeRw));
for (PatchLineComment c : db.patchComments().byPatchSet(ps.getId())) { for (PatchLineComment c : db.patchComments().byPatchSet(ps.getId())) {
PatchLineCommentEvent e = PatchLineCommentEvent e =
new PatchLineCommentEvent(c, change, ps, patchListCache); new PatchLineCommentEvent(c, change, ps, patchListCache);
@@ -153,16 +166,13 @@ public class ChangeRebuilder {
Collections.sort(events); Collections.sort(events);
events.add(new FinalUpdatesEvent(change, notedbChange)); events.add(new FinalUpdatesEvent(change, notedbChange));
// TODO(dborowitz): share manager across changes.
NoteDbUpdateManager manager =
updateManagerFactory.create(change.getProject());
ChangeUpdate update = null; ChangeUpdate update = null;
for (Event e : events) { for (Event e : events) {
if (!sameUpdate(e, update)) { if (!sameUpdate(e, update)) {
if (update != null) { if (update != null) {
manager.add(update); manager.add(update);
} }
IdentifiedUser user = userFactory.create(dbProvider, e.who); IdentifiedUser user = userFactory.create(Providers.of(db), e.who);
update = updateFactory.create( update = updateFactory.create(
controlFactory.controlFor(db, change, user), e.when); controlFactory.controlFor(db, change, user), e.when);
update.setPatchSetId(e.psId); update.setPatchSetId(e.psId);
@@ -172,7 +182,7 @@ public class ChangeRebuilder {
manager.add(update); manager.add(update);
for (Account.Id author : draftCommentEvents.keys()) { for (Account.Id author : draftCommentEvents.keys()) {
IdentifiedUser user = userFactory.create(dbProvider, author); IdentifiedUser user = userFactory.create(Providers.of(db), author);
ChangeDraftUpdate draftUpdate = null; ChangeDraftUpdate draftUpdate = null;
for (PatchLineCommentEvent e : draftCommentEvents.get(author)) { for (PatchLineCommentEvent e : draftCommentEvents.get(author)) {
if (!sameUpdate(e, draftUpdate)) { if (!sameUpdate(e, draftUpdate)) {
@@ -188,17 +198,16 @@ public class ChangeRebuilder {
manager.add(draftUpdate); manager.add(draftUpdate);
} }
createStarredChangesRefs(changeId, manager.getAllUsersCommands(), createStarredChangesRefs(db, changeId, manager.getAllUsersCommands(),
manager.getAllUsersRepo()); manager.getAllUsersRepo());
manager.execute(); manager.execute();
} }
private void createStarredChangesRefs(Change.Id changeId, private void createStarredChangesRefs(ReviewDb db, Change.Id changeId,
ChainedReceiveCommands allUsersCmds, Repository allUsersRepo) ChainedReceiveCommands allUsersCmds, Repository allUsersRepo)
throws IOException, OrmException { throws IOException, OrmException {
ObjectId emptyTree = emptyTree(allUsersRepo); ObjectId emptyTree = emptyTree(allUsersRepo);
for (StarredChange starred : dbProvider.get().starredChanges() for (StarredChange starred : db.starredChanges().byChange(changeId)) {
.byChange(changeId)) {
allUsersCmds.add(new ReceiveCommand(ObjectId.zeroId(), emptyTree, allUsersCmds.add(new ReceiveCommand(ObjectId.zeroId(), emptyTree,
RefNames.refsStarredChanges(starred.getAccountId(), changeId))); RefNames.refsStarredChanges(starred.getAccountId(), changeId)));
} }
@@ -213,16 +222,18 @@ public class ChangeRebuilder {
} }
private List<HashtagsEvent> getHashtagsEvents(Change change, private List<HashtagsEvent> getHashtagsEvents(Change change,
Repository changeRepo, RevWalk rw) throws IOException { NoteDbUpdateManager manager) throws IOException {
String refName = ChangeNoteUtil.changeRefName(change.getId()); String refName = ChangeNoteUtil.changeRefName(change.getId());
Ref ref = changeRepo.exactRef(refName); ObjectId old = manager.getChangeCommands()
if (ref == null) { .getObjectId(manager.getChangeRepo(), refName);
if (old == null) {
return Collections.emptyList(); return Collections.emptyList();
} }
RevWalk rw = manager.getChangeRevWalk();
List<HashtagsEvent> events = new ArrayList<>(); List<HashtagsEvent> events = new ArrayList<>();
rw.reset(); rw.reset();
rw.markStart(rw.parseCommit(ref.getObjectId())); rw.markStart(rw.parseCommit(old));
for (RevCommit commit : rw) { for (RevCommit commit : rw) {
Account.Id authorId = Account.Id authorId =
ChangeNoteUtil.parseIdent(commit.getAuthorIdent()); ChangeNoteUtil.parseIdent(commit.getAuthorIdent());
@@ -264,28 +275,11 @@ public class ChangeRebuilder {
return new PatchSet.Id(change.getId(), psId); return new PatchSet.Id(change.getId(), psId);
} }
private void deleteRef(Change change, Repository changeRepo) private void deleteRef(Change change, Repository repo,
throws IOException { ChainedReceiveCommands cmds) throws IOException {
String refName = ChangeNoteUtil.changeRefName(change.getId()); String refName = ChangeNoteUtil.changeRefName(change.getId());
RefUpdate ru = changeRepo.updateRef(refName, true); ObjectId old = cmds.getObjectId(repo, refName);
ru.setForceUpdate(true); cmds.add(new ReceiveCommand(old, ObjectId.zeroId(), refName));
RefUpdate.Result result = ru.delete();
switch (result) {
case FORCED:
case NEW:
case NO_CHANGE:
break;
case FAST_FORWARD:
case IO_FAILURE:
case LOCK_FAILURE:
case NOT_ATTEMPTED:
case REJECTED:
case REJECTED_CURRENT_BRANCH:
case RENAMED:
default:
throw new IOException(
String.format("Failed to delete ref %s: %s", refName, result));
}
} }
private static long round(Date when) { private static long round(Date when) {

View File

@@ -100,6 +100,21 @@ public class NoteDbUpdateManager {
return this; return this;
} }
Repository getChangeRepo() throws IOException {
initChangeRepo();
return changeRepo.repo;
}
RevWalk getChangeRevWalk() throws IOException {
initChangeRepo();
return changeRepo.rw;
}
ChainedReceiveCommands getChangeCommands() throws IOException {
initChangeRepo();
return changeRepo.cmds;
}
public NoteDbUpdateManager setAllUsersRepo(Repository repo, RevWalk rw, public NoteDbUpdateManager setAllUsersRepo(Repository repo, RevWalk rw,
ObjectInserter ins, ChainedReceiveCommands cmds) { ObjectInserter ins, ChainedReceiveCommands cmds) {
checkState(allUsersRepo == null, "allUsers repo already initialized"); checkState(allUsersRepo == null, "allUsers repo already initialized");