Merge "Update code refs and NoteDb meta refs in one batch"

This commit is contained in:
Dave Borowitz
2017-04-12 18:50:05 +00:00
committed by Gerrit Code Review
10 changed files with 372 additions and 163 deletions

View File

@@ -25,6 +25,7 @@ import com.google.gerrit.extensions.restapi.RestApiException;
import com.google.gerrit.extensions.restapi.RestReadView;
import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.reviewdb.client.RefNames;
import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.IdentifiedUser;
import com.google.gerrit.server.change.LimitedByteArrayOutputStream.LimitExceededException;
@@ -151,7 +152,9 @@ public class PreviewSubmit implements RestReadView<RevisionResource> {
for (ReceiveCommand r : refs) {
bw.include(r.getRefName(), r.getNewId());
ObjectId oldId = r.getOldId();
if (!oldId.equals(ObjectId.zeroId())) {
if (!oldId.equals(ObjectId.zeroId())
// Probably the client doesn't already have NoteDb data.
&& !RefNames.isNoteDbMetaRef(r.getRefName())) {
bw.assume(or.getCodeReviewRevWalk().parseCommit(oldId));
}
}

View File

@@ -168,12 +168,16 @@ public class NoteDbUpdateManager implements AutoCloseable {
}
void flush() throws IOException {
flushToFinalInserter();
finalIns.flush();
tempIns.clear();
}
void flushToFinalInserter() throws IOException {
checkState(finalIns != null);
for (InsertedObject obj : tempIns.getInsertedObjects()) {
finalIns.insert(obj.type(), obj.data().toByteArray());
}
finalIns.flush();
tempIns.clear();
}
@Override
@@ -317,7 +321,13 @@ public class NoteDbUpdateManager implements AutoCloseable {
return changeUpdates.isEmpty()
&& draftUpdates.isEmpty()
&& robotCommentUpdates.isEmpty()
&& toDelete.isEmpty();
&& toDelete.isEmpty()
&& !hasCommands(changeRepo)
&& !hasCommands(allUsersRepo);
}
private static boolean hasCommands(@Nullable OpenRepo or) {
return or != null && !or.cmds.isEmpty();
}
/**
@@ -385,6 +395,10 @@ public class NoteDbUpdateManager implements AutoCloseable {
Set<Change.Id> changeIds = new HashSet<>();
for (ReceiveCommand cmd : changeRepo.getCommandsSnapshot()) {
Change.Id changeId = Change.Id.fromRef(cmd.getRefName());
if (changeId == null || !cmd.getRefName().equals(RefNames.changeMetaRef(changeId))) {
// Not a meta ref update, likely due to a repo update along with the change meta update.
continue;
}
changeIds.add(changeId);
Optional<ObjectId> metaId = Optional.of(cmd.getNewId());
staged.put(
@@ -450,13 +464,19 @@ public class NoteDbUpdateManager implements AutoCloseable {
}
}
public void execute() throws OrmException, IOException {
@Nullable
public BatchRefUpdate execute() throws OrmException, IOException {
return execute(false);
}
@Nullable
public BatchRefUpdate execute(boolean dryrun) throws OrmException, IOException {
// Check before even inspecting the list, as this is a programmer error.
if (migration.failChangeWrites()) {
throw new OrmException(CHANGES_READ_ONLY);
}
if (isEmpty()) {
return;
return null;
}
try (Timer1.Context timer = metrics.updateLatency.start(CHANGES)) {
stage();
@@ -468,36 +488,51 @@ public class NoteDbUpdateManager implements AutoCloseable {
// we may have stale draft comments. Doing it in this order allows stale
// comments to be filtered out by ChangeNotes, reflecting the fact that
// comments can only go from DRAFT to PUBLISHED, not vice versa.
execute(changeRepo);
execute(allUsersRepo);
BatchRefUpdate result = execute(changeRepo, dryrun);
execute(allUsersRepo, dryrun);
return result;
} finally {
close();
}
}
private void execute(OpenRepo or) throws IOException {
private BatchRefUpdate execute(OpenRepo or, boolean dryrun) throws IOException {
if (or == null || or.cmds.isEmpty()) {
return;
return null;
}
or.flush();
if (!dryrun) {
or.flush();
} else {
// OpenRepo buffers objects separately; caller may assume that objects are available in the
// inserter it previously passed via setChangeRepo.
// TODO(dborowitz): This should be flushToFinalInserter to avoid flushing objects to the
// underlying repo during a dry run. However, the only user of this is PreviewSubmit, which
// uses BundleWriter, which only takes a Repository so it can't read unflushed objects. Fix
// BundleWriter, then fix this call.
or.flush();
}
BatchRefUpdate bru = or.repo.getRefDatabase().newBatchUpdate();
bru.setRefLogMessage(firstNonNull(refLogMessage, "Update NoteDb refs"), false);
bru.setRefLogIdent(refLogIdent != null ? refLogIdent : serverIdent.get());
or.cmds.addTo(bru);
bru.setAllowNonFastForwards(true);
bru.execute(or.rw, NullProgressMonitor.INSTANCE);
boolean lockFailure = false;
for (ReceiveCommand cmd : bru.getCommands()) {
if (cmd.getResult() == ReceiveCommand.Result.LOCK_FAILURE) {
lockFailure = true;
} else if (cmd.getResult() != ReceiveCommand.Result.OK) {
throw new IOException("Update failed: " + bru);
if (!dryrun) {
bru.execute(or.rw, NullProgressMonitor.INSTANCE);
boolean lockFailure = false;
for (ReceiveCommand cmd : bru.getCommands()) {
if (cmd.getResult() == ReceiveCommand.Result.LOCK_FAILURE) {
lockFailure = true;
} else if (cmd.getResult() != ReceiveCommand.Result.OK) {
throw new IOException("Update failed: " + bru);
}
}
if (lockFailure) {
throw new LockFailureException("Update failed with one or more lock failures: " + bru);
}
}
if (lockFailure) {
throw new LockFailureException("Update failed with one or more lock failures: " + bru);
}
return bru;
}
private void addCommands() throws OrmException, IOException {

View File

@@ -17,11 +17,9 @@ package com.google.gerrit.server.update;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Comparator.comparing;
import static java.util.stream.Collectors.toList;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.gerrit.common.Nullable;
import com.google.gerrit.extensions.restapi.RestApiException;
@@ -46,15 +44,13 @@ import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.TreeMap;
import org.eclipse.jgit.lib.NullProgressMonitor;
import org.eclipse.jgit.lib.ObjectInserter;
import org.eclipse.jgit.lib.ObjectReader;
import org.eclipse.jgit.lib.PersonIdent;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.revwalk.RevWalk;
import org.eclipse.jgit.transport.ReceiveCommand;
@@ -82,43 +78,64 @@ class NoteDbBatchUpdate extends BatchUpdate {
setRequestIds(updates, requestId);
try {
List<CheckedFuture<?, IOException>> indexFutures = new ArrayList<>();
List<ChangesHandle> handles = new ArrayList<>(updates.size());
Order order = getOrder(updates, listener);
// TODO(dborowitz): Fuse implementations to use a single BatchRefUpdate between phases. Note
// that we may still need to respect the order, since op implementations may make assumptions
// about the order in which their methods are called.
switch (order) {
case REPO_BEFORE_DB:
for (NoteDbBatchUpdate u : updates) {
u.executeUpdateRepo();
}
listener.afterUpdateRepos();
for (NoteDbBatchUpdate u : updates) {
u.executeRefUpdates(dryrun);
}
listener.afterUpdateRefs();
for (NoteDbBatchUpdate u : updates) {
u.reindexChanges(u.executeChangeOps(dryrun), dryrun);
}
listener.afterUpdateChanges();
break;
case DB_BEFORE_REPO:
for (NoteDbBatchUpdate u : updates) {
u.reindexChanges(u.executeChangeOps(dryrun), dryrun);
}
for (NoteDbBatchUpdate u : updates) {
u.executeUpdateRepo();
}
for (NoteDbBatchUpdate u : updates) {
u.executeRefUpdates(dryrun);
}
break;
default:
throw new IllegalStateException("invalid execution order: " + order);
try {
switch (order) {
case REPO_BEFORE_DB:
for (NoteDbBatchUpdate u : updates) {
u.executeUpdateRepo();
}
listener.afterUpdateRepos();
for (NoteDbBatchUpdate u : updates) {
handles.add(u.executeChangeOps(dryrun));
}
for (ChangesHandle h : handles) {
h.execute();
indexFutures.addAll(h.startIndexFutures());
}
listener.afterUpdateRefs();
listener.afterUpdateChanges();
break;
case DB_BEFORE_REPO:
// Call updateChange for each op before updateRepo, but defer executing the
// NoteDbUpdateManager until after calling updateRepo. They share an inserter and
// BatchRefUpdate, so it will all execute as a single batch. But we have to let
// NoteDbUpdateManager actually execute the update, since it has to interleave it
// properly with All-Users updates.
//
// TODO(dborowitz): This may still result in multiple updates to All-Users, but that's
// currently not a big deal because multi-change batches generally aren't affecting
// drafts anyway.
for (NoteDbBatchUpdate u : updates) {
handles.add(u.executeChangeOps(dryrun));
}
for (NoteDbBatchUpdate u : updates) {
u.executeUpdateRepo();
}
for (ChangesHandle h : handles) {
// TODO(dborowitz): This isn't quite good enough: in theory updateRepo may want to
// see the results of change meta commands, but they aren't actually added to the
// BatchUpdate until the body of execute. To fix this, execute needs to be split up
// into a method that returns a BatchRefUpdate before execution. Not a big deal at the
// moment, because this order is only used for deleting changes, and those updateRepo
// implementations definitely don't need to observe the updated change meta refs.
h.execute();
indexFutures.addAll(h.startIndexFutures());
}
break;
default:
throw new IllegalStateException("invalid execution order: " + order);
}
} finally {
for (ChangesHandle h : handles) {
h.close();
}
}
ChangeIndexer.allAsList(
updates.stream().flatMap(u -> u.indexFutures.stream()).collect(toList()))
.get();
ChangeIndexer.allAsList(indexFutures).get();
// Fire ref update events only after all mutations are finished, since callers may assume a
// patch set ref being created means the change was created, or a branch advancing meaning
@@ -250,8 +267,6 @@ class NoteDbBatchUpdate extends BatchUpdate {
private final GitReferenceUpdated gitRefUpdated;
private final ReviewDb db;
private List<CheckedFuture<?, IOException>> indexFutures;
@Inject
NoteDbBatchUpdate(
GitRepositoryManager repoManager,
@@ -275,7 +290,6 @@ class NoteDbBatchUpdate extends BatchUpdate {
this.indexer = indexer;
this.gitRefUpdated = gitRefUpdated;
this.db = db;
this.indexFutures = new ArrayList<>();
}
@Override
@@ -309,101 +323,104 @@ class NoteDbBatchUpdate extends BatchUpdate {
onSubmitValidators.validate(
project, ctx.getRevWalk().getObjectReader(), repoView.getCommands());
}
// TODO(dborowitz): Don't flush when fusing phases.
if (repoView != null) {
logDebug("Flushing inserter");
repoView.getInserter().flush();
} else {
logDebug("No objects to flush");
}
} catch (Exception e) {
Throwables.throwIfInstanceOf(e, RestApiException.class);
throw new UpdateException(e);
}
}
// TODO(dborowitz): Don't execute non-change ref updates separately when fusing phases.
private void executeRefUpdates(boolean dryrun) throws IOException, RestApiException {
if (getRefUpdates().isEmpty()) {
logDebug("No ref updates to execute");
return;
}
// May not be opened if the caller added ref updates but no new objects.
initRepository();
batchRefUpdate = repoView.getRepository().getRefDatabase().newBatchUpdate();
repoView.getCommands().addTo(batchRefUpdate);
logDebug("Executing batch of {} ref updates", batchRefUpdate.getCommands().size());
if (dryrun) {
return;
private class ChangesHandle implements AutoCloseable {
private final NoteDbUpdateManager manager;
private final boolean dryrun;
private final Map<Change.Id, ChangeResult> results;
ChangesHandle(NoteDbUpdateManager manager, boolean dryrun) {
this.manager = manager;
this.dryrun = dryrun;
results = new HashMap<>();
}
// Force BatchRefUpdate to read newly referenced objects using a new RevWalk, rather than one
// that might have access to unflushed objects.
try (RevWalk updateRw = new RevWalk(repoView.getRepository())) {
batchRefUpdate.execute(updateRw, NullProgressMonitor.INSTANCE);
@Override
public void close() {
manager.close();
}
boolean ok = true;
for (ReceiveCommand cmd : batchRefUpdate.getCommands()) {
if (cmd.getResult() != ReceiveCommand.Result.OK) {
ok = false;
break;
void setResult(Change.Id id, ChangeResult result) {
ChangeResult old = results.putIfAbsent(id, result);
checkArgument(old == null, "result for change %s already set: %s", id, old);
}
void execute() throws OrmException, IOException {
NoteDbBatchUpdate.this.batchRefUpdate = manager.execute(dryrun);
}
List<CheckedFuture<?, IOException>> startIndexFutures() {
if (dryrun) {
return ImmutableList.of();
}
}
if (!ok) {
throw new RestApiException("BatchRefUpdate failed: " + batchRefUpdate);
logDebug("Reindexing {} changes", results.size());
List<CheckedFuture<?, IOException>> indexFutures = new ArrayList<>(results.size());
for (Map.Entry<Change.Id, ChangeResult> e : results.entrySet()) {
Change.Id id = e.getKey();
switch (e.getValue()) {
case UPSERTED:
indexFutures.add(indexer.indexAsync(project, id));
break;
case DELETED:
indexFutures.add(indexer.deleteAsync(id));
break;
case SKIPPED:
break;
default:
throw new IllegalStateException("unexpected result: " + e.getValue());
}
}
return indexFutures;
}
}
private Map<Change.Id, ChangeResult> executeChangeOps(boolean dryrun) throws Exception {
private ChangesHandle executeChangeOps(boolean dryrun) throws Exception {
logDebug("Executing change ops");
Map<Change.Id, ChangeResult> result =
Maps.newLinkedHashMapWithExpectedSize(ops.keySet().size());
initRepository();
Repository repo = repoView.getRepository();
// TODO(dborowitz): Teach NoteDbUpdateManager to allow reusing the same inserter and batch ref
// update as in executeUpdateRepo.
try (ObjectInserter ins = repo.newObjectInserter();
ObjectReader reader = ins.newReader();
RevWalk rw = new RevWalk(reader);
NoteDbUpdateManager updateManager =
ChangesHandle handle =
new ChangesHandle(
updateManagerFactory
.create(project)
.setChangeRepo(repo, rw, ins, new ChainedReceiveCommands(repo))) {
if (user.isIdentifiedUser()) {
updateManager.setRefLogIdent(user.asIdentifiedUser().newRefLogIdent(when, tz));
.setChangeRepo(
repoView.getRepository(),
repoView.getRevWalk(),
repoView.getInserter(),
repoView.getCommands()),
dryrun);
if (user.isIdentifiedUser()) {
handle.manager.setRefLogIdent(user.asIdentifiedUser().newRefLogIdent(when, tz));
}
for (Map.Entry<Change.Id, Collection<BatchUpdateOp>> e : ops.asMap().entrySet()) {
Change.Id id = e.getKey();
ChangeContextImpl ctx = newChangeContext(id);
boolean dirty = false;
logDebug("Applying {} ops for change {}", e.getValue().size(), id);
for (BatchUpdateOp op : e.getValue()) {
dirty |= op.updateChange(ctx);
}
for (Map.Entry<Change.Id, Collection<BatchUpdateOp>> e : ops.asMap().entrySet()) {
Change.Id id = e.getKey();
ChangeContextImpl ctx = newChangeContext(id);
boolean dirty = false;
logDebug("Applying {} ops for change {}", e.getValue().size(), id);
for (BatchUpdateOp op : e.getValue()) {
dirty |= op.updateChange(ctx);
}
if (!dirty) {
logDebug("No ops reported dirty, short-circuiting");
result.put(id, ChangeResult.SKIPPED);
continue;
}
for (ChangeUpdate u : ctx.updates.values()) {
updateManager.add(u);
}
if (ctx.deleted) {
logDebug("Change {} was deleted", id);
updateManager.deleteChange(id);
result.put(id, ChangeResult.DELETED);
} else {
result.put(id, ChangeResult.UPSERTED);
}
if (!dirty) {
logDebug("No ops reported dirty, short-circuiting");
handle.setResult(id, ChangeResult.SKIPPED);
continue;
}
if (!dryrun) {
logDebug("Executing NoteDb updates");
updateManager.execute();
for (ChangeUpdate u : ctx.updates.values()) {
handle.manager.add(u);
}
if (ctx.deleted) {
logDebug("Change {} was deleted", id);
handle.manager.deleteChange(id);
handle.setResult(id, ChangeResult.DELETED);
} else {
handle.setResult(id, ChangeResult.UPSERTED);
}
}
return result;
return handle;
}
private ChangeContextImpl newChangeContext(Change.Id id) throws OrmException {
@@ -423,28 +440,6 @@ class NoteDbBatchUpdate extends BatchUpdate {
return new ChangeContextImpl(ctl);
}
private void reindexChanges(Map<Change.Id, ChangeResult> updateResults, boolean dryrun) {
if (dryrun) {
return;
}
logDebug("Reindexing {} changes", updateResults.size());
for (Map.Entry<Change.Id, ChangeResult> e : updateResults.entrySet()) {
Change.Id id = e.getKey();
switch (e.getValue()) {
case UPSERTED:
indexFutures.add(indexer.indexAsync(project, id));
break;
case DELETED:
indexFutures.add(indexer.deleteAsync(id));
break;
case SKIPPED:
break;
default:
throw new IllegalStateException("unexpected result: " + e.getValue());
}
}
}
private void executePostOps() throws Exception {
ContextImpl ctx = new ContextImpl();
for (BatchUpdateOp op : ops.values()) {