Support async change operations directly in BatchUpdate
This replaces the manual threadpool management in ReceiveCommits with a more general approach. That support was added for slow NoSQL backends, which may also have poor performance characteristics when updating lots of NoteDb refs in parallel. This approach substantially rewrites the BatchUpdate loop to stage all NoteDb update operations in memory in separate threads, each with its own copy of the ReviewDb and change repo, and then aggregates the results together in the main thread with a single ObjectInserter and BatchRefUpdate. One nice thing about this approach is it doesn't require any changes to BatchUpdate.Op implementations: each Op's methods are executed sequentially, in one thread at a time, with proper barriers (an executor) to ensure later methods see the results of writes in the background thread. Use an in-memory implementation of ObjectInserter in each of the background threads' NoteDbUpdateManagers to buffer writes completely in memory until it's time for the caller to flush. This wastes a small amount of memory for the buffer, but these are just NoteDb objects, so they should be quite small. This implementation using immutable result types is preferable to trying to share a Repository/ObjectReader/ ObjectInserter across threads, which requires manual locking. That is not only painful but also produces deadlocks when mixing repo-level locks and SQL-level locks (e.g. H2's transaction implementation) across threads. Change-Id: I40545a4d48fcfa892bd3e4c0cd9b72ab7fac9436
This commit is contained in:
@@ -329,7 +329,6 @@ public class PushOneCommit {
|
|||||||
public void assertMessage(String expectedMessage) {
|
public void assertMessage(String expectedMessage) {
|
||||||
RemoteRefUpdate refUpdate = result.getRemoteUpdate(ref);
|
RemoteRefUpdate refUpdate = result.getRemoteUpdate(ref);
|
||||||
assertThat(message(refUpdate).toLowerCase())
|
assertThat(message(refUpdate).toLowerCase())
|
||||||
.named(message(refUpdate))
|
|
||||||
.contains(expectedMessage.toLowerCase());
|
.contains(expectedMessage.toLowerCase());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -47,6 +47,7 @@ import com.google.gerrit.server.config.GitReceivePackGroups;
|
|||||||
import com.google.gerrit.server.config.GitUploadPackGroups;
|
import com.google.gerrit.server.config.GitUploadPackGroups;
|
||||||
import com.google.gerrit.server.git.BatchUpdate;
|
import com.google.gerrit.server.git.BatchUpdate;
|
||||||
import com.google.gerrit.server.git.MergeUtil;
|
import com.google.gerrit.server.git.MergeUtil;
|
||||||
|
import com.google.gerrit.server.git.ReceiveCommitsExecutorModule;
|
||||||
import com.google.gerrit.server.git.SearchingChangeCacheImpl;
|
import com.google.gerrit.server.git.SearchingChangeCacheImpl;
|
||||||
import com.google.gerrit.server.git.TagCache;
|
import com.google.gerrit.server.git.TagCache;
|
||||||
import com.google.gerrit.server.group.GroupModule;
|
import com.google.gerrit.server.group.GroupModule;
|
||||||
@@ -96,6 +97,7 @@ public class BatchProgramModule extends FactoryModule {
|
|||||||
protected void configure() {
|
protected void configure() {
|
||||||
install(reviewDbModule);
|
install(reviewDbModule);
|
||||||
install(new DiffExecutorModule());
|
install(new DiffExecutorModule());
|
||||||
|
install(new ReceiveCommitsExecutorModule());
|
||||||
install(PatchListCacheImpl.module());
|
install(PatchListCacheImpl.module());
|
||||||
|
|
||||||
// Plugins are not loaded and we're just running through each change
|
// Plugins are not loaded and we're just running through each change
|
||||||
|
|||||||
@@ -23,6 +23,10 @@ import com.google.common.collect.ImmutableList;
|
|||||||
import com.google.common.collect.ListMultimap;
|
import com.google.common.collect.ListMultimap;
|
||||||
import com.google.common.collect.MultimapBuilder;
|
import com.google.common.collect.MultimapBuilder;
|
||||||
import com.google.common.util.concurrent.CheckedFuture;
|
import com.google.common.util.concurrent.CheckedFuture;
|
||||||
|
import com.google.common.util.concurrent.Futures;
|
||||||
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||||
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
import com.google.gerrit.extensions.restapi.ResourceConflictException;
|
import com.google.gerrit.extensions.restapi.ResourceConflictException;
|
||||||
import com.google.gerrit.extensions.restapi.ResourceNotFoundException;
|
import com.google.gerrit.extensions.restapi.ResourceNotFoundException;
|
||||||
import com.google.gerrit.extensions.restapi.RestApiException;
|
import com.google.gerrit.extensions.restapi.RestApiException;
|
||||||
@@ -34,12 +38,12 @@ import com.google.gerrit.reviewdb.server.ReviewDbUtil;
|
|||||||
import com.google.gerrit.reviewdb.server.ReviewDbWrapper;
|
import com.google.gerrit.reviewdb.server.ReviewDbWrapper;
|
||||||
import com.google.gerrit.server.CurrentUser;
|
import com.google.gerrit.server.CurrentUser;
|
||||||
import com.google.gerrit.server.GerritPersonIdent;
|
import com.google.gerrit.server.GerritPersonIdent;
|
||||||
import com.google.gerrit.server.PatchLineCommentsUtil;
|
import com.google.gerrit.server.config.AllUsersName;
|
||||||
import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
|
import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
|
||||||
import com.google.gerrit.server.index.change.ChangeIndexer;
|
import com.google.gerrit.server.index.change.ChangeIndexer;
|
||||||
import com.google.gerrit.server.notedb.ChangeDelete;
|
|
||||||
import com.google.gerrit.server.notedb.ChangeNotes;
|
import com.google.gerrit.server.notedb.ChangeNotes;
|
||||||
import com.google.gerrit.server.notedb.ChangeUpdate;
|
import com.google.gerrit.server.notedb.ChangeUpdate;
|
||||||
|
import com.google.gerrit.server.notedb.InsertedObject;
|
||||||
import com.google.gerrit.server.notedb.NoteDbChangeState;
|
import com.google.gerrit.server.notedb.NoteDbChangeState;
|
||||||
import com.google.gerrit.server.notedb.NoteDbUpdateManager;
|
import com.google.gerrit.server.notedb.NoteDbUpdateManager;
|
||||||
import com.google.gerrit.server.notedb.NotesMigration;
|
import com.google.gerrit.server.notedb.NotesMigration;
|
||||||
@@ -51,12 +55,14 @@ import com.google.gerrit.server.project.NoSuchRefException;
|
|||||||
import com.google.gerrit.server.schema.DisabledChangesReviewDbWrapper;
|
import com.google.gerrit.server.schema.DisabledChangesReviewDbWrapper;
|
||||||
import com.google.gwtorm.server.OrmConcurrencyException;
|
import com.google.gwtorm.server.OrmConcurrencyException;
|
||||||
import com.google.gwtorm.server.OrmException;
|
import com.google.gwtorm.server.OrmException;
|
||||||
|
import com.google.gwtorm.server.SchemaFactory;
|
||||||
import com.google.inject.assistedinject.Assisted;
|
import com.google.inject.assistedinject.Assisted;
|
||||||
import com.google.inject.assistedinject.AssistedInject;
|
import com.google.inject.assistedinject.AssistedInject;
|
||||||
|
|
||||||
import org.eclipse.jgit.lib.BatchRefUpdate;
|
import org.eclipse.jgit.lib.BatchRefUpdate;
|
||||||
import org.eclipse.jgit.lib.NullProgressMonitor;
|
import org.eclipse.jgit.lib.NullProgressMonitor;
|
||||||
import org.eclipse.jgit.lib.ObjectInserter;
|
import org.eclipse.jgit.lib.ObjectInserter;
|
||||||
|
import org.eclipse.jgit.lib.ObjectReader;
|
||||||
import org.eclipse.jgit.lib.PersonIdent;
|
import org.eclipse.jgit.lib.PersonIdent;
|
||||||
import org.eclipse.jgit.lib.Repository;
|
import org.eclipse.jgit.lib.Repository;
|
||||||
import org.eclipse.jgit.revwalk.RevWalk;
|
import org.eclipse.jgit.revwalk.RevWalk;
|
||||||
@@ -74,6 +80,8 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TimeZone;
|
import java.util.TimeZone;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Context for a set of updates that should be applied for a site.
|
* Context for a set of updates that should be applied for a site.
|
||||||
@@ -184,13 +192,18 @@ public class BatchUpdate implements AutoCloseable {
|
|||||||
private final ChangeControl ctl;
|
private final ChangeControl ctl;
|
||||||
private final Map<PatchSet.Id, ChangeUpdate> updates;
|
private final Map<PatchSet.Id, ChangeUpdate> updates;
|
||||||
private final ReviewDbWrapper dbWrapper;
|
private final ReviewDbWrapper dbWrapper;
|
||||||
|
private final Repository threadLocalRepo;
|
||||||
|
private final RevWalk threadLocalRevWalk;
|
||||||
|
|
||||||
private boolean deleted;
|
private boolean deleted;
|
||||||
private boolean bumpLastUpdatedOn = true;
|
private boolean bumpLastUpdatedOn = true;
|
||||||
|
|
||||||
private ChangeContext(ChangeControl ctl, ReviewDbWrapper dbWrapper) {
|
protected ChangeContext(ChangeControl ctl, ReviewDbWrapper dbWrapper,
|
||||||
|
Repository repo, RevWalk rw) {
|
||||||
this.ctl = ctl;
|
this.ctl = ctl;
|
||||||
this.dbWrapper = dbWrapper;
|
this.dbWrapper = dbWrapper;
|
||||||
|
this.threadLocalRepo = repo;
|
||||||
|
this.threadLocalRevWalk = rw;
|
||||||
updates = new TreeMap<>(ReviewDbUtil.intKeyOrdering());
|
updates = new TreeMap<>(ReviewDbUtil.intKeyOrdering());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -200,6 +213,16 @@ public class BatchUpdate implements AutoCloseable {
|
|||||||
return dbWrapper;
|
return dbWrapper;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Repository getRepository() {
|
||||||
|
return threadLocalRepo;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RevWalk getRevWalk() {
|
||||||
|
return threadLocalRevWalk;
|
||||||
|
}
|
||||||
|
|
||||||
public ChangeUpdate getUpdate(PatchSet.Id psId) {
|
public ChangeUpdate getUpdate(PatchSet.Id psId) {
|
||||||
ChangeUpdate u = updates.get(psId);
|
ChangeUpdate u = updates.get(psId);
|
||||||
if (u == null) {
|
if (u == null) {
|
||||||
@@ -310,6 +333,26 @@ public class BatchUpdate implements AutoCloseable {
|
|||||||
return o;
|
return o;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static boolean getUpdateChangesInParallel(
|
||||||
|
Collection<BatchUpdate> updates) {
|
||||||
|
checkArgument(!updates.isEmpty());
|
||||||
|
Boolean p = null;
|
||||||
|
for (BatchUpdate u : updates) {
|
||||||
|
if (p == null) {
|
||||||
|
p = u.updateChangesInParallel;
|
||||||
|
} else if (u.updateChangesInParallel != p) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"cannot mix parallel and non-parallel operations");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Properly implementing this would involve hoisting the parallel loop up
|
||||||
|
// even further. As of this writing, the only user is ReceiveCommits,
|
||||||
|
// which only executes a single BatchUpdate at a time. So bail for now.
|
||||||
|
checkArgument(!p || updates.size() <= 1,
|
||||||
|
"cannot execute ChangeOps in parallel with more than 1 BatchUpdate");
|
||||||
|
return p;
|
||||||
|
}
|
||||||
|
|
||||||
static void execute(Collection<BatchUpdate> updates, Listener listener)
|
static void execute(Collection<BatchUpdate> updates, Listener listener)
|
||||||
throws UpdateException, RestApiException {
|
throws UpdateException, RestApiException {
|
||||||
if (updates.isEmpty()) {
|
if (updates.isEmpty()) {
|
||||||
@@ -317,6 +360,7 @@ public class BatchUpdate implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
Order order = getOrder(updates);
|
Order order = getOrder(updates);
|
||||||
|
boolean updateChangesInParallel = getUpdateChangesInParallel(updates);
|
||||||
switch (order) {
|
switch (order) {
|
||||||
case REPO_BEFORE_DB:
|
case REPO_BEFORE_DB:
|
||||||
for (BatchUpdate u : updates) {
|
for (BatchUpdate u : updates) {
|
||||||
@@ -328,13 +372,13 @@ public class BatchUpdate implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
listener.afterRefUpdates();
|
listener.afterRefUpdates();
|
||||||
for (BatchUpdate u : updates) {
|
for (BatchUpdate u : updates) {
|
||||||
u.executeChangeOps();
|
u.executeChangeOps(updateChangesInParallel);
|
||||||
}
|
}
|
||||||
listener.afterUpdateChanges();
|
listener.afterUpdateChanges();
|
||||||
break;
|
break;
|
||||||
case DB_BEFORE_REPO:
|
case DB_BEFORE_REPO:
|
||||||
for (BatchUpdate u : updates) {
|
for (BatchUpdate u : updates) {
|
||||||
u.executeChangeOps();
|
u.executeChangeOps(updateChangesInParallel);
|
||||||
}
|
}
|
||||||
listener.afterUpdateChanges();
|
listener.afterUpdateChanges();
|
||||||
for (BatchUpdate u : updates) {
|
for (BatchUpdate u : updates) {
|
||||||
@@ -392,16 +436,18 @@ public class BatchUpdate implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private final ReviewDb db;
|
private final AllUsersName allUsers;
|
||||||
private final GitRepositoryManager repoManager;
|
|
||||||
private final ChangeIndexer indexer;
|
|
||||||
private final ChangeControl.GenericFactory changeControlFactory;
|
private final ChangeControl.GenericFactory changeControlFactory;
|
||||||
|
private final ChangeIndexer indexer;
|
||||||
private final ChangeNotes.Factory changeNotesFactory;
|
private final ChangeNotes.Factory changeNotesFactory;
|
||||||
private final ChangeUpdate.Factory changeUpdateFactory;
|
private final ChangeUpdate.Factory changeUpdateFactory;
|
||||||
private final NoteDbUpdateManager.Factory updateManagerFactory;
|
|
||||||
private final GitReferenceUpdated gitRefUpdated;
|
private final GitReferenceUpdated gitRefUpdated;
|
||||||
|
private final GitRepositoryManager repoManager;
|
||||||
|
private final ListeningExecutorService changeUpdateExector;
|
||||||
|
private final NoteDbUpdateManager.Factory updateManagerFactory;
|
||||||
private final NotesMigration notesMigration;
|
private final NotesMigration notesMigration;
|
||||||
private final PatchLineCommentsUtil plcUtil;
|
private final ReviewDb db;
|
||||||
|
private final SchemaFactory<ReviewDb> schemaFactory;
|
||||||
|
|
||||||
private final Project.NameKey project;
|
private final Project.NameKey project;
|
||||||
private final CurrentUser user;
|
private final CurrentUser user;
|
||||||
@@ -421,32 +467,39 @@ public class BatchUpdate implements AutoCloseable {
|
|||||||
private BatchRefUpdate batchRefUpdate;
|
private BatchRefUpdate batchRefUpdate;
|
||||||
private boolean closeRepo;
|
private boolean closeRepo;
|
||||||
private Order order;
|
private Order order;
|
||||||
|
private boolean updateChangesInParallel;
|
||||||
|
|
||||||
@AssistedInject
|
@AssistedInject
|
||||||
BatchUpdate(GitRepositoryManager repoManager,
|
BatchUpdate(
|
||||||
ChangeIndexer indexer,
|
AllUsersName allUsers,
|
||||||
ChangeControl.GenericFactory changeControlFactory,
|
ChangeControl.GenericFactory changeControlFactory,
|
||||||
|
ChangeIndexer indexer,
|
||||||
ChangeNotes.Factory changeNotesFactory,
|
ChangeNotes.Factory changeNotesFactory,
|
||||||
|
@ChangeUpdateExecutor ListeningExecutorService changeUpdateExector,
|
||||||
ChangeUpdate.Factory changeUpdateFactory,
|
ChangeUpdate.Factory changeUpdateFactory,
|
||||||
NoteDbUpdateManager.Factory updateManagerFactory,
|
|
||||||
GitReferenceUpdated gitRefUpdated,
|
|
||||||
NotesMigration notesMigration,
|
|
||||||
PatchLineCommentsUtil plcUtil,
|
|
||||||
@GerritPersonIdent PersonIdent serverIdent,
|
@GerritPersonIdent PersonIdent serverIdent,
|
||||||
|
GitReferenceUpdated gitRefUpdated,
|
||||||
|
GitRepositoryManager repoManager,
|
||||||
|
NoteDbUpdateManager.Factory updateManagerFactory,
|
||||||
|
NotesMigration notesMigration,
|
||||||
|
SchemaFactory<ReviewDb> schemaFactory,
|
||||||
@Assisted ReviewDb db,
|
@Assisted ReviewDb db,
|
||||||
@Assisted Project.NameKey project,
|
@Assisted Project.NameKey project,
|
||||||
@Assisted CurrentUser user,
|
@Assisted CurrentUser user,
|
||||||
@Assisted Timestamp when) {
|
@Assisted Timestamp when) {
|
||||||
this.db = db;
|
this.allUsers = allUsers;
|
||||||
this.repoManager = repoManager;
|
|
||||||
this.indexer = indexer;
|
|
||||||
this.changeControlFactory = changeControlFactory;
|
this.changeControlFactory = changeControlFactory;
|
||||||
this.changeNotesFactory = changeNotesFactory;
|
this.changeNotesFactory = changeNotesFactory;
|
||||||
|
this.changeUpdateExector = changeUpdateExector;
|
||||||
this.changeUpdateFactory = changeUpdateFactory;
|
this.changeUpdateFactory = changeUpdateFactory;
|
||||||
this.updateManagerFactory = updateManagerFactory;
|
|
||||||
this.gitRefUpdated = gitRefUpdated;
|
this.gitRefUpdated = gitRefUpdated;
|
||||||
|
this.indexer = indexer;
|
||||||
this.notesMigration = notesMigration;
|
this.notesMigration = notesMigration;
|
||||||
this.plcUtil = plcUtil;
|
this.repoManager = repoManager;
|
||||||
|
this.schemaFactory = schemaFactory;
|
||||||
|
this.updateManagerFactory = updateManagerFactory;
|
||||||
|
|
||||||
|
this.db = db;
|
||||||
this.project = project;
|
this.project = project;
|
||||||
this.user = user;
|
this.user = user;
|
||||||
this.when = when;
|
this.when = when;
|
||||||
@@ -479,6 +532,14 @@ public class BatchUpdate implements AutoCloseable {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute {@link Op#updateChange(ChangeContext)} in parallel for each change.
|
||||||
|
*/
|
||||||
|
public BatchUpdate updateChangesInParallel() {
|
||||||
|
this.updateChangesInParallel = true;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
private void initRepository() throws IOException {
|
private void initRepository() throws IOException {
|
||||||
if (repo == null) {
|
if (repo == null) {
|
||||||
this.repo = repoManager.openRepository(project);
|
this.repo = repoManager.openRepository(project);
|
||||||
@@ -569,27 +630,182 @@ public class BatchUpdate implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void executeChangeOps() throws UpdateException, RestApiException {
|
private void executeChangeOps(boolean parallel)
|
||||||
|
throws UpdateException, RestApiException {
|
||||||
|
ListeningExecutorService executor = parallel
|
||||||
|
? changeUpdateExector
|
||||||
|
: MoreExecutors.newDirectExecutorService();
|
||||||
|
|
||||||
|
List<ChangeTask> tasks = new ArrayList<>(ops.keySet().size());
|
||||||
try {
|
try {
|
||||||
|
List<ListenableFuture<?>> futures = new ArrayList<>(ops.keySet().size());
|
||||||
for (Map.Entry<Change.Id, Collection<Op>> e : ops.asMap().entrySet()) {
|
for (Map.Entry<Change.Id, Collection<Op>> e : ops.asMap().entrySet()) {
|
||||||
Change.Id id = e.getKey();
|
ChangeTask task =
|
||||||
db.changes().beginTransaction(id);
|
new ChangeTask(e.getKey(), e.getValue(), Thread.currentThread());
|
||||||
|
tasks.add(task);
|
||||||
|
futures.add(executor.submit(task));
|
||||||
|
}
|
||||||
|
Futures.allAsList(futures).get();
|
||||||
|
|
||||||
|
if (notesMigration.writeChanges()) {
|
||||||
|
executeNoteDbUpdates(tasks);
|
||||||
|
}
|
||||||
|
} catch (ExecutionException | InterruptedException e) {
|
||||||
|
Throwables.propagateIfInstanceOf(e.getCause(), UpdateException.class);
|
||||||
|
Throwables.propagateIfInstanceOf(e.getCause(), RestApiException.class);
|
||||||
|
throw new UpdateException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reindex changes.
|
||||||
|
for (ChangeTask task : tasks) {
|
||||||
|
if (task.deleted) {
|
||||||
|
indexFutures.add(indexer.deleteAsync(task.id));
|
||||||
|
} else {
|
||||||
|
indexFutures.add(indexer.indexAsync(project, task.id));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void executeNoteDbUpdates(List<ChangeTask> tasks) {
|
||||||
|
// Aggregate together all NoteDb ref updates from the ops we executed,
|
||||||
|
// possibly in parallel. Each task had its own NoteDbUpdateManager instance
|
||||||
|
// with its own thread-local copy of the repo(s), but each of those was just
|
||||||
|
// used for staging updates and was never executed.
|
||||||
|
//
|
||||||
|
// Use a new BatchRefUpdate as the original batchRefUpdate field is intended
|
||||||
|
// for use only by the updateRepo phase.
|
||||||
|
//
|
||||||
|
// See the comments in NoteDbUpdateManager#execute() for why we execute the
|
||||||
|
// updates on the change repo first.
|
||||||
|
try {
|
||||||
|
BatchRefUpdate changeRefUpdate =
|
||||||
|
getRepository().getRefDatabase().newBatchUpdate();
|
||||||
|
boolean hasAllUsersCommands = false;
|
||||||
|
try (ObjectInserter ins = getRepository().newObjectInserter()) {
|
||||||
|
for (ChangeTask task : tasks) {
|
||||||
|
if (task.noteDbResult == null) {
|
||||||
|
continue; // No-op update.
|
||||||
|
}
|
||||||
|
for (ReceiveCommand cmd : task.noteDbResult.changeCommands()) {
|
||||||
|
changeRefUpdate.addCommand(cmd);
|
||||||
|
}
|
||||||
|
for (InsertedObject obj : task.noteDbResult.changeObjects()) {
|
||||||
|
ins.insert(obj.type(), obj.data().toByteArray());
|
||||||
|
}
|
||||||
|
hasAllUsersCommands |=
|
||||||
|
!task.noteDbResult.allUsersCommands().isEmpty();
|
||||||
|
}
|
||||||
|
executeNoteDbUpdate(getRevWalk(), ins, changeRefUpdate);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hasAllUsersCommands) {
|
||||||
|
try (Repository allUsersRepo = repoManager.openRepository(allUsers);
|
||||||
|
RevWalk allUsersRw = new RevWalk(allUsersRepo);
|
||||||
|
ObjectInserter allUsersIns = allUsersRepo.newObjectInserter()) {
|
||||||
|
BatchRefUpdate allUsersRefUpdate =
|
||||||
|
allUsersRepo.getRefDatabase().newBatchUpdate();
|
||||||
|
for (ChangeTask task : tasks) {
|
||||||
|
for (ReceiveCommand cmd : task.noteDbResult.allUsersCommands()) {
|
||||||
|
allUsersRefUpdate.addCommand(cmd);
|
||||||
|
}
|
||||||
|
for (InsertedObject obj : task.noteDbResult.allUsersObjects()) {
|
||||||
|
allUsersIns.insert(obj.type(), obj.data().toByteArray());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
executeNoteDbUpdate(allUsersRw, allUsersIns, allUsersRefUpdate);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
// Ignore all errors trying to update NoteDb at this point. We've
|
||||||
|
// already written the NoteDbChangeState to ReviewDb, which means
|
||||||
|
// if the state is out of date it will be rebuilt the next time it
|
||||||
|
// is needed.
|
||||||
|
log.debug(
|
||||||
|
"Ignoring NoteDb update error after ReviewDb write", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void executeNoteDbUpdate(RevWalk rw, ObjectInserter ins,
|
||||||
|
BatchRefUpdate bru) throws IOException {
|
||||||
|
if (bru.getCommands().isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
ins.flush();
|
||||||
|
bru.setAllowNonFastForwards(true);
|
||||||
|
bru.execute(rw, NullProgressMonitor.INSTANCE);
|
||||||
|
for (ReceiveCommand cmd : bru.getCommands()) {
|
||||||
|
if (cmd.getResult() != ReceiveCommand.Result.OK) {
|
||||||
|
// TODO(dborowitz): Not necessary once JGit is updated to include
|
||||||
|
// ba8eb931734d990c5a6a9352e4629fc84a191808.
|
||||||
|
StringBuilder sb = new StringBuilder("Update failed: [\n");
|
||||||
|
for (ReceiveCommand cmd2 : bru.getCommands()) {
|
||||||
|
sb.append(cmd2).append(": ").append(cmd2.getMessage()).append('\n');
|
||||||
|
}
|
||||||
|
throw new IOException(sb.append(']').toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class ChangeTask implements Callable<Void> {
|
||||||
|
final Change.Id id;
|
||||||
|
private final Collection<Op> changeOps;
|
||||||
|
private final Thread mainThread;
|
||||||
|
|
||||||
|
NoteDbUpdateManager.StagedResult noteDbResult;
|
||||||
|
boolean deleted;
|
||||||
|
|
||||||
|
private ChangeTask(Change.Id id, Collection<Op> changeOps,
|
||||||
|
Thread mainThread) {
|
||||||
|
this.id = id;
|
||||||
|
this.changeOps = changeOps;
|
||||||
|
this.mainThread = mainThread;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Void call() throws Exception {
|
||||||
|
if (Thread.currentThread() == mainThread) {
|
||||||
|
Repository repo = getRepository();
|
||||||
|
try (ObjectReader reader = repo.newObjectReader();
|
||||||
|
RevWalk rw = new RevWalk(repo)) {
|
||||||
|
call(BatchUpdate.this.db, repo, rw);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Possible optimization: allow Ops to declare whether they need to
|
||||||
|
// access the repo from updateChange, and don't open in this thread
|
||||||
|
// unless we need it. However, as of this writing the only operations
|
||||||
|
// that are executed in parallel are during ReceiveCommits, and they
|
||||||
|
// all need the repo open anyway. (The non-parallel case above does not
|
||||||
|
// reopen the repo.)
|
||||||
|
try (ReviewDb threadLocalDb = schemaFactory.open();
|
||||||
|
Repository repo = repoManager.openRepository(project);
|
||||||
|
RevWalk rw = new RevWalk(repo)) {
|
||||||
|
call(threadLocalDb, repo, rw);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void call(ReviewDb db, Repository repo, RevWalk rw)
|
||||||
|
throws Exception {
|
||||||
|
try {
|
||||||
ChangeContext ctx;
|
ChangeContext ctx;
|
||||||
NoteDbUpdateManager updateManager = null;
|
NoteDbUpdateManager updateManager = null;
|
||||||
boolean dirty = false;
|
boolean dirty = false;
|
||||||
|
db.changes().beginTransaction(id);
|
||||||
try {
|
try {
|
||||||
ctx = newChangeContext(id);
|
ctx = newChangeContext(db, repo, rw, id);
|
||||||
// Call updateChange on each op.
|
// Call updateChange on each op.
|
||||||
for (Op op : e.getValue()) {
|
for (Op op : changeOps) {
|
||||||
dirty |= op.updateChange(ctx);
|
dirty |= op.updateChange(ctx);
|
||||||
}
|
}
|
||||||
if (!dirty) {
|
if (!dirty) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
deleted = ctx.deleted;
|
||||||
|
|
||||||
// Stage the NoteDb update and store its state in the Change.
|
// Stage the NoteDb update and store its state in the Change.
|
||||||
if (!ctx.deleted && notesMigration.writeChanges()) {
|
if (notesMigration.writeChanges()) {
|
||||||
updateManager = stageNoteDbUpdate(ctx);
|
updateManager = stageNoteDbUpdate(ctx, deleted);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Bump lastUpdatedOn or rowVersion and commit.
|
// Bump lastUpdatedOn or rowVersion and commit.
|
||||||
@@ -597,7 +813,7 @@ public class BatchUpdate implements AutoCloseable {
|
|||||||
if (newChanges.containsKey(id)) {
|
if (newChanges.containsKey(id)) {
|
||||||
// Insert rather than upsert in case of a race on change IDs.
|
// Insert rather than upsert in case of a race on change IDs.
|
||||||
db.changes().insert(cs);
|
db.changes().insert(cs);
|
||||||
} else if (ctx.deleted) {
|
} else if (deleted) {
|
||||||
db.changes().delete(cs);
|
db.changes().delete(cs);
|
||||||
} else {
|
} else {
|
||||||
db.changes().update(cs);
|
db.changes().update(cs);
|
||||||
@@ -609,52 +825,63 @@ public class BatchUpdate implements AutoCloseable {
|
|||||||
|
|
||||||
if (notesMigration.writeChanges()) {
|
if (notesMigration.writeChanges()) {
|
||||||
try {
|
try {
|
||||||
if (updateManager != null) {
|
// Do not execute the NoteDbUpdateManager, as we don't want too much
|
||||||
// Execute NoteDb updates after committing ReviewDb updates.
|
// contention on the underlying repo, and we would rather use a
|
||||||
updateManager.execute();
|
// single ObjectInserter/BatchRefUpdate later.
|
||||||
}
|
//
|
||||||
if (ctx.deleted) {
|
// TODO(dborowitz): May or may not be worth trying to batch
|
||||||
new ChangeDelete(plcUtil, getRepository(), ctx.getNotes()).delete();
|
// together flushed inserters as well.
|
||||||
}
|
noteDbResult = updateManager.stage().get(id);
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
// Ignore all errors trying to update NoteDb at this point. We've
|
// Ignore all errors trying to update NoteDb at this point. We've
|
||||||
// already written the NoteDbChangeState to ReviewDb, which means
|
// already written the NoteDbChangeState to ReviewDb, which means
|
||||||
// if the state is out of date it will be rebuilt the next time it
|
// if the state is out of date it will be rebuilt the next time it
|
||||||
// is needed.
|
// is needed.
|
||||||
log.debug("Ignoring NoteDb update error after ReviewDb write", ex);
|
log.debug(
|
||||||
|
"Ignoring NoteDb update error after ReviewDb write", ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
// Reindex changes.
|
Throwables.propagateIfPossible(e, RestApiException.class);
|
||||||
if (ctx.deleted) {
|
throw new UpdateException(e);
|
||||||
indexFutures.add(indexer.deleteAsync(id));
|
|
||||||
} else {
|
|
||||||
indexFutures.add(indexer.indexAsync(ctx.getProject(), id));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
|
||||||
Throwables.propagateIfPossible(e, RestApiException.class);
|
|
||||||
throw new UpdateException(e);
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
private NoteDbUpdateManager stageNoteDbUpdate(ChangeContext ctx)
|
private ChangeContext newChangeContext(ReviewDb db, Repository repo,
|
||||||
throws OrmException, IOException {
|
RevWalk rw, Change.Id id) throws Exception {
|
||||||
NoteDbUpdateManager updateManager =
|
Change c = newChanges.get(id);
|
||||||
updateManagerFactory.create(ctx.getProject());
|
if (c == null) {
|
||||||
for (ChangeUpdate u : ctx.updates.values()) {
|
c = unwrap(db).changes().get(id);
|
||||||
updateManager.add(u);
|
}
|
||||||
|
// Pass in preloaded change to controlFor, to avoid:
|
||||||
|
// - reading from a db that does not belong to this update
|
||||||
|
// - attempting to read a change that doesn't exist yet
|
||||||
|
ChangeNotes notes = changeNotesFactory.createForNew(c);
|
||||||
|
ChangeControl ctl = changeControlFactory.controlFor(notes, user);
|
||||||
|
return new ChangeContext(ctl, new BatchUpdateReviewDb(db), repo, rw);
|
||||||
}
|
}
|
||||||
try {
|
|
||||||
NoteDbChangeState.applyDelta(
|
private NoteDbUpdateManager stageNoteDbUpdate(ChangeContext ctx,
|
||||||
ctx.getChange(),
|
boolean deleted) throws OrmException, IOException {
|
||||||
updateManager.stage().get(ctx.getChange().getId()));
|
NoteDbUpdateManager updateManager = updateManagerFactory
|
||||||
} catch (OrmConcurrencyException ex) {
|
.create(ctx.getProject())
|
||||||
// Refused to apply update because NoteDb was out of sync. Go ahead with
|
.setChangeRepo(ctx.getRepository(), ctx.getRevWalk(), null,
|
||||||
// this ReviewDb update; it's still out of sync, but this is no worse than
|
new ChainedReceiveCommands(repo));
|
||||||
// before, and it will eventually get rebuilt.
|
for (ChangeUpdate u : ctx.updates.values()) {
|
||||||
|
updateManager.add(u);
|
||||||
|
}
|
||||||
|
if (deleted) {
|
||||||
|
updateManager.deleteChange(ctx.getChange().getId());
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
NoteDbChangeState.applyDelta(ctx.getChange(), updateManager.stage());
|
||||||
|
} catch (OrmConcurrencyException ex) {
|
||||||
|
// Refused to apply update because NoteDb was out of sync. Go ahead with
|
||||||
|
// this ReviewDb update; it's still out of sync, but this is no worse
|
||||||
|
// than before, and it will eventually get rebuilt.
|
||||||
|
}
|
||||||
|
return updateManager;
|
||||||
}
|
}
|
||||||
return updateManager;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Iterable<Change> changesToUpdate(ChangeContext ctx) {
|
private static Iterable<Change> changesToUpdate(ChangeContext ctx) {
|
||||||
@@ -665,20 +892,6 @@ public class BatchUpdate implements AutoCloseable {
|
|||||||
return Collections.singleton(c);
|
return Collections.singleton(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ChangeContext newChangeContext(Change.Id id) throws Exception {
|
|
||||||
Change c = newChanges.get(id);
|
|
||||||
if (c == null) {
|
|
||||||
c = unwrap(db).changes().get(id);
|
|
||||||
}
|
|
||||||
// Pass in preloaded change to controlFor, to avoid:
|
|
||||||
// - reading from a db that does not belong to this update
|
|
||||||
// - attempting to read a change that doesn't exist yet
|
|
||||||
ChangeNotes notes = changeNotesFactory.createForNew(c);
|
|
||||||
ChangeContext ctx = new ChangeContext(
|
|
||||||
changeControlFactory.controlFor(notes, user), new BatchUpdateReviewDb(db));
|
|
||||||
return ctx;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void executePostOps() throws Exception {
|
private void executePostOps() throws Exception {
|
||||||
Context ctx = new Context();
|
Context ctx = new Context();
|
||||||
for (Op op : ops.values()) {
|
for (Op op : ops.values()) {
|
||||||
|
|||||||
@@ -801,8 +801,8 @@ public class ReceiveCommits {
|
|||||||
try (BatchUpdate bu = batchUpdateFactory.create(db,
|
try (BatchUpdate bu = batchUpdateFactory.create(db,
|
||||||
magicBranch.dest.getParentKey(), user, TimeUtil.nowTs());
|
magicBranch.dest.getParentKey(), user, TimeUtil.nowTs());
|
||||||
ObjectInserter ins = repo.newObjectInserter()) {
|
ObjectInserter ins = repo.newObjectInserter()) {
|
||||||
bu.setRepository(repo, rp.getRevWalk(), ins);
|
bu.setRepository(repo, rp.getRevWalk(), ins)
|
||||||
// TODO(dborowitz): Support parallel operations in BatchUpdate.
|
.updateChangesInParallel();
|
||||||
for (ReplaceRequest replace : replaceByChange.values()) {
|
for (ReplaceRequest replace : replaceByChange.values()) {
|
||||||
if (replace.inputCommand == magicBranch.cmd) {
|
if (replace.inputCommand == magicBranch.cmd) {
|
||||||
replace.addOps(bu);
|
replace.addOps(bu);
|
||||||
@@ -2337,8 +2337,8 @@ public class ReceiveCommits {
|
|||||||
try (BatchUpdate bu = batchUpdateFactory.create(db,
|
try (BatchUpdate bu = batchUpdateFactory.create(db,
|
||||||
projectControl.getProject().getNameKey(), user, TimeUtil.nowTs());
|
projectControl.getProject().getNameKey(), user, TimeUtil.nowTs());
|
||||||
ObjectInserter ins = repo.newObjectInserter()) {
|
ObjectInserter ins = repo.newObjectInserter()) {
|
||||||
bu.setRepository(repo, rp.getRevWalk(), ins);
|
bu.setRepository(repo, rp.getRevWalk(), ins)
|
||||||
// TODO(dborowitz): Change updates in parallel.
|
.updateChangesInParallel();
|
||||||
// TODO(dborowitz): Teach BatchUpdate to ignore missing changes.
|
// TODO(dborowitz): Teach BatchUpdate to ignore missing changes.
|
||||||
|
|
||||||
RevCommit newTip = rw.parseCommit(cmd.getNewId());
|
RevCommit newTip = rw.parseCommit(cmd.getNewId());
|
||||||
|
|||||||
@@ -192,6 +192,7 @@ public abstract class AbstractChangeUpdate {
|
|||||||
if (isEmpty()) {
|
if (isEmpty()) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
checkArgument(rw.getObjectReader().getCreatedFromInserter() == ins);
|
||||||
ObjectId z = ObjectId.zeroId();
|
ObjectId z = ObjectId.zeroId();
|
||||||
CommitBuilder cb = applyImpl(rw, ins, curr);
|
CommitBuilder cb = applyImpl(rw, ins, curr);
|
||||||
if (cb == null) {
|
if (cb == null) {
|
||||||
|
|||||||
@@ -1,66 +0,0 @@
|
|||||||
// Copyright (C) 2016 The Android Open Source Project
|
|
||||||
//
|
|
||||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
// you may not use this file except in compliance with the License.
|
|
||||||
// You may obtain a copy of the License at
|
|
||||||
//
|
|
||||||
// http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
//
|
|
||||||
// Unless required by applicable law or agreed to in writing, software
|
|
||||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
// See the License for the specific language governing permissions and
|
|
||||||
// limitations under the License.
|
|
||||||
|
|
||||||
package com.google.gerrit.server.notedb;
|
|
||||||
|
|
||||||
import com.google.gerrit.server.PatchLineCommentsUtil;
|
|
||||||
import com.google.gwtorm.server.OrmException;
|
|
||||||
|
|
||||||
import org.eclipse.jgit.lib.ObjectId;
|
|
||||||
import org.eclipse.jgit.lib.RefUpdate;
|
|
||||||
import org.eclipse.jgit.lib.Repository;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
public class ChangeDelete {
|
|
||||||
private final PatchLineCommentsUtil plcUtil;
|
|
||||||
private final Repository repo;
|
|
||||||
private final ChangeNotes notes;
|
|
||||||
|
|
||||||
public ChangeDelete(PatchLineCommentsUtil plcUtil, Repository repo,
|
|
||||||
ChangeNotes notes) {
|
|
||||||
this.plcUtil = plcUtil;
|
|
||||||
this.repo = repo;
|
|
||||||
this.notes = notes;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void delete() throws OrmException, IOException {
|
|
||||||
plcUtil.deleteAllDraftsFromAllUsers(notes.getChangeId());
|
|
||||||
|
|
||||||
RefUpdate ru = repo.updateRef(notes.getRefName());
|
|
||||||
ru.setExpectedOldObjectId(notes.load().getRevision());
|
|
||||||
ru.setNewObjectId(ObjectId.zeroId());
|
|
||||||
ru.setForceUpdate(true);
|
|
||||||
ru.setRefLogMessage("Delete change from NoteDb", false);
|
|
||||||
RefUpdate.Result result = ru.delete();
|
|
||||||
switch (result) {
|
|
||||||
case FAST_FORWARD:
|
|
||||||
case FORCED:
|
|
||||||
case NO_CHANGE:
|
|
||||||
break;
|
|
||||||
|
|
||||||
case IO_FAILURE:
|
|
||||||
case LOCK_FAILURE:
|
|
||||||
case NEW:
|
|
||||||
case NOT_ATTEMPTED:
|
|
||||||
case REJECTED:
|
|
||||||
case REJECTED_CURRENT_BRANCH:
|
|
||||||
case RENAMED:
|
|
||||||
default:
|
|
||||||
throw new IOException(String.format(
|
|
||||||
"Failed to delete change ref %s at %s: %s",
|
|
||||||
notes.getRefName(), notes.getRevision(), result));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -174,8 +174,7 @@ public class ChangeRebuilderImpl extends ChangeRebuilder {
|
|||||||
if (change == null) {
|
if (change == null) {
|
||||||
throw new NoSuchChangeException(changeId);
|
throw new NoSuchChangeException(changeId);
|
||||||
}
|
}
|
||||||
newState = NoteDbChangeState.applyDelta(
|
newState = NoteDbChangeState.applyDelta(change, manager.stage());
|
||||||
change, manager.stage().get(changeId));
|
|
||||||
db.changes().update(Collections.singleton(change));
|
db.changes().update(Collections.singleton(change));
|
||||||
db.commit();
|
db.commit();
|
||||||
} finally {
|
} finally {
|
||||||
@@ -191,8 +190,7 @@ public class ChangeRebuilderImpl extends ChangeRebuilder {
|
|||||||
OrmException, ConfigInvalidException {
|
OrmException, ConfigInvalidException {
|
||||||
Change change = new Change(bundle.getChange());
|
Change change = new Change(bundle.getChange());
|
||||||
buildUpdates(manager, bundle);
|
buildUpdates(manager, bundle);
|
||||||
return NoteDbChangeState.applyDelta(
|
return NoteDbChangeState.applyDelta(change, manager.stage());
|
||||||
change, manager.stage().get(change.getId()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -221,9 +221,7 @@ public class ChangeUpdate extends AbstractChangeUpdate {
|
|||||||
NoteDbUpdateManager updateManager =
|
NoteDbUpdateManager updateManager =
|
||||||
updateManagerFactory.create(getProjectName());
|
updateManagerFactory.create(getProjectName());
|
||||||
updateManager.add(this);
|
updateManager.add(this);
|
||||||
NoteDbChangeState.applyDelta(
|
NoteDbChangeState.applyDelta(getChange(), updateManager.stage());
|
||||||
getChange(),
|
|
||||||
updateManager.stage().get(getId()));
|
|
||||||
updateManager.execute();
|
updateManager.execute();
|
||||||
return getResult();
|
return getResult();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,135 @@
|
|||||||
|
// Copyright (C) 2016 The Android Open Source Project
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package com.google.gerrit.server.notedb;
|
||||||
|
|
||||||
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
|
||||||
|
import org.eclipse.jgit.errors.IncorrectObjectTypeException;
|
||||||
|
import org.eclipse.jgit.lib.AbbreviatedObjectId;
|
||||||
|
import org.eclipse.jgit.lib.AnyObjectId;
|
||||||
|
import org.eclipse.jgit.lib.ObjectId;
|
||||||
|
import org.eclipse.jgit.lib.ObjectInserter;
|
||||||
|
import org.eclipse.jgit.lib.ObjectLoader;
|
||||||
|
import org.eclipse.jgit.lib.ObjectReader;
|
||||||
|
import org.eclipse.jgit.transport.PackParser;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
class InMemoryInserter extends ObjectInserter {
|
||||||
|
private final ObjectReader reader;
|
||||||
|
private final Map<ObjectId, InsertedObject> inserted;
|
||||||
|
|
||||||
|
InMemoryInserter(ObjectReader reader) {
|
||||||
|
this.reader = checkNotNull(reader);
|
||||||
|
inserted = new LinkedHashMap<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ObjectId insert(int type, long length, InputStream in)
|
||||||
|
throws IOException {
|
||||||
|
return insert(InsertedObject.create(type, in));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ObjectId insert(int type, byte[] data) {
|
||||||
|
return insert(type, data, 0, data.length);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ObjectId insert(int type, byte[] data, int off, int len) {
|
||||||
|
return insert(InsertedObject.create(type, data, off, len));
|
||||||
|
}
|
||||||
|
|
||||||
|
private ObjectId insert(InsertedObject obj) {
|
||||||
|
inserted.put(obj.id(), obj);
|
||||||
|
return obj.id();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public PackParser newPackParser(InputStream in) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ObjectReader newReader() {
|
||||||
|
return new Reader();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void flush() {
|
||||||
|
// Do nothing; objects are not written to the repo.
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
// Do nothing; this class owns no open resources.
|
||||||
|
}
|
||||||
|
|
||||||
|
public ImmutableList<InsertedObject> getInsertedObjects() {
|
||||||
|
return ImmutableList.copyOf(inserted.values());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void clear() {
|
||||||
|
inserted.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
private class Reader extends ObjectReader {
|
||||||
|
@Override
|
||||||
|
public ObjectReader newReader() {
|
||||||
|
return new Reader();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Collection<ObjectId> resolve(AbbreviatedObjectId id) {
|
||||||
|
// This method should be unused by ChangeRebuilder.
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ObjectLoader open(AnyObjectId objectId, int typeHint)
|
||||||
|
throws IOException {
|
||||||
|
InsertedObject obj = inserted.get(objectId);
|
||||||
|
if (obj == null) {
|
||||||
|
return reader.open(objectId, typeHint);
|
||||||
|
}
|
||||||
|
if (typeHint != OBJ_ANY && obj.type() != typeHint) {
|
||||||
|
throw new IncorrectObjectTypeException(objectId.copy(), typeHint);
|
||||||
|
}
|
||||||
|
return obj.newLoader();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<ObjectId> getShallowCommits() throws IOException {
|
||||||
|
return reader.getShallowCommits();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
// Do nothing; this class owns no open resources.
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ObjectInserter getCreatedFromInserter() {
|
||||||
|
return InMemoryInserter.this;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,54 @@
|
|||||||
|
// Copyright (C) 2016 The Android Open Source Project
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package com.google.gerrit.server.notedb;
|
||||||
|
|
||||||
|
import com.google.auto.value.AutoValue;
|
||||||
|
import com.google.protobuf.ByteString;
|
||||||
|
|
||||||
|
import org.eclipse.jgit.lib.ObjectId;
|
||||||
|
import org.eclipse.jgit.lib.ObjectInserter;
|
||||||
|
import org.eclipse.jgit.lib.ObjectLoader;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
|
||||||
|
@AutoValue
|
||||||
|
public abstract class InsertedObject {
|
||||||
|
static InsertedObject create(int type, InputStream in) throws IOException {
|
||||||
|
return create(type, ByteString.readFrom(in));
|
||||||
|
}
|
||||||
|
|
||||||
|
static InsertedObject create(int type, ByteString bs) {
|
||||||
|
ObjectId id;
|
||||||
|
try (ObjectInserter.Formatter fmt = new ObjectInserter.Formatter()) {
|
||||||
|
id = fmt.idFor(type, bs.size(), bs.newInput());
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new IllegalStateException(e);
|
||||||
|
}
|
||||||
|
return new AutoValue_InsertedObject(id, type, bs);
|
||||||
|
}
|
||||||
|
|
||||||
|
static InsertedObject create(int type, byte[] src, int off, int len) {
|
||||||
|
return create(type, ByteString.copyFrom(src, off, len));
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract ObjectId id();
|
||||||
|
public abstract int type();
|
||||||
|
public abstract ByteString data();
|
||||||
|
|
||||||
|
ObjectLoader newLoader() {
|
||||||
|
return new ObjectLoader.SmallObject(type(), data().toByteArray());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -100,6 +100,12 @@ public class NoteDbChangeState {
|
|||||||
return new NoteDbChangeState(id, changeMetaId, draftIds);
|
return new NoteDbChangeState(id, changeMetaId, draftIds);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static NoteDbChangeState applyDelta(Change change,
|
||||||
|
Map<Change.Id, NoteDbUpdateManager.StagedResult> stagedResults) {
|
||||||
|
NoteDbUpdateManager.StagedResult r = stagedResults.get(change.getId());
|
||||||
|
return applyDelta(change, r != null ? r.delta() : null);
|
||||||
|
}
|
||||||
|
|
||||||
public static NoteDbChangeState applyDelta(Change change, Delta delta) {
|
public static NoteDbChangeState applyDelta(Change change, Delta delta) {
|
||||||
if (delta == null) {
|
if (delta == null) {
|
||||||
return null;
|
return null;
|
||||||
|
|||||||
@@ -20,15 +20,19 @@ import static com.google.common.base.Preconditions.checkState;
|
|||||||
import static com.google.gerrit.reviewdb.client.RefNames.REFS_DRAFT_COMMENTS;
|
import static com.google.gerrit.reviewdb.client.RefNames.REFS_DRAFT_COMMENTS;
|
||||||
import static com.google.gerrit.server.notedb.NoteDbTable.CHANGES;
|
import static com.google.gerrit.server.notedb.NoteDbTable.CHANGES;
|
||||||
|
|
||||||
|
import com.google.auto.value.AutoValue;
|
||||||
import com.google.common.base.Optional;
|
import com.google.common.base.Optional;
|
||||||
import com.google.common.collect.ArrayListMultimap;
|
import com.google.common.collect.ArrayListMultimap;
|
||||||
import com.google.common.collect.HashBasedTable;
|
import com.google.common.collect.HashBasedTable;
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ListMultimap;
|
import com.google.common.collect.ListMultimap;
|
||||||
import com.google.common.collect.Table;
|
import com.google.common.collect.Table;
|
||||||
|
import com.google.gerrit.common.Nullable;
|
||||||
import com.google.gerrit.metrics.Timer1;
|
import com.google.gerrit.metrics.Timer1;
|
||||||
import com.google.gerrit.reviewdb.client.Account;
|
import com.google.gerrit.reviewdb.client.Account;
|
||||||
import com.google.gerrit.reviewdb.client.Change;
|
import com.google.gerrit.reviewdb.client.Change;
|
||||||
import com.google.gerrit.reviewdb.client.Project;
|
import com.google.gerrit.reviewdb.client.Project;
|
||||||
|
import com.google.gerrit.reviewdb.client.RefNames;
|
||||||
import com.google.gerrit.server.config.AllUsersName;
|
import com.google.gerrit.server.config.AllUsersName;
|
||||||
import com.google.gerrit.server.git.ChainedReceiveCommands;
|
import com.google.gerrit.server.git.ChainedReceiveCommands;
|
||||||
import com.google.gerrit.server.git.GitRepositoryManager;
|
import com.google.gerrit.server.git.GitRepositoryManager;
|
||||||
@@ -41,6 +45,8 @@ import org.eclipse.jgit.lib.BatchRefUpdate;
|
|||||||
import org.eclipse.jgit.lib.NullProgressMonitor;
|
import org.eclipse.jgit.lib.NullProgressMonitor;
|
||||||
import org.eclipse.jgit.lib.ObjectId;
|
import org.eclipse.jgit.lib.ObjectId;
|
||||||
import org.eclipse.jgit.lib.ObjectInserter;
|
import org.eclipse.jgit.lib.ObjectInserter;
|
||||||
|
import org.eclipse.jgit.lib.ObjectReader;
|
||||||
|
import org.eclipse.jgit.lib.Ref;
|
||||||
import org.eclipse.jgit.lib.Repository;
|
import org.eclipse.jgit.lib.Repository;
|
||||||
import org.eclipse.jgit.revwalk.RevWalk;
|
import org.eclipse.jgit.revwalk.RevWalk;
|
||||||
import org.eclipse.jgit.transport.ReceiveCommand;
|
import org.eclipse.jgit.transport.ReceiveCommand;
|
||||||
@@ -66,19 +72,58 @@ public class NoteDbUpdateManager {
|
|||||||
NoteDbUpdateManager create(Project.NameKey projectName);
|
NoteDbUpdateManager create(Project.NameKey projectName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@AutoValue
|
||||||
|
public abstract static class StagedResult {
|
||||||
|
private static StagedResult create(Change.Id id, NoteDbChangeState.Delta delta,
|
||||||
|
OpenRepo changeRepo, OpenRepo allUsersRepo) {
|
||||||
|
ImmutableList<ReceiveCommand> changeCommands = ImmutableList.of();
|
||||||
|
ImmutableList<InsertedObject> changeObjects = ImmutableList.of();
|
||||||
|
if (changeRepo != null) {
|
||||||
|
changeCommands = changeRepo.getCommandsSnapshot();
|
||||||
|
changeObjects = changeRepo.tempIns.getInsertedObjects();
|
||||||
|
}
|
||||||
|
ImmutableList<ReceiveCommand> allUsersCommands = ImmutableList.of();
|
||||||
|
ImmutableList<InsertedObject> allUsersObjects = ImmutableList.of();
|
||||||
|
if (allUsersRepo != null) {
|
||||||
|
allUsersCommands = allUsersRepo.getCommandsSnapshot();
|
||||||
|
allUsersObjects = allUsersRepo.tempIns.getInsertedObjects();
|
||||||
|
}
|
||||||
|
return new AutoValue_NoteDbUpdateManager_StagedResult(
|
||||||
|
id, delta,
|
||||||
|
changeCommands, changeObjects,
|
||||||
|
allUsersCommands, allUsersObjects);
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract Change.Id id();
|
||||||
|
@Nullable public abstract NoteDbChangeState.Delta delta();
|
||||||
|
public abstract ImmutableList<ReceiveCommand> changeCommands();
|
||||||
|
public abstract ImmutableList<InsertedObject> changeObjects();
|
||||||
|
|
||||||
|
public abstract ImmutableList<ReceiveCommand> allUsersCommands();
|
||||||
|
public abstract ImmutableList<InsertedObject> allUsersObjects();
|
||||||
|
}
|
||||||
|
|
||||||
static class OpenRepo implements AutoCloseable {
|
static class OpenRepo implements AutoCloseable {
|
||||||
final Repository repo;
|
final Repository repo;
|
||||||
final RevWalk rw;
|
final RevWalk rw;
|
||||||
final ObjectInserter ins;
|
|
||||||
final ChainedReceiveCommands cmds;
|
final ChainedReceiveCommands cmds;
|
||||||
|
|
||||||
|
private final InMemoryInserter tempIns;
|
||||||
|
@Nullable private final ObjectInserter finalIns;
|
||||||
|
|
||||||
private final boolean close;
|
private final boolean close;
|
||||||
|
|
||||||
OpenRepo(Repository repo, RevWalk rw, ObjectInserter ins,
|
private OpenRepo(Repository repo, RevWalk rw, @Nullable ObjectInserter ins,
|
||||||
ChainedReceiveCommands cmds, boolean close) {
|
ChainedReceiveCommands cmds, boolean close) {
|
||||||
|
ObjectReader reader = rw.getObjectReader();
|
||||||
|
checkArgument(ins == null || reader.getCreatedFromInserter() == ins,
|
||||||
|
"expected reader to be created from %s, but was %s",
|
||||||
|
ins, reader.getCreatedFromInserter());
|
||||||
this.repo = checkNotNull(repo);
|
this.repo = checkNotNull(repo);
|
||||||
this.rw = checkNotNull(rw);
|
this.tempIns = new InMemoryInserter(rw.getObjectReader());
|
||||||
this.ins = ins;
|
this.rw = new RevWalk(tempIns.newReader());
|
||||||
this.cmds = cmds;
|
this.finalIns = ins;
|
||||||
|
this.cmds = checkNotNull(cmds);
|
||||||
this.close = close;
|
this.close = close;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -86,11 +131,26 @@ public class NoteDbUpdateManager {
|
|||||||
return cmds.get(refName);
|
return cmds.get(refName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ImmutableList<ReceiveCommand> getCommandsSnapshot() {
|
||||||
|
return ImmutableList.copyOf(cmds.getCommands().values());
|
||||||
|
}
|
||||||
|
|
||||||
|
void flush() throws IOException {
|
||||||
|
checkState(finalIns != null);
|
||||||
|
for (InsertedObject obj : tempIns.getInsertedObjects()) {
|
||||||
|
finalIns.insert(obj.type(), obj.data().toByteArray());
|
||||||
|
}
|
||||||
|
finalIns.flush();
|
||||||
|
tempIns.clear();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
|
rw.close();
|
||||||
if (close) {
|
if (close) {
|
||||||
ins.close();
|
if (finalIns != null) {
|
||||||
rw.close();
|
finalIns.close();
|
||||||
|
}
|
||||||
repo.close();
|
repo.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -103,10 +163,11 @@ public class NoteDbUpdateManager {
|
|||||||
private final Project.NameKey projectName;
|
private final Project.NameKey projectName;
|
||||||
private final ListMultimap<String, ChangeUpdate> changeUpdates;
|
private final ListMultimap<String, ChangeUpdate> changeUpdates;
|
||||||
private final ListMultimap<String, ChangeDraftUpdate> draftUpdates;
|
private final ListMultimap<String, ChangeDraftUpdate> draftUpdates;
|
||||||
|
private final Set<Change.Id> toDelete;
|
||||||
|
|
||||||
private OpenRepo changeRepo;
|
private OpenRepo changeRepo;
|
||||||
private OpenRepo allUsersRepo;
|
private OpenRepo allUsersRepo;
|
||||||
private Map<Change.Id, NoteDbChangeState.Delta> staged;
|
private Map<Change.Id, StagedResult> staged;
|
||||||
private boolean checkExpectedState;
|
private boolean checkExpectedState;
|
||||||
|
|
||||||
@AssistedInject
|
@AssistedInject
|
||||||
@@ -122,17 +183,18 @@ public class NoteDbUpdateManager {
|
|||||||
this.projectName = projectName;
|
this.projectName = projectName;
|
||||||
changeUpdates = ArrayListMultimap.create();
|
changeUpdates = ArrayListMultimap.create();
|
||||||
draftUpdates = ArrayListMultimap.create();
|
draftUpdates = ArrayListMultimap.create();
|
||||||
|
toDelete = new HashSet<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
public NoteDbUpdateManager setChangeRepo(Repository repo, RevWalk rw,
|
public NoteDbUpdateManager setChangeRepo(Repository repo, RevWalk rw,
|
||||||
ObjectInserter ins, ChainedReceiveCommands cmds) {
|
@Nullable ObjectInserter ins, ChainedReceiveCommands cmds) {
|
||||||
checkState(changeRepo == null, "change repo already initialized");
|
checkState(changeRepo == null, "change repo already initialized");
|
||||||
changeRepo = new OpenRepo(repo, rw, ins, cmds, false);
|
changeRepo = new OpenRepo(repo, rw, ins, cmds, false);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public NoteDbUpdateManager setAllUsersRepo(Repository repo, RevWalk rw,
|
public NoteDbUpdateManager setAllUsersRepo(Repository repo, RevWalk rw,
|
||||||
ObjectInserter ins, ChainedReceiveCommands cmds) {
|
@Nullable ObjectInserter ins, ChainedReceiveCommands cmds) {
|
||||||
checkState(allUsersRepo == null, "All-Users repo already initialized");
|
checkState(allUsersRepo == null, "All-Users repo already initialized");
|
||||||
allUsersRepo = new OpenRepo(repo, rw, ins, cmds, false);
|
allUsersRepo = new OpenRepo(repo, rw, ins, cmds, false);
|
||||||
return this;
|
return this;
|
||||||
@@ -177,7 +239,8 @@ public class NoteDbUpdateManager {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return changeUpdates.isEmpty()
|
return changeUpdates.isEmpty()
|
||||||
&& draftUpdates.isEmpty();
|
&& draftUpdates.isEmpty()
|
||||||
|
&& toDelete.isEmpty();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -205,6 +268,11 @@ public class NoteDbUpdateManager {
|
|||||||
draftUpdates.put(draftUpdate.getRefName(), draftUpdate);
|
draftUpdates.put(draftUpdate.getRefName(), draftUpdate);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void deleteChange(Change.Id id) {
|
||||||
|
checkState(staged == null, "cannot add new change to delete after staging");
|
||||||
|
toDelete.add(id);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stage updates in the manager's internal list of commands.
|
* Stage updates in the manager's internal list of commands.
|
||||||
*
|
*
|
||||||
@@ -213,7 +281,7 @@ public class NoteDbUpdateManager {
|
|||||||
* @throws OrmException if a database layer error occurs.
|
* @throws OrmException if a database layer error occurs.
|
||||||
* @throws IOException if a storage layer error occurs.
|
* @throws IOException if a storage layer error occurs.
|
||||||
*/
|
*/
|
||||||
public Map<Change.Id, NoteDbChangeState.Delta> stage()
|
public Map<Change.Id, StagedResult> stage()
|
||||||
throws OrmException, IOException {
|
throws OrmException, IOException {
|
||||||
if (staged != null) {
|
if (staged != null) {
|
||||||
return staged;
|
return staged;
|
||||||
@@ -225,7 +293,7 @@ public class NoteDbUpdateManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
initChangeRepo();
|
initChangeRepo();
|
||||||
if (!draftUpdates.isEmpty()) {
|
if (!draftUpdates.isEmpty() || !toDelete.isEmpty()) {
|
||||||
initAllUsersRepo();
|
initAllUsersRepo();
|
||||||
}
|
}
|
||||||
checkExpectedState();
|
checkExpectedState();
|
||||||
@@ -233,24 +301,31 @@ public class NoteDbUpdateManager {
|
|||||||
|
|
||||||
Table<Change.Id, Account.Id, ObjectId> allDraftIds = getDraftIds();
|
Table<Change.Id, Account.Id, ObjectId> allDraftIds = getDraftIds();
|
||||||
Set<Change.Id> changeIds = new HashSet<>();
|
Set<Change.Id> changeIds = new HashSet<>();
|
||||||
for (ReceiveCommand cmd : changeRepo.cmds.getCommands().values()) {
|
for (ReceiveCommand cmd : changeRepo.getCommandsSnapshot()) {
|
||||||
Change.Id changeId = Change.Id.fromRef(cmd.getRefName());
|
Change.Id changeId = Change.Id.fromRef(cmd.getRefName());
|
||||||
changeIds.add(changeId);
|
changeIds.add(changeId);
|
||||||
Optional<ObjectId> metaId = Optional.of(cmd.getNewId());
|
Optional<ObjectId> metaId = Optional.of(cmd.getNewId());
|
||||||
staged.put(
|
staged.put(
|
||||||
changeId,
|
changeId,
|
||||||
NoteDbChangeState.Delta.create(
|
StagedResult.create(
|
||||||
changeId, metaId, allDraftIds.rowMap().remove(changeId)));
|
changeId,
|
||||||
|
NoteDbChangeState.Delta.create(
|
||||||
|
changeId, metaId, allDraftIds.rowMap().remove(changeId)),
|
||||||
|
changeRepo, allUsersRepo));
|
||||||
}
|
}
|
||||||
|
|
||||||
for (Map.Entry<Change.Id, Map<Account.Id, ObjectId>> e
|
for (Map.Entry<Change.Id, Map<Account.Id, ObjectId>> e
|
||||||
: allDraftIds.rowMap().entrySet()) {
|
: allDraftIds.rowMap().entrySet()) {
|
||||||
// If a change remains in the table at this point, it means we are
|
// If a change remains in the table at this point, it means we are
|
||||||
// updating its drafts but not the change itself.
|
// updating its drafts but not the change itself.
|
||||||
staged.put(
|
StagedResult r = StagedResult.create(
|
||||||
e.getKey(),
|
e.getKey(),
|
||||||
NoteDbChangeState.Delta.create(
|
NoteDbChangeState.Delta.create(
|
||||||
e.getKey(), Optional.<ObjectId>absent(), e.getValue()));
|
e.getKey(), Optional.<ObjectId>absent(), e.getValue()),
|
||||||
|
changeRepo, allUsersRepo);
|
||||||
|
checkState(r.changeCommands().isEmpty(),
|
||||||
|
"should not have change commands when updating only drafts: %s", r);
|
||||||
|
staged.put(r.id(), r);
|
||||||
}
|
}
|
||||||
|
|
||||||
return staged;
|
return staged;
|
||||||
@@ -262,7 +337,7 @@ public class NoteDbUpdateManager {
|
|||||||
if (allUsersRepo == null) {
|
if (allUsersRepo == null) {
|
||||||
return draftIds;
|
return draftIds;
|
||||||
}
|
}
|
||||||
for (ReceiveCommand cmd : allUsersRepo.cmds.getCommands().values()) {
|
for (ReceiveCommand cmd : allUsersRepo.getCommandsSnapshot()) {
|
||||||
String r = cmd.getRefName();
|
String r = cmd.getRefName();
|
||||||
if (r.startsWith(REFS_DRAFT_COMMENTS)) {
|
if (r.startsWith(REFS_DRAFT_COMMENTS)) {
|
||||||
Change.Id changeId =
|
Change.Id changeId =
|
||||||
@@ -281,7 +356,6 @@ public class NoteDbUpdateManager {
|
|||||||
}
|
}
|
||||||
try (Timer1.Context timer = metrics.updateLatency.start(CHANGES)) {
|
try (Timer1.Context timer = metrics.updateLatency.start(CHANGES)) {
|
||||||
stage();
|
stage();
|
||||||
|
|
||||||
// ChangeUpdates must execute before ChangeDraftUpdates.
|
// ChangeUpdates must execute before ChangeDraftUpdates.
|
||||||
//
|
//
|
||||||
// ChangeUpdate will automatically delete draft comments for any published
|
// ChangeUpdate will automatically delete draft comments for any published
|
||||||
@@ -306,7 +380,7 @@ public class NoteDbUpdateManager {
|
|||||||
if (or == null || or.cmds.isEmpty()) {
|
if (or == null || or.cmds.isEmpty()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
or.ins.flush();
|
or.flush();
|
||||||
BatchRefUpdate bru = or.repo.getRefDatabase().newBatchUpdate();
|
BatchRefUpdate bru = or.repo.getRefDatabase().newBatchUpdate();
|
||||||
or.cmds.addTo(bru);
|
or.cmds.addTo(bru);
|
||||||
bru.setAllowNonFastForwards(true);
|
bru.setAllowNonFastForwards(true);
|
||||||
@@ -330,9 +404,31 @@ public class NoteDbUpdateManager {
|
|||||||
if (!draftUpdates.isEmpty()) {
|
if (!draftUpdates.isEmpty()) {
|
||||||
addUpdates(draftUpdates, allUsersRepo);
|
addUpdates(draftUpdates, allUsersRepo);
|
||||||
}
|
}
|
||||||
|
for (Change.Id id : toDelete) {
|
||||||
|
doDelete(id);
|
||||||
|
}
|
||||||
checkExpectedState();
|
checkExpectedState();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void doDelete(Change.Id id) throws IOException {
|
||||||
|
String metaRef = RefNames.changeMetaRef(id);
|
||||||
|
Optional<ObjectId> old = changeRepo.cmds.get(metaRef);
|
||||||
|
if (old.isPresent()) {
|
||||||
|
changeRepo.cmds.add(
|
||||||
|
new ReceiveCommand(old.get(), ObjectId.zeroId(), metaRef));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Just scan repo for ref names, but get "old" values from cmds.
|
||||||
|
for (Ref r : allUsersRepo.repo.getRefDatabase().getRefs(
|
||||||
|
RefNames.refsDraftCommentsPrefix(id)).values()) {
|
||||||
|
old = allUsersRepo.cmds.get(r.getName());
|
||||||
|
if (old.isPresent()) {
|
||||||
|
allUsersRepo.cmds.add(
|
||||||
|
new ReceiveCommand(old.get(), ObjectId.zeroId(), r.getName()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void checkExpectedState() throws OrmException, IOException {
|
private void checkExpectedState() throws OrmException, IOException {
|
||||||
if (!checkExpectedState) {
|
if (!checkExpectedState) {
|
||||||
return;
|
return;
|
||||||
@@ -404,7 +500,7 @@ public class NoteDbUpdateManager {
|
|||||||
|
|
||||||
ObjectId curr = old;
|
ObjectId curr = old;
|
||||||
for (U u : updates) {
|
for (U u : updates) {
|
||||||
ObjectId next = u.apply(or.rw, or.ins, curr);
|
ObjectId next = u.apply(or.rw, or.tempIns, curr);
|
||||||
if (next == null) {
|
if (next == null) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ package com.google.gerrit.testutil;
|
|||||||
|
|
||||||
import static com.google.inject.Scopes.SINGLETON;
|
import static com.google.inject.Scopes.SINGLETON;
|
||||||
|
|
||||||
|
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
import com.google.gerrit.common.ChangeHooks;
|
import com.google.gerrit.common.ChangeHooks;
|
||||||
import com.google.gerrit.common.DisabledChangeHooks;
|
import com.google.gerrit.common.DisabledChangeHooks;
|
||||||
@@ -44,6 +45,7 @@ import com.google.gerrit.server.config.SitePath;
|
|||||||
import com.google.gerrit.server.config.TrackingFooters;
|
import com.google.gerrit.server.config.TrackingFooters;
|
||||||
import com.google.gerrit.server.config.TrackingFootersProvider;
|
import com.google.gerrit.server.config.TrackingFootersProvider;
|
||||||
import com.google.gerrit.server.git.ChangeCacheImplModule;
|
import com.google.gerrit.server.git.ChangeCacheImplModule;
|
||||||
|
import com.google.gerrit.server.git.ChangeUpdateExecutor;
|
||||||
import com.google.gerrit.server.git.GarbageCollection;
|
import com.google.gerrit.server.git.GarbageCollection;
|
||||||
import com.google.gerrit.server.git.GitRepositoryManager;
|
import com.google.gerrit.server.git.GitRepositoryManager;
|
||||||
import com.google.gerrit.server.git.PerThreadRequestScope;
|
import com.google.gerrit.server.git.PerThreadRequestScope;
|
||||||
@@ -167,6 +169,9 @@ public class InMemoryModule extends FactoryModule {
|
|||||||
bind(TrackingFooters.class).toProvider(TrackingFootersProvider.class)
|
bind(TrackingFooters.class).toProvider(TrackingFootersProvider.class)
|
||||||
.in(SINGLETON);
|
.in(SINGLETON);
|
||||||
bind(NotesMigration.class).toInstance(notesMigration);
|
bind(NotesMigration.class).toInstance(notesMigration);
|
||||||
|
bind(ListeningExecutorService.class)
|
||||||
|
.annotatedWith(ChangeUpdateExecutor.class)
|
||||||
|
.toInstance(MoreExecutors.newDirectExecutorService());
|
||||||
|
|
||||||
bind(DataSourceType.class)
|
bind(DataSourceType.class)
|
||||||
.to(InMemoryH2Type.class);
|
.to(InMemoryH2Type.class);
|
||||||
|
|||||||
Reference in New Issue
Block a user