Fix email delivery for merged notifications

There was no ReviewDb available to the email thread. Fix this by
using the same pattern as PostReview; a task is built from a factory
and the task sets up its own thread-local request scope to provide
a dynamic ReviewDb instance to the MergedSender.

Change-Id: Idbeb0d0f611efbd6c1409c5da57bd7408eb89019
This commit is contained in:
Shawn Pearce
2015-07-22 16:14:56 -07:00
parent eb10d12f41
commit 6c91d4966d
3 changed files with 134 additions and 124 deletions

View File

@@ -74,6 +74,7 @@ import com.google.gerrit.server.change.ChangeKindCacheImpl;
import com.google.gerrit.server.change.MergeabilityCacheImpl;
import com.google.gerrit.server.events.EventFactory;
import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
import com.google.gerrit.server.git.EmailMerge;
import com.google.gerrit.server.git.GitModule;
import com.google.gerrit.server.git.MergeUtil;
import com.google.gerrit.server.git.NotesBranchUtil;
@@ -192,6 +193,7 @@ public class GerritGlobalModule extends FactoryModule {
factory(GroupDetailFactory.Factory.class);
factory(GroupInfoCacheFactory.Factory.class);
factory(GroupMembers.Factory.class);
factory(EmailMerge.Factory.class);
factory(MergedSender.Factory.class);
factory(MergeFailSender.Factory.class);
factory(MergeUtil.Factory.class);

View File

@@ -0,0 +1,120 @@
// Copyright (C) 2015 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.git;
import com.google.gerrit.common.Nullable;
import com.google.gerrit.reviewdb.client.Account;
import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.CurrentUser;
import com.google.gerrit.server.change.EmailReviewComments;
import com.google.gerrit.server.mail.MergedSender;
import com.google.gerrit.server.util.RequestContext;
import com.google.gerrit.server.util.ThreadLocalRequestContext;
import com.google.gwtorm.server.OrmException;
import com.google.gwtorm.server.SchemaFactory;
import com.google.inject.Inject;
import com.google.inject.OutOfScopeException;
import com.google.inject.Provider;
import com.google.inject.ProvisionException;
import com.google.inject.assistedinject.Assisted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
public class EmailMerge implements Runnable, RequestContext {
private static final Logger log = LoggerFactory.getLogger(EmailReviewComments.class);
public interface Factory {
EmailMerge create(Change.Id changeId, Account.Id submitter);
}
private final ExecutorService sendEmailsExecutor;
private final MergedSender.Factory mergedSenderFactory;
private final SchemaFactory<ReviewDb> schemaFactory;
private final ThreadLocalRequestContext requestContext;
private final Change.Id changeId;
private final Account.Id submitter;
private ReviewDb db;
@Inject
EmailMerge(@EmailReviewCommentsExecutor ExecutorService executor,
MergedSender.Factory mergedSenderFactory,
SchemaFactory<ReviewDb> schemaFactory,
ThreadLocalRequestContext requestContext,
@Assisted Change.Id changeId,
@Assisted @Nullable Account.Id submitter) {
this.sendEmailsExecutor = executor;
this.mergedSenderFactory = mergedSenderFactory;
this.schemaFactory = schemaFactory;
this.requestContext = requestContext;
this.changeId = changeId;
this.submitter = submitter;
}
void sendAsync() {
sendEmailsExecutor.submit(this);
}
@Override
public void run() {
RequestContext old = requestContext.setContext(this);
try {
MergedSender cm = mergedSenderFactory.create(changeId);
if (submitter != null) {
cm.setFrom(submitter);
}
cm.send();
} catch (Exception e) {
log.error("Cannot email merged notification for " + changeId, e);
} finally {
requestContext.setContext(old);
if (db != null) {
db.close();
db = null;
}
}
}
@Override
public String toString() {
return "send-email merged";
}
@Override
public CurrentUser getCurrentUser() {
throw new OutOfScopeException("No user on email thread");
}
@Override
public Provider<ReviewDb> getReviewDbProvider() {
return new Provider<ReviewDb>() {
@Override
public ReviewDb get() {
if (db == null) {
try {
db = schemaFactory.open();
} catch (OrmException e) {
throw new ProvisionException("Cannot open ReviewDb", e);
}
}
return db;
}
};
}
}

View File

@@ -45,13 +45,9 @@ import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.ApprovalsUtil;
import com.google.gerrit.server.ChangeMessagesUtil;
import com.google.gerrit.server.ChangeUtil;
import com.google.gerrit.server.CurrentUser;
import com.google.gerrit.server.GerritPersonIdent;
import com.google.gerrit.server.IdentifiedUser;
import com.google.gerrit.server.RemotePeer;
import com.google.gerrit.server.account.AccountCache;
import com.google.gerrit.server.config.GerritRequestModule;
import com.google.gerrit.server.config.RequestScopedReviewDbProvider;
import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
import com.google.gerrit.server.git.VersionedMetaData.BatchMetaDataUpdate;
import com.google.gerrit.server.git.strategy.SubmitStrategy;
@@ -59,7 +55,6 @@ import com.google.gerrit.server.git.strategy.SubmitStrategyFactory;
import com.google.gerrit.server.git.validators.MergeValidationException;
import com.google.gerrit.server.git.validators.MergeValidators;
import com.google.gerrit.server.index.ChangeIndexer;
import com.google.gerrit.server.mail.MergedSender;
import com.google.gerrit.server.notedb.ChangeNotes;
import com.google.gerrit.server.notedb.ChangeUpdate;
import com.google.gerrit.server.patch.PatchSetInfoFactory;
@@ -72,21 +67,10 @@ import com.google.gerrit.server.project.ProjectState;
import com.google.gerrit.server.project.SubmitRuleEvaluator;
import com.google.gerrit.server.query.change.ChangeData;
import com.google.gerrit.server.query.change.InternalChangeQuery;
import com.google.gerrit.server.ssh.SshInfo;
import com.google.gerrit.server.util.RequestContext;
import com.google.gerrit.server.util.RequestScopePropagator;
import com.google.gwtorm.server.AtomicUpdate;
import com.google.gwtorm.server.OrmException;
import com.google.gwtorm.server.SchemaFactory;
import com.google.inject.AbstractModule;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.OutOfScopeException;
import com.google.inject.Provider;
import com.google.inject.Provides;
import com.google.inject.servlet.RequestScoped;
import com.jcraft.jsch.HostKey;
import org.eclipse.jgit.errors.IncorrectObjectTypeException;
import org.eclipse.jgit.errors.RepositoryNotFoundException;
@@ -106,18 +90,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.SocketAddress;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
/**
* Merges changes in submission order into a single branch.
@@ -148,22 +129,19 @@ public class MergeOp {
private final GitRepositoryManager repoManager;
private final IdentifiedUser.GenericFactory identifiedUserFactory;
private final LabelNormalizer labelNormalizer;
private final MergedSender.Factory mergedSenderFactory;
private final EmailMerge.Factory mergedSenderFactory;
private final MergeSuperSet mergeSuperSet;
private final MergeValidators.Factory mergeValidatorsFactory;
private final PatchSetInfoFactory patchSetInfoFactory;
private final ProjectCache projectCache;
private final InternalChangeQuery internalChangeQuery;
private final SchemaFactory<ReviewDb> schemaFactory;
private final PersonIdent serverIdent;
private final SubmitStrategyFactory submitStrategyFactory;
private final Provider<SubmoduleOp> subOpProvider;
private final TagCache tagCache;
private final WorkQueue workQueue;
private final Map<Change.Id, List<SubmitRecord>> records;
private final Map<Change.Id, CodeReviewCommit> commits;
private final PerThreadRequestScope.Scoper threadScoper;
private String logPrefix;
private ProjectState destProject;
@@ -184,25 +162,22 @@ public class MergeOp {
ChangeData.Factory changeDataFactory,
ChangeHooks hooks,
ChangeIndexer indexer,
Injector injector,
ChangeMessagesUtil cmUtil,
ChangeUpdate.Factory updateFactory,
GitReferenceUpdated gitRefUpdated,
GitRepositoryManager repoManager,
IdentifiedUser.GenericFactory identifiedUserFactory,
LabelNormalizer labelNormalizer,
MergedSender.Factory mergedSenderFactory,
EmailMerge.Factory mergedSenderFactory,
MergeSuperSet mergeSuperSet,
MergeValidators.Factory mergeValidatorsFactory,
PatchSetInfoFactory patchSetInfoFactory,
ProjectCache projectCache,
InternalChangeQuery internalChangeQuery,
SchemaFactory<ReviewDb> schemaFactory,
@GerritPersonIdent PersonIdent serverIdent,
SubmitStrategyFactory submitStrategyFactory,
Provider<SubmoduleOp> subOpProvider,
TagCache tagCache,
WorkQueue workQueue) {
TagCache tagCache) {
this.accountCache = accountCache;
this.approvalsUtil = approvalsUtil;
this.changeControlFactory = changeControlFactory;
@@ -221,67 +196,17 @@ public class MergeOp {
this.patchSetInfoFactory = patchSetInfoFactory;
this.projectCache = projectCache;
this.internalChangeQuery = internalChangeQuery;
this.schemaFactory = schemaFactory;
this.serverIdent = serverIdent;
this.submitStrategyFactory = submitStrategyFactory;
this.subOpProvider = subOpProvider;
this.tagCache = tagCache;
this.workQueue = workQueue;
commits = new HashMap<>();
pendingRefUpdates = new HashMap<>();
openBranches = new HashMap<>();
pendingRefUpdates = new HashMap<>();
records = new HashMap<>();
mergeTips = new HashMap<>();
Injector child = injector.createChildInjector(new AbstractModule() {
@Override
protected void configure() {
bindScope(RequestScoped.class, PerThreadRequestScope.REQUEST);
bind(RequestScopePropagator.class)
.to(PerThreadRequestScope.Propagator.class);
bind(PerThreadRequestScope.Propagator.class);
install(new GerritRequestModule());
bind(SocketAddress.class).annotatedWith(RemotePeer.class).toProvider(
new Provider<SocketAddress>() {
@Override
public SocketAddress get() {
throw new OutOfScopeException("No remote peer on merge thread");
}
});
bind(SshInfo.class).toInstance(new SshInfo() {
@Override
public List<HostKey> getHostKeys() {
return Collections.emptyList();
}
});
}
@Provides
public PerThreadRequestScope.Scoper provideScoper(
final PerThreadRequestScope.Propagator propagator,
final Provider<RequestScopedReviewDbProvider> dbProvider) {
final RequestContext requestContext = new RequestContext() {
@Override
public CurrentUser getCurrentUser() {
throw new OutOfScopeException("No user on merge thread");
}
@Override
public Provider<ReviewDb> getReviewDbProvider() {
return dbProvider.get();
}
};
return new PerThreadRequestScope.Scoper() {
@Override
public <T> Callable<T> scope(Callable<T> callable) {
return propagator.scope(requestContext, callable);
}
};
}
});
threadScoper = child.getInstance(PerThreadRequestScope.Scoper.class);
}
private void setDestProject(Branch.NameKey destBranch) throws MergeException {
@@ -1020,20 +945,15 @@ public class MergeOp {
db.rollback();
}
update.commit();
final Change change = c;
try {
threadScoper.scope(new Callable<Void>(){
@Override
public Void call() throws Exception {
sendMergedEmail(change, submitter);
return null;
}
}).call();
} catch (Exception e) {
logError("internal server error", e);
}
indexer.index(db, c);
try {
mergedSenderFactory.create(
c.getId(),
submitter != null ? submitter.getAccountId() : null).sendAsync();
} catch (Exception e) {
log.error("Cannot email merged notification for " + c.getId(), e);
}
if (submitter != null && mergeResultRev != null) {
try {
hooks.doChangeMergedHook(c,
@@ -1188,38 +1108,6 @@ public class MergeOp {
}
}
private void sendMergedEmail(final Change c, final PatchSetApproval from) {
workQueue.getDefaultQueue()
.submit(new Runnable() {
@Override
public void run() {
PatchSet patchSet;
try (ReviewDb reviewDb = schemaFactory.open()) {
patchSet = reviewDb.patchSets().get(c.currentPatchSetId());
} catch (Exception e) {
logError("Cannot send email for submitted patch set " + c.getId(), e);
return;
}
try {
MergedSender cm = mergedSenderFactory.create(c.getId());
if (from != null) {
cm.setFrom(from.getAccountId());
}
cm.setPatchSet(patchSet);
cm.send();
} catch (Exception e) {
logError("Cannot send email for submitted patch set " + c.getId(), e);
}
}
@Override
public String toString() {
return "send-email merged";
}
});
}
private ChangeControl changeControl(Change c) throws NoSuchChangeException {
return changeControlFactory.controlFor(
c, identifiedUserFactory.create(c.getOwner()));