Asynchronously send email.

On gerrit-review.googlesource.com, an email is taking about 3
seconds to create and send. The CommentSender is complex and
difficult to optimize, so I wrapped the email generation/sending
in a request scoped runnable and submitted it to the default
WorkQueue for PublishComments, MergeOp, and ReceiveCommits.

Change-Id: I471cd6c45af9ec851654b2b23c385081afe3de04
This commit is contained in:
Colby Ranger
2012-03-08 11:30:54 -08:00
parent 3e7ef3cc5f
commit c26eafc643
3 changed files with 216 additions and 82 deletions

View File

@@ -46,6 +46,7 @@ import com.google.gerrit.server.project.ChangeControl;
import com.google.gerrit.server.project.NoSuchChangeException;
import com.google.gerrit.server.project.ProjectCache;
import com.google.gerrit.server.project.ProjectState;
import com.google.gerrit.server.util.RequestScopePropagator;
import com.google.gerrit.server.workflow.CategoryFunction;
import com.google.gerrit.server.workflow.FunctionState;
import com.google.gwtorm.server.AtomicUpdate;
@@ -164,6 +165,8 @@ public class MergeOp {
private final TagCache tagCache;
private final CreateCodeReviewNotes.Factory codeReviewNotesFactory;
private final SubmoduleOp.Factory subOpFactory;
private final WorkQueue workQueue;
private final RequestScopePropagator requestScopePropagator;
@Inject
MergeOp(final GitRepositoryManager grm, final SchemaFactory<ReviewDb> sf,
@@ -178,7 +181,9 @@ public class MergeOp {
final MergeQueue mergeQueue, @Assisted final Branch.NameKey branch,
final ChangeHooks hooks, final AccountCache accountCache,
final TagCache tagCache, final CreateCodeReviewNotes.Factory crnf,
final SubmoduleOp.Factory subOpFactory) {
final SubmoduleOp.Factory subOpFactory,
final WorkQueue workQueue,
final RequestScopePropagator requestScopePropagator) {
repoManager = grm;
schemaFactory = sf;
functionState = fs;
@@ -197,7 +202,8 @@ public class MergeOp {
this.tagCache = tagCache;
codeReviewNotesFactory = crnf;
this.subOpFactory = subOpFactory;
this.workQueue = workQueue;
this.requestScopePropagator = requestScopePropagator;
this.myIdent = myIdent;
destBranch = branch;
toMerge = new ArrayList<CodeReviewCommit>();
@@ -603,7 +609,7 @@ public class MergeOp {
final List<CodeReviewCommit> codeReviewCommits) {
PatchSetApproval submitter = null;
for (final CodeReviewCommit c : codeReviewCommits) {
PatchSetApproval s = getSubmitter(c.patchsetId);
PatchSetApproval s = getSubmitter(db, c.patchsetId);
if (submitter == null
|| (s != null && s.getGranted().compareTo(submitter.getGranted()) > 0)) {
submitter = s;
@@ -663,7 +669,7 @@ public class MergeOp {
if (c.patchsetId != null) {
c.statusCode = CommitMergeStatus.CLEAN_MERGE;
if (branchUpdate.getRefLogIdent() == null) {
setRefLogIdent(getSubmitter(c.patchsetId));
setRefLogIdent(getSubmitter(db, c.patchsetId));
}
}
}
@@ -1025,7 +1031,7 @@ public class MergeOp {
.getName());
Account account = null;
final PatchSetApproval submitter = getSubmitter(mergeTip.patchsetId);
final PatchSetApproval submitter = getSubmitter(db, mergeTip.patchsetId);
if (submitter != null) {
account = accountCache.get(submitter.getAccountId()).getAccount();
}
@@ -1285,14 +1291,15 @@ public class MergeOp {
return m;
}
private PatchSetApproval getSubmitter(PatchSet.Id c) {
private static PatchSetApproval getSubmitter(ReviewDb reviewDb,
PatchSet.Id c) {
if (c == null) {
return null;
}
PatchSetApproval submitter = null;
try {
final List<PatchSetApproval> approvals =
db.patchSetApprovals().byPatchSet(c).toList();
reviewDb.patchSetApprovals().byPatchSet(c).toList();
for (PatchSetApproval a : approvals) {
if (a.getValue() > 0
&& ApprovalCategory.SUBMIT.equals(a.getCategoryId())) {
@@ -1307,7 +1314,7 @@ public class MergeOp {
return submitter;
}
private void setMerged(Change c, ChangeMessage msg) {
private void setMerged(final Change c, final ChangeMessage msg) {
final Change.Id changeId = c.getId();
// We must pull the patchset out of commits, because the patchset ID is
// modified when using the cherry-pick merge strategy.
@@ -1390,18 +1397,42 @@ public class MergeOp {
}
}
try {
final MergedSender cm = mergedSenderFactory.create(c);
if (submitter != null) {
cm.setFrom(submitter.getAccountId());
final PatchSetApproval from = submitter;
workQueue.getDefaultQueue()
.submit(requestScopePropagator.wrap(new Runnable() {
@Override
public void run() {
PatchSet patchSet;
try {
ReviewDb reviewDb = schemaFactory.open();
try {
patchSet = reviewDb.patchSets().get(c.currentPatchSetId());
} finally {
reviewDb.close();
}
} catch (OrmException e) {
log.error("Cannot send email for submitted patch set " + c.getId(), e);
return;
}
try {
final MergedSender cm = mergedSenderFactory.create(c);
if (from != null) {
cm.setFrom(from.getAccountId());
}
cm.setPatchSet(patchSet);
cm.send();
} catch (EmailException e) {
log.error("Cannot send email for submitted patch set " + c.getId(), e);
}
}
cm.setPatchSet(db.patchSets().get(c.currentPatchSetId()));
cm.send();
} catch (OrmException e) {
log.error("Cannot send email for submitted patch set " + c.getId(), e);
} catch (EmailException e) {
log.error("Cannot send email for submitted patch set " + c.getId(), e);
}
@Override
public String toString() {
return "send-email merged";
}
}));
try {
hooks.doChangeMergedHook(c, //
@@ -1416,7 +1447,8 @@ public class MergeOp {
sendMergeFail(c, msg, true);
}
private void sendMergeFail(Change c, ChangeMessage msg, final boolean makeNew) {
private void sendMergeFail(final Change c, final ChangeMessage msg,
final boolean makeNew) {
try {
db.changeMessages().insert(Collections.singleton(msg));
} catch (OrmException err) {
@@ -1447,19 +1479,42 @@ public class MergeOp {
}
}
try {
final MergeFailSender cm = mergeFailSenderFactory.create(c);
final PatchSetApproval submitter = getSubmitter(c.currentPatchSetId());
if (submitter != null) {
cm.setFrom(submitter.getAccountId());
workQueue.getDefaultQueue()
.submit(requestScopePropagator.wrap(new Runnable() {
@Override
public void run() {
PatchSet patchSet;
PatchSetApproval submitter;
try {
ReviewDb reviewDb = schemaFactory.open();
try {
patchSet = reviewDb.patchSets().get(c.currentPatchSetId());
submitter = getSubmitter(reviewDb, c.currentPatchSetId());
} finally {
reviewDb.close();
}
} catch (OrmException e) {
log.error("Cannot send email notifications about merge failure", e);
return;
}
try {
final MergeFailSender cm = mergeFailSenderFactory.create(c);
if (submitter != null) {
cm.setFrom(submitter.getAccountId());
}
cm.setPatchSet(patchSet);
cm.setChangeMessage(msg);
cm.send();
} catch (EmailException e) {
log.error("Cannot send email notifications about merge failure", e);
}
}
cm.setPatchSet(db.patchSets().get(c.currentPatchSetId()));
cm.setChangeMessage(msg);
cm.send();
} catch (OrmException e) {
log.error("Cannot send email notifications about merge failure", e);
} catch (EmailException e) {
log.error("Cannot send email notifications about merge failure", e);
}
@Override
public String toString() {
return "send-email merge-failed";
}
}));
}
}

View File

@@ -51,6 +51,7 @@ import com.google.gerrit.server.project.ProjectControl;
import com.google.gerrit.server.project.ProjectState;
import com.google.gerrit.server.project.RefControl;
import com.google.gerrit.server.util.MagicBranch;
import com.google.gerrit.server.util.RequestScopePropagator;
import com.google.gwtorm.server.AtomicUpdate;
import com.google.gwtorm.server.OrmException;
import com.google.inject.Inject;
@@ -132,13 +133,14 @@ public class ReceiveCommits implements PreReceiveHook, PostReceiveHook {
private final PersonIdent gerritIdent;
private final TrackingFooters trackingFooters;
private final TagCache tagCache;
private final WorkQueue workQueue;
private final RequestScopePropagator requestScopePropagator;
private final ProjectControl projectControl;
private final Project project;
private final Repository repo;
private final ReceivePack rp;
private final NoteMap rejectCommits;
private ReceiveCommand newChange;
private Branch.NameKey destBranch;
private RefControl destBranchCtl;
@@ -171,6 +173,8 @@ public class ReceiveCommits implements PreReceiveHook, PostReceiveHook {
@CanonicalWebUrl @Nullable final String canonicalWebUrl,
@GerritPersonIdent final PersonIdent gerritIdent,
final TrackingFooters trackingFooters,
final WorkQueue workQueue,
final RequestScopePropagator requestScopePropagator,
@Assisted final ProjectControl projectControl,
@Assisted final Repository repo,
@@ -191,6 +195,8 @@ public class ReceiveCommits implements PreReceiveHook, PostReceiveHook {
this.gerritIdent = gerritIdent;
this.trackingFooters = trackingFooters;
this.tagCache = tagCache;
this.workQueue = workQueue;
this.requestScopePropagator = requestScopePropagator;
this.projectControl = projectControl;
this.project = projectControl.getProject();
@@ -998,17 +1004,28 @@ public class ReceiveCommits implements PreReceiveHook, PostReceiveHook {
allNewChanges.add(change);
try {
final CreateChangeSender cm;
cm = createChangeSenderFactory.create(change);
cm.setFrom(me);
cm.setPatchSet(ps, info);
cm.addReviewers(reviewers);
cm.addExtraCC(cc);
cm.send();
} catch (EmailException e) {
log.error("Cannot send email for new change " + change.getId(), e);
}
workQueue.getDefaultQueue()
.submit(requestScopePropagator.wrap(new Runnable() {
@Override
public void run() {
try {
final CreateChangeSender cm;
cm = createChangeSenderFactory.create(change);
cm.setFrom(me);
cm.setPatchSet(ps, info);
cm.addReviewers(reviewers);
cm.addExtraCC(cc);
cm.send();
} catch (EmailException e) {
log.error("Cannot send email for new change " + change.getId(), e);
}
}
@Override
public String toString() {
return "send-email newchange";
}
}));
hooks.doPatchsetCreatedHook(change, ps, db);
}
@@ -1334,20 +1351,31 @@ public class ReceiveCommits implements PreReceiveHook, PostReceiveHook {
hooks.doPatchsetCreatedHook(result.change, ps, db);
request.cmd.setResult(ReceiveCommand.Result.OK);
try {
final ReplacePatchSetSender cm;
cm = replacePatchSetFactory.create(result.change);
cm.setFrom(me);
cm.setPatchSet(ps, result.info);
cm.setChangeMessage(result.msg);
cm.addReviewers(reviewers);
cm.addExtraCC(cc);
cm.addReviewers(oldReviewers);
cm.addExtraCC(oldCC);
cm.send();
} catch (EmailException e) {
log.error("Cannot send email for new patch set " + ps.getId(), e);
}
workQueue.getDefaultQueue()
.submit(requestScopePropagator.wrap(new Runnable() {
@Override
public void run() {
try {
final ReplacePatchSetSender cm;
cm = replacePatchSetFactory.create(result.change);
cm.setFrom(me);
cm.setPatchSet(ps, result.info);
cm.setChangeMessage(result.msg);
cm.addReviewers(reviewers);
cm.addExtraCC(cc);
cm.addReviewers(oldReviewers);
cm.addExtraCC(oldCC);
cm.send();
} catch (EmailException e) {
log.error("Cannot send email for new patch set " + ps.getId(), e);
}
}
@Override
public String toString() {
return "send-email newpatchset";
}
}));
sendMergedEmail(result);
return result != null ? result.info.getKey() : null;
@@ -1867,15 +1895,26 @@ public class ReceiveCommits implements PreReceiveHook, PostReceiveHook {
private void sendMergedEmail(final ReplaceResult result) {
if (result != null && result.mergedIntoRef != null) {
try {
final MergedSender cm = mergedSenderFactory.create(result.change);
cm.setFrom(currentUser.getAccountId());
cm.setPatchSet(result.patchSet, result.info);
cm.send();
} catch (EmailException e) {
final PatchSet.Id psi = result.patchSet.getId();
log.error("Cannot send email for submitted patch set " + psi, e);
}
workQueue.getDefaultQueue()
.submit(requestScopePropagator.wrap(new Runnable() {
@Override
public void run() {
try {
final MergedSender cm = mergedSenderFactory.create(result.change);
cm.setFrom(currentUser.getAccountId());
cm.setPatchSet(result.patchSet, result.info);
cm.send();
} catch (EmailException e) {
final PatchSet.Id psi = result.patchSet.getId();
log.error("Cannot send email for submitted patch set " + psi, e);
}
}
@Override
public String toString() {
return "send-email merged";
}
}));
try {
hooks.doChangeMergedHook(result.change, currentUser.getAccount(),

View File

@@ -24,17 +24,21 @@ import com.google.gerrit.reviewdb.client.ChangeMessage;
import com.google.gerrit.reviewdb.client.PatchLineComment;
import com.google.gerrit.reviewdb.client.PatchSet;
import com.google.gerrit.reviewdb.client.PatchSetApproval;
import com.google.gerrit.reviewdb.client.PatchSetInfo;
import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.ChangeUtil;
import com.google.gerrit.server.IdentifiedUser;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.mail.CommentSender;
import com.google.gerrit.server.mail.EmailException;
import com.google.gerrit.server.project.ChangeControl;
import com.google.gerrit.server.project.InvalidChangeOperationException;
import com.google.gerrit.server.project.NoSuchChangeException;
import com.google.gerrit.server.util.RequestScopePropagator;
import com.google.gerrit.server.workflow.FunctionState;
import com.google.gwtjsonrpc.client.VoidResult;
import com.google.gwtorm.server.OrmException;
import com.google.gwtorm.server.SchemaFactory;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
@@ -60,6 +64,7 @@ public class PublishComments implements Callable<VoidResult> {
Set<ApprovalCategoryValue.Id> approvals, boolean forceMessage);
}
private final SchemaFactory<ReviewDb> schemaFactory;
private final ReviewDb db;
private final IdentifiedUser user;
private final ApprovalTypes types;
@@ -68,6 +73,8 @@ public class PublishComments implements Callable<VoidResult> {
private final ChangeControl.Factory changeControlFactory;
private final FunctionState.Factory functionStateFactory;
private final ChangeHooks hooks;
private final WorkQueue workQueue;
private final RequestScopePropagator requestScopePropagator;
private final PatchSet.Id patchSetId;
private final String messageText;
@@ -80,18 +87,22 @@ public class PublishComments implements Callable<VoidResult> {
private List<PatchLineComment> drafts;
@Inject
PublishComments(final ReviewDb db, final IdentifiedUser user,
PublishComments(final SchemaFactory<ReviewDb> sf, final ReviewDb db,
final IdentifiedUser user,
final ApprovalTypes approvalTypes,
final CommentSender.Factory commentSenderFactory,
final PatchSetInfoFactory patchSetInfoFactory,
final ChangeControl.Factory changeControlFactory,
final FunctionState.Factory functionStateFactory,
final ChangeHooks hooks,
final WorkQueue workQueue,
final RequestScopePropagator requestScopePropagator,
@Assisted final PatchSet.Id patchSetId,
@Assisted final String messageText,
@Assisted final Set<ApprovalCategoryValue.Id> approvals,
@Assisted final boolean forceMessage) {
this.schemaFactory = sf;
this.db = db;
this.user = user;
this.types = approvalTypes;
@@ -100,6 +111,8 @@ public class PublishComments implements Callable<VoidResult> {
this.changeControlFactory = changeControlFactory;
this.functionStateFactory = functionStateFactory;
this.hooks = hooks;
this.workQueue = workQueue;
this.requestScopePropagator = requestScopePropagator;
this.patchSetId = patchSetId;
this.messageText = messageText;
@@ -294,20 +307,47 @@ public class PublishComments implements Callable<VoidResult> {
}
private void email() {
try {
if (message != null) {
final CommentSender cm = commentSenderFactory.create(change);
cm.setFrom(user.getAccountId());
cm.setPatchSet(patchSet, patchSetInfoFactory.get(db, patchSetId));
cm.setChangeMessage(message);
cm.setPatchLineComments(drafts);
cm.send();
}
} catch (EmailException e) {
log.error("Cannot send comments by email for patch set " + patchSetId, e);
} catch (PatchSetInfoNotAvailableException e) {
log.error("Failed to obtain PatchSetInfo for patch set " + patchSetId, e);
if (message == null) {
return;
}
workQueue.getDefaultQueue()
.submit(requestScopePropagator.wrap(new Runnable() {
@Override
public void run() {
PatchSetInfo patchSetInfo;
try {
ReviewDb reviewDb = schemaFactory.open();
try {
patchSetInfo = patchSetInfoFactory.get(reviewDb, patchSetId);
} finally {
reviewDb.close();
}
} catch (PatchSetInfoNotAvailableException e) {
log.error("Cannot read PatchSetInfo of " + patchSetId, e);
return;
} catch (OrmException e) {
log.error("Cannot email comments for " + patchSetId, e);
return;
}
try {
final CommentSender cm = commentSenderFactory.create(change);
cm.setFrom(user.getAccountId());
cm.setPatchSet(patchSet, patchSetInfo);
cm.setChangeMessage(message);
cm.setPatchLineComments(drafts);
cm.send();
} catch (EmailException e) {
log.error("Cannot email comments for " + patchSetId, e);
}
}
@Override
public String toString() {
return "send-email comments";
}
}));
}
private void fireHook() throws OrmException {