Pass only Change.Id for async background indexing

The background worker thread creates its own database connection
to read the record. The change could have been modified between
the time it is enqueued, and the time the indexing is processed.
Always read from the database in the background thread to ensure
the index gets the most recent possible data.

This may fix some index state errors on gerrit-review (and friends).

Change-Id: Ifab8d0ba7c9dcac54b7e168b86e9a82a3c681ea5
This commit is contained in:
Shawn Pearce
2013-12-30 14:42:08 -08:00
parent b9e64c2704
commit 6f3453863a
9 changed files with 26 additions and 23 deletions

View File

@@ -107,7 +107,8 @@ public class Abandon implements RestModifyView<ChangeResource, AbandonInput>,
db.rollback(); db.rollback();
} }
CheckedFuture<?, IOException> indexFuture = indexer.indexAsync(change); CheckedFuture<?, IOException> indexFuture =
indexer.indexAsync(change.getId());
try { try {
ReplyToChangeSender cm = abandonedSenderFactory.create(change); ReplyToChangeSender cm = abandonedSenderFactory.create(change);
cm.setFrom(caller.getAccountId()); cm.setFrom(caller.getAccountId());

View File

@@ -185,7 +185,8 @@ public class MergeabilityChecker implements GitReferenceUpdatedListener {
* reindexes the change (whether the mergeability flag was updated or * reindexes the change (whether the mergeability flag was updated or
* not) * not)
*/ */
public CheckedFuture<?, IOException> updateAndIndexAsync(final Change change) { public CheckedFuture<?, IOException> updateAndIndexAsync(Change change) {
final Change.Id id = change.getId();
return Futures.makeChecked( return Futures.makeChecked(
Futures.transform(updateAsync(change), Futures.transform(updateAsync(change),
new AsyncFunction<Boolean, Object>() { new AsyncFunction<Boolean, Object>() {
@@ -194,7 +195,7 @@ public class MergeabilityChecker implements GitReferenceUpdatedListener {
public ListenableFuture<Object> apply(Boolean indexUpdated) public ListenableFuture<Object> apply(Boolean indexUpdated)
throws Exception { throws Exception {
if (!indexUpdated) { if (!indexUpdated) {
return (ListenableFuture<Object>) indexer.indexAsync(change); return (ListenableFuture<Object>) indexer.indexAsync(id);
} }
return Futures.immediateFuture(null); return Futures.immediateFuture(null);
} }

View File

@@ -139,7 +139,7 @@ public class PostReview implements RestModifyView<RevisionResource, ReviewInput>
CheckedFuture<?, IOException> indexWrite; CheckedFuture<?, IOException> indexWrite;
if (dirty) { if (dirty) {
indexWrite = indexer.indexAsync(change); indexWrite = indexer.indexAsync(change.getId());
} else { } else {
indexWrite = Futures.<Void, IOException> immediateCheckedFuture(null); indexWrite = Futures.<Void, IOException> immediateCheckedFuture(null);
} }

View File

@@ -228,7 +228,8 @@ public class PostReviewers implements RestModifyView<ChangeResource, AddReviewer
db.rollback(); db.rollback();
} }
CheckedFuture<?, IOException> indexFuture = indexer.indexAsync(rsrc.getChange()); CheckedFuture<?, IOException> indexFuture =
indexer.indexAsync(rsrc.getChange().getId());
result.reviewers = Lists.newArrayListWithCapacity(added.size()); result.reviewers = Lists.newArrayListWithCapacity(added.size());
for (PatchSetApproval psa : added) { for (PatchSetApproval psa : added) {
result.reviewers.add(json.format( result.reviewers.add(json.format(

View File

@@ -77,7 +77,7 @@ public class Publish implements RestModifyView<RevisionResource, Input>,
if (!updatedPatchSet.isDraft() if (!updatedPatchSet.isDraft()
|| updatedChange.getStatus() == Change.Status.NEW) { || updatedChange.getStatus() == Change.Status.NEW) {
CheckedFuture<?, IOException> indexFuture = CheckedFuture<?, IOException> indexFuture =
indexer.indexAsync(updatedChange); indexer.indexAsync(updatedChange.getId());
hooks.doDraftPublishedHook(updatedChange, updatedPatchSet, dbProvider.get()); hooks.doDraftPublishedHook(updatedChange, updatedPatchSet, dbProvider.get());
sender.send(rsrc.getChange().getStatus() == Change.Status.DRAFT, sender.send(rsrc.getChange().getStatus() == Change.Status.DRAFT,
rsrc.getUser(), updatedChange, updatedPatchSet, rsrc.getUser(), updatedChange, updatedPatchSet,

View File

@@ -115,7 +115,8 @@ class PutTopic implements RestModifyView<ChangeResource, Input>,
} finally { } finally {
db.rollback(); db.rollback();
} }
CheckedFuture<?, IOException> indexFuture = indexer.indexAsync(change); CheckedFuture<?, IOException> indexFuture =
indexer.indexAsync(change.getId());
hooks.doTopicChangedHook(change, currentUser.getAccount(), hooks.doTopicChangedHook(change, currentUser.getAccount(),
oldTopicName, db); oldTopicName, db);
indexFuture.checkedGet(); indexFuture.checkedGet();

View File

@@ -108,7 +108,8 @@ public class Restore implements RestModifyView<ChangeResource, RestoreInput>,
db.rollback(); db.rollback();
} }
CheckedFuture<?, IOException> indexFuture = indexer.indexAsync(change); CheckedFuture<?, IOException> indexFuture =
indexer.indexAsync(change.getId());
try { try {
ReplyToChangeSender cm = restoredSenderFactory.create(change); ReplyToChangeSender cm = restoredSenderFactory.create(change);
cm.setFrom(caller.getAccountId()); cm.setFrom(caller.getAccountId());

View File

@@ -1043,7 +1043,7 @@ public class MergeOp {
CheckedFuture<?, IOException> indexFuture; CheckedFuture<?, IOException> indexFuture;
if (change != null) { if (change != null) {
indexFuture = indexer.indexAsync(change); indexFuture = indexer.indexAsync(change.getId());
} else { } else {
indexFuture = null; indexFuture = null;
} }

View File

@@ -112,12 +112,12 @@ public class ChangeIndexer {
/** /**
* Start indexing a change. * Start indexing a change.
* *
* @param change change to index. * @param id change to index.
* @return future for the indexing task. * @return future for the indexing task.
*/ */
public CheckedFuture<?, IOException> indexAsync(Change change) { public CheckedFuture<?, IOException> indexAsync(Change.Id id) {
return executor != null return executor != null
? submit(new Task(change, false)) ? submit(new Task(id, false))
: Futures.<Object, IOException> immediateCheckedFuture(null); : Futures.<Object, IOException> immediateCheckedFuture(null);
} }
@@ -145,12 +145,12 @@ public class ChangeIndexer {
/** /**
* Start deleting a change. * Start deleting a change.
* *
* @param change change to delete. * @param id change to delete.
* @return future for the deleting task. * @return future for the deleting task.
*/ */
public CheckedFuture<?, IOException> deleteAsync(Change change) { public CheckedFuture<?, IOException> deleteAsync(Change.Id id) {
return executor != null return executor != null
? submit(new Task(change, true)) ? submit(new Task(id, true))
: Futures.<Object, IOException> immediateCheckedFuture(null); : Futures.<Object, IOException> immediateCheckedFuture(null);
} }
@@ -186,11 +186,11 @@ public class ChangeIndexer {
} }
private class Task implements Callable<Void> { private class Task implements Callable<Void> {
private final Change change; private final Change.Id id;
private final boolean delete; private final boolean delete;
private Task(Change change, boolean delete) { private Task(Change.Id id, boolean delete) {
this.change = change; this.id = id;
this.delete = delete; this.delete = delete;
} }
@@ -225,7 +225,7 @@ public class ChangeIndexer {
RequestContext oldCtx = context.setContext(newCtx); RequestContext oldCtx = context.setContext(newCtx);
try { try {
ChangeData cd = changeDataFactory.create( ChangeData cd = changeDataFactory.create(
newCtx.getReviewDbProvider().get(), change); newCtx.getReviewDbProvider().get(), id);
if (delete) { if (delete) {
for (ChangeIndex i : getWriteIndexes()) { for (ChangeIndex i : getWriteIndexes()) {
i.delete(cd); i.delete(cd);
@@ -244,16 +244,14 @@ public class ChangeIndexer {
} }
} }
} catch (Exception e) { } catch (Exception e) {
log.error(String.format( log.error(String.format("Failed to index change %d", id), e);
"Failed to index change %d in %s",
change.getId().get(), change.getProject().get()), e);
throw e; throw e;
} }
} }
@Override @Override
public String toString() { public String toString() {
return "index-change-" + change.getId().get(); return "index-change-" + id.get();
} }
} }
} }