Finish indexing before sending email
Project watches need to see the most up-to-date information about a change when evaluating their watch predicates. The match() implementation for predicates over full-text fields is to search the secondary index for "id:X field:{text}", which means the secondary index update needs to be completed before even beginning to send the email. For example, if there is a watch for "message:foo", the user wants to be notified on a new patch set only if the new commit message contains "foo". Since the predicate "new commit message contains 'foo'" is evaluated by executing a full-text search on the secondary index, we need to ensure the secondary index is up to date. Prior to this change, if the user pushed a new patch set _not_ containing "foo", the watch evaluation would be racing with the index update, resulting in the project watch possibly matching against an old version of the commit message. To ensure this in all cases, we have to do a lot more synchronous indexing steps. Very few index operations can now be async, but that's fine. (In theory we could split up the email interface to collect recipient information synchronously but still do the actual sending asynchronously, but that's considerably more work, and will probably wait until a hypothetical future when we rewrite the email processing entirely.) Checked all callers of indexAsync to find all instances of this bug. Change-Id: I7f6d6e42fe45c64749205cdbc1a0105a39e90fbe
This commit is contained in:
@@ -15,7 +15,6 @@
|
||||
package com.google.gerrit.server.change;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.util.concurrent.CheckedFuture;
|
||||
import com.google.gerrit.common.ChangeHooks;
|
||||
import com.google.gerrit.extensions.api.changes.AbandonInput;
|
||||
import com.google.gerrit.extensions.common.ChangeInfo;
|
||||
@@ -123,8 +122,7 @@ public class Abandon implements RestModifyView<ChangeResource, AbandonInput>,
|
||||
}
|
||||
update.commit();
|
||||
|
||||
CheckedFuture<?, IOException> indexFuture =
|
||||
indexer.indexAsync(change.getId());
|
||||
indexer.index(db, change);
|
||||
try {
|
||||
ReplyToChangeSender cm = abandonedSenderFactory.create(change.getId());
|
||||
cm.setFrom(caller.getAccountId());
|
||||
@@ -133,7 +131,6 @@ public class Abandon implements RestModifyView<ChangeResource, AbandonInput>,
|
||||
} catch (Exception e) {
|
||||
log.error("Cannot email update for change " + change.getChangeId(), e);
|
||||
}
|
||||
indexFuture.checkedGet();
|
||||
hooks.doChangeAbandonedHook(change,
|
||||
caller.getAccount(),
|
||||
db.patchSets().get(change.currentPatchSetId()),
|
||||
|
@@ -233,10 +233,10 @@ public class ChangeInserter {
|
||||
}
|
||||
|
||||
CheckedFuture<?, IOException> f = indexer.indexAsync(change.getId());
|
||||
|
||||
if (!messageIsForChange()) {
|
||||
commitMessageNotForChange();
|
||||
}
|
||||
f.checkedGet();
|
||||
|
||||
if (sendMail) {
|
||||
Runnable sender = new Runnable() {
|
||||
@@ -266,7 +266,6 @@ public class ChangeInserter {
|
||||
sender.run();
|
||||
}
|
||||
}
|
||||
f.checkedGet();
|
||||
|
||||
gitRefUpdated.fire(change.getProject(), patchSet.getRefName(),
|
||||
ObjectId.zeroId(), commit);
|
||||
|
@@ -22,8 +22,6 @@ import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.util.concurrent.CheckedFuture;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.gerrit.common.ChangeHooks;
|
||||
import com.google.gerrit.common.TimeUtil;
|
||||
import com.google.gerrit.common.data.LabelType;
|
||||
@@ -180,11 +178,8 @@ public class PostReview implements RestModifyView<RevisionResource, ReviewInput>
|
||||
db.get().rollback();
|
||||
}
|
||||
|
||||
CheckedFuture<?, IOException> indexWrite;
|
||||
if (dirty) {
|
||||
indexWrite = indexer.indexAsync(change.getId());
|
||||
} else {
|
||||
indexWrite = Futures.<Void, IOException> immediateCheckedFuture(null);
|
||||
indexer.index(db.get(), change);
|
||||
}
|
||||
if (message != null && input.notify.compareTo(NotifyHandling.NONE) > 0) {
|
||||
email.create(
|
||||
@@ -198,7 +193,6 @@ public class PostReview implements RestModifyView<RevisionResource, ReviewInput>
|
||||
|
||||
Output output = new Output();
|
||||
output.labels = input.labels;
|
||||
indexWrite.checkedGet();
|
||||
if (message != null) {
|
||||
fireCommentAddedHook(revision);
|
||||
}
|
||||
|
@@ -255,8 +255,8 @@ public class PostReviewers implements RestModifyView<ChangeResource, AddReviewer
|
||||
ImmutableList.of(psa)));
|
||||
}
|
||||
accountLoaderFactory.create(true).fill(result.reviewers);
|
||||
emailReviewers(rsrc.getChange(), added);
|
||||
indexFuture.checkedGet();
|
||||
emailReviewers(rsrc.getChange(), added);
|
||||
if (!added.isEmpty()) {
|
||||
PatchSet patchSet = dbProvider.get().patchSets().get(rsrc.getChange().currentPatchSetId());
|
||||
for (PatchSetApproval psa : added) {
|
||||
|
@@ -14,7 +14,6 @@
|
||||
|
||||
package com.google.gerrit.server.change;
|
||||
|
||||
import com.google.common.util.concurrent.CheckedFuture;
|
||||
import com.google.gerrit.common.ChangeHooks;
|
||||
import com.google.gerrit.extensions.restapi.AuthException;
|
||||
import com.google.gerrit.extensions.restapi.ResourceConflictException;
|
||||
@@ -82,13 +81,11 @@ public class PublishDraftPatchSet implements RestModifyView<RevisionResource, In
|
||||
|
||||
if (!updatedPatchSet.isDraft()
|
||||
|| updatedChange.getStatus() == Change.Status.NEW) {
|
||||
CheckedFuture<?, IOException> indexFuture =
|
||||
indexer.indexAsync(updatedChange.getId());
|
||||
indexer.index(dbProvider.get(), updatedChange);
|
||||
sender.send(rsrc.getNotes(), update,
|
||||
rsrc.getChange().getStatus() == Change.Status.DRAFT,
|
||||
rsrc.getUser(), updatedChange, updatedPatchSet,
|
||||
rsrc.getControl().getLabelTypes());
|
||||
indexFuture.checkedGet();
|
||||
hooks.doDraftPublishedHook(updatedChange, updatedPatchSet,
|
||||
dbProvider.get());
|
||||
}
|
||||
|
@@ -15,7 +15,6 @@
|
||||
package com.google.gerrit.server.change;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.util.concurrent.CheckedFuture;
|
||||
import com.google.gerrit.common.ChangeHooks;
|
||||
import com.google.gerrit.extensions.api.changes.RestoreInput;
|
||||
import com.google.gerrit.extensions.common.ChangeInfo;
|
||||
@@ -122,7 +121,7 @@ public class Restore implements RestModifyView<ChangeResource, RestoreInput>,
|
||||
}
|
||||
update.commit();
|
||||
|
||||
CheckedFuture<?, IOException> f = indexer.indexAsync(change.getId());
|
||||
indexer.index(db, change);
|
||||
|
||||
try {
|
||||
ReplyToChangeSender cm = restoredSenderFactory.create(change.getId());
|
||||
@@ -132,7 +131,6 @@ public class Restore implements RestModifyView<ChangeResource, RestoreInput>,
|
||||
} catch (Exception e) {
|
||||
log.error("Cannot email update for change " + change.getChangeId(), e);
|
||||
}
|
||||
f.checkedGet();
|
||||
hooks.doChangeRestoredHook(change,
|
||||
caller.getAccount(),
|
||||
db.patchSets().get(change.currentPatchSetId()),
|
||||
|
@@ -24,7 +24,6 @@ import com.google.common.collect.ArrayListMultimap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.ListMultimap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.util.concurrent.CheckedFuture;
|
||||
import com.google.gerrit.common.ChangeHooks;
|
||||
import com.google.gerrit.common.Nullable;
|
||||
import com.google.gerrit.common.TimeUtil;
|
||||
@@ -1187,12 +1186,7 @@ public class MergeOp {
|
||||
update.commit();
|
||||
}
|
||||
|
||||
CheckedFuture<?, IOException> indexFuture;
|
||||
if (change != null) {
|
||||
indexFuture = indexer.indexAsync(change.getId());
|
||||
} else {
|
||||
indexFuture = null;
|
||||
}
|
||||
indexer.index(db, change);
|
||||
final PatchSetApproval from = submitter;
|
||||
workQueue.getDefaultQueue()
|
||||
.submit(requestScopePropagator.wrap(new Runnable() {
|
||||
@@ -1230,14 +1224,6 @@ public class MergeOp {
|
||||
}
|
||||
}));
|
||||
|
||||
if (indexFuture != null) {
|
||||
try {
|
||||
indexFuture.checkedGet();
|
||||
} catch (IOException e) {
|
||||
logError("Failed to index new change message", e);
|
||||
}
|
||||
}
|
||||
|
||||
if (submitter != null) {
|
||||
try {
|
||||
hooks.doMergeFailedHook(c,
|
||||
|
@@ -2206,7 +2206,7 @@ public class ReceiveCommits {
|
||||
if (cmd.getResult() == NOT_ATTEMPTED) {
|
||||
cmd.execute(rp);
|
||||
}
|
||||
CheckedFuture<?, IOException> f = indexer.indexAsync(change.getId());
|
||||
indexer.index(db, change);
|
||||
if (changeKind != ChangeKind.TRIVIAL_REBASE) {
|
||||
workQueue.getDefaultQueue()
|
||||
.submit(requestScopePropagator.wrap(new Runnable() {
|
||||
@@ -2235,7 +2235,6 @@ public class ReceiveCommits {
|
||||
}
|
||||
}));
|
||||
}
|
||||
f.checkedGet();
|
||||
|
||||
gitRefUpdated.fire(project.getNameKey(), newPatchSet.getRefName(),
|
||||
ObjectId.zeroId(), newCommit);
|
||||
|
Reference in New Issue
Block a user