Add fluent interface for retrying actions

RetryHelper accumulated a lot of functionality and using it was not
straight-forward for callers. Clean-up the class and add fluent
interface for calling actions with retry:

  Object result = retryHelper.changeUpdate(
    "myActionName",
    batchUpdateFactory -> {
      try (BatchUpdate bu = newBatchUpdate(batchUpdateFactory)) {
        ...
      }
      return result;
    })
    .retryOn(LockFailureException.class::isInstance)
    ...
    .call();

With the fluent interface providing an action name is now mandatory
which makes the retry metrics more useful.

Signed-off-by: Edwin Kempin <ekempin@google.com>
Change-Id: Iecdfa5b153ab17f31c8ec1d2dca82b428fcf5800
This commit is contained in:
Edwin Kempin 2019-12-06 16:25:51 +01:00
parent 1fa522e414
commit aece3ffe75
17 changed files with 647 additions and 332 deletions

View File

@ -59,7 +59,6 @@ import com.google.gerrit.server.mail.send.AddKeySender;
import com.google.gerrit.server.mail.send.DeleteKeySender;
import com.google.gerrit.server.query.account.InternalAccountQuery;
import com.google.gerrit.server.update.RetryHelper;
import com.google.gerrit.server.update.RetryHelper.ActionType;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
@ -207,10 +206,10 @@ public class PostGpgKeys implements RestModifyView<AccountResource, GpgKeysInput
AccountResource rsrc, List<PGPPublicKeyRing> keyRings, Collection<Fingerprint> toRemove)
throws RestApiException, PGPException, IOException {
try {
retryHelper.execute(
ActionType.ACCOUNT_UPDATE,
() -> tryStoreKeys(rsrc, keyRings, toRemove),
LockFailureException.class::isInstance);
retryHelper
.accountUpdate("storeGpgKeys", () -> tryStoreKeys(rsrc, keyRings, toRemove))
.retryOn(LockFailureException.class::isInstance)
.call();
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
Throwables.throwIfInstanceOf(e, RestApiException.class);

View File

@ -126,8 +126,9 @@ import com.google.gerrit.server.quota.QuotaException;
import com.google.gerrit.server.restapi.change.ChangesCollection;
import com.google.gerrit.server.restapi.project.ProjectsCollection;
import com.google.gerrit.server.update.RetryHelper;
import com.google.gerrit.server.update.RetryHelper.Action;
import com.google.gerrit.server.update.RetryHelper.ActionType;
import com.google.gerrit.server.update.RetryableAction;
import com.google.gerrit.server.update.RetryableAction.Action;
import com.google.gerrit.server.update.RetryableAction.ActionType;
import com.google.gerrit.server.update.UpdateException;
import com.google.gerrit.server.util.time.TimeUtil;
import com.google.gerrit.util.http.CacheHeaders;
@ -814,27 +815,22 @@ public class RestApiServlet extends HttpServlet {
ActionType actionType,
Action<T> action)
throws Exception {
RetryableAction<T> retryableAction = globals.retryHelper.action(actionType, caller, action);
AtomicReference<Optional<String>> traceId = new AtomicReference<>(Optional.empty());
RetryHelper.Options.Builder retryOptionsBuilder = RetryHelper.options().caller(caller);
if (!traceContext.isTracing()) {
// enable automatic retry with tracing in case of non-recoverable failure
retryOptionsBuilder =
retryOptionsBuilder
.retryWithTrace(t -> !(t instanceof RestApiException))
.onAutoTrace(
autoTraceId -> {
traceId.set(Optional.of(autoTraceId));
retryableAction
.retryWithTrace(t -> !(t instanceof RestApiException))
.onAutoTrace(
autoTraceId -> {
traceId.set(Optional.of(autoTraceId));
// Include details of the request into the trace.
traceRequestData(req);
});
// Include details of the request into the trace.
traceRequestData(req);
});
}
try {
// ExceptionHookImpl controls on which exceptions we retry.
// The passed in exceptionPredicate allows to define additional exceptions on which retry
// should happen, but here we have none (hence pass in "t -> false" as exceptionPredicate).
return globals.retryHelper.execute(
actionType, action, retryOptionsBuilder.build(), t -> false);
return retryableAction.call();
} finally {
// If auto-tracing got triggered due to a non-recoverable failure, also trace the rest of
// this request. This means logging is forced for all further log statements and the logs are

View File

@ -41,8 +41,7 @@ import com.google.gerrit.server.git.meta.MetaDataUpdate;
import com.google.gerrit.server.index.change.ReindexAfterRefUpdate;
import com.google.gerrit.server.notedb.Sequences;
import com.google.gerrit.server.update.RetryHelper;
import com.google.gerrit.server.update.RetryHelper.Action;
import com.google.gerrit.server.update.RetryHelper.ActionType;
import com.google.gerrit.server.update.RetryableAction.Action;
import com.google.inject.Provider;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
@ -421,8 +420,10 @@ public class AccountsUpdate {
private Optional<AccountState> executeAccountUpdate(Action<Optional<AccountState>> action)
throws IOException, ConfigInvalidException {
try {
return retryHelper.execute(
ActionType.ACCOUNT_UPDATE, action, LockFailureException.class::isInstance);
return retryHelper
.accountUpdate("updateAccount", action)
.retryOn(LockFailureException.class::isInstance)
.call();
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
Throwables.throwIfInstanceOf(e, IOException.class);

View File

@ -29,8 +29,7 @@ import com.google.gerrit.server.account.externalids.ExternalId;
import com.google.gerrit.server.account.externalids.ExternalIds;
import com.google.gerrit.server.query.account.InternalAccountQuery;
import com.google.gerrit.server.update.RetryHelper;
import com.google.gerrit.server.update.RetryHelper.Action;
import com.google.gerrit.server.update.RetryHelper.ActionType;
import com.google.gerrit.server.update.RetryableAction.Action;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
@ -85,7 +84,9 @@ public class Emails {
return accounts;
}
return executeIndexQuery(() -> queryProvider.get().byPreferredEmail(email).stream())
return executeIndexQuery(
"queryAccountsByPreferredEmail",
() -> queryProvider.get().byPreferredEmail(email).stream())
.map(a -> a.account().id())
.collect(toImmutableSet());
}
@ -105,6 +106,7 @@ public class Emails {
Arrays.stream(emails).filter(e -> !result.containsKey(e)).collect(toImmutableList());
if (!emailsToBackfill.isEmpty()) {
executeIndexQuery(
"queryAccountsByPreferredEmails",
() -> queryProvider.get().byPreferredEmail(emailsToBackfill).entries().stream())
.forEach(e -> result.put(e.getKey(), e.getValue().account().id()));
}
@ -139,10 +141,12 @@ public class Emails {
return u;
}
private <T> T executeIndexQuery(Action<T> action) {
private <T> T executeIndexQuery(String actionName, Action<T> action) {
try {
return retryHelper.execute(
ActionType.INDEX_QUERY, action, StorageException.class::isInstance);
return retryHelper
.indexQuery(actionName, action)
.retryOn(StorageException.class::isInstance)
.call();
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new StorageException(e);

View File

@ -79,11 +79,14 @@ public class ChangeCleanupRunner implements Runnable {
// abandonInactiveOpenChanges skips failures instead of throwing, so retrying will never
// actually happen. For the purposes of this class that is fine: they'll get tried again the
// next time the scheduled task is run.
retryHelper.execute(
updateFactory -> {
abandonUtil.abandonInactiveOpenChanges(updateFactory);
return null;
});
retryHelper
.changeUpdate(
"abandonInactiveOpenChanges",
updateFactory -> {
abandonUtil.abandonInactiveOpenChanges(updateFactory);
return null;
})
.call();
} catch (RestApiException | UpdateException e) {
logger.atSevere().withCause(e).log("Failed to cleanup changes.");
}

View File

@ -170,26 +170,29 @@ public class ConsistencyChecker {
public Result check(ChangeNotes notes, @Nullable FixInput f) {
requireNonNull(notes);
try {
return retryHelper.execute(
buf -> {
try {
reset();
this.updateFactory = buf;
this.notes = notes;
fix = f;
checkImpl();
return result();
} finally {
if (rw != null) {
rw.getObjectReader().close();
rw.close();
oi.close();
}
if (repo != null) {
repo.close();
}
}
});
return retryHelper
.changeUpdate(
"checkChangeConsistency",
buf -> {
try {
reset();
this.updateFactory = buf;
this.notes = notes;
fix = f;
checkImpl();
return result();
} finally {
if (rw != null) {
rw.getObjectReader().close();
rw.close();
oi.close();
}
if (repo != null) {
repo.close();
}
}
})
.call();
} catch (RestApiException e) {
return logAndReturnOneProblem(e, notes, "Error checking change: " + e.getMessage());
} catch (UpdateException e) {

View File

@ -167,8 +167,7 @@ import com.google.gerrit.server.update.Context;
import com.google.gerrit.server.update.RepoContext;
import com.google.gerrit.server.update.RepoOnlyOp;
import com.google.gerrit.server.update.RetryHelper;
import com.google.gerrit.server.update.RetryHelper.Action;
import com.google.gerrit.server.update.RetryHelper.ActionType;
import com.google.gerrit.server.update.RetryableAction.Action;
import com.google.gerrit.server.update.UpdateException;
import com.google.gerrit.server.util.LabelVote;
import com.google.gerrit.server.util.MagicBranch;
@ -3258,122 +3257,130 @@ class ReceiveCommits {
// TODO(dborowitz): Combine this BatchUpdate with the main one in
// handleRegularCommands
try {
retryHelper.execute(
updateFactory -> {
try (BatchUpdate bu =
updateFactory.create(projectState.getNameKey(), user, TimeUtil.nowTs());
ObjectInserter ins = repo.newObjectInserter();
ObjectReader reader = ins.newReader();
RevWalk rw = new RevWalk(reader)) {
bu.setRepository(repo, rw, ins);
// TODO(dborowitz): Teach BatchUpdate to ignore missing changes.
retryHelper
.changeUpdate(
"autoCloseChanges",
updateFactory -> {
try (BatchUpdate bu =
updateFactory.create(projectState.getNameKey(), user, TimeUtil.nowTs());
ObjectInserter ins = repo.newObjectInserter();
ObjectReader reader = ins.newReader();
RevWalk rw = new RevWalk(reader)) {
bu.setRepository(repo, rw, ins);
// TODO(dborowitz): Teach BatchUpdate to ignore missing changes.
RevCommit newTip = rw.parseCommit(cmd.getNewId());
BranchNameKey branch = BranchNameKey.create(project.getNameKey(), refName);
RevCommit newTip = rw.parseCommit(cmd.getNewId());
BranchNameKey branch = BranchNameKey.create(project.getNameKey(), refName);
rw.reset();
rw.sort(RevSort.REVERSE);
rw.markStart(newTip);
if (!ObjectId.zeroId().equals(cmd.getOldId())) {
rw.markUninteresting(rw.parseCommit(cmd.getOldId()));
}
rw.reset();
rw.sort(RevSort.REVERSE);
rw.markStart(newTip);
if (!ObjectId.zeroId().equals(cmd.getOldId())) {
rw.markUninteresting(rw.parseCommit(cmd.getOldId()));
}
Map<Change.Key, ChangeNotes> byKey = null;
List<ReplaceRequest> replaceAndClose = new ArrayList<>();
Map<Change.Key, ChangeNotes> byKey = null;
List<ReplaceRequest> replaceAndClose = new ArrayList<>();
int existingPatchSets = 0;
int newPatchSets = 0;
SubmissionId submissionId = null;
COMMIT:
for (RevCommit c; (c = rw.next()) != null; ) {
rw.parseBody(c);
int existingPatchSets = 0;
int newPatchSets = 0;
SubmissionId submissionId = null;
COMMIT:
for (RevCommit c; (c = rw.next()) != null; ) {
rw.parseBody(c);
for (Ref ref :
receivePackRefCache.tipsFromObjectId(c.copy(), RefNames.REFS_CHANGES)) {
PatchSet.Id psId = PatchSet.Id.fromRef(ref.getName());
Optional<ChangeNotes> notes = getChangeNotes(psId.changeId());
if (notes.isPresent() && notes.get().getChange().getDest().equals(branch)) {
if (submissionId == null) {
submissionId = new SubmissionId(notes.get().getChange());
for (Ref ref :
receivePackRefCache.tipsFromObjectId(c.copy(), RefNames.REFS_CHANGES)) {
PatchSet.Id psId = PatchSet.Id.fromRef(ref.getName());
Optional<ChangeNotes> notes = getChangeNotes(psId.changeId());
if (notes.isPresent() && notes.get().getChange().getDest().equals(branch)) {
if (submissionId == null) {
submissionId = new SubmissionId(notes.get().getChange());
}
existingPatchSets++;
bu.addOp(
notes.get().getChangeId(), setPrivateOpFactory.create(false, null));
bu.addOp(
psId.changeId(),
mergedByPushOpFactory.create(
requestScopePropagator,
psId,
submissionId,
refName,
newTip.getId().getName()));
continue COMMIT;
}
}
existingPatchSets++;
bu.addOp(notes.get().getChangeId(), setPrivateOpFactory.create(false, null));
for (String changeId : c.getFooterLines(FooterConstants.CHANGE_ID)) {
if (byKey == null) {
byKey =
executeIndexQuery(
"queryOpenChangesByKeyByBranch",
() -> openChangesByKeyByBranch(branch));
}
ChangeNotes onto = byKey.get(Change.key(changeId.trim()));
if (onto != null) {
newPatchSets++;
// Hold onto this until we're done with the walk, as the call to
// req.validate below calls isMergedInto which resets the walk.
ReplaceRequest req =
new ReplaceRequest(onto.getChangeId(), c, cmd, false);
req.notes = onto;
replaceAndClose.add(req);
continue COMMIT;
}
}
}
for (ReplaceRequest req : replaceAndClose) {
Change.Id id = req.notes.getChangeId();
if (!req.validateNewPatchSetForAutoClose()) {
logger.atFine().log("Not closing %s because validation failed", id);
continue;
}
if (submissionId == null) {
submissionId = new SubmissionId(req.notes.getChange());
}
req.addOps(bu, null);
bu.addOp(id, setPrivateOpFactory.create(false, null));
bu.addOp(
psId.changeId(),
mergedByPushOpFactory.create(
requestScopePropagator,
psId,
submissionId,
refName,
newTip.getId().getName()));
continue COMMIT;
}
}
for (String changeId : c.getFooterLines(FooterConstants.CHANGE_ID)) {
if (byKey == null) {
byKey = executeIndexQuery(() -> openChangesByKeyByBranch(branch));
id,
mergedByPushOpFactory
.create(
requestScopePropagator,
req.psId,
submissionId,
refName,
newTip.getId().getName())
.setPatchSetProvider(req.replaceOp::getPatchSet));
bu.addOp(id, new ChangeProgressOp(progress));
ids.add(id);
}
ChangeNotes onto = byKey.get(Change.key(changeId.trim()));
if (onto != null) {
newPatchSets++;
// Hold onto this until we're done with the walk, as the call to
// req.validate below calls isMergedInto which resets the walk.
ReplaceRequest req = new ReplaceRequest(onto.getChangeId(), c, cmd, false);
req.notes = onto;
replaceAndClose.add(req);
continue COMMIT;
}
logger.atFine().log(
"Auto-closing %d changes with existing patch sets and %d with new patch sets",
existingPatchSets, newPatchSets);
bu.execute();
} catch (IOException | StorageException | PermissionBackendException e) {
logger.atSevere().withCause(e).log("Failed to auto-close changes");
return null;
}
}
for (ReplaceRequest req : replaceAndClose) {
Change.Id id = req.notes.getChangeId();
if (!req.validateNewPatchSetForAutoClose()) {
logger.atFine().log("Not closing %s because validation failed", id);
continue;
}
if (submissionId == null) {
submissionId = new SubmissionId(req.notes.getChange());
}
req.addOps(bu, null);
bu.addOp(id, setPrivateOpFactory.create(false, null));
bu.addOp(
id,
mergedByPushOpFactory
.create(
requestScopePropagator,
req.psId,
submissionId,
refName,
newTip.getId().getName())
.setPatchSetProvider(req.replaceOp::getPatchSet));
bu.addOp(id, new ChangeProgressOp(progress));
ids.add(id);
}
// If we are here, we didn't throw UpdateException. Record the result.
// The ordering is indeterminate due to the HashSet; unfortunately, Change.Id
// doesn't
// fit into TreeSet.
ids.stream()
.forEach(id -> resultChangeIds.add(ResultChangeIds.Key.AUTOCLOSED, id));
logger.atFine().log(
"Auto-closing %d changes with existing patch sets and %d with new patch sets",
existingPatchSets, newPatchSets);
bu.execute();
} catch (IOException | StorageException | PermissionBackendException e) {
logger.atSevere().withCause(e).log("Failed to auto-close changes");
return null;
}
// If we are here, we didn't throw UpdateException. Record the result.
// The ordering is indeterminate due to the HashSet; unfortunately, Change.Id doesn't
// fit into TreeSet.
ids.stream().forEach(id -> resultChangeIds.add(ResultChangeIds.Key.AUTOCLOSED, id));
return null;
},
return null;
})
// Use a multiple of the default timeout to account for inner retries that may otherwise
// eat up the whole timeout so that no time is left to retry this outer action.
RetryHelper.options()
.timeout(retryHelper.getDefaultTimeout(ActionType.CHANGE_UPDATE).multipliedBy(5))
.build());
.defaultTimeoutMultiplier(5)
.call();
} catch (RestApiException e) {
logger.atSevere().withCause(e).log("Can't insert patchset");
} catch (UpdateException e) {
@ -3390,10 +3397,12 @@ class ReceiveCommits {
}
}
private <T> T executeIndexQuery(Action<T> action) {
private <T> T executeIndexQuery(String actionName, Action<T> action) {
try (TraceTimer traceTimer = newTimer("executeIndexQuery")) {
return retryHelper.execute(
ActionType.INDEX_QUERY, action, StorageException.class::isInstance);
return retryHelper
.indexQuery(actionName, action)
.retryOn(StorageException.class::isInstance)
.call();
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new StorageException(e);

View File

@ -309,10 +309,10 @@ public class GroupsUpdate {
InternalGroupCreation groupCreation, InternalGroupUpdate groupUpdate)
throws IOException, ConfigInvalidException, DuplicateKeyException {
try {
return retryHelper.execute(
RetryHelper.ActionType.GROUP_UPDATE,
() -> createGroupInNoteDb(groupCreation, groupUpdate),
LockFailureException.class::isInstance);
return retryHelper
.groupUpdate("createGroup", () -> createGroupInNoteDb(groupCreation, groupUpdate))
.retryOn(LockFailureException.class::isInstance)
.call();
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
Throwables.throwIfInstanceOf(e, IOException.class);
@ -349,10 +349,10 @@ public class GroupsUpdate {
AccountGroup.UUID groupUuid, InternalGroupUpdate groupUpdate)
throws IOException, ConfigInvalidException, DuplicateKeyException, NoSuchGroupException {
try {
return retryHelper.execute(
RetryHelper.ActionType.GROUP_UPDATE,
() -> updateGroupInNoteDb(groupUuid, groupUpdate),
LockFailureException.class::isInstance);
return retryHelper
.groupUpdate("updateGroup", () -> updateGroupInNoteDb(groupUuid, groupUpdate))
.retryOn(LockFailureException.class::isInstance)
.call();
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
Throwables.throwIfInstanceOf(e, IOException.class);

View File

@ -156,11 +156,14 @@ public class MailProcessor {
* @param message {@link MailMessage} to process
*/
public void process(MailMessage message) throws RestApiException, UpdateException {
retryHelper.execute(
buf -> {
processImpl(buf, message);
return null;
});
retryHelper
.changeUpdate(
"processCommentsReceivedByEmail",
buf -> {
processImpl(buf, message);
return null;
})
.call();
}
private void processImpl(BatchUpdate.Factory buf, MailMessage message)

View File

@ -52,7 +52,6 @@ import com.google.gerrit.server.query.change.InternalChangeQuery;
import com.google.gerrit.server.query.change.ProjectPredicate;
import com.google.gerrit.server.query.change.RefPredicate;
import com.google.gerrit.server.update.RetryHelper;
import com.google.gerrit.server.update.RetryHelper.ActionType;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
@ -264,16 +263,16 @@ public class ProjectsConsistencyChecker {
try {
List<ChangeData> queryResult =
retryHelper.execute(
ActionType.INDEX_QUERY,
() -> {
// Execute the query.
return changeQueryProvider
.get()
.setRequestedFields(ChangeField.CHANGE, ChangeField.PATCH_SET)
.query(and(basePredicate, or(predicates)));
},
StorageException.class::isInstance);
retryHelper
.indexQuery(
"projectsConsistencyCheckerQueryChanges",
() ->
changeQueryProvider
.get()
.setRequestedFields(ChangeField.CHANGE, ChangeField.PATCH_SET)
.query(and(basePredicate, or(predicates))))
.retryOn(StorageException.class::isInstance)
.call();
// Result for this query that we want to return to the client.
List<ChangeInfo> autoCloseableChangesByBranch = new ArrayList<>();
@ -282,32 +281,35 @@ public class ProjectsConsistencyChecker {
// Skip changes that we have already processed, either by this query or by
// earlier queries.
if (seenChanges.add(autoCloseableChange.getId())) {
retryHelper.execute(
ActionType.CHANGE_UPDATE,
() -> {
// Auto-close by change
if (changeIdToMergedSha1.containsKey(autoCloseableChange.change().getKey())) {
autoCloseableChangesByBranch.add(
changeJson(
fix, changeIdToMergedSha1.get(autoCloseableChange.change().getKey()))
.format(autoCloseableChange));
return null;
}
retryHelper
.changeUpdate(
"projectsConsistencyCheckerAutoCloseChanges",
() -> {
// Auto-close by change
if (changeIdToMergedSha1.containsKey(autoCloseableChange.change().getKey())) {
autoCloseableChangesByBranch.add(
changeJson(
fix,
changeIdToMergedSha1.get(autoCloseableChange.change().getKey()))
.format(autoCloseableChange));
return null;
}
// Auto-close by commit
for (ObjectId patchSetSha1 :
autoCloseableChange.patchSets().stream()
.map(PatchSet::commitId)
.collect(toSet())) {
if (mergedSha1s.contains(patchSetSha1)) {
autoCloseableChangesByBranch.add(
changeJson(fix, patchSetSha1).format(autoCloseableChange));
break;
}
}
return null;
},
StorageException.class::isInstance);
// Auto-close by commit
for (ObjectId patchSetSha1 :
autoCloseableChange.patchSets().stream()
.map(PatchSet::commitId)
.collect(toSet())) {
if (mergedSha1s.contains(patchSetSha1)) {
autoCloseableChangesByBranch.add(
changeJson(fix, patchSetSha1).format(autoCloseableChange));
break;
}
}
return null;
})
.retryOn(StorageException.class::isInstance)
.call();
}
}

View File

@ -38,8 +38,7 @@ import com.google.gerrit.server.query.change.CommitPredicate;
import com.google.gerrit.server.query.change.InternalChangeQuery;
import com.google.gerrit.server.query.change.ProjectPredicate;
import com.google.gerrit.server.update.RetryHelper;
import com.google.gerrit.server.update.RetryHelper.Action;
import com.google.gerrit.server.update.RetryHelper.ActionType;
import com.google.gerrit.server.update.RetryableAction.Action;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
@ -132,6 +131,7 @@ public class CommitsCollection implements ChildCollection<ProjectResource, Commi
// cheaper than ref visibility filtering and reachability computation.
List<ChangeData> changes =
executeIndexQuery(
"queryChangesByProjectCommitWithLimit1",
() ->
queryProvider
.get()
@ -151,7 +151,10 @@ public class CommitsCollection implements ChildCollection<ProjectResource, Commi
Arrays.stream(commit.getParents())
.map(parent -> new CommitPredicate(parent.getId().getName()))
.collect(toImmutableList())));
changes = executeIndexQuery(() -> queryProvider.get().enforceVisibility(true).query(pred));
changes =
executeIndexQuery(
"queryChangesByProjectCommit",
() -> queryProvider.get().enforceVisibility(true).query(pred));
Set<Ref> branchesForCommitParents = new HashSet<>(changes.size());
for (ChangeData cd : changes) {
@ -175,10 +178,12 @@ public class CommitsCollection implements ChildCollection<ProjectResource, Commi
return reachable.fromRefs(project, repo, commit, refs);
}
private <T> T executeIndexQuery(Action<T> action) {
private <T> T executeIndexQuery(String actionName, Action<T> action) {
try {
return retryHelper.execute(
ActionType.INDEX_QUERY, action, StorageException.class::isInstance);
return retryHelper
.indexQuery(actionName, action)
.retryOn(StorageException.class::isInstance)
.call();
} catch (Exception e) {
Throwables.throwIfUnchecked(e);
throw new StorageException(e);

View File

@ -77,7 +77,6 @@ import com.google.gerrit.server.update.BatchUpdate;
import com.google.gerrit.server.update.BatchUpdateOp;
import com.google.gerrit.server.update.ChangeContext;
import com.google.gerrit.server.update.RetryHelper;
import com.google.gerrit.server.update.RetryHelper.ActionType;
import com.google.gerrit.server.update.UpdateException;
import com.google.gerrit.server.util.time.TimeUtil;
import com.google.inject.Inject;
@ -483,41 +482,39 @@ public class MergeOp implements AutoCloseable {
}
RetryTracker retryTracker = new RetryTracker();
retryHelper.execute(
updateFactory -> {
long attempt = retryTracker.lastAttemptNumber + 1;
boolean isRetry = attempt > 1;
if (isRetry) {
logger.atFine().log("Retrying, attempt #%d; skipping merged changes", attempt);
this.ts = TimeUtil.nowTs();
openRepoManager();
}
this.commitStatus = new CommitStatus(cs, isRetry);
if (checkSubmitRules) {
logger.atFine().log("Checking submit rules and state");
checkSubmitRulesAndState(cs, isRetry);
} else {
logger.atFine().log("Bypassing submit rules");
bypassSubmitRules(cs, isRetry);
}
try {
integrateIntoHistory(cs);
} catch (IntegrationException e) {
logger.atWarning().withCause(e).log("Error from integrateIntoHistory");
throw new ResourceConflictException(e.getMessage(), e);
}
return null;
},
RetryHelper.options()
.listener(retryTracker)
// Up to the entire submit operation is retried, including possibly many projects.
// Multiply the timeout by the number of projects we're actually attempting to
// submit.
.timeout(
retryHelper
.getDefaultTimeout(ActionType.CHANGE_UPDATE)
.multipliedBy(cs.projects().size()))
.build());
retryHelper
.changeUpdate(
"integrateIntoHistory",
updateFactory -> {
long attempt = retryTracker.lastAttemptNumber + 1;
boolean isRetry = attempt > 1;
if (isRetry) {
logger.atFine().log("Retrying, attempt #%d; skipping merged changes", attempt);
this.ts = TimeUtil.nowTs();
openRepoManager();
}
this.commitStatus = new CommitStatus(cs, isRetry);
if (checkSubmitRules) {
logger.atFine().log("Checking submit rules and state");
checkSubmitRulesAndState(cs, isRetry);
} else {
logger.atFine().log("Bypassing submit rules");
bypassSubmitRules(cs, isRetry);
}
try {
integrateIntoHistory(cs);
} catch (IntegrationException e) {
logger.atWarning().withCause(e).log("Error from integrateIntoHistory");
throw new ResourceConflictException(e.getMessage(), e);
}
return null;
})
.listener(retryTracker)
// Up to the entire submit operation is retried, including possibly many projects.
// Multiply the timeout by the number of projects we're actually attempting to
// submit.
.defaultTimeoutMultiplier(cs.projects().size())
.call();
if (projects > 1) {
topicMetrics.topicSubmissionsCompleted.increment();

View File

@ -28,12 +28,10 @@ import com.github.rholder.retry.WaitStrategies;
import com.github.rholder.retry.WaitStrategy;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.common.Nullable;
import com.google.gerrit.exceptions.StorageException;
import com.google.gerrit.extensions.restapi.RestApiException;
import com.google.gerrit.metrics.Counter3;
import com.google.gerrit.metrics.Description;
import com.google.gerrit.metrics.Field;
@ -44,6 +42,9 @@ import com.google.gerrit.server.logging.Metadata;
import com.google.gerrit.server.logging.RequestId;
import com.google.gerrit.server.logging.TraceContext;
import com.google.gerrit.server.plugincontext.PluginSetContext;
import com.google.gerrit.server.update.RetryableAction.Action;
import com.google.gerrit.server.update.RetryableAction.ActionType;
import com.google.gerrit.server.update.RetryableChangeAction.ChangeAction;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.time.Duration;
@ -59,26 +60,6 @@ import org.eclipse.jgit.lib.Config;
public class RetryHelper {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@FunctionalInterface
public interface ChangeAction<T> {
T call(BatchUpdate.Factory batchUpdateFactory) throws Exception;
}
@FunctionalInterface
public interface Action<T> {
T call() throws Exception;
}
public enum ActionType {
ACCOUNT_UPDATE,
CHANGE_UPDATE,
GROUP_UPDATE,
INDEX_QUERY,
PLUGIN_UPDATE,
REST_READ_REQUEST,
REST_WRITE_REQUEST,
}
/**
* Options for retrying a single operation.
*
@ -194,10 +175,6 @@ public class RetryHelper {
return new AutoValue_RetryHelper_Options.Builder();
}
private static Options defaults() {
return options().build();
}
private final Metrics metrics;
private final BatchUpdate.Factory updateFactory;
private final PluginSetContext<ExceptionHook> exceptionHooks;
@ -253,48 +230,100 @@ public class RetryHelper {
this.retryWithTraceOnFailure = cfg.getBoolean("retry", "retryWithTraceOnFailure", false);
}
public Duration getDefaultTimeout(ActionType actionType) {
/**
* Creates an action that is executed with retrying when called.
*
* @param actionType the type of the action, used as metric bucket
* @param actionName the name of the action, used as metric bucket
* @param action the action that should be executed
* @return the retryable action, callers need to call {@link RetryableAction#call()} to execute
* the action
*/
public <T> RetryableAction<T> action(ActionType actionType, String actionName, Action<T> action) {
return new RetryableAction<>(this, actionType, actionName, action);
}
/**
* Creates an action for updating an account that is executed with retrying when called.
*
* @param actionName the name of the action, used as metric bucket
* @param action the action that should be executed
* @return the retryable action, callers need to call {@link RetryableAction#call()} to execute
* the action
*/
public <T> RetryableAction<T> accountUpdate(String actionName, Action<T> action) {
return new RetryableAction<>(this, ActionType.ACCOUNT_UPDATE, actionName, action);
}
/**
* Creates an action for updating a change that is executed with retrying when called.
*
* @param actionName the name of the action, used as metric bucket
* @param action the action that should be executed
* @return the retryable action, callers need to call {@link RetryableAction#call()} to execute
* the action
*/
public <T> RetryableAction<T> changeUpdate(String actionName, Action<T> action) {
return new RetryableAction<>(this, ActionType.CHANGE_UPDATE, actionName, action);
}
/**
* Creates an action for updating a change that is executed with retrying when called.
*
* <p>The change action gets a {@link BatchUpdate.Factory} provided that can be used to update the
* change.
*
* @param actionName the name of the action, used as metric bucket
* @param changeAction the action that should be executed
* @return the retryable action, callers need to call {@link RetryableChangeAction#call()} to
* execute the action
*/
public <T> RetryableChangeAction<T> changeUpdate(
String actionName, ChangeAction<T> changeAction) {
return new RetryableChangeAction<>(this, updateFactory, actionName, changeAction);
}
/**
* Creates an action for updating a group that is executed with retrying when called.
*
* @param actionName the name of the action, used as metric bucket
* @param action the action that should be executed
* @return the retryable action, callers need to call {@link RetryableAction#call()} to execute
* the action
*/
public <T> RetryableAction<T> groupUpdate(String actionName, Action<T> action) {
return new RetryableAction<>(this, ActionType.GROUP_UPDATE, actionName, action);
}
/**
* Creates an action for updating of plugin-specific data that is executed with retrying when
* called.
*
* @param actionName the name of the action, used as metric bucket
* @param action the action that should be executed
* @return the retryable action, callers need to call {@link RetryableAction#call()} to execute
* the action
*/
public <T> RetryableAction<T> pluginUpdate(String actionName, Action<T> action) {
return new RetryableAction<>(this, ActionType.PLUGIN_UPDATE, actionName, action);
}
/**
* Creates an action for querying an index that is executed with retrying when called.
*
* @param actionName the name of the action, used as metric bucket
* @param action the action that should be executed
* @return the retryable action, callers need to call {@link RetryableAction#call()} to execute
* the action
*/
public <T> RetryableAction<T> indexQuery(String actionName, Action<T> action) {
return new RetryableAction<>(this, ActionType.INDEX_QUERY, actionName, action);
}
Duration getDefaultTimeout(ActionType actionType) {
return defaultTimeouts.get(actionType);
}
public <T> T execute(
ActionType actionType, Action<T> action, Predicate<Throwable> exceptionPredicate)
throws Exception {
return execute(actionType, action, defaults(), exceptionPredicate);
}
public <T> T execute(
ActionType actionType,
Action<T> action,
Options opts,
Predicate<Throwable> exceptionPredicate)
throws Exception {
try {
return executeWithAttemptAndTimeoutCount(actionType, action, opts, exceptionPredicate);
} catch (Throwable t) {
Throwables.throwIfUnchecked(t);
Throwables.throwIfInstanceOf(t, Exception.class);
throw new IllegalStateException(t);
}
}
public <T> T execute(ChangeAction<T> changeAction) throws RestApiException, UpdateException {
return execute(changeAction, defaults());
}
public <T> T execute(ChangeAction<T> changeAction, Options opts)
throws RestApiException, UpdateException {
try {
return execute(
ActionType.CHANGE_UPDATE, () -> changeAction.call(updateFactory), opts, t -> false);
} catch (Throwable t) {
Throwables.throwIfUnchecked(t);
Throwables.throwIfInstanceOf(t, UpdateException.class);
Throwables.throwIfInstanceOf(t, RestApiException.class);
throw new UpdateException(t);
}
}
/**
* Executes an action and records the number of attempts and the timeout as metrics.
*
@ -306,7 +335,7 @@ public class RetryHelper {
* @throws Throwable any error or exception that made the action fail, callers are expected to
* catch and inspect this Throwable to decide carefully whether it should be re-thrown
*/
private <T> T executeWithAttemptAndTimeoutCount(
<T> T execute(
ActionType actionType,
Action<T> action,
Options opts,

View File

@ -0,0 +1,168 @@
// Copyright (C) 2019 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.update;
import static java.util.Objects.requireNonNull;
import com.github.rholder.retry.RetryListener;
import com.google.common.base.Throwables;
import com.google.gerrit.server.ExceptionHook;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Predicate;
/**
* An action that is executed with retrying.
*
* <p>Instances of this class are created via {@link RetryHelper} (see {@link
* RetryHelper#action(ActionType, String, Action)}, {@link RetryHelper#accountUpdate(String,
* Action)}, {@link RetryHelper#changeUpdate(String, Action)}, {@link
* RetryHelper#groupUpdate(String, Action)}, {@link RetryHelper#pluginUpdate(String, Action)},
* {@link RetryHelper#indexQuery(String, Action)}).
*
* <p>Which exceptions cause a retry is controlled by {@link ExceptionHook#shouldRetry(Throwable)}.
* In addition callers can specify additional exception that should cause a retry via {@link
* #retryOn(Predicate)}.
*/
public class RetryableAction<T> {
public enum ActionType {
ACCOUNT_UPDATE,
CHANGE_UPDATE,
GROUP_UPDATE,
INDEX_QUERY,
PLUGIN_UPDATE,
REST_READ_REQUEST,
REST_WRITE_REQUEST,
}
@FunctionalInterface
public interface Action<T> {
T call() throws Exception;
}
private final RetryHelper retryHelper;
private final ActionType actionType;
private final Action<T> action;
private final RetryHelper.Options.Builder options = RetryHelper.options();
private final List<Predicate<Throwable>> exceptionPredicates = new ArrayList<>();
RetryableAction(
RetryHelper retryHelper, ActionType actionType, String actionName, Action<T> action) {
this.retryHelper = requireNonNull(retryHelper, "retryHelper");
this.actionType = requireNonNull(actionType, "actionType");
this.action = requireNonNull(action, "action");
options.caller(requireNonNull(actionName, "actionName"));
}
/**
* Adds an additional condition that should trigger retries.
*
* <p>For some exceptions retrying is enabled globally (see {@link
* ExceptionHook#shouldRetry(Throwable)}). Conditions for those exceptions do not need to be
* specified here again.
*
* <p>This method can be invoked multiple times to add further conditions that should trigger
* retries.
*
* @param exceptionPredicate predicate that decides if the action should be retried for a given
* exception
* @return this instance to enable chaining of calls
*/
public RetryableAction<T> retryOn(Predicate<Throwable> exceptionPredicate) {
exceptionPredicates.add(exceptionPredicate);
return this;
}
/**
* Sets a condition that should trigger auto-retry with tracing.
*
* <p>This condition is only relevant if an exception occurs that doesn't trigger (normal) retry.
*
* <p>Auto-retry with tracing automatically captures traces for unexpected exceptions so that they
* can be investigated.
*
* <p>Every call of this method overwrites any previously set condition for auto-retry with
* tracing.
*
* @param exceptionPredicate predicate that decides if the action should be retried with tracing
* for a given exception
* @return this instance to enable chaining of calls
*/
public RetryableAction<T> retryWithTrace(Predicate<Throwable> exceptionPredicate) {
options.retryWithTrace(exceptionPredicate);
return this;
}
/**
* Sets a callback that is invoked when auto-retry with tracing is triggered.
*
* <p>Via the callback callers can find out with trace ID was used for the retry.
*
* <p>Every call of this method overwrites any previously set trace ID consumer.
*
* @param traceIdConsumer trace ID consumer
* @return this instance to enable chaining of calls
*/
public RetryableAction<T> onAutoTrace(Consumer<String> traceIdConsumer) {
options.onAutoTrace(traceIdConsumer);
return this;
}
/**
* Sets a listener that is invoked when the action is retried.
*
* <p>Every call of this method overwrites any previously set listener.
*
* @param retryListener retry listener
* @return this instance to enable chaining of calls
*/
public RetryableAction<T> listener(RetryListener retryListener) {
options.listener(retryListener);
return this;
}
/**
* Increases the default timeout by the given multiplier.
*
* <p>Every call of this method overwrites any previously set timeout.
*
* @param multiplier multiplier for the default timeout
* @return this instance to enable chaining of calls
*/
public RetryableAction<T> defaultTimeoutMultiplier(int multiplier) {
options.timeout(retryHelper.getDefaultTimeout(actionType).multipliedBy(multiplier));
return this;
}
/**
* Executes this action with retry.
*
* @return the result of the action
*/
public T call() throws Exception {
try {
return retryHelper.execute(
actionType,
action,
options.build(),
t -> exceptionPredicates.stream().anyMatch(p -> p.test(t)));
} catch (Throwable t) {
Throwables.throwIfUnchecked(t);
Throwables.throwIfInstanceOf(t, Exception.class);
throw new IllegalStateException(t);
}
}
}

View File

@ -0,0 +1,92 @@
// Copyright (C) 2019 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.update;
import com.github.rholder.retry.RetryListener;
import com.google.common.base.Throwables;
import com.google.gerrit.extensions.restapi.RestApiException;
import java.util.function.Consumer;
import java.util.function.Predicate;
/**
* A change action that is executed with retrying.
*
* <p>Instances of this class are created via {@link RetryHelper#changeUpdate(String,
* ChangeAction)}.
*
* <p>In contrast to normal {@link RetryableAction.Action}s that are called via {@link
* RetryableAction} {@link ChangeAction}s get a {@link BatchUpdate.Factory} provided.
*
* <p>In addition when a change action is called any exception that is not an unchecked exception
* and neither {@link UpdateException} nor {@link RestApiException} get wrapped into an {@link
* UpdateException}.
*/
public class RetryableChangeAction<T> extends RetryableAction<T> {
@FunctionalInterface
public interface ChangeAction<T> {
T call(BatchUpdate.Factory batchUpdateFactory) throws Exception;
}
RetryableChangeAction(
RetryHelper retryHelper,
BatchUpdate.Factory updateFactory,
String actionName,
ChangeAction<T> changeAction) {
super(
retryHelper, ActionType.CHANGE_UPDATE, actionName, () -> changeAction.call(updateFactory));
}
@Override
public RetryableChangeAction<T> retryOn(Predicate<Throwable> exceptionPredicate) {
super.retryOn(exceptionPredicate);
return this;
}
@Override
public RetryableChangeAction<T> retryWithTrace(Predicate<Throwable> exceptionPredicate) {
super.retryWithTrace(exceptionPredicate);
return this;
}
@Override
public RetryableChangeAction<T> onAutoTrace(Consumer<String> traceIdConsumer) {
super.onAutoTrace(traceIdConsumer);
return this;
}
@Override
public RetryableChangeAction<T> listener(RetryListener retryListener) {
super.listener(retryListener);
return this;
}
@Override
public RetryableChangeAction<T> defaultTimeoutMultiplier(int multiplier) {
super.defaultTimeoutMultiplier(multiplier);
return this;
}
@Override
public T call() throws UpdateException, RestApiException {
try {
return super.call();
} catch (Throwable t) {
Throwables.throwIfUnchecked(t);
Throwables.throwIfInstanceOf(t, UpdateException.class);
Throwables.throwIfInstanceOf(t, RestApiException.class);
throw new UpdateException(t);
}
}
}

View File

@ -154,16 +154,20 @@ public class NoteDbOnlyIT extends AbstractDaemonTest {
AtomicInteger afterUpdateReposCalledCount = new AtomicInteger();
String result =
retryHelper.execute(
batchUpdateFactory -> {
try (BatchUpdate bu = newBatchUpdate(batchUpdateFactory)) {
bu.addOp(
id,
new UpdateRefAndAddMessageOp(updateRepoCalledCount, updateChangeCalledCount));
bu.execute(new ConcurrentWritingListener(afterUpdateReposCalledCount));
}
return "Done";
});
retryHelper
.changeUpdate(
"testUpdateRefAndAddMessageOp",
batchUpdateFactory -> {
try (BatchUpdate bu = newBatchUpdate(batchUpdateFactory)) {
bu.addOp(
id,
new UpdateRefAndAddMessageOp(
updateRepoCalledCount, updateChangeCalledCount));
bu.execute(new ConcurrentWritingListener(afterUpdateReposCalledCount));
}
return "Done";
})
.call();
assertThat(result).isEqualTo("Done");
assertThat(updateRepoCalledCount.get()).isEqualTo(2);

@ -1 +1 @@
Subproject commit f1a36220e0ef31fb024de9ad589dfdfdf301c295
Subproject commit eac4cd97cb5818ff471c64914fb4e342baf28c05