More reliably get futures returned by ChangeIndexer

Convert futures to be CheckedFuture<?, IOException>s so calling
checkedGet() does not result in too many extra exception types. Rename
the methods to {index,delete}Async so it is clear that these just
launch a future. For all usages, make sure checkedGet() is invoked on
the future, either immediately or after another block of non-Change-
mutating code.

Change-Id: I18b55b0b03b2be0a70995a0b08a4fc3193db96e3
This commit is contained in:
Dave Borowitz
2013-10-01 19:43:39 -07:00
parent 7e057414e2
commit c3b436de36
17 changed files with 187 additions and 80 deletions

View File

@@ -15,6 +15,7 @@
package com.google.gerrit.server.change;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.gerrit.common.ChangeHooks;
import com.google.gerrit.extensions.restapi.AuthException;
import com.google.gerrit.extensions.restapi.BadRequestException;
@@ -29,6 +30,7 @@ import com.google.gerrit.server.ApprovalsUtil;
import com.google.gerrit.server.ChangeUtil;
import com.google.gerrit.server.IdentifiedUser;
import com.google.gerrit.server.change.Abandon.Input;
import com.google.gerrit.server.change.ChangeJson.ChangeInfo;
import com.google.gerrit.server.index.ChangeIndexer;
import com.google.gerrit.server.mail.AbandonedSender;
import com.google.gerrit.server.mail.ReplyToChangeSender;
@@ -41,6 +43,7 @@ import com.google.inject.Provider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
public class Abandon implements RestModifyView<ChangeResource, Input>,
@@ -113,7 +116,7 @@ public class Abandon implements RestModifyView<ChangeResource, Input>,
db.rollback();
}
indexer.index(change);
CheckedFuture<?, IOException> indexFuture = indexer.indexAsync(change);
try {
ReplyToChangeSender cm = abandonedSenderFactory.create(change);
cm.setFrom(caller.getAccountId());
@@ -127,7 +130,9 @@ public class Abandon implements RestModifyView<ChangeResource, Input>,
db.patchSets().get(change.currentPatchSetId()),
Strings.emptyToNull(input.message),
db);
return json.format(change);
ChangeInfo result = json.format(change);
indexFuture.checkedGet();
return result;
}
@Override

View File

@@ -16,6 +16,7 @@ package com.google.gerrit.server.change;
import static com.google.gerrit.reviewdb.client.Change.INITIAL_PATCH_SET_ID;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.gerrit.common.ChangeHooks;
import com.google.gerrit.common.data.LabelTypes;
import com.google.gerrit.reviewdb.client.Account;
@@ -43,6 +44,7 @@ import org.eclipse.jgit.revwalk.RevCommit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
@@ -150,7 +152,7 @@ public class ChangeInserter {
return patchSetInfo;
}
public void insert() throws OrmException {
public void insert() throws OrmException, IOException {
ReviewDb db = dbProvider.get();
db.changes().beginTransaction(change.getId());
try {
@@ -169,7 +171,7 @@ public class ChangeInserter {
db.changeMessages().insert(Collections.singleton(changeMessage));
}
indexer.index(change);
CheckedFuture<?, IOException> indexFuture = indexer.indexAsync(change);
gitRefUpdated.fire(change.getProject(), patchSet.getRefName(),
ObjectId.zeroId(), commit);
@@ -190,5 +192,6 @@ public class ChangeInserter {
log.error("Cannot send email for new change " + change.getId(), err);
}
}
indexFuture.checkedGet();
}
}

View File

@@ -33,6 +33,7 @@ import com.google.gwtorm.server.OrmException;
import com.google.inject.Inject;
import com.google.inject.Provider;
import java.io.IOException;
import java.util.List;
public class DeleteReviewer implements RestModifyView<ReviewerResource, Input> {
@@ -50,7 +51,8 @@ public class DeleteReviewer implements RestModifyView<ReviewerResource, Input> {
@Override
public Object apply(ReviewerResource rsrc, Input input)
throws AuthException, ResourceNotFoundException, OrmException {
throws AuthException, ResourceNotFoundException, OrmException,
IOException {
ChangeControl control = rsrc.getControl();
Change.Id changeId = rsrc.getChange().getId();
ReviewDb db = dbProvider.get();

View File

@@ -22,7 +22,7 @@ import com.google.gerrit.server.change.Index.Input;
import com.google.gerrit.server.index.ChangeIndexer;
import com.google.inject.Inject;
import java.util.concurrent.ExecutionException;
import java.io.IOException;
@RequiresCapability(GlobalCapability.ADMINISTRATE_SERVER)
public class Index implements RestModifyView<ChangeResource, Input> {
@@ -37,9 +37,8 @@ public class Index implements RestModifyView<ChangeResource, Input> {
}
@Override
public Object apply(ChangeResource rsrc, Input input)
throws InterruptedException, ExecutionException {
indexer.index(rsrc.getChange()).get();
public Object apply(ChangeResource rsrc, Input input) throws IOException {
indexer.index(rsrc.getChange());
return Response.none();
}
}

View File

@@ -18,6 +18,8 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.gerrit.common.ChangeHooks;
import com.google.gerrit.reviewdb.client.Account;
import com.google.gerrit.reviewdb.client.Change;
@@ -312,13 +314,14 @@ public class PatchSetInserter {
}
}
indexer.index(updatedChange);
if (runHooks) {
hooks.doPatchsetCreatedHook(updatedChange, patchSet, db);
}
} finally {
db.rollback();
}
CheckedFuture<?, IOException> e = indexer.indexAsync(updatedChange);
if (runHooks) {
hooks.doPatchsetCreatedHook(updatedChange, patchSet, db);
}
e.checkedGet();
return updatedChange;
}

View File

@@ -20,7 +20,8 @@ import com.google.common.base.Objects;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import com.google.gerrit.common.ChangeHooks;
import com.google.gerrit.common.changes.Side;
import com.google.gerrit.common.data.LabelType;
@@ -35,10 +36,10 @@ import com.google.gerrit.extensions.restapi.UnprocessableEntityException;
import com.google.gerrit.extensions.restapi.Url;
import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.reviewdb.client.ChangeMessage;
import com.google.gerrit.reviewdb.client.CommentRange;
import com.google.gerrit.reviewdb.client.Patch;
import com.google.gerrit.reviewdb.client.PatchLineComment;
import com.google.gerrit.reviewdb.client.PatchSetApproval;
import com.google.gerrit.reviewdb.client.CommentRange;
import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.ChangeUtil;
import com.google.gerrit.server.IdentifiedUser;
@@ -52,12 +53,12 @@ import com.google.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
public class PostReview implements RestModifyView<RevisionResource, Input> {
private static final Logger log = LoggerFactory.getLogger(PostReview.class);
@@ -153,7 +154,7 @@ public class PostReview implements RestModifyView<RevisionResource, Input> {
@Override
public Object apply(RevisionResource revision, Input input)
throws AuthException, BadRequestException, OrmException,
UnprocessableEntityException, InterruptedException, ExecutionException {
UnprocessableEntityException, IOException {
if (input.onBehalfOf != null) {
revision = onBehalfOf(revision, input);
}
@@ -169,28 +170,29 @@ public class PostReview implements RestModifyView<RevisionResource, Input> {
}
db.changes().beginTransaction(revision.getChange().getId());
boolean dirty = false;
try {
change = db.changes().get(revision.getChange().getId());
ChangeUtil.updated(change);
timestamp = change.getLastUpdatedOn();
boolean dirty = false;
dirty |= insertComments(revision, input.comments, input.drafts);
dirty |= updateLabels(revision, input.labels);
dirty |= insertMessage(revision, input.message);
if (dirty) {
db.changes().update(Collections.singleton(change));
db.commit();
ListenableFuture<?> indexWrite = indexer.index(change);
if (input.waitForCommit) {
indexWrite.get();
}
}
} finally {
db.rollback();
}
CheckedFuture<?, IOException> indexWrite;
if (dirty) {
indexWrite = indexer.indexAsync(change);
} else {
indexWrite = Futures.<Void, IOException> immediateCheckedFuture(null);
}
if (input.notify.compareTo(NotifyHandling.NONE) > 0 && message != null) {
email.create(
input.notify,
@@ -204,6 +206,9 @@ public class PostReview implements RestModifyView<RevisionResource, Input> {
Output output = new Output();
output.labels = input.labels;
if (input.waitForCommit) {
indexWrite.checkedGet();
}
return output;
}

View File

@@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.gerrit.common.ChangeHooks;
import com.google.gerrit.common.data.GroupDescription;
import com.google.gerrit.common.errors.EmailException;
@@ -151,7 +152,7 @@ public class PostReviewers implements RestModifyView<ChangeResource, Input> {
}
private PostResult putAccount(ReviewerResource rsrc) throws OrmException,
EmailException {
EmailException, IOException {
PostResult result = new PostResult();
addReviewers(rsrc, result, ImmutableSet.of(rsrc.getUser()));
return result;
@@ -219,7 +220,8 @@ public class PostReviewers implements RestModifyView<ChangeResource, Input> {
}
private void addReviewers(ChangeResource rsrc, PostResult result,
Set<IdentifiedUser> reviewers) throws OrmException, EmailException {
Set<IdentifiedUser> reviewers)
throws OrmException, EmailException, IOException {
if (reviewers.isEmpty()) {
result.reviewers = ImmutableList.of();
return;
@@ -259,9 +261,10 @@ public class PostReviewers implements RestModifyView<ChangeResource, Input> {
db.rollback();
}
indexer.index(rsrc.getChange());
CheckedFuture<?, IOException> indexFuture = indexer.indexAsync(rsrc.getChange());
accountLoaderFactory.create(true).fill(result.reviewers);
postAdd(rsrc.getChange(), result);
indexFuture.checkedGet();
}
private void postAdd(Change change, PostResult result)

View File

@@ -14,6 +14,7 @@
package com.google.gerrit.server.change;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.gerrit.common.ChangeHooks;
import com.google.gerrit.extensions.restapi.AuthException;
import com.google.gerrit.extensions.restapi.ResourceConflictException;
@@ -25,9 +26,9 @@ import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.reviewdb.client.PatchSet;
import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.ChangeUtil;
import com.google.gerrit.server.mail.PatchSetNotificationSender;
import com.google.gerrit.server.change.Publish.Input;
import com.google.gerrit.server.index.ChangeIndexer;
import com.google.gerrit.server.mail.PatchSetNotificationSender;
import com.google.gerrit.server.patch.PatchSetInfoNotAvailableException;
import com.google.gwtorm.server.AtomicUpdate;
import com.google.gwtorm.server.OrmException;
@@ -75,11 +76,13 @@ public class Publish implements RestModifyView<RevisionResource, Input>,
try {
if (!updatedPatchSet.isDraft()
|| updatedChange.getStatus() == Change.Status.NEW) {
indexer.index(updatedChange);
CheckedFuture<?, IOException> indexFuture =
indexer.indexAsync(updatedChange);
hooks.doDraftPublishedHook(updatedChange, updatedPatchSet, dbProvider.get());
sender.send(rsrc.getChange().getStatus() == Change.Status.DRAFT,
rsrc.getUser(), updatedChange, updatedPatchSet,
rsrc.getControl().getLabelTypes());
indexFuture.checkedGet();
}
} catch (PatchSetInfoNotAvailableException e) {
throw new ResourceNotFoundException(e.getMessage());

View File

@@ -15,6 +15,7 @@
package com.google.gerrit.server.change;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.gerrit.common.ChangeHooks;
import com.google.gerrit.extensions.restapi.AuthException;
import com.google.gerrit.extensions.restapi.BadRequestException;
@@ -35,6 +36,7 @@ import com.google.gwtorm.server.AtomicUpdate;
import com.google.inject.Inject;
import com.google.inject.Provider;
import java.io.IOException;
import java.util.Collections;
class PutTopic implements RestModifyView<ChangeResource, Input>,
@@ -114,9 +116,10 @@ class PutTopic implements RestModifyView<ChangeResource, Input>,
} finally {
db.rollback();
}
indexer.index(change);
CheckedFuture<?, IOException> indexFuture = indexer.indexAsync(change);
hooks.doTopicChangedHook(change, currentUser.getAccount(),
oldTopicName, db);
indexFuture.checkedGet();
}
return Strings.isNullOrEmpty(newTopicName)
? Response.none()

View File

@@ -15,6 +15,7 @@
package com.google.gerrit.server.change;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.gerrit.common.ChangeHooks;
import com.google.gerrit.extensions.restapi.AuthException;
import com.google.gerrit.extensions.restapi.DefaultInput;
@@ -28,6 +29,7 @@ import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.ApprovalsUtil;
import com.google.gerrit.server.ChangeUtil;
import com.google.gerrit.server.IdentifiedUser;
import com.google.gerrit.server.change.ChangeJson.ChangeInfo;
import com.google.gerrit.server.change.Restore.Input;
import com.google.gerrit.server.index.ChangeIndexer;
import com.google.gerrit.server.mail.ReplyToChangeSender;
@@ -41,6 +43,7 @@ import com.google.inject.Provider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
public class Restore implements RestModifyView<ChangeResource, Input>,
@@ -104,7 +107,6 @@ public class Restore implements RestModifyView<ChangeResource, Input>,
throw new ResourceConflictException("change is "
+ status(db.changes().get(req.getChange().getId())));
}
indexer.index(change);
message = newMessage(input, caller, change);
db.changeMessages().insert(Collections.singleton(message));
new ApprovalsUtil(db).syncChangeStatus(change);
@@ -113,6 +115,7 @@ public class Restore implements RestModifyView<ChangeResource, Input>,
db.rollback();
}
CheckedFuture<?, IOException> indexFuture = indexer.indexAsync(change);
try {
ReplyToChangeSender cm = restoredSenderFactory.create(change);
cm.setFrom(caller.getAccountId());
@@ -126,7 +129,9 @@ public class Restore implements RestModifyView<ChangeResource, Input>,
db.patchSets().get(change.currentPatchSetId()),
Strings.emptyToNull(input.message),
dbProvider.get());
return json.format(change);
ChangeInfo result = json.format(change);
indexFuture.checkedGet();
return result;
}
@Override

View File

@@ -177,7 +177,7 @@ public class Submit implements RestModifyView<RevisionResource, Input>,
}
public Change submit(RevisionResource rsrc, IdentifiedUser caller)
throws OrmException {
throws OrmException, IOException {
final Timestamp timestamp = new Timestamp(System.currentTimeMillis());
Change change = rsrc.getChange();
ReviewDb db = dbProvider.get();

View File

@@ -15,6 +15,7 @@
package com.google.gerrit.server.changedetail;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.gerrit.common.ChangeHooks;
import com.google.gerrit.common.data.LabelTypes;
import com.google.gerrit.common.data.ReviewResult;
@@ -121,12 +122,13 @@ public class PublishDraft implements Callable<ReviewResult> {
});
if (!updatedPatchSet.isDraft() || updatedChange.getStatus() == Change.Status.NEW) {
indexer.index(updatedChange);
CheckedFuture<?, IOException> indexFuture = indexer.indexAsync(updatedChange);
hooks.doDraftPublishedHook(updatedChange, updatedPatchSet, db);
sender.send(control.getChange().getStatus() == Change.Status.DRAFT,
(IdentifiedUser) control.getCurrentUser(), updatedChange, updatedPatchSet,
labelTypes);
indexFuture.checkedGet();
}
}

View File

@@ -25,6 +25,7 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.gerrit.common.ChangeHooks;
import com.google.gerrit.common.Nullable;
import com.google.gerrit.common.data.Capable;
@@ -509,7 +510,6 @@ public class MergeOp {
commit.statusCode = CommitMergeStatus.ALREADY_MERGED;
try {
setMerged(chg, null);
indexer.index(chg);
} catch (OrmException e) {
log.error("Cannot mark change " + chg.getId() + " merged", e);
}
@@ -672,6 +672,8 @@ public class MergeOp {
}
} catch (OrmException err) {
log.warn("Error updating change status for " + c.getId(), err);
} catch (IOException err) {
log.warn("Error updating change status for " + c.getId(), err);
}
}
}
@@ -789,7 +791,7 @@ public class MergeOp {
}
private void setMerged(final Change c, final ChangeMessage msg)
throws OrmException {
throws OrmException, IOException {
try {
db.changes().beginTransaction(c.getId());
@@ -990,10 +992,11 @@ public class MergeOp {
}
final boolean setStatusNew = makeNew;
Change change = null;
try {
db.changes().beginTransaction(c.getId());
try {
Change change = db.changes().atomicUpdate(
change = db.changes().atomicUpdate(
c.getId(),
new AtomicUpdate<Change>() {
@Override
@@ -1009,7 +1012,6 @@ public class MergeOp {
});
db.changeMessages().insert(Collections.singleton(msg));
db.commit();
indexer.index(change);
} finally {
db.rollback();
}
@@ -1017,6 +1019,12 @@ public class MergeOp {
log.warn("Cannot record merge failure message", err);
}
CheckedFuture<?, IOException> indexFuture;
if (change != null) {
indexFuture = indexer.indexAsync(change);
} else {
indexFuture = null;
}
final PatchSetApproval from = submitter;
workQueue.getDefaultQueue()
.submit(requestScopePropagator.wrap(new Runnable() {
@@ -1063,9 +1071,17 @@ public class MergeOp {
log.error("Cannot run hook for merge failed " + c.getId(), ex);
}
}
if (indexFuture != null) {
try {
indexFuture.checkedGet();
} catch (IOException e) {
log.error("Failed to index new change message", e);
}
}
}
private void abandonAllOpenChanges() {
Exception err = null;
try {
openSchema();
for (Change c : db.changes().byProjectOpenAll(destBranch.getParentKey())) {
@@ -1073,14 +1089,20 @@ public class MergeOp {
}
db.close();
db = null;
} catch (IOException e) {
err = e;
} catch (OrmException e) {
err = e;
}
if (err != null) {
log.warn(String.format(
"Cannot abandon changes for deleted project %s",
destBranch.getParentKey().get()), e);
destBranch.getParentKey().get()), err);
}
}
private void abandonOneChange(Change change) throws OrmException {
private void abandonOneChange(Change change) throws OrmException,
IOException {
db.changes().beginTransaction(change.getId());
try {
change = db.changes().atomicUpdate(
@@ -1107,10 +1129,10 @@ public class MergeOp {
db.changeMessages().insert(Collections.singleton(msg));
new ApprovalsUtil(db).syncChangeStatus(change);
db.commit();
indexer.index(change);
}
} finally {
db.rollback();
}
indexer.index(change);
}
}

View File

@@ -1432,7 +1432,7 @@ public class ReceiveCommits {
ListenableFuture<Void> future = changeUpdateExector.submit(
requestScopePropagator.wrap(new Callable<Void>() {
@Override
public Void call() throws OrmException {
public Void call() throws OrmException, IOException {
if (caller == Thread.currentThread()) {
insertChange(db);
} else {
@@ -1452,7 +1452,7 @@ public class ReceiveCommits {
return Futures.makeChecked(future, ORM_EXCEPTION);
}
private void insertChange(ReviewDb db) throws OrmException {
private void insertChange(ReviewDb db) throws OrmException, IOException {
final PatchSet ps = ins.getPatchSet();
final Account.Id me = currentUser.getAccountId();
final List<FooterLine> footerLines = commit.getFooterLines();
@@ -1504,7 +1504,8 @@ public class ReceiveCommits {
}
}
private void submit(ChangeControl changeCtl, PatchSet ps) throws OrmException {
private void submit(ChangeControl changeCtl, PatchSet ps)
throws OrmException, IOException {
Submit submit = submitProvider.get();
RevisionResource rsrc = new RevisionResource(new ChangeResource(changeCtl), ps);
Change c = submit.submit(rsrc, currentUser);
@@ -1751,7 +1752,7 @@ public class ReceiveCommits {
ListenableFuture<PatchSet.Id> future = changeUpdateExector.submit(
requestScopePropagator.wrap(new Callable<PatchSet.Id>() {
@Override
public PatchSet.Id call() throws OrmException {
public PatchSet.Id call() throws OrmException, IOException {
try {
if (caller == Thread.currentThread()) {
return insertPatchSet(db);
@@ -1773,7 +1774,7 @@ public class ReceiveCommits {
return Futures.makeChecked(future, ORM_EXCEPTION);
}
PatchSet.Id insertPatchSet(ReviewDb db) throws OrmException {
PatchSet.Id insertPatchSet(ReviewDb db) throws OrmException, IOException {
final Account.Id me = currentUser.getAccountId();
final List<FooterLine> footerLines = newCommit.getFooterLines();
final MailRecipients recipients = new MailRecipients();
@@ -1877,7 +1878,7 @@ public class ReceiveCommits {
if (cmd.getResult() == NOT_ATTEMPTED) {
cmd.execute(rp);
}
indexer.index(change);
CheckedFuture<?, IOException> indexFuture = indexer.indexAsync(change);
gitRefUpdated.fire(project.getNameKey(), newPatchSet.getRefName(),
ObjectId.zeroId(), newCommit);
hooks.doPatchsetCreatedHook(change, newPatchSet, db);
@@ -1911,6 +1912,7 @@ public class ReceiveCommits {
return "send-email newpatchset";
}
}));
indexFuture.checkedGet();
if (magicBranch != null && magicBranch.isSubmit()) {
submit(changeCtl, newPatchSet);
@@ -2145,7 +2147,7 @@ public class ReceiveCommits {
}
private Change.Key closeChange(final ReceiveCommand cmd, final PatchSet.Id psi,
final RevCommit commit) throws OrmException {
final RevCommit commit) throws OrmException, IOException {
final String refName = cmd.getRefName();
final Change.Id cid = psi.getParentKey();
@@ -2200,7 +2202,8 @@ public class ReceiveCommits {
}
private void markChangeMergedByPush(final ReviewDb db,
final ReplaceRequest result) throws OrmException {
final ReplaceRequest result) throws OrmException,
IOException {
Change change = result.change;
final String mergedIntoRef = result.mergedIntoRef;

View File

@@ -297,7 +297,7 @@ public class ChangeBatchIndexer {
cd = cdit.next();
try {
cd.setCurrentFilePaths(paths);
indexer.indexTask(cd).call();
indexer.index(cd);
done.update(1);
if (verboseWriter != null) {
verboseWriter.println("Reindexed change " + cd.getId());

View File

@@ -14,13 +14,17 @@
package com.google.gerrit.server.index;
import com.google.common.base.Function;
import com.google.common.util.concurrent.Callables;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.server.query.change.ChangeData;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
/**
* Helper for (re)indexing a change document.
@@ -37,28 +41,33 @@ public abstract class ChangeIndexer {
/** Instance indicating secondary index is disabled. */
public static final ChangeIndexer DISABLED = new ChangeIndexer(null) {
@Override
public ListenableFuture<?> index(ChangeData cd) {
return Futures.immediateFuture(null);
public CheckedFuture<?, IOException> indexAsync(ChangeData cd) {
return Futures.immediateCheckedFuture(null);
}
@Override
public Callable<?> indexTask(ChangeData cd) {
return new Callable<Void>() {
@Override
public Void call() {
return null;
}
};
protected Callable<?> indexTask(ChangeData cd) {
return Callables.returning(null);
}
@Override
public Callable<?> deleteTask(ChangeData cd) {
return new Callable<Void>() {
@Override
public Void call() {
return null;
protected Callable<?> deleteTask(ChangeData cd) {
return Callables.returning(null);
}
};
private static final Function<Exception, IOException> MAPPER =
new Function<Exception, IOException>() {
@Override
public IOException apply(Exception in) {
if (in instanceof IOException) {
return (IOException) in;
} else if (in instanceof ExecutionException
&& in.getCause() instanceof IOException) {
return (IOException) in.getCause();
} else {
return new IOException(in);
}
}
};
@@ -74,8 +83,8 @@ public abstract class ChangeIndexer {
* @param change change to index.
* @return future for the indexing task.
*/
public ListenableFuture<?> index(Change change) {
return index(new ChangeData(change));
public CheckedFuture<?, IOException> indexAsync(Change change) {
return indexAsync(new ChangeData(change));
}
/**
@@ -84,10 +93,28 @@ public abstract class ChangeIndexer {
* @param cd change to index.
* @return future for the indexing task.
*/
public ListenableFuture<?> index(ChangeData cd) {
public CheckedFuture<?, IOException> indexAsync(ChangeData cd) {
return executor != null
? executor.submit(indexTask(cd))
: Futures.immediateFuture(null);
? submit(indexTask(cd))
: Futures.<Object, IOException> immediateCheckedFuture(null);
}
/**
* Synchronously index a change.
*
* @param change change to index.
*/
public void index(Change change) throws IOException {
indexAsync(change).checkedGet();
}
/**
* Synchronously index a change.
*
* @param cd change to index.
*/
public void index(ChangeData cd) throws IOException {
indexAsync(cd).checkedGet();
}
/**
@@ -96,7 +123,7 @@ public abstract class ChangeIndexer {
* @param cd change to index.
* @return unstarted runnable to index the change.
*/
public abstract Callable<?> indexTask(ChangeData cd);
protected abstract Callable<?> indexTask(ChangeData cd);
/**
* Start deleting a change.
@@ -104,8 +131,8 @@ public abstract class ChangeIndexer {
* @param change change to delete.
* @return future for the deleting task.
*/
public ListenableFuture<?> delete(Change change) {
return delete(new ChangeData(change));
public CheckedFuture<?, IOException> deleteAsync(Change change) {
return deleteAsync(new ChangeData(change));
}
/**
@@ -114,10 +141,28 @@ public abstract class ChangeIndexer {
* @param cd change to delete.
* @return future for the deleting task.
*/
public ListenableFuture<?> delete(ChangeData cd) {
public CheckedFuture<?, IOException> deleteAsync(ChangeData cd) {
return executor != null
? executor.submit(deleteTask(cd))
: Futures.immediateFuture(null);
? submit(deleteTask(cd))
: Futures.<Object, IOException> immediateCheckedFuture(null);
}
/**
* Synchronously delete a change.
*
* @param change change to delete.
*/
public void delete(Change change) throws IOException {
deleteAsync(change).checkedGet();
}
/**
* Synchronously delete a change.
*
* @param cd change to delete.
*/
public void delete(ChangeData cd) throws IOException {
deleteAsync(cd).checkedGet();
}
/**
@@ -126,5 +171,9 @@ public abstract class ChangeIndexer {
* @param cd change to delete.
* @return unstarted runnable to delete the change.
*/
public abstract Callable<?> deleteTask(ChangeData cd);
protected abstract Callable<?> deleteTask(ChangeData cd);
private CheckedFuture<?, IOException> submit(Callable<?> task) {
return Futures.makeChecked(executor.submit(task), MAPPER);
}
}

View File

@@ -77,12 +77,12 @@ public class ChangeIndexerImpl extends ChangeIndexer {
}
@Override
public Callable<Void> indexTask(ChangeData cd) {
protected Callable<Void> indexTask(ChangeData cd) {
return new Task(cd, false);
}
@Override
public Callable<Void> deleteTask(ChangeData cd) {
protected Callable<Void> deleteTask(ChangeData cd) {
return new Task(cd, true);
}