Update code refs and NoteDb meta refs in one batch
Within NoteDbBatchUpdate, always use the revwalk, inserter, and commands from the single RepoView associated with the update. All BatchUpdateOp#updateChange implementations have been converted to inspect the limited kinds of repo state exposed by RepoView, so they will see the results of updateRepo even though the ref updates have not been executed yet. One complication comes from the fact that NoteDbUpdateManager really wants to control execution of the BatchRefUpdates via its own execute method. We have to make sure the execute method is deferred until after updateRepo has been called. This requires expanding the previous ChangeResults returned by executeChangeUps into something a little more beefy. Change-Id: Ia5b34bae93630f6d4dbeddb4ad5ce725ba5c6147
This commit is contained in:
@@ -17,11 +17,9 @@ package com.google.gerrit.server.update;
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static java.util.Comparator.comparing;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.util.concurrent.CheckedFuture;
|
||||
import com.google.gerrit.common.Nullable;
|
||||
import com.google.gerrit.extensions.restapi.RestApiException;
|
||||
@@ -46,15 +44,13 @@ import java.io.IOException;
|
||||
import java.sql.Timestamp;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.TimeZone;
|
||||
import java.util.TreeMap;
|
||||
import org.eclipse.jgit.lib.NullProgressMonitor;
|
||||
import org.eclipse.jgit.lib.ObjectInserter;
|
||||
import org.eclipse.jgit.lib.ObjectReader;
|
||||
import org.eclipse.jgit.lib.PersonIdent;
|
||||
import org.eclipse.jgit.lib.Repository;
|
||||
import org.eclipse.jgit.revwalk.RevWalk;
|
||||
import org.eclipse.jgit.transport.ReceiveCommand;
|
||||
|
||||
@@ -82,43 +78,64 @@ class NoteDbBatchUpdate extends BatchUpdate {
|
||||
setRequestIds(updates, requestId);
|
||||
|
||||
try {
|
||||
List<CheckedFuture<?, IOException>> indexFutures = new ArrayList<>();
|
||||
List<ChangesHandle> handles = new ArrayList<>(updates.size());
|
||||
Order order = getOrder(updates, listener);
|
||||
// TODO(dborowitz): Fuse implementations to use a single BatchRefUpdate between phases. Note
|
||||
// that we may still need to respect the order, since op implementations may make assumptions
|
||||
// about the order in which their methods are called.
|
||||
switch (order) {
|
||||
case REPO_BEFORE_DB:
|
||||
for (NoteDbBatchUpdate u : updates) {
|
||||
u.executeUpdateRepo();
|
||||
}
|
||||
listener.afterUpdateRepos();
|
||||
for (NoteDbBatchUpdate u : updates) {
|
||||
u.executeRefUpdates(dryrun);
|
||||
}
|
||||
listener.afterUpdateRefs();
|
||||
for (NoteDbBatchUpdate u : updates) {
|
||||
u.reindexChanges(u.executeChangeOps(dryrun), dryrun);
|
||||
}
|
||||
listener.afterUpdateChanges();
|
||||
break;
|
||||
case DB_BEFORE_REPO:
|
||||
for (NoteDbBatchUpdate u : updates) {
|
||||
u.reindexChanges(u.executeChangeOps(dryrun), dryrun);
|
||||
}
|
||||
for (NoteDbBatchUpdate u : updates) {
|
||||
u.executeUpdateRepo();
|
||||
}
|
||||
for (NoteDbBatchUpdate u : updates) {
|
||||
u.executeRefUpdates(dryrun);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("invalid execution order: " + order);
|
||||
try {
|
||||
switch (order) {
|
||||
case REPO_BEFORE_DB:
|
||||
for (NoteDbBatchUpdate u : updates) {
|
||||
u.executeUpdateRepo();
|
||||
}
|
||||
listener.afterUpdateRepos();
|
||||
for (NoteDbBatchUpdate u : updates) {
|
||||
handles.add(u.executeChangeOps(dryrun));
|
||||
}
|
||||
for (ChangesHandle h : handles) {
|
||||
h.execute();
|
||||
indexFutures.addAll(h.startIndexFutures());
|
||||
}
|
||||
listener.afterUpdateRefs();
|
||||
listener.afterUpdateChanges();
|
||||
break;
|
||||
|
||||
case DB_BEFORE_REPO:
|
||||
// Call updateChange for each op before updateRepo, but defer executing the
|
||||
// NoteDbUpdateManager until after calling updateRepo. They share an inserter and
|
||||
// BatchRefUpdate, so it will all execute as a single batch. But we have to let
|
||||
// NoteDbUpdateManager actually execute the update, since it has to interleave it
|
||||
// properly with All-Users updates.
|
||||
//
|
||||
// TODO(dborowitz): This may still result in multiple updates to All-Users, but that's
|
||||
// currently not a big deal because multi-change batches generally aren't affecting
|
||||
// drafts anyway.
|
||||
for (NoteDbBatchUpdate u : updates) {
|
||||
handles.add(u.executeChangeOps(dryrun));
|
||||
}
|
||||
for (NoteDbBatchUpdate u : updates) {
|
||||
u.executeUpdateRepo();
|
||||
}
|
||||
for (ChangesHandle h : handles) {
|
||||
// TODO(dborowitz): This isn't quite good enough: in theory updateRepo may want to
|
||||
// see the results of change meta commands, but they aren't actually added to the
|
||||
// BatchUpdate until the body of execute. To fix this, execute needs to be split up
|
||||
// into a method that returns a BatchRefUpdate before execution. Not a big deal at the
|
||||
// moment, because this order is only used for deleting changes, and those updateRepo
|
||||
// implementations definitely don't need to observe the updated change meta refs.
|
||||
h.execute();
|
||||
indexFutures.addAll(h.startIndexFutures());
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("invalid execution order: " + order);
|
||||
}
|
||||
} finally {
|
||||
for (ChangesHandle h : handles) {
|
||||
h.close();
|
||||
}
|
||||
}
|
||||
|
||||
ChangeIndexer.allAsList(
|
||||
updates.stream().flatMap(u -> u.indexFutures.stream()).collect(toList()))
|
||||
.get();
|
||||
ChangeIndexer.allAsList(indexFutures).get();
|
||||
|
||||
// Fire ref update events only after all mutations are finished, since callers may assume a
|
||||
// patch set ref being created means the change was created, or a branch advancing meaning
|
||||
@@ -250,8 +267,6 @@ class NoteDbBatchUpdate extends BatchUpdate {
|
||||
private final GitReferenceUpdated gitRefUpdated;
|
||||
private final ReviewDb db;
|
||||
|
||||
private List<CheckedFuture<?, IOException>> indexFutures;
|
||||
|
||||
@Inject
|
||||
NoteDbBatchUpdate(
|
||||
GitRepositoryManager repoManager,
|
||||
@@ -275,7 +290,6 @@ class NoteDbBatchUpdate extends BatchUpdate {
|
||||
this.indexer = indexer;
|
||||
this.gitRefUpdated = gitRefUpdated;
|
||||
this.db = db;
|
||||
this.indexFutures = new ArrayList<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -309,101 +323,104 @@ class NoteDbBatchUpdate extends BatchUpdate {
|
||||
onSubmitValidators.validate(
|
||||
project, ctx.getRevWalk().getObjectReader(), repoView.getCommands());
|
||||
}
|
||||
|
||||
// TODO(dborowitz): Don't flush when fusing phases.
|
||||
if (repoView != null) {
|
||||
logDebug("Flushing inserter");
|
||||
repoView.getInserter().flush();
|
||||
} else {
|
||||
logDebug("No objects to flush");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
Throwables.throwIfInstanceOf(e, RestApiException.class);
|
||||
throw new UpdateException(e);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(dborowitz): Don't execute non-change ref updates separately when fusing phases.
|
||||
private void executeRefUpdates(boolean dryrun) throws IOException, RestApiException {
|
||||
if (getRefUpdates().isEmpty()) {
|
||||
logDebug("No ref updates to execute");
|
||||
return;
|
||||
}
|
||||
// May not be opened if the caller added ref updates but no new objects.
|
||||
initRepository();
|
||||
batchRefUpdate = repoView.getRepository().getRefDatabase().newBatchUpdate();
|
||||
repoView.getCommands().addTo(batchRefUpdate);
|
||||
logDebug("Executing batch of {} ref updates", batchRefUpdate.getCommands().size());
|
||||
if (dryrun) {
|
||||
return;
|
||||
private class ChangesHandle implements AutoCloseable {
|
||||
private final NoteDbUpdateManager manager;
|
||||
private final boolean dryrun;
|
||||
private final Map<Change.Id, ChangeResult> results;
|
||||
|
||||
ChangesHandle(NoteDbUpdateManager manager, boolean dryrun) {
|
||||
this.manager = manager;
|
||||
this.dryrun = dryrun;
|
||||
results = new HashMap<>();
|
||||
}
|
||||
|
||||
// Force BatchRefUpdate to read newly referenced objects using a new RevWalk, rather than one
|
||||
// that might have access to unflushed objects.
|
||||
try (RevWalk updateRw = new RevWalk(repoView.getRepository())) {
|
||||
batchRefUpdate.execute(updateRw, NullProgressMonitor.INSTANCE);
|
||||
@Override
|
||||
public void close() {
|
||||
manager.close();
|
||||
}
|
||||
boolean ok = true;
|
||||
for (ReceiveCommand cmd : batchRefUpdate.getCommands()) {
|
||||
if (cmd.getResult() != ReceiveCommand.Result.OK) {
|
||||
ok = false;
|
||||
break;
|
||||
|
||||
void setResult(Change.Id id, ChangeResult result) {
|
||||
ChangeResult old = results.putIfAbsent(id, result);
|
||||
checkArgument(old == null, "result for change %s already set: %s", id, old);
|
||||
}
|
||||
|
||||
void execute() throws OrmException, IOException {
|
||||
NoteDbBatchUpdate.this.batchRefUpdate = manager.execute(dryrun);
|
||||
}
|
||||
|
||||
List<CheckedFuture<?, IOException>> startIndexFutures() {
|
||||
if (dryrun) {
|
||||
return ImmutableList.of();
|
||||
}
|
||||
}
|
||||
if (!ok) {
|
||||
throw new RestApiException("BatchRefUpdate failed: " + batchRefUpdate);
|
||||
logDebug("Reindexing {} changes", results.size());
|
||||
List<CheckedFuture<?, IOException>> indexFutures = new ArrayList<>(results.size());
|
||||
for (Map.Entry<Change.Id, ChangeResult> e : results.entrySet()) {
|
||||
Change.Id id = e.getKey();
|
||||
switch (e.getValue()) {
|
||||
case UPSERTED:
|
||||
indexFutures.add(indexer.indexAsync(project, id));
|
||||
break;
|
||||
case DELETED:
|
||||
indexFutures.add(indexer.deleteAsync(id));
|
||||
break;
|
||||
case SKIPPED:
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("unexpected result: " + e.getValue());
|
||||
}
|
||||
}
|
||||
return indexFutures;
|
||||
}
|
||||
}
|
||||
|
||||
private Map<Change.Id, ChangeResult> executeChangeOps(boolean dryrun) throws Exception {
|
||||
private ChangesHandle executeChangeOps(boolean dryrun) throws Exception {
|
||||
logDebug("Executing change ops");
|
||||
Map<Change.Id, ChangeResult> result =
|
||||
Maps.newLinkedHashMapWithExpectedSize(ops.keySet().size());
|
||||
initRepository();
|
||||
Repository repo = repoView.getRepository();
|
||||
// TODO(dborowitz): Teach NoteDbUpdateManager to allow reusing the same inserter and batch ref
|
||||
// update as in executeUpdateRepo.
|
||||
try (ObjectInserter ins = repo.newObjectInserter();
|
||||
ObjectReader reader = ins.newReader();
|
||||
RevWalk rw = new RevWalk(reader);
|
||||
NoteDbUpdateManager updateManager =
|
||||
|
||||
ChangesHandle handle =
|
||||
new ChangesHandle(
|
||||
updateManagerFactory
|
||||
.create(project)
|
||||
.setChangeRepo(repo, rw, ins, new ChainedReceiveCommands(repo))) {
|
||||
if (user.isIdentifiedUser()) {
|
||||
updateManager.setRefLogIdent(user.asIdentifiedUser().newRefLogIdent(when, tz));
|
||||
.setChangeRepo(
|
||||
repoView.getRepository(),
|
||||
repoView.getRevWalk(),
|
||||
repoView.getInserter(),
|
||||
repoView.getCommands()),
|
||||
dryrun);
|
||||
if (user.isIdentifiedUser()) {
|
||||
handle.manager.setRefLogIdent(user.asIdentifiedUser().newRefLogIdent(when, tz));
|
||||
}
|
||||
for (Map.Entry<Change.Id, Collection<BatchUpdateOp>> e : ops.asMap().entrySet()) {
|
||||
Change.Id id = e.getKey();
|
||||
ChangeContextImpl ctx = newChangeContext(id);
|
||||
boolean dirty = false;
|
||||
logDebug("Applying {} ops for change {}", e.getValue().size(), id);
|
||||
for (BatchUpdateOp op : e.getValue()) {
|
||||
dirty |= op.updateChange(ctx);
|
||||
}
|
||||
for (Map.Entry<Change.Id, Collection<BatchUpdateOp>> e : ops.asMap().entrySet()) {
|
||||
Change.Id id = e.getKey();
|
||||
ChangeContextImpl ctx = newChangeContext(id);
|
||||
boolean dirty = false;
|
||||
logDebug("Applying {} ops for change {}", e.getValue().size(), id);
|
||||
for (BatchUpdateOp op : e.getValue()) {
|
||||
dirty |= op.updateChange(ctx);
|
||||
}
|
||||
if (!dirty) {
|
||||
logDebug("No ops reported dirty, short-circuiting");
|
||||
result.put(id, ChangeResult.SKIPPED);
|
||||
continue;
|
||||
}
|
||||
for (ChangeUpdate u : ctx.updates.values()) {
|
||||
updateManager.add(u);
|
||||
}
|
||||
if (ctx.deleted) {
|
||||
logDebug("Change {} was deleted", id);
|
||||
updateManager.deleteChange(id);
|
||||
result.put(id, ChangeResult.DELETED);
|
||||
} else {
|
||||
result.put(id, ChangeResult.UPSERTED);
|
||||
}
|
||||
if (!dirty) {
|
||||
logDebug("No ops reported dirty, short-circuiting");
|
||||
handle.setResult(id, ChangeResult.SKIPPED);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!dryrun) {
|
||||
logDebug("Executing NoteDb updates");
|
||||
updateManager.execute();
|
||||
for (ChangeUpdate u : ctx.updates.values()) {
|
||||
handle.manager.add(u);
|
||||
}
|
||||
if (ctx.deleted) {
|
||||
logDebug("Change {} was deleted", id);
|
||||
handle.manager.deleteChange(id);
|
||||
handle.setResult(id, ChangeResult.DELETED);
|
||||
} else {
|
||||
handle.setResult(id, ChangeResult.UPSERTED);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
return handle;
|
||||
}
|
||||
|
||||
private ChangeContextImpl newChangeContext(Change.Id id) throws OrmException {
|
||||
@@ -423,28 +440,6 @@ class NoteDbBatchUpdate extends BatchUpdate {
|
||||
return new ChangeContextImpl(ctl);
|
||||
}
|
||||
|
||||
private void reindexChanges(Map<Change.Id, ChangeResult> updateResults, boolean dryrun) {
|
||||
if (dryrun) {
|
||||
return;
|
||||
}
|
||||
logDebug("Reindexing {} changes", updateResults.size());
|
||||
for (Map.Entry<Change.Id, ChangeResult> e : updateResults.entrySet()) {
|
||||
Change.Id id = e.getKey();
|
||||
switch (e.getValue()) {
|
||||
case UPSERTED:
|
||||
indexFutures.add(indexer.indexAsync(project, id));
|
||||
break;
|
||||
case DELETED:
|
||||
indexFutures.add(indexer.deleteAsync(id));
|
||||
break;
|
||||
case SKIPPED:
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("unexpected result: " + e.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void executePostOps() throws Exception {
|
||||
ContextImpl ctx = new ContextImpl();
|
||||
for (BatchUpdateOp op : ops.values()) {
|
||||
|
||||
Reference in New Issue
Block a user