Merge "Add PatchLineComments to RebuildNotedb"

This commit is contained in:
Dave Borowitz 2014-08-18 20:24:17 +00:00 committed by Gerrit Code Review
commit 64802216fa
2 changed files with 265 additions and 103 deletions

View File

@ -32,7 +32,9 @@ import com.google.gerrit.pgm.util.SiteProgram;
import com.google.gerrit.pgm.util.ThreadLimiter;
import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.reviewdb.client.RefNames;
import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.config.AllUsersName;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.git.MultiProgressMonitor;
import com.google.gerrit.server.git.MultiProgressMonitor.Task;
@ -48,13 +50,20 @@ import com.google.inject.Key;
import com.google.inject.TypeLiteral;
import org.eclipse.jgit.lib.BatchRefUpdate;
import org.eclipse.jgit.lib.NullProgressMonitor;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.RefDatabase;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.revwalk.RevWalk;
import org.eclipse.jgit.transport.ReceiveCommand;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -93,100 +102,98 @@ public class RebuildNotedb extends SiteProgram {
Stopwatch sw = Stopwatch.createStarted();
GitRepositoryManager repoManager =
sysInjector.getInstance(GitRepositoryManager.class);
final Project.NameKey allUsersName =
sysInjector.getInstance(AllUsersName.class);
final Repository allUsersRepo = repoManager.openRepository(allUsersName);
try {
deleteDraftRefs(allUsersRepo);
for (final Project.NameKey project : changesByProject.keySet()) {
final Repository repo = repoManager.openRepository(project);
try {
final BatchRefUpdate bru = repo.getRefDatabase().newBatchUpdate();
final BatchRefUpdate bruForDrafts =
allUsersRepo.getRefDatabase().newBatchUpdate();
List<ListenableFuture<?>> futures = Lists.newArrayList();
for (final Project.NameKey project : changesByProject.keySet()) {
final Repository repo = repoManager.openRepository(project);
try {
final BatchRefUpdate bru = repo.getRefDatabase().newBatchUpdate();
List<ListenableFuture<?>> futures = Lists.newArrayList();
// Here, we truncate the project name to 50 characters to ensure that
// the whole monitor line for a project fits on one line (<80 chars).
final MultiProgressMonitor mpm = new MultiProgressMonitor(System.out,
truncateProjectName(project.get()));
final Task doneTask =
mpm.beginSubTask("done", changesByProject.get(project).size());
final Task failedTask =
mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN);
// Here, we truncate the project name to 50 characters to ensure that
// the whole monitor line for a project fits on one line (<80 chars).
int monitorStringMaxLength = 50;
String projectString = project.toString();
String monitorString = (projectString.length() > monitorStringMaxLength)
? projectString.substring(0, monitorStringMaxLength)
: projectString;
if (projectString.length() > monitorString.length()) {
monitorString = monitorString + "...";
}
final MultiProgressMonitor mpm = new MultiProgressMonitor(System.out,
monitorString);
final Task doneTask =
mpm.beginSubTask("done", changesByProject.get(project).size());
final Task failedTask = mpm.beginSubTask("failed",
MultiProgressMonitor.UNKNOWN);
for (final Change c : changesByProject.get(project)) {
final ListenableFuture<?> future = rebuilder.rebuildAsync(c,
executor, bru, bruForDrafts, repo, allUsersRepo);
futures.add(future);
future.addListener(
new RebuildListener(c.getId(), future, ok, doneTask, failedTask),
MoreExecutors.sameThreadExecutor());
}
for (final Change c : changesByProject.get(project)) {
final ListenableFuture<?> future =
rebuilder.rebuildAsync(c, executor, bru);
futures.add(future);
future.addListener(new Runnable() {
@Override
public void run() {
try {
future.get();
doneTask.update(1);
} catch (ExecutionException | InterruptedException e) {
fail(e);
} catch (RuntimeException e) {
failAndThrow(e);
} catch (Error e) {
// Can't join with RuntimeException because "RuntimeException |
// Error" becomes Throwable, which messes with signatures.
failAndThrow(e);
}
}
private void fail(Throwable t) {
log.error("Failed to rebuild change " + c.getId(), t);
ok.set(false);
failedTask.update(1);
}
private void failAndThrow(RuntimeException e) {
fail(e);
throw e;
}
private void failAndThrow(Error e) {
fail(e);
throw e;
}
}, MoreExecutors.sameThreadExecutor());
}
mpm.waitFor(Futures.transform(Futures.successfulAsList(futures),
new AsyncFunction<List<?>, Void>() {
@Override
public ListenableFuture<Void> apply(List<?> input)
throws Exception {
Task t = mpm.beginSubTask("update refs",
MultiProgressMonitor.UNKNOWN);
RevWalk walk = new RevWalk(repo);
try {
bru.execute(walk, t);
mpm.waitFor(Futures.transform(Futures.successfulAsList(futures),
new AsyncFunction<List<?>, Void>() {
@Override
public ListenableFuture<Void> apply(List<?> input)
throws Exception {
execute(bru, repo);
execute(bruForDrafts, allUsersRepo);
mpm.end();
return Futures.immediateFuture(null);
} finally {
walk.release();
}
}
}));
} catch (Exception e) {
log.error("Error rebuilding notedb", e);
ok.set(false);
break;
} finally {
repo.close();
}));
} catch (Exception e) {
log.error("Error rebuilding notedb", e);
ok.set(false);
break;
} finally {
repo.close();
}
}
} finally {
allUsersRepo.close();
}
double t = sw.elapsed(TimeUnit.MILLISECONDS) / 1000d;
System.out.format("Rebuild %d changes in %.01fs (%.01f/s)\n",
changesByProject.size(), t, changesByProject.size() / t);
return ok.get() ? 0 : 1;
}
private static String truncateProjectName(String projectName) {
int monitorStringMaxLength = 50;
String monitorString = (projectName.length() > monitorStringMaxLength)
? projectName.substring(0, monitorStringMaxLength)
: projectName;
if (projectName.length() > monitorString.length()) {
monitorString = monitorString + "...";
}
return monitorString;
}
private static void execute(BatchRefUpdate bru, Repository repo)
throws IOException {
RevWalk rw = new RevWalk(repo);
try {
bru.execute(rw, NullProgressMonitor.INSTANCE);
} finally {
rw.release();
}
}
private void deleteDraftRefs(Repository allUsersRepo) throws IOException {
RefDatabase refDb = allUsersRepo.getRefDatabase();
Map<String, Ref> allRefs = refDb.getRefs(RefNames.REFS_DRAFT_COMMENTS);
BatchRefUpdate bru = refDb.newBatchUpdate();
for (Map.Entry<String, Ref> ref : allRefs.entrySet()) {
bru.addCommand(new ReceiveCommand(ref.getValue().getObjectId(),
ObjectId.zeroId(), RefNames.REFS_DRAFT_COMMENTS + ref.getKey()));
}
execute(bru, allUsersRepo);
}
private Injector createSysInjector() {
return dbInjector.createChildInjector(new AbstractModule() {
@Override
@ -227,4 +234,54 @@ public class RebuildNotedb extends SiteProgram {
db.close();
}
}
private class RebuildListener implements Runnable {
private Change.Id changeId;
private ListenableFuture<?> future;
private AtomicBoolean ok;
private Task doneTask;
private Task failedTask;
private RebuildListener(Change.Id changeId, ListenableFuture<?> future,
AtomicBoolean ok, Task doneTask, Task failedTask) {
this.changeId = changeId;
this.future = future;
this.ok = ok;
this.doneTask = doneTask;
this.failedTask = failedTask;
}
@Override
public void run() {
try {
future.get();
doneTask.update(1);
} catch (ExecutionException | InterruptedException e) {
fail(e);
} catch (RuntimeException e) {
failAndThrow(e);
} catch (Error e) {
// Can't join with RuntimeException because "RuntimeException
// | Error" becomes Throwable, which messes with signatures.
failAndThrow(e);
}
}
private void fail(Throwable t) {
log.error("Failed to rebuild change " + changeId, t);
ok.set(false);
failedTask.update(1);
}
private void failAndThrow(RuntimeException e) {
fail(e);
throw e;
}
private void failAndThrow(Error e) {
fail(e);
throw e;
}
}
}

View File

@ -15,21 +15,27 @@
package com.google.gerrit.server.notedb;
import static com.google.common.base.Preconditions.checkState;
import static com.google.gerrit.server.notedb.CommentsInNotesUtil.getCommentPsId;
import static com.google.gerrit.server.PatchLineCommentsUtil.setCommentRevId;
import com.google.common.base.Objects;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.gerrit.reviewdb.client.Account;
import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.reviewdb.client.PatchLineComment;
import com.google.gerrit.reviewdb.client.PatchLineComment.Status;
import com.google.gerrit.reviewdb.client.PatchSet;
import com.google.gerrit.reviewdb.client.PatchSetApproval;
import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.GerritPersonIdent;
import com.google.gerrit.server.IdentifiedUser;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.PatchLineCommentsUtil;
import com.google.gerrit.server.git.VersionedMetaData.BatchMetaDataUpdate;
import com.google.gerrit.server.patch.PatchListCache;
import com.google.gerrit.server.project.ChangeControl;
import com.google.gerrit.server.project.NoSuchChangeException;
import com.google.gwtorm.server.OrmException;
@ -37,8 +43,9 @@ import com.google.inject.Inject;
import com.google.inject.Provider;
import org.eclipse.jgit.lib.BatchRefUpdate;
import org.eclipse.jgit.lib.CommitBuilder;
import org.eclipse.jgit.lib.PersonIdent;
import org.eclipse.jgit.lib.ObjectInserter;
import org.eclipse.jgit.lib.RefUpdate;
import org.eclipse.jgit.lib.Repository;
import java.io.IOException;
import java.sql.Timestamp;
@ -52,57 +59,82 @@ public class ChangeRebuilder {
private static final long TS_WINDOW_MS =
TimeUnit.MILLISECONDS.convert(1, TimeUnit.SECONDS);
private final PersonIdent serverIdent;
private final Provider<ReviewDb> dbProvider;
private final ChangeControl.GenericFactory controlFactory;
private final IdentifiedUser.GenericFactory userFactory;
private final PatchListCache patchListCache;
private final ChangeUpdate.Factory updateFactory;
private final ChangeDraftUpdate.Factory draftUpdateFactory;
@Inject
ChangeRebuilder(@GerritPersonIdent PersonIdent serverIdent,
Provider<ReviewDb> dbProvider,
GitRepositoryManager repoManager,
ChangeRebuilder(Provider<ReviewDb> dbProvider,
ChangeControl.GenericFactory controlFactory,
IdentifiedUser.GenericFactory userFactory,
ChangeUpdate.Factory updateFactory) {
this.serverIdent = serverIdent;
PatchListCache patchListCache,
PatchLineCommentsUtil plcUtil,
ChangeUpdate.Factory updateFactory,
ChangeDraftUpdate.Factory draftUpdateFactory) {
this.dbProvider = dbProvider;
this.controlFactory = controlFactory;
this.userFactory = userFactory;
this.patchListCache = patchListCache;
this.updateFactory = updateFactory;
this.draftUpdateFactory = draftUpdateFactory;
}
public ListenableFuture<?> rebuildAsync(final Change change,
ListeningExecutorService executor, final BatchRefUpdate bru) {
ListeningExecutorService executor, final BatchRefUpdate bru,
final BatchRefUpdate bruForDrafts, final Repository changeRepo,
final Repository allUsersRepo) {
return executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
rebuild(change, bru);
rebuild(change, bru, bruForDrafts, changeRepo, allUsersRepo);
return null;
}
});
}
public void rebuild(Change change, BatchRefUpdate bru)
throws NoSuchChangeException, IOException, OrmException {
public void rebuild(Change change, BatchRefUpdate bru,
BatchRefUpdate bruForDrafts, Repository changeRepo,
Repository allUsersRepo) throws NoSuchChangeException, IOException,
OrmException {
deleteRef(change, changeRepo);
ReviewDb db = dbProvider.get();
Change.Id changeId = change.getId();
// We will rebuild all events, except for draft comments, in buckets based
// on author and timestamp. However, all draft comments for a given change
// and author will be written as one commit in the notedb.
List<Event> events = Lists.newArrayList();
Multimap<Account.Id, PatchLineCommentEvent> draftCommentEvents =
ArrayListMultimap.create();
for (PatchSet ps : db.patchSets().byChange(changeId)) {
events.add(new PatchSetEvent(ps));
for (PatchLineComment c : db.patchComments().byPatchSet(ps.getId())) {
PatchLineCommentEvent e =
new PatchLineCommentEvent(c, change, ps, patchListCache);
if (c.getStatus() == Status.PUBLISHED) {
events.add(e);
} else {
draftCommentEvents.put(c.getAuthor(), e);
}
}
}
for (PatchSetApproval psa : db.patchSetApprovals().byChange(changeId)) {
events.add(new ApprovalEvent(psa));
}
Collections.sort(events);
Collections.sort(events);
BatchMetaDataUpdate batch = null;
ChangeUpdate update = null;
for (Event e : events) {
if (!sameUpdate(e, update)) {
if (update != null) {
writeToBatch(batch, update);
writeToBatch(batch, update, changeRepo);
}
IdentifiedUser user = userFactory.create(dbProvider, e.who);
update = updateFactory.create(
@ -116,7 +148,7 @@ public class ChangeRebuilder {
}
if (batch != null) {
if (update != null) {
writeToBatch(batch, update);
writeToBatch(batch, update, changeRepo);
}
// Since the BatchMetaDataUpdates generated by all ChangeRebuilders on a
@ -128,13 +160,54 @@ public class ChangeRebuilder {
batch.commit();
}
}
for (Account.Id author : draftCommentEvents.keys()) {
IdentifiedUser user = userFactory.create(dbProvider, author);
ChangeDraftUpdate draftUpdate = null;
BatchMetaDataUpdate batchForDrafts = null;
for (PatchLineCommentEvent e : draftCommentEvents.get(author)) {
if (draftUpdate == null) {
draftUpdate = draftUpdateFactory.create(
controlFactory.controlFor(change, user), e.when);
draftUpdate.setPatchSetId(e.psId);
batchForDrafts = draftUpdate.openUpdateInBatch(bruForDrafts);
}
e.applyDraft(draftUpdate);
}
writeToBatch(batchForDrafts, draftUpdate, allUsersRepo);
synchronized(bruForDrafts) {
batchForDrafts.commit();
}
}
}
private void writeToBatch(BatchMetaDataUpdate batch, ChangeUpdate update)
private void deleteRef(Change change, Repository changeRepo)
throws IOException {
CommitBuilder commit = new CommitBuilder();
commit.setCommitter(new PersonIdent(serverIdent, update.getWhen()));
batch.write(update, commit);
String refName = ChangeNoteUtil.changeRefName(change.getId());
RefUpdate ru = changeRepo.updateRef(refName, true);
ru.setForceUpdate(true);
RefUpdate.Result result = ru.delete();
switch (result) {
case FORCED:
case NEW:
case NO_CHANGE:
break;
default:
throw new IOException(
String.format("Failed to delete ref %s: %s", refName, result));
}
}
private void writeToBatch(BatchMetaDataUpdate batch,
AbstractChangeUpdate update, Repository repo) throws IOException,
OrmException {
ObjectInserter inserter = repo.newObjectInserter();
try {
update.setInserter(inserter);
update.writeCommit(batch);
} finally {
inserter.release();
}
}
private static long round(Date when) {
@ -159,7 +232,7 @@ public class ChangeRebuilder {
this.when = when;
}
protected void checkUpdate(ChangeUpdate update) {
protected void checkUpdate(AbstractChangeUpdate update) {
checkState(Objects.equal(update.getPatchSetId(), psId),
"cannot apply event for %s to update for %s",
update.getPatchSetId(), psId);
@ -171,7 +244,7 @@ public class ChangeRebuilder {
who, update.getUser().getAccountId());
}
abstract void apply(ChangeUpdate update);
abstract void apply(ChangeUpdate update) throws OrmException;
@Override
public int compareTo(Event other) {
@ -228,4 +301,36 @@ public class ChangeRebuilder {
}
}
}
private static class PatchLineCommentEvent extends Event {
public final PatchLineComment c;
private final Change change;
private final PatchSet ps;
private final PatchListCache cache;
PatchLineCommentEvent(PatchLineComment c, Change change, PatchSet ps,
PatchListCache cache) {
super(getCommentPsId(c), c.getAuthor(), c.getWrittenOn());
this.c = c;
this.change = change;
this.ps = ps;
this.cache = cache;
}
@Override
void apply(ChangeUpdate update) throws OrmException {
checkUpdate(update);
if (c.getRevId() == null) {
setCommentRevId(c, cache, change, ps);
}
update.insertComment(c);
}
void applyDraft(ChangeDraftUpdate draftUpdate) throws OrmException {
if (c.getRevId() == null) {
setCommentRevId(c, cache, change, ps);
}
draftUpdate.insertComment(c);
}
}
}