Fix ChangeIndexer to be runnable from REST API
The request scope should not be relied upon to obtain a database connection. The ChangeIndexer does not need the CurrentUser, it just expects a Provider<ReviewDb> to work. Temporarily obtain a ReviewDb when the task starts executing, pass it through as part of the RequestContext, and release the connection when the task is done. Change-Id: I3ebcb634bfb7112756cdb2ad49f3ead83cd1b4d7
This commit is contained in:
@@ -32,7 +32,6 @@ import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
|
|||||||
import com.google.gerrit.server.index.ChangeIndexer;
|
import com.google.gerrit.server.index.ChangeIndexer;
|
||||||
import com.google.gerrit.server.patch.PatchSetInfoFactory;
|
import com.google.gerrit.server.patch.PatchSetInfoFactory;
|
||||||
import com.google.gerrit.server.project.RefControl;
|
import com.google.gerrit.server.project.RefControl;
|
||||||
import com.google.gerrit.server.util.RequestScopePropagator;
|
|
||||||
import com.google.gwtorm.server.OrmException;
|
import com.google.gwtorm.server.OrmException;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.google.inject.Provider;
|
import com.google.inject.Provider;
|
||||||
@@ -62,7 +61,6 @@ public class ChangeInserter {
|
|||||||
private final RevCommit commit;
|
private final RevCommit commit;
|
||||||
private final PatchSetInfo patchSetInfo;
|
private final PatchSetInfo patchSetInfo;
|
||||||
|
|
||||||
private RequestScopePropagator requestScopePropagator;
|
|
||||||
private ChangeMessage changeMessage;
|
private ChangeMessage changeMessage;
|
||||||
private Set<Account.Id> reviewers;
|
private Set<Account.Id> reviewers;
|
||||||
|
|
||||||
@@ -99,11 +97,6 @@ public class ChangeInserter {
|
|||||||
ChangeUtil.computeSortKey(change);
|
ChangeUtil.computeSortKey(change);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ChangeInserter setRequestScopePropagator(RequestScopePropagator rsp) {
|
|
||||||
requestScopePropagator = rsp;
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public ChangeInserter setMessage(ChangeMessage changeMessage) {
|
public ChangeInserter setMessage(ChangeMessage changeMessage) {
|
||||||
this.changeMessage = changeMessage;
|
this.changeMessage = changeMessage;
|
||||||
return this;
|
return this;
|
||||||
@@ -147,7 +140,7 @@ public class ChangeInserter {
|
|||||||
db.changeMessages().insert(Collections.singleton(changeMessage));
|
db.changeMessages().insert(Collections.singleton(changeMessage));
|
||||||
}
|
}
|
||||||
|
|
||||||
indexer.index(change, requestScopePropagator);
|
indexer.index(change);
|
||||||
gitRefUpdated.fire(change.getProject(), patchSet.getRefName(),
|
gitRefUpdated.fire(change.getProject(), patchSet.getRefName(),
|
||||||
ObjectId.zeroId(), commit);
|
ObjectId.zeroId(), commit);
|
||||||
hooks.doPatchsetCreatedHook(change, patchSet, db);
|
hooks.doPatchsetCreatedHook(change, patchSet, db);
|
||||||
|
|||||||
@@ -41,7 +41,6 @@ import com.google.gerrit.server.patch.PatchSetInfoFactory;
|
|||||||
import com.google.gerrit.server.patch.PatchSetInfoNotAvailableException;
|
import com.google.gerrit.server.patch.PatchSetInfoNotAvailableException;
|
||||||
import com.google.gerrit.server.project.ChangeControl;
|
import com.google.gerrit.server.project.ChangeControl;
|
||||||
import com.google.gerrit.server.project.NoSuchChangeException;
|
import com.google.gerrit.server.project.NoSuchChangeException;
|
||||||
import com.google.gerrit.server.util.RequestScopePropagator;
|
|
||||||
import com.google.gwtorm.server.AtomicUpdate;
|
import com.google.gwtorm.server.AtomicUpdate;
|
||||||
import com.google.gwtorm.server.OrmException;
|
import com.google.gwtorm.server.OrmException;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
@@ -78,7 +77,6 @@ public class PublishDraft implements Callable<ReviewResult> {
|
|||||||
private final CreateChangeSender.Factory createChangeSenderFactory;
|
private final CreateChangeSender.Factory createChangeSenderFactory;
|
||||||
private final ReplacePatchSetSender.Factory replacePatchSetFactory;
|
private final ReplacePatchSetSender.Factory replacePatchSetFactory;
|
||||||
private final ChangeIndexer indexer;
|
private final ChangeIndexer indexer;
|
||||||
private final RequestScopePropagator requestScopePropagator;
|
|
||||||
|
|
||||||
private final PatchSet.Id patchSetId;
|
private final PatchSet.Id patchSetId;
|
||||||
|
|
||||||
@@ -92,7 +90,6 @@ public class PublishDraft implements Callable<ReviewResult> {
|
|||||||
final CreateChangeSender.Factory createChangeSenderFactory,
|
final CreateChangeSender.Factory createChangeSenderFactory,
|
||||||
final ReplacePatchSetSender.Factory replacePatchSetFactory,
|
final ReplacePatchSetSender.Factory replacePatchSetFactory,
|
||||||
final ChangeIndexer indexer,
|
final ChangeIndexer indexer,
|
||||||
final RequestScopePropagator requestScopePropagator,
|
|
||||||
@Assisted final PatchSet.Id patchSetId) {
|
@Assisted final PatchSet.Id patchSetId) {
|
||||||
this.changeControlFactory = changeControlFactory;
|
this.changeControlFactory = changeControlFactory;
|
||||||
this.db = db;
|
this.db = db;
|
||||||
@@ -104,7 +101,6 @@ public class PublishDraft implements Callable<ReviewResult> {
|
|||||||
this.createChangeSenderFactory = createChangeSenderFactory;
|
this.createChangeSenderFactory = createChangeSenderFactory;
|
||||||
this.replacePatchSetFactory = replacePatchSetFactory;
|
this.replacePatchSetFactory = replacePatchSetFactory;
|
||||||
this.indexer = indexer;
|
this.indexer = indexer;
|
||||||
this.requestScopePropagator = requestScopePropagator;
|
|
||||||
|
|
||||||
this.patchSetId = patchSetId;
|
this.patchSetId = patchSetId;
|
||||||
}
|
}
|
||||||
@@ -154,7 +150,7 @@ public class PublishDraft implements Callable<ReviewResult> {
|
|||||||
});
|
});
|
||||||
|
|
||||||
if (!updatedPatchSet.isDraft() || updatedChange.getStatus() == Change.Status.NEW) {
|
if (!updatedPatchSet.isDraft() || updatedChange.getStatus() == Change.Status.NEW) {
|
||||||
indexer.index(updatedChange, requestScopePropagator);
|
indexer.index(updatedChange);
|
||||||
hooks.doDraftPublishedHook(updatedChange, updatedPatchSet, db);
|
hooks.doDraftPublishedHook(updatedChange, updatedPatchSet, db);
|
||||||
|
|
||||||
sendNotifications(control.getChange().getStatus() == Change.Status.DRAFT,
|
sendNotifications(control.getChange().getStatus() == Change.Status.DRAFT,
|
||||||
|
|||||||
@@ -1476,8 +1476,7 @@ public class ReceiveCommits {
|
|||||||
currentUser.getAccountId(),
|
currentUser.getAccountId(),
|
||||||
magicBranch.dest);
|
magicBranch.dest);
|
||||||
change.setTopic(magicBranch.topic);
|
change.setTopic(magicBranch.topic);
|
||||||
ins = changeInserterFactory.create(ctl, change, c)
|
ins = changeInserterFactory.create(ctl, change, c);
|
||||||
.setRequestScopePropagator(requestScopePropagator);
|
|
||||||
if (magicBranch.isDraft()) {
|
if (magicBranch.isDraft()) {
|
||||||
ins.setDraft();
|
ins.setDraft();
|
||||||
}
|
}
|
||||||
@@ -1922,7 +1921,7 @@ public class ReceiveCommits {
|
|||||||
if (cmd.getResult() == NOT_ATTEMPTED) {
|
if (cmd.getResult() == NOT_ATTEMPTED) {
|
||||||
cmd.execute(rp);
|
cmd.execute(rp);
|
||||||
}
|
}
|
||||||
indexer.index(change, requestScopePropagator);
|
indexer.index(change);
|
||||||
gitRefUpdated.fire(project.getNameKey(), newPatchSet.getRefName(),
|
gitRefUpdated.fire(project.getNameKey(), newPatchSet.getRefName(),
|
||||||
ObjectId.zeroId(), newCommit);
|
ObjectId.zeroId(), newCommit);
|
||||||
hooks.doPatchsetCreatedHook(change, newPatchSet, db);
|
hooks.doPatchsetCreatedHook(change, newPatchSet, db);
|
||||||
@@ -2268,7 +2267,7 @@ public class ReceiveCommits {
|
|||||||
return change;
|
return change;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
indexer.index(change, requestScopePropagator);
|
indexer.index(change);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendMergedEmail(final ReplaceRequest result) {
|
private void sendMergedEmail(final ReplaceRequest result) {
|
||||||
|
|||||||
@@ -17,7 +17,6 @@ package com.google.gerrit.server.index;
|
|||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.gerrit.reviewdb.client.Change;
|
import com.google.gerrit.reviewdb.client.Change;
|
||||||
import com.google.gerrit.server.util.RequestScopePropagator;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper for (re)indexing a change document.
|
* Helper for (re)indexing a change document.
|
||||||
@@ -32,12 +31,6 @@ public interface ChangeIndexer {
|
|||||||
public ListenableFuture<?> index(Change change) {
|
public ListenableFuture<?> index(Change change) {
|
||||||
return Futures.immediateFuture(null);
|
return Futures.immediateFuture(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public ListenableFuture<?> index(Change change,
|
|
||||||
RequestScopePropagator prop) {
|
|
||||||
return Futures.immediateFuture(null);
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -47,13 +40,4 @@ public interface ChangeIndexer {
|
|||||||
* @return future for the indexing task.
|
* @return future for the indexing task.
|
||||||
*/
|
*/
|
||||||
public ListenableFuture<?> index(Change change);
|
public ListenableFuture<?> index(Change change);
|
||||||
|
|
||||||
/**
|
|
||||||
* Start indexing a change.
|
|
||||||
*
|
|
||||||
* @param change change to index.
|
|
||||||
* @param prop propagator to wrap any created runnables in.
|
|
||||||
* @return future for the indexing task.
|
|
||||||
*/
|
|
||||||
public ListenableFuture<?> index(Change change, RequestScopePropagator prop);
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,14 +17,20 @@ package com.google.gerrit.server.index;
|
|||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
|
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
|
||||||
import com.google.gerrit.reviewdb.client.Change;
|
import com.google.gerrit.reviewdb.client.Change;
|
||||||
|
import com.google.gerrit.reviewdb.server.ReviewDb;
|
||||||
|
import com.google.gerrit.server.CurrentUser;
|
||||||
import com.google.gerrit.server.query.change.ChangeData;
|
import com.google.gerrit.server.query.change.ChangeData;
|
||||||
import com.google.gerrit.server.util.RequestScopePropagator;
|
import com.google.gerrit.server.util.RequestContext;
|
||||||
|
import com.google.gerrit.server.util.ThreadLocalRequestContext;
|
||||||
|
import com.google.gwtorm.server.SchemaFactory;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
import com.google.inject.OutOfScopeException;
|
||||||
|
import com.google.inject.Provider;
|
||||||
|
import com.google.inject.util.Providers;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -39,27 +45,23 @@ public class ChangeIndexerImpl implements ChangeIndexer {
|
|||||||
|
|
||||||
private final ListeningScheduledExecutorService executor;
|
private final ListeningScheduledExecutorService executor;
|
||||||
private final ChangeIndex index;
|
private final ChangeIndex index;
|
||||||
|
private final SchemaFactory<ReviewDb> schemaFactory;
|
||||||
|
private final ThreadLocalRequestContext context;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
ChangeIndexerImpl(@IndexExecutor ListeningScheduledExecutorService executor,
|
ChangeIndexerImpl(@IndexExecutor ListeningScheduledExecutorService executor,
|
||||||
ChangeIndex index) throws IOException {
|
ChangeIndex index,
|
||||||
|
SchemaFactory<ReviewDb> schemaFactory,
|
||||||
|
ThreadLocalRequestContext context) {
|
||||||
this.executor = executor;
|
this.executor = executor;
|
||||||
this.index = index;
|
this.index = index;
|
||||||
|
this.schemaFactory = schemaFactory;
|
||||||
|
this.context = context;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ListenableFuture<?> index(Change change) {
|
public ListenableFuture<?> index(Change change) {
|
||||||
return index(change, null);
|
return executor.submit(new Task(change));
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ListenableFuture<?> index(Change change,
|
|
||||||
RequestScopePropagator prop) {
|
|
||||||
Callable<?> task = new Task(change);
|
|
||||||
if (prop != null) {
|
|
||||||
task = prop.wrap(task);
|
|
||||||
}
|
|
||||||
return executor.submit(task);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private class Task implements Callable<Void> {
|
private class Task implements Callable<Void> {
|
||||||
@@ -72,8 +74,25 @@ public class ChangeIndexerImpl implements ChangeIndexer {
|
|||||||
@Override
|
@Override
|
||||||
public Void call() throws Exception {
|
public Void call() throws Exception {
|
||||||
try {
|
try {
|
||||||
|
final ReviewDb db = schemaFactory.open();
|
||||||
|
try {
|
||||||
|
context.setContext(new RequestContext() {
|
||||||
|
@Override
|
||||||
|
public Provider<ReviewDb> getReviewDbProvider() {
|
||||||
|
return Providers.of(db);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CurrentUser getCurrentUser() {
|
||||||
|
throw new OutOfScopeException("No user during ChangeIndexer");
|
||||||
|
}
|
||||||
|
});
|
||||||
index.replace(new ChangeData(change));
|
index.replace(new ChangeData(change));
|
||||||
return null;
|
return null;
|
||||||
|
} finally {
|
||||||
|
context.setContext(null);
|
||||||
|
db.close();
|
||||||
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error(String.format(
|
log.error(String.format(
|
||||||
"Failed to index change %d in %s",
|
"Failed to index change %d in %s",
|
||||||
|
|||||||
Reference in New Issue
Block a user