ReceiveCommits: Reopen repo in background threads
JGit Repository instances are not inherently threadsafe. We already detect the background thread case and reopen the ReviewDb; we should also reopen the repo. Encapsulate repo, RevWalk, and db in a new AutoCloseable, RequestState. This can be constructed either with new objects or reusing the ones from the current thread, allowing us to save a bit of code at call sites. Change-Id: I9333f68ac222c149e48680d61167d2c6261fa4f2
This commit is contained in:
@@ -138,6 +138,7 @@ import com.google.inject.assistedinject.Assisted;
|
||||
|
||||
import org.eclipse.jgit.errors.IncorrectObjectTypeException;
|
||||
import org.eclipse.jgit.errors.MissingObjectException;
|
||||
import org.eclipse.jgit.errors.RepositoryNotFoundException;
|
||||
import org.eclipse.jgit.lib.BatchRefUpdate;
|
||||
import org.eclipse.jgit.lib.Constants;
|
||||
import org.eclipse.jgit.lib.ObjectId;
|
||||
@@ -1757,13 +1758,9 @@ public class ReceiveCommits {
|
||||
requestScopePropagator.wrap(new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws OrmException, RestApiException,
|
||||
UpdateException {
|
||||
if (caller == Thread.currentThread()) {
|
||||
insertChange(ReceiveCommits.this.db);
|
||||
} else {
|
||||
try (ReviewDb threadLocalDb = schemaFactory.open()) {
|
||||
insertChange(threadLocalDb);
|
||||
}
|
||||
UpdateException, RepositoryNotFoundException, IOException {
|
||||
try (RequestState state = requestState(caller)) {
|
||||
insertChange(state);
|
||||
}
|
||||
synchronizedIncrement(newProgress);
|
||||
return null;
|
||||
@@ -1772,7 +1769,7 @@ public class ReceiveCommits {
|
||||
return Futures.makeChecked(future, INSERT_EXCEPTION);
|
||||
}
|
||||
|
||||
private void insertChange(ReviewDb threadLocalDb)
|
||||
private void insertChange(RequestState state)
|
||||
throws OrmException, RestApiException, UpdateException {
|
||||
final PatchSet.Id psId = ins.setGroups(groups).getPatchSetId();
|
||||
final Account.Id me = user.getAccountId();
|
||||
@@ -1788,10 +1785,9 @@ public class ReceiveCommits {
|
||||
recipients.remove(me);
|
||||
String msg = renderMessageWithApprovals(psId.get(), null,
|
||||
approvals, Collections.<String, PatchSetApproval> emptyMap());
|
||||
try (ObjectInserter oi = repo.newObjectInserter();
|
||||
BatchUpdate bu = batchUpdateFactory.create(threadLocalDb,
|
||||
magicBranch.dest.getParentKey(), user, TimeUtil.nowTs())) {
|
||||
bu.setRepository(repo, rp.getRevWalk(), oi);
|
||||
try (BatchUpdate bu = batchUpdateFactory.create(state.db,
|
||||
magicBranch.dest.getParentKey(), user, TimeUtil.nowTs())) {
|
||||
bu.setRepository(state.repo, state.rw, state.ins);
|
||||
bu.insertChange(ins
|
||||
.setReviewers(recipients.getReviewers())
|
||||
.setExtraCC(recipients.getCcOnly())
|
||||
@@ -2163,12 +2159,9 @@ public class ReceiveCommits {
|
||||
try {
|
||||
if (magicBranch != null && magicBranch.edit) {
|
||||
return upsertEdit();
|
||||
} else if (caller == Thread.currentThread()) {
|
||||
return insertPatchSet(db);
|
||||
} else {
|
||||
try (ReviewDb db = schemaFactory.open()) {
|
||||
return insertPatchSet(db);
|
||||
}
|
||||
}
|
||||
try (RequestState state = requestState(caller)) {
|
||||
return insertPatchSet(state);
|
||||
}
|
||||
} finally {
|
||||
synchronizedIncrement(replaceProgress);
|
||||
@@ -2232,8 +2225,10 @@ public class ReceiveCommits {
|
||||
return psId;
|
||||
}
|
||||
|
||||
PatchSet.Id insertPatchSet(ReviewDb db) throws OrmException, IOException,
|
||||
RestApiException {
|
||||
PatchSet.Id insertPatchSet(RequestState state)
|
||||
throws OrmException, IOException, RestApiException {
|
||||
ReviewDb db = state.db;
|
||||
Repository repo = state.repo;
|
||||
final Account.Id me = user.getAccountId();
|
||||
final List<FooterLine> footerLines = newCommit.getFooterLines();
|
||||
final MailRecipients recipients = new MailRecipients();
|
||||
@@ -2425,7 +2420,9 @@ public class ReceiveCommits {
|
||||
this.commit = commit;
|
||||
}
|
||||
|
||||
private void updateGroups(ReviewDb db) throws OrmException, IOException {
|
||||
private void updateGroups(RequestState state)
|
||||
throws OrmException, IOException {
|
||||
ReviewDb db = state.db;
|
||||
PatchSet ps = db.patchSets().atomicUpdate(psId,
|
||||
new AtomicUpdate<PatchSet>() {
|
||||
@Override
|
||||
@@ -2456,12 +2453,8 @@ public class ReceiveCommits {
|
||||
requestScopePropagator.wrap(new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws OrmException, IOException {
|
||||
if (caller == Thread.currentThread()) {
|
||||
updateGroups(db);
|
||||
} else {
|
||||
try (ReviewDb db = schemaFactory.open()) {
|
||||
updateGroups(db);
|
||||
}
|
||||
try (RequestState state = requestState(caller)) {
|
||||
updateGroups(state);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
@@ -2847,4 +2840,53 @@ public class ReceiveCommits {
|
||||
p.update(1);
|
||||
}
|
||||
}
|
||||
|
||||
private RequestState requestState(Thread caller)
|
||||
throws OrmException, IOException {
|
||||
if (caller == Thread.currentThread()) {
|
||||
return new RequestState(db, repo, rp.getRevWalk());
|
||||
} else {
|
||||
return new RequestState(project.getNameKey());
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("hiding")
|
||||
private class RequestState implements AutoCloseable {
|
||||
private final ReviewDb db;
|
||||
private final Repository repo;
|
||||
private final RevWalk rw;
|
||||
private final ObjectInserter ins;
|
||||
private final boolean close;
|
||||
|
||||
RequestState(ReviewDb db, Repository repo, RevWalk rw) {
|
||||
this.db = db;
|
||||
this.repo = repo;
|
||||
this.rw = rw;
|
||||
close = false;
|
||||
ins = repo.newObjectInserter();
|
||||
}
|
||||
|
||||
RequestState(Project.NameKey projectName) throws OrmException, IOException {
|
||||
repo = repoManager.openRepository(projectName);
|
||||
try {
|
||||
db = schemaFactory.open();
|
||||
} catch (OrmException e) {
|
||||
repo.close();
|
||||
throw e;
|
||||
}
|
||||
rw = new RevWalk(repo);
|
||||
close = true;
|
||||
ins = repo.newObjectInserter();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
ins.close();
|
||||
if (close) {
|
||||
rw.close();
|
||||
repo.close();
|
||||
db.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user