Revert "Migrate all emails to send async via asyncPostUpdate"

This reverts commit ce4e7c6609.

Reason for revert:
The code has threading issues. For instance, we see an NPE for submitter in SubmitStrategyOp#asyncPostUpdate on real servers. Looking at the code, there doesn't seem to be any precautions for the use of the existing objects on different threads. Objects like SubmitStrategyOp are not built to be thread-safe at the moment. We can't simply pass them to another thread and expect them to work correctly.

Change-Id: If2e8c7c4d40231fdc9390f52c883401a1c0710cb
This commit is contained in:
Alice Kober-Sotzek
2020-09-18 09:25:15 +00:00
parent ce4e7c6609
commit da0559e065
42 changed files with 481 additions and 551 deletions

View File

@@ -48,7 +48,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import org.eclipse.jgit.junit.TestRepository;
@@ -98,7 +97,7 @@ public abstract class AbstractNotificationTest extends AbstractDaemonTest {
protected static class FakeEmailSenderSubject extends Subject {
private final FakeEmailSender fakeEmailSender;
private Optional<Message> message;
private Message message;
private StagedUsers users;
private Map<RecipientType, List<String>> recipients = new HashMap<>();
private Set<String> accountedFor = new HashSet<>();
@@ -117,29 +116,35 @@ public abstract class AbstractNotificationTest extends AbstractDaemonTest {
}
public FakeEmailSenderSubject sent(String messageType, StagedUsers users) {
fakeEmailSender.readOneMessage();
message =
fakeEmailSender.getMessages().stream()
.filter(
m ->
m.headers()
.get("X-Gerrit-MessageType")
.equals(new EmailHeader.String(messageType)))
.findFirst();
if (!message.isPresent()) {
failWithoutActual(
fact(String.format("expected message of type %s", messageType), "not sent"));
message = fakeEmailSender.nextMessage();
if (message == null) {
failWithoutActual(fact("expected message", "not sent"));
}
recipients = new HashMap<>();
recipients.put(TO, parseAddresses(message.get(), "To"));
recipients.put(CC, parseAddresses(message.get(), "Cc"));
recipients.put(TO, parseAddresses(message, "To"));
recipients.put(CC, parseAddresses(message, "Cc"));
recipients.put(
BCC,
message.get().rcpt().stream()
message.rcpt().stream()
.map(Address::email)
.filter(e -> !recipients.get(TO).contains(e) && !recipients.get(CC).contains(e))
.collect(toList()));
this.users = users;
if (!message.headers().containsKey("X-Gerrit-MessageType")) {
failWithoutActual(
fact("expected to have message sent with", "X-Gerrit-MessageType header"));
}
EmailHeader header = message.headers().get("X-Gerrit-MessageType");
if (!header.equals(new EmailHeader.String(messageType))) {
failWithoutActual(
fact("expected message of type", messageType),
fact(
"actual",
header instanceof EmailHeader.String
? ((EmailHeader.String) header).getString()
: header));
}
return this;
}

View File

@@ -28,7 +28,6 @@ import com.google.gerrit.server.extensions.events.CommentAdded;
import com.google.gerrit.server.notedb.ChangeNotes;
import com.google.gerrit.server.notedb.ChangeUpdate;
import com.google.gerrit.server.patch.PatchListNotAvailableException;
import com.google.gerrit.server.update.AsyncPostUpdateOp;
import com.google.gerrit.server.update.BatchUpdateOp;
import com.google.gerrit.server.update.ChangeContext;
import com.google.gerrit.server.update.CommentsRejectedException;
@@ -47,7 +46,7 @@ import java.util.List;
* <p>This class uses the {@link PublishCommentUtil} to publish draft comments and fires the
* necessary event for this.
*/
public class PublishCommentsOp implements BatchUpdateOp, AsyncPostUpdateOp {
public class PublishCommentsOp implements BatchUpdateOp {
private final PatchSetUtil psUtil;
private final ChangeNotes.Factory changeNotesFactory;
private final ChangeMessagesUtil cmUtil;
@@ -62,8 +61,6 @@ public class PublishCommentsOp implements BatchUpdateOp, AsyncPostUpdateOp {
private List<HumanComment> comments = new ArrayList<>();
private ChangeMessage message;
private IdentifiedUser user;
private ChangeNotes changeNotes;
private PatchSet patchset;
public interface Factory {
PublishCommentsOp create(PatchSet.Id psId, Project.NameKey projectNameKey);
@@ -112,24 +109,11 @@ public class PublishCommentsOp implements BatchUpdateOp, AsyncPostUpdateOp {
@Override
public void postUpdate(Context ctx) {
changeNotes = changeNotesFactory.createChecked(projectNameKey, psId.changeId());
patchset = psUtil.get(changeNotes, psId);
commentAdded.fire(
changeNotes.getChange(),
patchset,
ctx.getAccount(),
message.getMessage(),
ImmutableMap.of(),
ImmutableMap.of(),
ctx.getWhen());
}
@Override
public void asyncPostUpdate(Context ctx) {
if (message == null || comments.isEmpty()) {
return;
}
ChangeNotes changeNotes = changeNotesFactory.createChecked(projectNameKey, psId.changeId());
PatchSet ps = psUtil.get(changeNotes, psId);
NotifyResolver.Result notify = ctx.getNotify(changeNotes.getChangeId());
if (notify.shouldNotify()) {
RepoView repoView;
@@ -140,10 +124,17 @@ public class PublishCommentsOp implements BatchUpdateOp, AsyncPostUpdateOp {
String.format("Repository %s not found", ctx.getProject().get()), ex);
}
email
.create(
notify, changeNotes, patchset, user, message, comments, null, labelDelta, repoView)
.send();
.create(notify, changeNotes, ps, user, message, comments, null, labelDelta, repoView)
.sendAsync();
}
commentAdded.fire(
changeNotes.getChange(),
ps,
ctx.getAccount(),
message.getMessage(),
ImmutableMap.of(),
ImmutableMap.of(),
ctx.getWhen());
}
private boolean insertMessage(ChangeContext ctx, ChangeUpdate changeUpdate) {

View File

@@ -30,14 +30,13 @@ import com.google.gerrit.server.mail.send.AbandonedSender;
import com.google.gerrit.server.mail.send.MessageIdGenerator;
import com.google.gerrit.server.mail.send.ReplyToChangeSender;
import com.google.gerrit.server.notedb.ChangeUpdate;
import com.google.gerrit.server.update.AsyncPostUpdateOp;
import com.google.gerrit.server.update.BatchUpdateOp;
import com.google.gerrit.server.update.ChangeContext;
import com.google.gerrit.server.update.Context;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
public class AbandonOp implements BatchUpdateOp, AsyncPostUpdateOp {
public class AbandonOp implements BatchUpdateOp {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private final AbandonedSender.Factory abandonedSenderFactory;
@@ -52,7 +51,6 @@ public class AbandonOp implements BatchUpdateOp, AsyncPostUpdateOp {
private Change change;
private PatchSet patchSet;
private ChangeMessage message;
private NotifyResolver.Result notify;
public interface Factory {
AbandonOp create(
@@ -98,7 +96,6 @@ public class AbandonOp implements BatchUpdateOp, AsyncPostUpdateOp {
update.setStatus(change.getStatus());
message = newMessage(ctx);
cmUtil.addChangeMessage(update, message);
notify = ctx.getNotify(change.getId());
return true;
}
@@ -115,11 +112,7 @@ public class AbandonOp implements BatchUpdateOp, AsyncPostUpdateOp {
@Override
public void postUpdate(Context ctx) {
changeAbandoned.fire(change, patchSet, accountState, msgTxt, ctx.getWhen(), notify.handling());
}
@Override
public void asyncPostUpdate(Context ctx) {
NotifyResolver.Result notify = ctx.getNotify(change.getId());
try {
ReplyToChangeSender emailSender =
abandonedSenderFactory.create(ctx.getProject(), change.getId());
@@ -134,5 +127,6 @@ public class AbandonOp implements BatchUpdateOp, AsyncPostUpdateOp {
} catch (Exception e) {
logger.atSevere().withCause(e).log("Cannot email update for change %s", change.getId());
}
changeAbandoned.fire(change, patchSet, accountState, msgTxt, ctx.getWhen(), notify.handling());
}
}

View File

@@ -23,27 +23,34 @@ import com.google.gerrit.entities.Address;
import com.google.gerrit.entities.Change;
import com.google.gerrit.entities.Project;
import com.google.gerrit.server.IdentifiedUser;
import com.google.gerrit.server.config.SendEmailExecutor;
import com.google.gerrit.server.mail.send.AddReviewerSender;
import com.google.gerrit.server.mail.send.MessageIdGenerator;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@Singleton
public class AddReviewersEmail {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private final AddReviewerSender.Factory addReviewerSenderFactory;
private final ExecutorService sendEmailsExecutor;
private final MessageIdGenerator messageIdGenerator;
@Inject
AddReviewersEmail(
AddReviewerSender.Factory addReviewerSenderFactory, MessageIdGenerator messageIdGenerator) {
AddReviewerSender.Factory addReviewerSenderFactory,
@SendEmailExecutor ExecutorService sendEmailsExecutor,
MessageIdGenerator messageIdGenerator) {
this.addReviewerSenderFactory = addReviewerSenderFactory;
this.sendEmailsExecutor = sendEmailsExecutor;
this.messageIdGenerator = messageIdGenerator;
}
public void emailReviewers(
public void emailReviewersAsync(
IdentifiedUser user,
Change change,
Collection<Account.Id> added,
@@ -71,20 +78,27 @@ public class AddReviewersEmail {
ImmutableList<Address> immutableAddedByEmail = ImmutableList.copyOf(addedByEmail);
ImmutableList<Address> immutableCopiedByEmail = ImmutableList.copyOf(copiedByEmail);
try {
AddReviewerSender emailSender = addReviewerSenderFactory.create(projectNameKey, cId);
emailSender.setNotify(notify);
emailSender.setFrom(userId);
emailSender.addReviewers(immutableToMail);
emailSender.addReviewersByEmail(immutableAddedByEmail);
emailSender.addExtraCC(immutableToCopy);
emailSender.addExtraCCByEmail(immutableCopiedByEmail);
emailSender.setMessageId(
messageIdGenerator.fromChangeUpdate(change.getProject(), change.currentPatchSetId()));
emailSender.send();
} catch (Exception err) {
logger.atSevere().withCause(err).log(
"Cannot send email to new reviewers of change %s", change.getId());
}
@SuppressWarnings("unused")
Future<?> possiblyIgnoredError =
sendEmailsExecutor.submit(
() -> {
try {
AddReviewerSender emailSender =
addReviewerSenderFactory.create(projectNameKey, cId);
emailSender.setNotify(notify);
emailSender.setFrom(userId);
emailSender.addReviewers(immutableToMail);
emailSender.addReviewersByEmail(immutableAddedByEmail);
emailSender.addExtraCC(immutableToCopy);
emailSender.addExtraCCByEmail(immutableCopiedByEmail);
emailSender.setMessageId(
messageIdGenerator.fromChangeUpdate(
change.getProject(), change.currentPatchSetId()));
emailSender.send();
} catch (Exception err) {
logger.atSevere().withCause(err).log(
"Cannot send email to new reviewers of change %s", change.getId());
}
});
}
}

View File

@@ -42,7 +42,6 @@ import com.google.gerrit.server.account.AccountState;
import com.google.gerrit.server.extensions.events.ReviewerAdded;
import com.google.gerrit.server.notedb.ReviewerStateInternal;
import com.google.gerrit.server.project.ProjectCache;
import com.google.gerrit.server.update.AsyncPostUpdateOp;
import com.google.gerrit.server.update.BatchUpdateOp;
import com.google.gerrit.server.update.ChangeContext;
import com.google.gerrit.server.update.Context;
@@ -53,7 +52,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Set;
public class AddReviewersOp implements BatchUpdateOp, AsyncPostUpdateOp {
public class AddReviewersOp implements BatchUpdateOp {
public interface Factory {
/**
@@ -239,7 +238,7 @@ public class AddReviewersOp implements BatchUpdateOp, AsyncPostUpdateOp {
}
@Override
public void postUpdate(Context ctx) {
public void postUpdate(Context ctx) throws Exception {
opResult =
Result.builder()
.setAddedReviewers(addedReviewers)
@@ -247,6 +246,16 @@ public class AddReviewersOp implements BatchUpdateOp, AsyncPostUpdateOp {
.setAddedCCs(addedCCs)
.setAddedCCsByEmail(addedCCsByEmail)
.build();
if (sendEmail) {
addReviewersEmail.emailReviewersAsync(
ctx.getUser().asIdentifiedUser(),
change,
Lists.transform(addedReviewers, PatchSetApproval::accountId),
addedCCs,
addedReviewersByEmail,
addedCCsByEmail,
ctx.getNotify(change.getId()));
}
if (!addedReviewers.isEmpty()) {
List<AccountState> reviewers =
addedReviewers.stream()
@@ -257,20 +266,6 @@ public class AddReviewersOp implements BatchUpdateOp, AsyncPostUpdateOp {
}
}
@Override
public void asyncPostUpdate(Context ctx) {
if (sendEmail) {
addReviewersEmail.emailReviewers(
ctx.getUser().asIdentifiedUser(),
change,
Lists.transform(addedReviewers, PatchSetApproval::accountId),
addedCCs,
addedReviewersByEmail,
addedCCsByEmail,
ctx.getNotify(change.getId()));
}
}
public Result getResult() {
checkState(opResult != null, "Batch update wasn't executed yet");
return opResult;

View File

@@ -25,7 +25,6 @@ import com.google.gerrit.server.mail.send.AddToAttentionSetSender;
import com.google.gerrit.server.mail.send.MessageIdGenerator;
import com.google.gerrit.server.notedb.ChangeUpdate;
import com.google.gerrit.server.query.change.ChangeData;
import com.google.gerrit.server.update.AsyncPostUpdateOp;
import com.google.gerrit.server.update.BatchUpdateOp;
import com.google.gerrit.server.update.ChangeContext;
import com.google.gerrit.server.update.Context;
@@ -35,7 +34,7 @@ import com.google.inject.assistedinject.Assisted;
import java.io.IOException;
/** Add a specified user to the attention set. */
public class AddToAttentionSetOp implements BatchUpdateOp, AsyncPostUpdateOp {
public class AddToAttentionSetOp implements BatchUpdateOp {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
public interface Factory {
@@ -102,7 +101,7 @@ public class AddToAttentionSetOp implements BatchUpdateOp, AsyncPostUpdateOp {
}
@Override
public void asyncPostUpdate(Context ctx) {
public void postUpdate(Context ctx) {
if (!notify) {
return;
}
@@ -115,7 +114,7 @@ public class AddToAttentionSetOp implements BatchUpdateOp, AsyncPostUpdateOp {
reason,
messageIdGenerator.fromChangeUpdate(ctx.getRepoView(), change.currentPatchSetId()),
attentionUserId)
.send();
.sendAsync();
} catch (IOException e) {
logger.atSevere().withCause(e).log(e.getMessage(), change.getId());
}

View File

@@ -52,6 +52,7 @@ import com.google.gerrit.server.PatchSetUtil;
import com.google.gerrit.server.change.ReviewerAdder.InternalAddReviewerInput;
import com.google.gerrit.server.change.ReviewerAdder.ReviewerAddition;
import com.google.gerrit.server.change.ReviewerAdder.ReviewerAdditionList;
import com.google.gerrit.server.config.SendEmailExecutor;
import com.google.gerrit.server.config.UrlFormatter;
import com.google.gerrit.server.events.CommitReceivedEvent;
import com.google.gerrit.server.extensions.events.CommentAdded;
@@ -68,12 +69,12 @@ import com.google.gerrit.server.permissions.PermissionBackendException;
import com.google.gerrit.server.project.ProjectCache;
import com.google.gerrit.server.project.ProjectState;
import com.google.gerrit.server.ssh.NoSshInfo;
import com.google.gerrit.server.update.AsyncPostUpdateOp;
import com.google.gerrit.server.update.ChangeContext;
import com.google.gerrit.server.update.Context;
import com.google.gerrit.server.update.InsertChangeOp;
import com.google.gerrit.server.update.RepoContext;
import com.google.gerrit.server.util.CommitMessageUtil;
import com.google.gerrit.server.util.RequestScopePropagator;
import com.google.gerrit.server.validators.ValidationException;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
@@ -83,13 +84,15 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.revwalk.RevCommit;
import org.eclipse.jgit.revwalk.RevWalk;
import org.eclipse.jgit.transport.ReceiveCommand;
public class ChangeInserter implements InsertChangeOp, AsyncPostUpdateOp {
public class ChangeInserter implements InsertChangeOp {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
public interface Factory {
@@ -103,6 +106,7 @@ public class ChangeInserter implements InsertChangeOp, AsyncPostUpdateOp {
private final ApprovalsUtil approvalsUtil;
private final ChangeMessagesUtil cmUtil;
private final CreateChangeSender.Factory createChangeSenderFactory;
private final ExecutorService sendEmailExecutor;
private final CommitValidators.Factory commitValidatorsFactory;
private final RevisionCreated revisionCreated;
private final CommentAdded commentAdded;
@@ -126,6 +130,7 @@ public class ChangeInserter implements InsertChangeOp, AsyncPostUpdateOp {
private List<String> groups = Collections.emptyList();
private boolean validate = true;
private Map<String, Short> approvals;
private RequestScopePropagator requestScopePropagator;
private boolean fireRevisionCreated;
private boolean sendMail;
private boolean updateRef;
@@ -141,7 +146,6 @@ public class ChangeInserter implements InsertChangeOp, AsyncPostUpdateOp {
private String pushCert;
private ProjectState projectState;
private ReviewerAdditionList reviewerAdditions;
private NotifyResolver.Result notify;
@Inject
ChangeInserter(
@@ -152,6 +156,7 @@ public class ChangeInserter implements InsertChangeOp, AsyncPostUpdateOp {
ApprovalsUtil approvalsUtil,
ChangeMessagesUtil cmUtil,
CreateChangeSender.Factory createChangeSenderFactory,
@SendEmailExecutor ExecutorService sendEmailExecutor,
CommitValidators.Factory commitValidatorsFactory,
CommentAdded commentAdded,
RevisionCreated revisionCreated,
@@ -168,6 +173,7 @@ public class ChangeInserter implements InsertChangeOp, AsyncPostUpdateOp {
this.approvalsUtil = approvalsUtil;
this.cmUtil = cmUtil;
this.createChangeSenderFactory = createChangeSenderFactory;
this.sendEmailExecutor = sendEmailExecutor;
this.commitValidatorsFactory = commitValidatorsFactory;
this.revisionCreated = revisionCreated;
this.commentAdded = commentAdded;
@@ -309,6 +315,11 @@ public class ChangeInserter implements InsertChangeOp, AsyncPostUpdateOp {
return this;
}
public ChangeInserter setRequestScopePropagator(RequestScopePropagator r) {
this.requestScopePropagator = r;
return this;
}
public ChangeInserter setRevertOf(Change.Id revertOf) {
this.revertOf = revertOf;
return this;
@@ -446,13 +457,57 @@ public class ChangeInserter implements InsertChangeOp, AsyncPostUpdateOp {
ChangeMessagesUtil.uploadedPatchSetTag(workInProgress));
cmUtil.addChangeMessage(update, changeMessage);
}
notify = ctx.getNotify(change.getId());
return true;
}
@Override
public void postUpdate(Context ctx) {
public void postUpdate(Context ctx) throws Exception {
reviewerAdditions.postUpdate(ctx);
NotifyResolver.Result notify = ctx.getNotify(change.getId());
if (sendMail && notify.shouldNotify()) {
Runnable sender =
new Runnable() {
@Override
public void run() {
try {
CreateChangeSender emailSender =
createChangeSenderFactory.create(change.getProject(), change.getId());
emailSender.setFrom(change.getOwner());
emailSender.setPatchSet(patchSet, patchSetInfo);
emailSender.setNotify(notify);
emailSender.addReviewers(
reviewerAdditions.flattenResults(AddReviewersOp.Result::addedReviewers).stream()
.map(PatchSetApproval::accountId)
.collect(toImmutableSet()));
emailSender.addReviewersByEmail(
reviewerAdditions.flattenResults(AddReviewersOp.Result::addedReviewersByEmail));
emailSender.addExtraCC(
reviewerAdditions.flattenResults(AddReviewersOp.Result::addedCCs));
emailSender.addExtraCCByEmail(
reviewerAdditions.flattenResults(AddReviewersOp.Result::addedCCsByEmail));
emailSender.setMessageId(
messageIdGenerator.fromChangeUpdate(ctx.getRepoView(), patchSet.id()));
emailSender.send();
} catch (Exception e) {
logger.atSevere().withCause(e).log(
"Cannot send email for new change %s", change.getId());
}
}
@Override
public String toString() {
return "send-email newchange";
}
};
if (requestScopePropagator != null) {
@SuppressWarnings("unused")
Future<?> possiblyIgnoredError =
sendEmailExecutor.submit(requestScopePropagator.wrap(sender));
} else {
sender.run();
}
}
/* For labels that are not set in this operation, show the "current" value
* of 0, and no oldValue as the value was not modified by this operation.
* For labels that are set in this operation, the value was modified, so
@@ -480,34 +535,6 @@ public class ChangeInserter implements InsertChangeOp, AsyncPostUpdateOp {
}
}
@Override
public void asyncPostUpdate(Context ctx) {
reviewerAdditions.asyncPostUpdate(ctx);
if (sendMail && notify.shouldNotify()) {
try {
CreateChangeSender emailSender =
createChangeSenderFactory.create(change.getProject(), change.getId());
emailSender.setFrom(change.getOwner());
emailSender.setPatchSet(patchSet, patchSetInfo);
emailSender.setNotify(notify);
emailSender.addReviewers(
reviewerAdditions.flattenResults(AddReviewersOp.Result::addedReviewers).stream()
.map(PatchSetApproval::accountId)
.collect(toImmutableSet()));
emailSender.addReviewersByEmail(
reviewerAdditions.flattenResults(AddReviewersOp.Result::addedReviewersByEmail));
emailSender.addExtraCC(reviewerAdditions.flattenResults(AddReviewersOp.Result::addedCCs));
emailSender.addExtraCCByEmail(
reviewerAdditions.flattenResults(AddReviewersOp.Result::addedCCsByEmail));
emailSender.setMessageId(
messageIdGenerator.fromChangeUpdate(ctx.getRepoView(), patchSet.id()));
emailSender.send();
} catch (Exception e) {
logger.atSevere().withCause(e).log("Cannot send email for new change %s", change.getId());
}
}
}
private void validate(RepoContext ctx) throws IOException, ResourceConflictException {
if (!validate) {
return;

View File

@@ -22,7 +22,6 @@ import com.google.gerrit.entities.PatchSet;
import com.google.gerrit.server.ChangeUtil;
import com.google.gerrit.server.mail.send.DeleteReviewerSender;
import com.google.gerrit.server.mail.send.MessageIdGenerator;
import com.google.gerrit.server.update.AsyncPostUpdateOp;
import com.google.gerrit.server.update.BatchUpdateOp;
import com.google.gerrit.server.update.ChangeContext;
import com.google.gerrit.server.update.Context;
@@ -30,7 +29,7 @@ import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import java.util.Collections;
public class DeleteReviewerByEmailOp implements BatchUpdateOp, AsyncPostUpdateOp {
public class DeleteReviewerByEmailOp implements BatchUpdateOp {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
public interface Factory {
@@ -74,7 +73,7 @@ public class DeleteReviewerByEmailOp implements BatchUpdateOp, AsyncPostUpdateOp
}
@Override
public void asyncPostUpdate(Context ctx) {
public void postUpdate(Context ctx) {
try {
NotifyResolver.Result notify = ctx.getNotify(change.getId());
if (!notify.shouldNotify()) {

View File

@@ -43,7 +43,6 @@ import com.google.gerrit.server.notedb.ChangeUpdate;
import com.google.gerrit.server.permissions.PermissionBackendException;
import com.google.gerrit.server.project.ProjectCache;
import com.google.gerrit.server.project.RemoveReviewerControl;
import com.google.gerrit.server.update.AsyncPostUpdateOp;
import com.google.gerrit.server.update.BatchUpdateOp;
import com.google.gerrit.server.update.ChangeContext;
import com.google.gerrit.server.update.Context;
@@ -56,7 +55,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class DeleteReviewerOp implements BatchUpdateOp, AsyncPostUpdateOp {
public class DeleteReviewerOp implements BatchUpdateOp {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
public interface Factory {
@@ -77,12 +76,11 @@ public class DeleteReviewerOp implements BatchUpdateOp, AsyncPostUpdateOp {
private final AccountState reviewer;
private final DeleteReviewerInput input;
private ChangeMessage changeMessage;
private Change currChange;
private PatchSet currPs;
private Map<String, Short> newApprovals = new HashMap<>();
private Map<String, Short> oldApprovals = new HashMap<>();
private NotifyResolver.Result notify;
ChangeMessage changeMessage;
Change currChange;
PatchSet currPs;
Map<String, Short> newApprovals = new HashMap<>();
Map<String, Short> oldApprovals = new HashMap<>();
@Inject
DeleteReviewerOp(
@@ -167,27 +165,13 @@ public class DeleteReviewerOp implements BatchUpdateOp, AsyncPostUpdateOp {
changeMessage =
ChangeMessagesUtil.newMessage(ctx, msg.toString(), ChangeMessagesUtil.TAG_DELETE_REVIEWER);
cmUtil.addChangeMessage(update, changeMessage);
notify = ctx.getNotify(currChange.getId());
return true;
}
@Override
public void postUpdate(Context ctx) {
reviewerDeleted.fire(
currChange,
currPs,
reviewer,
ctx.getAccount(),
changeMessage.getMessage(),
newApprovals,
oldApprovals,
notify.handling(),
ctx.getWhen());
}
@Override
public void asyncPostUpdate(Context ctx) {
NotifyResolver.Result notify = ctx.getNotify(currChange.getId());
if (input.notify == null
&& currChange.isWorkInProgress()
&& !oldApprovals.isEmpty()
@@ -203,6 +187,16 @@ public class DeleteReviewerOp implements BatchUpdateOp, AsyncPostUpdateOp {
} catch (Exception err) {
logger.atSevere().withCause(err).log("Cannot email update for change %s", currChange.getId());
}
reviewerDeleted.fire(
currChange,
currPs,
reviewer,
ctx.getAccount(),
changeMessage.getMessage(),
newApprovals,
oldApprovals,
notify.handling(),
ctx.getWhen());
}
private Iterable<PatchSetApproval> approvals(ChangeContext ctx, Account.Id accountId) {

View File

@@ -21,18 +21,24 @@ import com.google.gerrit.common.Nullable;
import com.google.gerrit.entities.ChangeMessage;
import com.google.gerrit.entities.Comment;
import com.google.gerrit.entities.PatchSet;
import com.google.gerrit.server.CurrentUser;
import com.google.gerrit.server.IdentifiedUser;
import com.google.gerrit.server.config.SendEmailExecutor;
import com.google.gerrit.server.mail.send.CommentSender;
import com.google.gerrit.server.mail.send.MessageIdGenerator;
import com.google.gerrit.server.notedb.ChangeNotes;
import com.google.gerrit.server.patch.PatchSetInfoFactory;
import com.google.gerrit.server.update.RepoView;
import com.google.gerrit.server.util.LabelVote;
import com.google.gerrit.server.util.RequestContext;
import com.google.gerrit.server.util.ThreadLocalRequestContext;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
public class EmailReviewComments {
public class EmailReviewComments implements Runnable, RequestContext {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
public interface Factory {
@@ -65,8 +71,10 @@ public class EmailReviewComments {
RepoView repoView);
}
private final ExecutorService sendEmailsExecutor;
private final PatchSetInfoFactory patchSetInfoFactory;
private final CommentSender.Factory commentSenderFactory;
private final ThreadLocalRequestContext requestContext;
private final MessageIdGenerator messageIdGenerator;
private final NotifyResolver.Result notify;
@@ -81,8 +89,10 @@ public class EmailReviewComments {
@Inject
EmailReviewComments(
@SendEmailExecutor ExecutorService executor,
PatchSetInfoFactory patchSetInfoFactory,
CommentSender.Factory commentSenderFactory,
ThreadLocalRequestContext requestContext,
MessageIdGenerator messageIdGenerator,
@Assisted NotifyResolver.Result notify,
@Assisted ChangeNotes notes,
@@ -93,8 +103,10 @@ public class EmailReviewComments {
@Nullable @Assisted String patchSetComment,
@Assisted List<LabelVote> labels,
@Assisted RepoView repoView) {
this.sendEmailsExecutor = executor;
this.patchSetInfoFactory = patchSetInfoFactory;
this.commentSenderFactory = commentSenderFactory;
this.requestContext = requestContext;
this.messageIdGenerator = messageIdGenerator;
this.notify = notify;
this.notes = notes;
@@ -107,7 +119,14 @@ public class EmailReviewComments {
this.repoView = repoView;
}
public void send() {
public void sendAsync() {
@SuppressWarnings("unused")
Future<?> possiblyIgnoredError = sendEmailsExecutor.submit(this);
}
@Override
public void run() {
RequestContext old = requestContext.setContext(this);
try {
CommentSender emailSender =
commentSenderFactory.create(notes.getProjectName(), notes.getChangeId());
@@ -124,6 +143,18 @@ public class EmailReviewComments {
emailSender.send();
} catch (Exception e) {
logger.atSevere().withCause(e).log("Cannot email comments for %s", patchSet.id());
} finally {
requestContext.setContext(old);
}
}
@Override
public String toString() {
return "send-email comments";
}
@Override
public CurrentUser getUser() {
return user.getRealUser();
}
}

View File

@@ -49,7 +49,6 @@ import com.google.gerrit.server.permissions.PermissionBackend;
import com.google.gerrit.server.permissions.PermissionBackendException;
import com.google.gerrit.server.project.ProjectCache;
import com.google.gerrit.server.ssh.NoSshInfo;
import com.google.gerrit.server.update.AsyncPostUpdateOp;
import com.google.gerrit.server.update.BatchUpdateOp;
import com.google.gerrit.server.update.ChangeContext;
import com.google.gerrit.server.update.Context;
@@ -63,7 +62,7 @@ import java.util.List;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.transport.ReceiveCommand;
public class PatchSetInserter implements BatchUpdateOp, AsyncPostUpdateOp {
public class PatchSetInserter implements BatchUpdateOp {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
public interface Factory {
@@ -110,7 +109,6 @@ public class PatchSetInserter implements BatchUpdateOp, AsyncPostUpdateOp {
private ChangeMessage changeMessage;
private ReviewerSet oldReviewers;
private boolean oldWorkInProgressState;
private NotifyResolver.Result notify;
@Inject
public PatchSetInserter(
@@ -281,12 +279,12 @@ public class PatchSetInserter implements BatchUpdateOp, AsyncPostUpdateOp {
throw new BadRequestException(ex.getMessage());
}
}
notify = ctx.getNotify(change.getId());
return true;
}
@Override
public void asyncPostUpdate(Context ctx) {
public void postUpdate(Context ctx) {
NotifyResolver.Result notify = ctx.getNotify(change.getId());
if (notify.shouldNotify() && sendEmail) {
requireNonNull(changeMessage);
try {
@@ -306,10 +304,7 @@ public class PatchSetInserter implements BatchUpdateOp, AsyncPostUpdateOp {
"Cannot send email for new patch set on change %s", change.getId());
}
}
}
@Override
public void postUpdate(Context ctx) {
if (fireRevisionCreated) {
revisionCreated.fire(change, patchSet, ctx.getAccount(), ctx.getWhen(), notify);
}

View File

@@ -34,7 +34,6 @@ import com.google.gerrit.server.project.InvalidChangeOperationException;
import com.google.gerrit.server.project.NoSuchChangeException;
import com.google.gerrit.server.project.ProjectCache;
import com.google.gerrit.server.project.ProjectState;
import com.google.gerrit.server.update.AsyncPostUpdateOp;
import com.google.gerrit.server.update.BatchUpdateOp;
import com.google.gerrit.server.update.ChangeContext;
import com.google.gerrit.server.update.Context;
@@ -49,7 +48,7 @@ import org.eclipse.jgit.merge.ThreeWayMerger;
import org.eclipse.jgit.revwalk.RevCommit;
import org.eclipse.jgit.revwalk.RevWalk;
public class RebaseChangeOp implements BatchUpdateOp, AsyncPostUpdateOp {
public class RebaseChangeOp implements BatchUpdateOp {
public interface Factory {
RebaseChangeOp create(ChangeNotes notes, PatchSet originalPatchSet, ObjectId baseCommitId);
}
@@ -222,11 +221,6 @@ public class RebaseChangeOp implements BatchUpdateOp, AsyncPostUpdateOp {
patchSetInserter.postUpdate(ctx);
}
@Override
public void asyncPostUpdate(Context ctx) {
patchSetInserter.asyncPostUpdate(ctx);
}
public RevCommit getRebasedCommit() {
checkState(rebasedCommit != null, "getRebasedCommit() only valid after updateRepo");
return rebasedCommit;

View File

@@ -26,7 +26,6 @@ import com.google.gerrit.server.mail.send.MessageIdGenerator;
import com.google.gerrit.server.mail.send.RemoveFromAttentionSetSender;
import com.google.gerrit.server.notedb.ChangeUpdate;
import com.google.gerrit.server.query.change.ChangeData;
import com.google.gerrit.server.update.AsyncPostUpdateOp;
import com.google.gerrit.server.update.BatchUpdateOp;
import com.google.gerrit.server.update.ChangeContext;
import com.google.gerrit.server.update.Context;
@@ -37,7 +36,7 @@ import java.io.IOException;
import java.util.Optional;
/** Remove a specified user from the attention set. */
public class RemoveFromAttentionSetOp implements BatchUpdateOp, AsyncPostUpdateOp {
public class RemoveFromAttentionSetOp implements BatchUpdateOp {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
public interface Factory {
@@ -102,7 +101,7 @@ public class RemoveFromAttentionSetOp implements BatchUpdateOp, AsyncPostUpdateO
}
@Override
public void asyncPostUpdate(Context ctx) {
public void postUpdate(Context ctx) {
if (!notify) {
return;
}
@@ -115,7 +114,7 @@ public class RemoveFromAttentionSetOp implements BatchUpdateOp, AsyncPostUpdateO
reason,
messageIdGenerator.fromChangeUpdate(ctx.getRepoView(), change.currentPatchSetId()),
attentionUserId)
.send();
.sendAsync();
} catch (IOException e) {
logger.atSevere().withCause(e).log(e.getMessage(), change.getId());
}

View File

@@ -587,7 +587,7 @@ public class ReviewerAdder {
}
}
public void postUpdate(Context ctx) {
public void postUpdate(Context ctx) throws Exception {
for (ReviewerAddition addition : additions()) {
if (addition.op != null) {
addition.op.postUpdate(ctx);
@@ -595,14 +595,6 @@ public class ReviewerAdder {
}
}
public void asyncPostUpdate(Context ctx) {
for (ReviewerAddition addition : additions()) {
if (addition.op != null) {
addition.op.asyncPostUpdate(ctx);
}
}
}
public <T> ImmutableSet<T> flattenResults(
Function<AddReviewersOp.Result, ? extends Collection<T>> func) {
additions()

View File

@@ -28,7 +28,6 @@ import com.google.gerrit.server.mail.send.MessageIdGenerator;
import com.google.gerrit.server.mail.send.SetAssigneeSender;
import com.google.gerrit.server.notedb.ChangeUpdate;
import com.google.gerrit.server.plugincontext.PluginSetContext;
import com.google.gerrit.server.update.AsyncPostUpdateOp;
import com.google.gerrit.server.update.BatchUpdateOp;
import com.google.gerrit.server.update.ChangeContext;
import com.google.gerrit.server.update.Context;
@@ -38,7 +37,7 @@ import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.assistedinject.Assisted;
public class SetAssigneeOp implements BatchUpdateOp, AsyncPostUpdateOp {
public class SetAssigneeOp implements BatchUpdateOp {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
public interface Factory {
@@ -121,7 +120,7 @@ public class SetAssigneeOp implements BatchUpdateOp, AsyncPostUpdateOp {
}
@Override
public void asyncPostUpdate(Context ctx) {
public void postUpdate(Context ctx) {
try {
SetAssigneeSender emailSender =
setAssigneeSenderFactory.create(
@@ -134,10 +133,6 @@ public class SetAssigneeOp implements BatchUpdateOp, AsyncPostUpdateOp {
logger.atSevere().withCause(err).log(
"Cannot send email to new assignee of change %s", change.getId());
}
}
@Override
public void postUpdate(Context ctx) {
assigneeChanged.fire(
change, ctx.getAccount(), oldAssignee != null ? oldAssignee.state() : null, ctx.getWhen());
}

View File

@@ -28,7 +28,6 @@ import com.google.gerrit.server.PatchSetUtil;
import com.google.gerrit.server.extensions.events.WorkInProgressStateChanged;
import com.google.gerrit.server.notedb.ChangeNotes;
import com.google.gerrit.server.notedb.ChangeUpdate;
import com.google.gerrit.server.update.AsyncPostUpdateOp;
import com.google.gerrit.server.update.BatchUpdateOp;
import com.google.gerrit.server.update.ChangeContext;
import com.google.gerrit.server.update.Context;
@@ -38,7 +37,7 @@ import com.google.inject.assistedinject.Assisted;
import java.io.IOException;
/* Set work in progress or ready for review state on a change */
public class WorkInProgressOp implements BatchUpdateOp, AsyncPostUpdateOp {
public class WorkInProgressOp implements BatchUpdateOp {
public static class Input extends InputWithMessage {
@Nullable public NotifyHandling notify;
@@ -127,7 +126,8 @@ public class WorkInProgressOp implements BatchUpdateOp, AsyncPostUpdateOp {
}
@Override
public void asyncPostUpdate(Context ctx) {
public void postUpdate(Context ctx) {
stateChanged.fire(change, ps, ctx.getAccount(), ctx.getWhen());
NotifyResolver.Result notify = ctx.getNotify(change.getId());
if (workInProgress
|| notify.handling().compareTo(NotifyHandling.OWNER_REVIEWERS) < 0
@@ -152,11 +152,6 @@ public class WorkInProgressOp implements BatchUpdateOp, AsyncPostUpdateOp {
cmsg.getMessage(),
ImmutableList.of(),
repoView)
.send();
}
@Override
public void postUpdate(Context ctx) {
stateChanged.fire(change, ps, ctx.getAccount(), ctx.getWhen());
.sendAsync();
}
}

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2020 The Android Open Source Project
// Copyright (C) 2014 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -23,4 +23,4 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
/** Marker on the global {@link ScheduledThreadPoolExecutor} used to send email. */
@Retention(RUNTIME)
@BindingAnnotation
public @interface AsyncPostUpdateExecutor {}
public @interface SendEmailExecutor {}

View File

@@ -51,20 +51,14 @@ public class SysExecutorModule extends AbstractModule {
@Provides
@Singleton
@AsyncPostUpdateExecutor
@SendEmailExecutor
public ExecutorService provideSendEmailExecutor(
@GerritServerConfig Config config, WorkQueue queues) {
// sendemail.threadPoolSize is deprecated and overridden by asyncPostUpdate.threadPoolSize.
int poolSize =
config.getInt(
"asyncPostUpdate",
null,
"threadPoolSize",
config.getInt("sendemail", null, "threadPoolSize", 1));
int poolSize = config.getInt("sendemail", null, "threadPoolSize", 1);
if (poolSize == 0) {
return newDirectExecutorService();
}
return queues.createQueue(poolSize, "AsyncPostUpdate", true);
return queues.createQueue(poolSize, "SendEmail", true);
}
@Provides

View File

@@ -45,7 +45,6 @@ import com.google.gerrit.server.mail.send.RevertedSender;
import com.google.gerrit.server.notedb.ChangeNotes;
import com.google.gerrit.server.notedb.ReviewerStateInternal;
import com.google.gerrit.server.notedb.Sequences;
import com.google.gerrit.server.update.AsyncPostUpdateOp;
import com.google.gerrit.server.update.BatchUpdate;
import com.google.gerrit.server.update.BatchUpdateOp;
import com.google.gerrit.server.update.ChangeContext;
@@ -297,7 +296,7 @@ public class CommitUtil {
return changeId;
}
private class NotifyOp implements BatchUpdateOp, AsyncPostUpdateOp {
private class NotifyOp implements BatchUpdateOp {
private final Change change;
private final ChangeInserter ins;
@@ -307,12 +306,8 @@ public class CommitUtil {
}
@Override
public void postUpdate(Context ctx) {
public void postUpdate(Context ctx) throws Exception {
changeReverted.fire(change, ins.getChange(), ctx.getWhen());
}
@Override
public void asyncPostUpdate(Context ctx) {
try {
RevertedSender emailSender = revertedSenderFactory.create(ctx.getProject(), change.getId());
emailSender.setFrom(ctx.getAccountId());

View File

@@ -25,19 +25,22 @@ import com.google.gerrit.entities.PatchSetInfo;
import com.google.gerrit.entities.SubmissionId;
import com.google.gerrit.server.ChangeMessagesUtil;
import com.google.gerrit.server.PatchSetUtil;
import com.google.gerrit.server.config.SendEmailExecutor;
import com.google.gerrit.server.extensions.events.ChangeMerged;
import com.google.gerrit.server.mail.send.MergedSender;
import com.google.gerrit.server.mail.send.MessageIdGenerator;
import com.google.gerrit.server.notedb.ChangeUpdate;
import com.google.gerrit.server.patch.PatchSetInfoFactory;
import com.google.gerrit.server.update.AsyncPostUpdateOp;
import com.google.gerrit.server.update.BatchUpdateOp;
import com.google.gerrit.server.update.ChangeContext;
import com.google.gerrit.server.update.Context;
import com.google.gerrit.server.util.RequestScopePropagator;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.assistedinject.Assisted;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.revwalk.RevCommit;
@@ -49,21 +52,24 @@ import org.eclipse.jgit.revwalk.RevWalk;
* <p>When we find a change corresponding to a commit that is pushed to a branch directly, we close
* the change. This class marks the change as merged, and sends out the email notification.
*/
public class MergedByPushOp implements BatchUpdateOp, AsyncPostUpdateOp {
public class MergedByPushOp implements BatchUpdateOp {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
public interface Factory {
MergedByPushOp create(
RequestScopePropagator requestScopePropagator,
PatchSet.Id psId,
@Assisted SubmissionId submissionId,
@Assisted("refName") String refName,
@Assisted("mergeResultRevId") String mergeResultRevId);
}
private final RequestScopePropagator requestScopePropagator;
private final PatchSetInfoFactory patchSetInfoFactory;
private final ChangeMessagesUtil cmUtil;
private final MergedSender.Factory mergedSenderFactory;
private final PatchSetUtil psUtil;
private final ExecutorService sendEmailExecutor;
private final ChangeMerged changeMerged;
private final MessageIdGenerator messageIdGenerator;
@@ -84,8 +90,10 @@ public class MergedByPushOp implements BatchUpdateOp, AsyncPostUpdateOp {
ChangeMessagesUtil cmUtil,
MergedSender.Factory mergedSenderFactory,
PatchSetUtil psUtil,
@SendEmailExecutor ExecutorService sendEmailExecutor,
ChangeMerged changeMerged,
MessageIdGenerator messageIdGenerator,
@Assisted RequestScopePropagator requestScopePropagator,
@Assisted PatchSet.Id psId,
@Assisted SubmissionId submissionId,
@Assisted("refName") String refName,
@@ -94,8 +102,10 @@ public class MergedByPushOp implements BatchUpdateOp, AsyncPostUpdateOp {
this.cmUtil = cmUtil;
this.mergedSenderFactory = mergedSenderFactory;
this.psUtil = psUtil;
this.sendEmailExecutor = sendEmailExecutor;
this.changeMerged = changeMerged;
this.messageIdGenerator = messageIdGenerator;
this.requestScopePropagator = requestScopePropagator;
this.submissionId = submissionId;
this.psId = psId;
this.refName = refName;
@@ -171,27 +181,36 @@ public class MergedByPushOp implements BatchUpdateOp, AsyncPostUpdateOp {
if (!correctBranch) {
return;
}
@SuppressWarnings("unused") // Runnable already handles errors
Future<?> possiblyIgnoredError =
sendEmailExecutor.submit(
requestScopePropagator.wrap(
new Runnable() {
@Override
public void run() {
try {
MergedSender emailSender =
mergedSenderFactory.create(ctx.getProject(), psId.changeId());
emailSender.setFrom(ctx.getAccountId());
emailSender.setPatchSet(patchSet, info);
emailSender.setMessageId(
messageIdGenerator.fromChangeUpdate(ctx.getRepoView(), patchSet.id()));
emailSender.send();
} catch (Exception e) {
logger.atSevere().withCause(e).log(
"Cannot send email for submitted patch set %s", psId);
}
}
@Override
public String toString() {
return "send-email merged";
}
}));
changeMerged.fire(change, patchSet, ctx.getAccount(), mergeResultRevId, ctx.getWhen());
}
@Override
public void asyncPostUpdate(Context ctx) {
if (!correctBranch) {
return;
}
try {
MergedSender emailSender = mergedSenderFactory.create(ctx.getProject(), psId.changeId());
emailSender.setFrom(ctx.getAccountId());
emailSender.setPatchSet(patchSet, info);
emailSender.setMessageId(
messageIdGenerator.fromChangeUpdate(ctx.getRepoView(), patchSet.id()));
emailSender.send();
} catch (Exception e) {
logger.atSevere().withCause(e).log("Cannot send email for submitted patch set %s", psId);
}
}
private PatchSetInfo getPatchSetInfo(ChangeContext ctx) throws IOException {
RevWalk rw = ctx.getRevWalk();
RevCommit commit = rw.parseCommit(requireNonNull(patchSet).commitId());

View File

@@ -175,6 +175,7 @@ import com.google.gerrit.server.update.RetryHelper;
import com.google.gerrit.server.update.UpdateException;
import com.google.gerrit.server.util.LabelVote;
import com.google.gerrit.server.util.MagicBranch;
import com.google.gerrit.server.util.RequestScopePropagator;
import com.google.gerrit.server.util.time.TimeUtil;
import com.google.gerrit.server.validators.ValidationException;
import com.google.gerrit.util.cli.CmdLineParser;
@@ -338,6 +339,7 @@ class ReceiveCommits {
private final PluginSetContext<RequestListener> requestListeners;
private final PublishCommentsOp.Factory publishCommentsOp;
private final RetryHelper retryHelper;
private final RequestScopePropagator requestScopePropagator;
private final Sequences seq;
private final SetHashtagsOp.Factory hashtagsFactory;
private final SubmoduleOp.Factory subOpFactory;
@@ -419,6 +421,7 @@ class ReceiveCommits {
ReplaceOp.Factory replaceOpFactory,
PluginSetContext<RequestListener> requestListeners,
RetryHelper retryHelper,
RequestScopePropagator requestScopePropagator,
Sequences seq,
SetHashtagsOp.Factory hashtagsFactory,
SubmoduleOp.Factory subOpFactory,
@@ -467,6 +470,7 @@ class ReceiveCommits {
this.replaceOpFactory = replaceOpFactory;
this.requestListeners = requestListeners;
this.retryHelper = retryHelper;
this.requestScopePropagator = requestScopePropagator;
this.seq = seq;
this.subOpFactory = subOpFactory;
this.tagCache = tagCache;
@@ -2582,6 +2586,7 @@ class ReceiveCommits {
magicBranch.getCombinedCcs(fromFooters))
.setApprovals(approvals)
.setMessage(msg.toString())
.setRequestScopePropagator(requestScopePropagator)
.setSendMail(true)
.setPatchSetDescription(magicBranch.message));
if (!magicBranch.hashtags.isEmpty()) {
@@ -3008,20 +3013,22 @@ class ReceiveCommits {
RevCommit priorCommit = revisions.inverse().get(priorPatchSet);
replaceOp =
replaceOpFactory.create(
projectState,
notes.getChange().getDest(),
checkMergedInto,
checkMergedInto ? inputCommand.getNewId().name() : null,
priorPatchSet,
priorCommit,
psId,
newCommit,
info,
groups,
magicBranch,
receivePack.getPushCertificate(),
notes.getChange());
replaceOpFactory
.create(
projectState,
notes.getChange().getDest(),
checkMergedInto,
checkMergedInto ? inputCommand.getNewId().name() : null,
priorPatchSet,
priorCommit,
psId,
newCommit,
info,
groups,
magicBranch,
receivePack.getPushCertificate(),
notes.getChange())
.setRequestScopePropagator(requestScopePropagator);
bu.addOp(notes.getChangeId(), replaceOp);
if (progress != null) {
bu.addOp(notes.getChangeId(), new ChangeProgressOp(progress));
@@ -3302,7 +3309,11 @@ class ReceiveCommits {
bu.addOp(
psId.changeId(),
mergedByPushOpFactory.create(
psId, submissionId, refName, newTip.getId().getName()));
requestScopePropagator,
psId,
submissionId,
refName,
newTip.getId().getName()));
continue COMMIT;
}
}
@@ -3346,7 +3357,12 @@ class ReceiveCommits {
bu.addOp(
id,
mergedByPushOpFactory
.create(req.psId, submissionId, refName, newTip.getId().getName())
.create(
requestScopePropagator,
req.psId,
submissionId,
refName,
newTip.getId().getName())
.setPatchSetProvider(req.replaceOp::getPatchSet));
bu.addOp(id, new ChangeProgressOp(progress));
ids.add(id);

View File

@@ -56,6 +56,7 @@ import com.google.gerrit.server.change.ReviewerAdder;
import com.google.gerrit.server.change.ReviewerAdder.InternalAddReviewerInput;
import com.google.gerrit.server.change.ReviewerAdder.ReviewerAddition;
import com.google.gerrit.server.change.ReviewerAdder.ReviewerAdditionList;
import com.google.gerrit.server.config.SendEmailExecutor;
import com.google.gerrit.server.config.UrlFormatter;
import com.google.gerrit.server.extensions.events.CommentAdded;
import com.google.gerrit.server.extensions.events.RevisionCreated;
@@ -70,11 +71,11 @@ import com.google.gerrit.server.permissions.PermissionBackendException;
import com.google.gerrit.server.project.ProjectCache;
import com.google.gerrit.server.project.ProjectState;
import com.google.gerrit.server.query.change.ChangeData;
import com.google.gerrit.server.update.AsyncPostUpdateOp;
import com.google.gerrit.server.update.BatchUpdateOp;
import com.google.gerrit.server.update.ChangeContext;
import com.google.gerrit.server.update.Context;
import com.google.gerrit.server.update.RepoContext;
import com.google.gerrit.server.util.RequestScopePropagator;
import com.google.gerrit.server.validators.ValidationException;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
@@ -85,6 +86,8 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Stream;
import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.lib.ObjectId;
@@ -93,7 +96,7 @@ import org.eclipse.jgit.revwalk.RevWalk;
import org.eclipse.jgit.transport.PushCertificate;
import org.eclipse.jgit.transport.ReceiveCommand;
public class ReplaceOp implements BatchUpdateOp, AsyncPostUpdateOp {
public class ReplaceOp implements BatchUpdateOp {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
public interface Factory {
@@ -120,6 +123,7 @@ public class ReplaceOp implements BatchUpdateOp, AsyncPostUpdateOp {
private final ChangeData.Factory changeDataFactory;
private final ChangeKindCache changeKindCache;
private final ChangeMessagesUtil cmUtil;
private final ExecutorService sendEmailExecutor;
private final RevisionCreated revisionCreated;
private final CommentAdded commentAdded;
private final MergedByPushOp.Factory mergedByPushOpFactory;
@@ -153,6 +157,7 @@ public class ReplaceOp implements BatchUpdateOp, AsyncPostUpdateOp {
private ChangeMessage msg;
private String rejectMessage;
private MergedByPushOp mergedByPushOp;
private RequestScopePropagator requestScopePropagator;
private ReviewerAdditionList reviewerAdditions;
private MailRecipients oldRecipients;
@@ -169,6 +174,7 @@ public class ReplaceOp implements BatchUpdateOp, AsyncPostUpdateOp {
PatchSetUtil psUtil,
ReplacePatchSetSender.Factory replacePatchSetFactory,
ProjectCache projectCache,
@SendEmailExecutor ExecutorService sendEmailExecutor,
ReviewerAdder reviewerAdder,
Change change,
MessageIdGenerator messageIdGenerator,
@@ -196,6 +202,7 @@ public class ReplaceOp implements BatchUpdateOp, AsyncPostUpdateOp {
this.psUtil = psUtil;
this.replacePatchSetFactory = replacePatchSetFactory;
this.projectCache = projectCache;
this.sendEmailExecutor = sendEmailExecutor;
this.reviewerAdder = reviewerAdder;
this.change = change;
this.messageIdGenerator = messageIdGenerator;
@@ -232,7 +239,11 @@ public class ReplaceOp implements BatchUpdateOp, AsyncPostUpdateOp {
if (mergedInto != null) {
mergedByPushOp =
mergedByPushOpFactory.create(
patchSetId, new SubmissionId(change), mergedInto, mergeResultRevId);
requestScopePropagator,
patchSetId,
new SubmissionId(change),
mergedInto,
mergeResultRevId);
}
}
@@ -482,8 +493,18 @@ public class ReplaceOp implements BatchUpdateOp, AsyncPostUpdateOp {
}
@Override
public void postUpdate(Context ctx) {
public void postUpdate(Context ctx) throws Exception {
reviewerAdditions.postUpdate(ctx);
if (changeKind != ChangeKind.TRIVIAL_REBASE) {
// TODO(dborowitz): Merge email templates so we only have to send one.
Runnable e = new ReplaceEmailTask(ctx);
if (requestScopePropagator != null) {
@SuppressWarnings("unused")
Future<?> possiblyIgnoredError = sendEmailExecutor.submit(requestScopePropagator.wrap(e));
} else {
e.run();
}
}
NotifyResolver.Result notify = ctx.getNotify(notes.getChangeId());
revisionCreated.fire(notes.getChange(), newPatchSet, ctx.getAccount(), ctx.getWhen(), notify);
try {
@@ -496,11 +517,15 @@ public class ReplaceOp implements BatchUpdateOp, AsyncPostUpdateOp {
}
}
@Override
public void asyncPostUpdate(Context ctx) {
reviewerAdditions.asyncPostUpdate(ctx);
if (changeKind != ChangeKind.TRIVIAL_REBASE) {
// TODO(dborowitz): Merge email templates so we only have to send one.
private class ReplaceEmailTask implements Runnable {
private final Context ctx;
private ReplaceEmailTask(Context ctx) {
this.ctx = ctx;
}
@Override
public void run() {
try {
ReplacePatchSetSender emailSender =
replacePatchSetFactory.create(projectState.getNameKey(), notes.getChangeId());
@@ -528,8 +553,10 @@ public class ReplaceOp implements BatchUpdateOp, AsyncPostUpdateOp {
"Cannot send email for new patch set %s", newPatchSet.id());
}
}
if (mergedByPushOp != null) {
mergedByPushOp.asyncPostUpdate(ctx);
@Override
public String toString() {
return "send-email newpatchset";
}
}
@@ -586,6 +613,11 @@ public class ReplaceOp implements BatchUpdateOp, AsyncPostUpdateOp {
return cmd;
}
public ReplaceOp setRequestScopePropagator(RequestScopePropagator requestScopePropagator) {
this.requestScopePropagator = requestScopePropagator;
return this;
}
private static String findMergedInto(Context ctx, String first, RevCommit commit) {
try {
RevWalk rw = ctx.getRevWalk();

View File

@@ -64,7 +64,6 @@ import com.google.gerrit.server.patch.PatchListNotAvailableException;
import com.google.gerrit.server.plugincontext.PluginSetContext;
import com.google.gerrit.server.query.change.ChangeData;
import com.google.gerrit.server.query.change.InternalChangeQuery;
import com.google.gerrit.server.update.AsyncPostUpdateOp;
import com.google.gerrit.server.update.BatchUpdate;
import com.google.gerrit.server.update.BatchUpdateOp;
import com.google.gerrit.server.update.ChangeContext;
@@ -315,7 +314,7 @@ public class MailProcessor {
}
}
private class Op implements BatchUpdateOp, AsyncPostUpdateOp {
private class Op implements BatchUpdateOp {
private final PatchSet.Id psId;
private final List<MailComment> parsedComments;
private final String tag;
@@ -360,26 +359,6 @@ public class MailProcessor {
@Override
public void postUpdate(Context ctx) throws Exception {
// Get previous approvals from this user
Map<String, Short> approvals = new HashMap<>();
approvalsUtil
.byPatchSetUser(
notes, psId, ctx.getAccountId(), ctx.getRevWalk(), ctx.getRepoView().getConfig())
.forEach(a -> approvals.put(a.label(), a.value()));
// Fire Gerrit event. Note that approvals can't be granted via email, so old and new approvals
// are always the same here.
commentAdded.fire(
notes.getChange(),
patchSet,
ctx.getAccount(),
changeMessage.getMessage(),
approvals,
approvals,
ctx.getWhen());
}
@Override
public void asyncPostUpdate(Context ctx) throws Exception {
String patchSetComment = null;
if (parsedComments.get(0).getType() == MailComment.CommentType.CHANGE_MESSAGE) {
patchSetComment = parsedComments.get(0).getMessage();
@@ -396,7 +375,23 @@ public class MailProcessor {
patchSetComment,
ImmutableList.of(),
ctx.getRepoView())
.send();
.sendAsync();
// Get previous approvals from this user
Map<String, Short> approvals = new HashMap<>();
approvalsUtil
.byPatchSetUser(
notes, psId, ctx.getAccountId(), ctx.getRevWalk(), ctx.getRepoView().getConfig())
.forEach(a -> approvals.put(a.label(), a.value()));
// Fire Gerrit event. Note that approvals can't be granted via email, so old and new approvals
// are always the same here.
commentAdded.fire(
notes.getChange(),
patchSet,
ctx.getAccount(),
changeMessage.getMessage(),
approvals,
approvals,
ctx.getWhen());
}
private ChangeMessage generateChangeMessage(ChangeContext ctx) {

View File

@@ -50,7 +50,6 @@ import com.google.gerrit.server.permissions.PermissionBackendException;
import com.google.gerrit.server.project.ProjectCache;
import com.google.gerrit.server.project.ProjectState;
import com.google.gerrit.server.project.RemoveReviewerControl;
import com.google.gerrit.server.update.AsyncPostUpdateOp;
import com.google.gerrit.server.update.BatchUpdate;
import com.google.gerrit.server.update.BatchUpdateOp;
import com.google.gerrit.server.update.ChangeContext;
@@ -147,7 +146,7 @@ public class DeleteVote implements RestModifyView<VoteResource, DeleteVoteInput>
return Response.none();
}
private class Op implements BatchUpdateOp, AsyncPostUpdateOp {
private class Op implements BatchUpdateOp {
private final ProjectState projectState;
private final AccountState accountState;
private final String label;
@@ -222,30 +221,17 @@ public class DeleteVote implements RestModifyView<VoteResource, DeleteVoteInput>
@Override
public void postUpdate(Context ctx) {
voteDeleted.fire(
change,
ps,
accountState,
newApprovals,
oldApprovals,
input.notify,
changeMessage.getMessage(),
ctx.getIdentifiedUser().state(),
ctx.getWhen());
}
@Override
public void asyncPostUpdate(Context ctx) {
if (changeMessage == null) {
return;
}
IdentifiedUser user = ctx.getIdentifiedUser();
try {
NotifyResolver.Result notify = ctx.getNotify(change.getId());
if (notify.shouldNotify()) {
ReplyToChangeSender emailSender =
deleteVoteSenderFactory.create(ctx.getProject(), change.getId());
emailSender.setFrom(ctx.getIdentifiedUser().getAccountId());
emailSender.setFrom(user.getAccountId());
emailSender.setChangeMessage(changeMessage.getMessage(), ctx.getWhen());
emailSender.setNotify(notify);
emailSender.setMessageId(
@@ -255,6 +241,17 @@ public class DeleteVote implements RestModifyView<VoteResource, DeleteVoteInput>
} catch (Exception e) {
logger.atSevere().withCause(e).log("Cannot email update for change %s", change.getId());
}
voteDeleted.fire(
change,
ps,
accountState,
newApprovals,
oldApprovals,
input.notify,
changeMessage.getMessage(),
user.state(),
ctx.getWhen());
}
}
}

View File

@@ -120,7 +120,6 @@ import com.google.gerrit.server.plugincontext.PluginSetContext;
import com.google.gerrit.server.project.ProjectCache;
import com.google.gerrit.server.project.ProjectState;
import com.google.gerrit.server.query.change.ChangeData;
import com.google.gerrit.server.update.AsyncPostUpdateOp;
import com.google.gerrit.server.update.BatchUpdate;
import com.google.gerrit.server.update.BatchUpdateOp;
import com.google.gerrit.server.update.ChangeContext;
@@ -449,7 +448,7 @@ public class PostReview implements RestModifyView<RevisionResource, ReviewInput>
ccByEmail.addAll(addition.reviewersByEmail);
}
}
addReviewersEmail.emailReviewers(
addReviewersEmail.emailReviewersAsync(
user.asIdentifiedUser(), change, to, cc, toByEmail, ccByEmail, notify);
}
}
@@ -860,7 +859,7 @@ public class PostReview implements RestModifyView<RevisionResource, ReviewInput>
abstract Comment.Range range();
}
private class Op implements BatchUpdateOp, AsyncPostUpdateOp {
private class Op implements BatchUpdateOp {
private final ProjectState projectState;
private final PatchSet.Id psId;
private final ReviewInput in;
@@ -906,7 +905,7 @@ public class PostReview implements RestModifyView<RevisionResource, ReviewInput>
}
@Override
public void asyncPostUpdate(Context ctx) {
public void postUpdate(Context ctx) {
if (message == null) {
return;
}
@@ -924,20 +923,12 @@ public class PostReview implements RestModifyView<RevisionResource, ReviewInput>
in.message,
labelDelta,
ctx.getRepoView())
.send();
.sendAsync();
} catch (IOException ex) {
throw new StorageException(
String.format("Repository %s not found", ctx.getProject().get()), ex);
}
}
}
@Override
public void postUpdate(Context ctx) {
if (message == null) {
return;
}
commentAdded.fire(
notes.getChange(),
ps,

View File

@@ -44,7 +44,6 @@ import com.google.gerrit.server.permissions.ChangePermission;
import com.google.gerrit.server.permissions.PermissionBackendException;
import com.google.gerrit.server.project.ProjectCache;
import com.google.gerrit.server.project.ProjectState;
import com.google.gerrit.server.update.AsyncPostUpdateOp;
import com.google.gerrit.server.update.BatchUpdate;
import com.google.gerrit.server.update.BatchUpdateOp;
import com.google.gerrit.server.update.ChangeContext;
@@ -109,7 +108,7 @@ public class Restore
return Response.ok(json.noOptions().format(op.change));
}
private class Op implements BatchUpdateOp, AsyncPostUpdateOp {
private class Op implements BatchUpdateOp {
private final RestoreInput input;
private Change change;
@@ -150,12 +149,6 @@ public class Restore
@Override
public void postUpdate(Context ctx) {
changeRestored.fire(
change, patchSet, ctx.getAccount(), Strings.emptyToNull(input.message), ctx.getWhen());
}
@Override
public void asyncPostUpdate(Context ctx) {
try {
ReplyToChangeSender emailSender =
restoredSenderFactory.create(ctx.getProject(), change.getId());
@@ -167,6 +160,8 @@ public class Restore
} catch (Exception e) {
logger.atSevere().withCause(e).log("Cannot email update for change %s", change.getId());
}
changeRestored.fire(
change, patchSet, ctx.getAccount(), Strings.emptyToNull(input.message), ctx.getWhen());
}
}

View File

@@ -72,7 +72,6 @@ import com.google.gerrit.server.project.ProjectState;
import com.google.gerrit.server.query.change.ChangeData;
import com.google.gerrit.server.query.change.InternalChangeQuery;
import com.google.gerrit.server.restapi.change.CherryPickChange.Result;
import com.google.gerrit.server.update.AsyncPostUpdateOp;
import com.google.gerrit.server.update.BatchUpdate;
import com.google.gerrit.server.update.BatchUpdateOp;
import com.google.gerrit.server.update.ChangeContext;
@@ -589,7 +588,7 @@ public class RevertSubmission
}
}
private class NotifyOp implements BatchUpdateOp, AsyncPostUpdateOp {
private class NotifyOp implements BatchUpdateOp {
private final Change change;
private final Change.Id revertChangeId;
@@ -599,13 +598,9 @@ public class RevertSubmission
}
@Override
public void postUpdate(Context ctx) {
public void postUpdate(Context ctx) throws Exception {
changeReverted.fire(
change, changeNotesFactory.createChecked(revertChangeId).getChange(), ctx.getWhen());
}
@Override
public void asyncPostUpdate(Context ctx) {
try {
RevertedSender emailSender = revertedSenderFactory.create(ctx.getProject(), change.getId());
emailSender.setFrom(ctx.getAccountId());

View File

@@ -19,14 +19,22 @@ import com.google.gerrit.common.Nullable;
import com.google.gerrit.entities.Account;
import com.google.gerrit.entities.Change;
import com.google.gerrit.entities.Project;
import com.google.gerrit.server.CurrentUser;
import com.google.gerrit.server.IdentifiedUser;
import com.google.gerrit.server.change.NotifyResolver;
import com.google.gerrit.server.config.SendEmailExecutor;
import com.google.gerrit.server.mail.send.MergedSender;
import com.google.gerrit.server.mail.send.MessageIdGenerator;
import com.google.gerrit.server.update.RepoView;
import com.google.gerrit.server.util.RequestContext;
import com.google.gerrit.server.util.ThreadLocalRequestContext;
import com.google.inject.Inject;
import com.google.inject.OutOfScopeException;
import com.google.inject.assistedinject.Assisted;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
class EmailMerge {
class EmailMerge implements Runnable, RequestContext {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
interface Factory {
@@ -38,7 +46,10 @@ class EmailMerge {
RepoView repoView);
}
private final ExecutorService sendEmailsExecutor;
private final MergedSender.Factory mergedSenderFactory;
private final ThreadLocalRequestContext requestContext;
private final IdentifiedUser.GenericFactory identifiedUserFactory;
private final MessageIdGenerator messageIdGenerator;
private final Project.NameKey project;
@@ -49,14 +60,20 @@ class EmailMerge {
@Inject
EmailMerge(
@SendEmailExecutor ExecutorService executor,
MergedSender.Factory mergedSenderFactory,
ThreadLocalRequestContext requestContext,
IdentifiedUser.GenericFactory identifiedUserFactory,
MessageIdGenerator messageIdGenerator,
@Assisted Project.NameKey project,
@Assisted Change change,
@Assisted @Nullable Account.Id submitter,
@Assisted NotifyResolver.Result notify,
@Assisted RepoView repoView) {
this.sendEmailsExecutor = executor;
this.mergedSenderFactory = mergedSenderFactory;
this.requestContext = requestContext;
this.identifiedUserFactory = identifiedUserFactory;
this.messageIdGenerator = messageIdGenerator;
this.project = project;
this.change = change;
@@ -65,7 +82,14 @@ class EmailMerge {
this.repoView = repoView;
}
public void send() {
void sendAsync() {
@SuppressWarnings("unused")
Future<?> possiblyIgnoredError = sendEmailsExecutor.submit(this);
}
@Override
public void run() {
RequestContext old = requestContext.setContext(this);
try {
MergedSender emailSender = mergedSenderFactory.create(project, change.getId());
if (submitter != null) {
@@ -77,6 +101,21 @@ class EmailMerge {
emailSender.send();
} catch (Exception e) {
logger.atSevere().withCause(e).log("Cannot email merged notification for %s", change.getId());
} finally {
requestContext.setContext(old);
}
}
@Override
public String toString() {
return "send-email merged";
}
@Override
public CurrentUser getUser() {
if (submitter != null) {
return identifiedUserFactory.create(submitter).getRealUser();
}
throw new OutOfScopeException("No user on email thread");
}
}

View File

@@ -275,13 +275,6 @@ public class RebaseSubmitStrategy extends SubmitStrategy {
rebaseOp.postUpdate(ctx);
}
}
@Override
public void asyncPostUpdateImpl(Context ctx) {
if (rebaseOp != null) {
rebaseOp.asyncPostUpdate(ctx);
}
}
}
private class MergeIfNecessaryOp extends SubmitStrategyOp {

View File

@@ -43,7 +43,6 @@ import com.google.gerrit.server.git.MergeUtil;
import com.google.gerrit.server.notedb.ChangeUpdate;
import com.google.gerrit.server.project.ProjectConfig;
import com.google.gerrit.server.project.ProjectState;
import com.google.gerrit.server.update.AsyncPostUpdateOp;
import com.google.gerrit.server.update.BatchUpdateOp;
import com.google.gerrit.server.update.ChangeContext;
import com.google.gerrit.server.update.Context;
@@ -61,7 +60,7 @@ import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.transport.ReceiveCommand;
abstract class SubmitStrategyOp implements BatchUpdateOp, AsyncPostUpdateOp {
abstract class SubmitStrategyOp implements BatchUpdateOp {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
protected final SubmitStrategy.Arguments args;
@@ -484,22 +483,8 @@ abstract class SubmitStrategyOp implements BatchUpdateOp, AsyncPostUpdateOp {
}
}
if (mergeResultRev != null && !args.dryrun) {
args.changeMerged.fire(
updatedChange,
mergedPatchSet,
args.accountCache.get(submitter.accountId()).orElse(null),
args.mergeTip.getCurrentTip().name(),
ctx.getWhen());
}
}
/**
* Assume the change must have been merged at this point, otherwise we would have failed in one of
* the other steps in postUpdate (which is done prior to this method).
*/
@Override
public final void asyncPostUpdate(Context ctx) {
// Assume the change must have been merged at this point, otherwise we would
// have failed fast in one of the other steps.
try {
args.mergedSenderFactory
.create(
@@ -508,11 +493,18 @@ abstract class SubmitStrategyOp implements BatchUpdateOp, AsyncPostUpdateOp {
submitter.accountId(),
ctx.getNotify(getId()),
ctx.getRepoView())
.send();
.sendAsync();
} catch (Exception e) {
logger.atSevere().withCause(e).log("Cannot email merged notification for %s", getId());
}
asyncPostUpdateImpl(ctx);
if (mergeResultRev != null && !args.dryrun) {
args.changeMerged.fire(
updatedChange,
mergedPatchSet,
args.accountCache.get(submitter.accountId()).orElse(null),
args.mergeTip.getCurrentTip().name(),
ctx.getWhen());
}
}
/**
@@ -536,12 +528,6 @@ abstract class SubmitStrategyOp implements BatchUpdateOp, AsyncPostUpdateOp {
*/
protected void postUpdateImpl(Context ctx) throws Exception {}
/**
* @see #asyncPostUpdate(Context)
* @param ctx
*/
protected void asyncPostUpdateImpl(Context ctx) {}
/**
* Amend the commit with gitlink update
*

View File

@@ -1,38 +0,0 @@
// Copyright (C) 2020 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.update;
import com.google.gerrit.server.config.SysExecutorModule;
/** Base interface for operations performed asynchronously as part of a {@link BatchUpdate}. */
public interface AsyncPostUpdateOp {
/**
* Override this method to do something after the update e.g. send emails. This method will be
* invoked asynchronously, and when invoked, the invoking method will not wait for the async
* updates to finish. This method will be called after {@link BatchUpdateOp} operations and {@link
* RepoOnlyOp} are finished.
*
* <p>The maximum amount of threads in the thread pool is decided by
* asyncPostUpdate.threadPoolSize. When asyncPostUpdate.threadPoolSize is not specified, the
* deprecated sendemail.threadPoolSize is used (see {@link
* SysExecutorModule#provideSendEmailExecutor}). This is the case for legacy reasons, since in the
* past only some emails were sent async (and sendemail.threadPoolSize) was used, and now all
* emails (and possibly others) are done async, so asyncPostUpdate.threadPoolSize is used.
*
* @param ctx context
*/
default void asyncPostUpdate(Context ctx) throws Exception {}
}

View File

@@ -47,7 +47,6 @@ import com.google.gerrit.server.CurrentUser;
import com.google.gerrit.server.GerritPersonIdent;
import com.google.gerrit.server.account.AccountState;
import com.google.gerrit.server.change.NotifyResolver;
import com.google.gerrit.server.config.AsyncPostUpdateExecutor;
import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.git.validators.OnSubmitValidators;
@@ -63,7 +62,6 @@ import com.google.gerrit.server.project.InvalidChangeOperationException;
import com.google.gerrit.server.project.NoSuchChangeException;
import com.google.gerrit.server.project.NoSuchProjectException;
import com.google.gerrit.server.project.NoSuchRefException;
import com.google.gerrit.server.util.ThreadLocalRequestContext;
import com.google.inject.Inject;
import com.google.inject.Module;
import com.google.inject.assistedinject.Assisted;
@@ -77,7 +75,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.TimeZone;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import org.eclipse.jgit.lib.BatchRefUpdate;
import org.eclipse.jgit.lib.ObjectInserter;
import org.eclipse.jgit.lib.PersonIdent;
@@ -335,8 +332,6 @@ public class BatchUpdate implements AutoCloseable {
private final NoteDbUpdateManager.Factory updateManagerFactory;
private final ChangeIndexer indexer;
private final GitReferenceUpdated gitRefUpdated;
private final ThreadLocalRequestContext requestContext;
private final ExecutorService executorService;
private final Project.NameKey project;
private final CurrentUser user;
@@ -365,8 +360,6 @@ public class BatchUpdate implements AutoCloseable {
NoteDbUpdateManager.Factory updateManagerFactory,
ChangeIndexer indexer,
GitReferenceUpdated gitRefUpdated,
ThreadLocalRequestContext requestContext,
@AsyncPostUpdateExecutor ExecutorService executorService,
@Assisted Project.NameKey project,
@Assisted CurrentUser user,
@Assisted Timestamp when) {
@@ -376,8 +369,6 @@ public class BatchUpdate implements AutoCloseable {
this.updateManagerFactory = updateManagerFactory;
this.indexer = indexer;
this.gitRefUpdated = gitRefUpdated;
this.requestContext = requestContext;
this.executorService = executorService;
this.project = project;
this.user = user;
this.when = when;
@@ -646,40 +637,23 @@ public class BatchUpdate implements AutoCloseable {
return new ChangeContextImpl(notes);
}
private void executePostOps() {
private void executePostOps() throws Exception {
ContextImpl ctx = new ContextImpl();
for (BatchUpdateOp op : ops.values()) {
postUpdate(ctx, op);
if (op instanceof AsyncPostUpdateOp) {
asyncPostUpdate(ctx, ((AsyncPostUpdateOp) op));
try (TraceContext.TraceTimer ignored =
TraceContext.newTimer(op.getClass().getSimpleName() + "#postUpdate", Metadata.empty())) {
op.postUpdate(ctx);
}
}
for (RepoOnlyOp op : repoOnlyOps) {
postUpdate(ctx, op);
if (op instanceof AsyncPostUpdateOp) {
asyncPostUpdate(ctx, ((AsyncPostUpdateOp) op));
try (TraceContext.TraceTimer ignored =
TraceContext.newTimer(op.getClass().getSimpleName() + "#postUpdate", Metadata.empty())) {
op.postUpdate(ctx);
}
}
}
/** Invoke the postUpdate methods synchronously. */
private void postUpdate(ContextImpl ctx, RepoOnlyOp op) {
try (TraceContext.TraceTimer ignored =
TraceContext.newTimer(op.getClass().getSimpleName() + "#postUpdate", Metadata.empty())) {
op.postUpdate(ctx);
} catch (Exception ex) {
logDebug(
String.format(
"postUpdate for project %s failed for user %s", ctx.getProject(), ctx.getUser()));
}
}
/** Invoke the asyncPostUpdate methods asynchronously. */
private void asyncPostUpdate(ContextImpl ctx, AsyncPostUpdateOp op) {
executorService.execute(new ExecuteAsyncPostUpdate(op, ctx, user, requestContext));
}
private static void logDebug(String msg) {
// Only log if there is a requestId assigned, since those are the
// expensive/complicated requests like MergeOp. Doing it every time would be

View File

@@ -1,65 +0,0 @@
// Copyright (C) 2020 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.update;
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.server.CurrentUser;
import com.google.gerrit.server.util.RequestContext;
import com.google.gerrit.server.util.ThreadLocalRequestContext;
/** Executes {@link AsyncPostUpdateOp#asyncPostUpdate(Context)} on a specific op, asynchronously. */
public class ExecuteAsyncPostUpdate implements Runnable, RequestContext {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
private final AsyncPostUpdateOp op;
private final Context ctx;
private final CurrentUser user;
private final ThreadLocalRequestContext threadLocalRequestContext;
ExecuteAsyncPostUpdate(
AsyncPostUpdateOp op,
Context ctx,
CurrentUser user,
ThreadLocalRequestContext threadLocalRequestContext) {
this.op = op;
this.ctx = ctx;
this.user = user;
this.threadLocalRequestContext = threadLocalRequestContext;
}
@Override
public void run() {
RequestContext old = threadLocalRequestContext.setContext(this);
try {
op.asyncPostUpdate(ctx);
} catch (Exception e) {
logger.atSevere().withCause(e).log(
"Cannot perform async post update for repo %s and user %s",
ctx.getProject(), ctx.getAccount().account().getName());
} finally {
threadLocalRequestContext.setContext(old);
}
}
@Override
public String toString() {
return "async-post-update";
}
@Override
public CurrentUser getUser() {
return user;
}
}

View File

@@ -30,11 +30,10 @@ public interface RepoOnlyOp {
default void updateRepo(RepoContext ctx) throws Exception {}
/**
* Override this method to do something after the update e.g. run hooks. This method will
* <strong>NOT</strong> be invoked asynchronously. This method will be finished before {@link
* AsyncPostUpdateOp#asyncPostUpdate} is called.
* Override this method to do something after the update e.g. send email or run hooks
*
* @param ctx context
*/
// TODO(dborowitz): Support async operations?
default void postUpdate(Context ctx) throws Exception {}
}

View File

@@ -17,7 +17,9 @@ package com.google.gerrit.server.util;
import com.google.common.flogger.FluentLogger;
import com.google.gerrit.entities.Account;
import com.google.gerrit.entities.Change;
import com.google.gerrit.server.CurrentUser;
import com.google.gerrit.server.account.AccountState;
import com.google.gerrit.server.config.SendEmailExecutor;
import com.google.gerrit.server.mail.send.AddToAttentionSetSender;
import com.google.gerrit.server.mail.send.AttentionSetSender;
import com.google.gerrit.server.mail.send.MessageIdGenerator;
@@ -25,8 +27,10 @@ import com.google.gerrit.server.mail.send.RemoveFromAttentionSetSender;
import com.google.gerrit.server.update.Context;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
public class AttentionSetEmail {
public class AttentionSetEmail implements Runnable, RequestContext {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
public interface Factory {
@@ -51,6 +55,7 @@ public class AttentionSetEmail {
Account.Id attentionUserId);
}
private ExecutorService sendEmailsExecutor;
private AttentionSetSender sender;
private Context ctx;
private Change change;
@@ -61,12 +66,14 @@ public class AttentionSetEmail {
@Inject
AttentionSetEmail(
@SendEmailExecutor ExecutorService executor,
@Assisted AttentionSetSender sender,
@Assisted Context ctx,
@Assisted Change change,
@Assisted String reason,
@Assisted MessageIdGenerator.MessageId messageId,
@Assisted Account.Id attentionUserId) {
this.sendEmailsExecutor = executor;
this.sender = sender;
this.ctx = ctx;
this.change = change;
@@ -75,7 +82,13 @@ public class AttentionSetEmail {
this.attentionUserId = attentionUserId;
}
public void send() {
public void sendAsync() {
@SuppressWarnings("unused")
Future<?> possiblyIgnoredError = sendEmailsExecutor.submit(this);
}
@Override
public void run() {
try {
AccountState accountState =
ctx.getUser().isIdentifiedUser() ? ctx.getUser().asIdentifiedUser().state() : null;
@@ -91,4 +104,14 @@ public class AttentionSetEmail {
logger.atSevere().withCause(e).log("Cannot email update for change %s", change.getId());
}
}
@Override
public String toString() {
return "send-email comments";
}
@Override
public CurrentUser getUser() {
return ctx.getUser();
}
}

View File

@@ -128,7 +128,6 @@ public class FakeEmailSender implements EmailSender {
}
public synchronized @Nullable Message peekMessage() {
waitForEmails();
if (messagesRead >= messages.size()) {
return null;
}
@@ -136,14 +135,9 @@ public class FakeEmailSender implements EmailSender {
}
public synchronized @Nullable Message nextMessage() {
waitForEmails();
Message msg = peekMessage();
readOneMessage();
return msg;
}
public synchronized void readOneMessage() {
messagesRead++;
return msg;
}
public ImmutableList<Message> getMessages() {
@@ -166,7 +160,7 @@ public class FakeEmailSender implements EmailSender {
// a single thread in tests (tricky because most callers just use the
// default executor).
for (WorkQueue.Task<?> task : workQueue.getTasks()) {
if (task.toString().contains("async-post-update")) {
if (task.toString().contains("send-email")) {
try {
task.get();
} catch (ExecutionException | InterruptedException e) {

View File

@@ -47,7 +47,6 @@ import com.google.gerrit.server.config.AllUsersName;
import com.google.gerrit.server.config.AllUsersNameProvider;
import com.google.gerrit.server.config.AnonymousCowardName;
import com.google.gerrit.server.config.AnonymousCowardNameProvider;
import com.google.gerrit.server.config.AsyncPostUpdateExecutor;
import com.google.gerrit.server.config.CanonicalWebUrlModule;
import com.google.gerrit.server.config.CanonicalWebUrlProvider;
import com.google.gerrit.server.config.DefaultUrlFormatter;
@@ -59,6 +58,7 @@ import com.google.gerrit.server.config.GerritRuntime;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.config.GerritServerId;
import com.google.gerrit.server.config.GerritServerIdProvider;
import com.google.gerrit.server.config.SendEmailExecutor;
import com.google.gerrit.server.config.SitePath;
import com.google.gerrit.server.config.TrackingFooters;
import com.google.gerrit.server.config.TrackingFootersProvider;
@@ -279,7 +279,7 @@ public class InMemoryModule extends FactoryModule {
@Provides
@Singleton
@AsyncPostUpdateExecutor
@SendEmailExecutor
public ExecutorService createSendEmailExecutor() {
return newDirectExecutorService();
}