Perform change update on multiple threads

When multiple changes need to be created or updated for a single push
operation they are now inserted into the database by parallel threads,
up to the maximum allowed thread count. The current thread is used
when the thread pool is already fully in use, falling back to the
prior behavior where each concurrent push operation can do its own
concurrent database update. The thread pool exists to reduce latency
so long as there are sufficient threads available.

This helps push times on databases that are high latency, such as
database servers that are running on a different machine from the
Gerrit server itself, e.g. gerrit.googlesource.com.

The new thread pool is disabled by default, limiting the overhead to
servers that have good latency with their database, such as using
in-process H2 database, or a MySQL or PostgreSQL on the same host.

Change-Id: I7d7368cee99a47e3f2ad1e753cc3f7e1c82d37b0
This commit is contained in:
Shawn O. Pearce
2012-07-27 16:38:55 -07:00
parent e6298f7216
commit c545c09012
5 changed files with 189 additions and 33 deletions

View File

@@ -1796,6 +1796,18 @@ processed.
+ +
Defaults to the number of available CPUs according to the Java runtime. Defaults to the number of available CPUs according to the Java runtime.
[[receive.changeUpdateThreads]]receive.changeUpdateThreads::
+
Number of threads to perform change creation or patch set updates
concurrently. Each thread uses its own database connection from
the database connection pool, and if all threads are busy then
main receive thread will also perform a change creation or patch
set update.
+
Defaults to 1, using only the main receive thread. This feature is for
databases with very high latency that can benfit from concurrent
operations when multiple changes are impacted at once.
[[receive.timeout]]receive.timeout:: [[receive.timeout]]receive.timeout::
+ +
Overall timeout on the time taken to process the change data in Overall timeout on the time taken to process the change data in

View File

@@ -29,7 +29,6 @@ import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gwtorm.server.OrmException; import com.google.gwtorm.server.OrmException;
import com.google.inject.Inject; import com.google.inject.Inject;
import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@@ -74,16 +73,29 @@ public class ApprovalsUtil {
* *
* @param change Change to update * @param change Change to update
* @throws OrmException * @throws OrmException
* @throws IOException
* @return List<PatchSetApproval> The previous approvals * @return List<PatchSetApproval> The previous approvals
*/ */
public List<PatchSetApproval> copyVetosToLatestPatchSet(Change change) public List<PatchSetApproval> copyVetosToLatestPatchSet(Change change)
throws OrmException, IOException { throws OrmException {
return copyVetosToLatestPatchSet(db, change);
}
/**
* Moves the PatchSetApprovals to the last PatchSet on the change while
* keeping the vetos.
*
* @param db database connection to use for updates.
* @param change Change to update
* @throws OrmException
* @return List<PatchSetApproval> The previous approvals
*/
public List<PatchSetApproval> copyVetosToLatestPatchSet(ReviewDb db,
Change change) throws OrmException {
PatchSet.Id source; PatchSet.Id source;
if (change.getNumberOfPatchSets() > 1) { if (change.getNumberOfPatchSets() > 1) {
source = new PatchSet.Id(change.getId(), change.getNumberOfPatchSets() - 1); source = new PatchSet.Id(change.getId(), change.getNumberOfPatchSets() - 1);
} else { } else {
throw new IOException("Previous patch set could not be found"); throw new OrmException("Previous patch set could not be found");
} }
PatchSet.Id dest = change.currPatchSetId(); PatchSet.Id dest = change.currPatchSetId();
@@ -103,18 +115,9 @@ public class ApprovalsUtil {
return patchSetApprovals; return patchSetApprovals;
} }
public void addReviewers(ReviewDb db, Change change, PatchSet ps,
/** Attach reviewers to a change. */ PatchSetInfo info, Set<Id> wantReviewers,
public void addReviewers(Change change, PatchSet ps, PatchSetInfo info, Set<Account.Id> existingReviewers) throws OrmException {
Set<Account.Id> wantReviewers) throws OrmException {
Set<Id> existing = Sets.<Account.Id> newHashSet();
addReviewers(change, ps, info, wantReviewers, existing);
}
/** Attach reviewers to a change. */
public void addReviewers(Change change, PatchSet ps, PatchSetInfo info,
Set<Account.Id> wantReviewers, Set<Account.Id> existingReviewers)
throws OrmException {
List<ApprovalType> allTypes = approvalTypes.getApprovalTypes(); List<ApprovalType> allTypes = approvalTypes.getApprovalTypes();
if (allTypes.isEmpty()) { if (allTypes.isEmpty()) {
return; return;

View File

@@ -0,0 +1,31 @@
// Copyright (C) 2012 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.git;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.Retention;
/**
* Marker on the global {@link ListeningExecutorService} used by
* {@link ReceiveCommits} to create or replace changes.
*/
@Retention(RUNTIME)
@BindingAnnotation
public @interface ChangeUpdateExecutor {
}

View File

@@ -21,6 +21,7 @@ import static org.eclipse.jgit.transport.ReceiveCommand.Result.REJECTED_MISSING_
import static org.eclipse.jgit.transport.ReceiveCommand.Result.REJECTED_NONFASTFORWARD; import static org.eclipse.jgit.transport.ReceiveCommand.Result.REJECTED_NONFASTFORWARD;
import static org.eclipse.jgit.transport.ReceiveCommand.Result.REJECTED_OTHER_REASON; import static org.eclipse.jgit.transport.ReceiveCommand.Result.REJECTED_OTHER_REASON;
import com.google.common.base.Function;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.LinkedListMultimap;
@@ -28,6 +29,9 @@ import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.gerrit.common.ChangeHooks; import com.google.gerrit.common.ChangeHooks;
import com.google.gerrit.common.PageLinks; import com.google.gerrit.common.PageLinks;
import com.google.gerrit.common.data.Capable; import com.google.gerrit.common.data.Capable;
@@ -67,6 +71,7 @@ import com.google.gerrit.server.util.RequestScopePropagator;
import com.google.gwtorm.server.AtomicUpdate; import com.google.gwtorm.server.AtomicUpdate;
import com.google.gwtorm.server.OrmException; import com.google.gwtorm.server.OrmException;
import com.google.gwtorm.server.ResultSet; import com.google.gwtorm.server.ResultSet;
import com.google.gwtorm.server.SchemaFactory;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted; import com.google.inject.assistedinject.Assisted;
@@ -106,6 +111,7 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@@ -208,11 +214,23 @@ public class ReceiveCommits {
} }
} }
private static final Function<Exception, OrmException> ORM_EXCEPTION =
new Function<Exception, OrmException>() {
@Override
public OrmException apply(Exception input) {
if (input instanceof OrmException) {
return (OrmException) input;
}
return new OrmException("Error updating database", input);
}
};
private final Set<Account.Id> reviewerId = new HashSet<Account.Id>(); private final Set<Account.Id> reviewerId = new HashSet<Account.Id>();
private final Set<Account.Id> ccId = new HashSet<Account.Id>(); private final Set<Account.Id> ccId = new HashSet<Account.Id>();
private final IdentifiedUser currentUser; private final IdentifiedUser currentUser;
private final ReviewDb db; private final ReviewDb db;
private final SchemaFactory<ReviewDb> schemaFactory;
private final AccountResolver accountResolver; private final AccountResolver accountResolver;
private final CreateChangeSender.Factory createChangeSenderFactory; private final CreateChangeSender.Factory createChangeSenderFactory;
private final MergedSender.Factory mergedSenderFactory; private final MergedSender.Factory mergedSenderFactory;
@@ -228,6 +246,7 @@ public class ReceiveCommits {
private final TrackingFooters trackingFooters; private final TrackingFooters trackingFooters;
private final TagCache tagCache; private final TagCache tagCache;
private final WorkQueue workQueue; private final WorkQueue workQueue;
private final ListeningExecutorService changeUpdateExector;
private final RequestScopePropagator requestScopePropagator; private final RequestScopePropagator requestScopePropagator;
private final ProjectControl projectControl; private final ProjectControl projectControl;
@@ -263,6 +282,7 @@ public class ReceiveCommits {
@Inject @Inject
ReceiveCommits(final ReviewDb db, ReceiveCommits(final ReviewDb db,
final SchemaFactory<ReviewDb> schemaFactory,
final AccountResolver accountResolver, final AccountResolver accountResolver,
final CreateChangeSender.Factory createChangeSenderFactory, final CreateChangeSender.Factory createChangeSenderFactory,
final MergedSender.Factory mergedSenderFactory, final MergedSender.Factory mergedSenderFactory,
@@ -278,6 +298,7 @@ public class ReceiveCommits {
@GerritPersonIdent final PersonIdent gerritIdent, @GerritPersonIdent final PersonIdent gerritIdent,
final TrackingFooters trackingFooters, final TrackingFooters trackingFooters,
final WorkQueue workQueue, final WorkQueue workQueue,
@ChangeUpdateExecutor ListeningExecutorService changeUpdateExector,
final RequestScopePropagator requestScopePropagator, final RequestScopePropagator requestScopePropagator,
@Assisted final ProjectControl projectControl, @Assisted final ProjectControl projectControl,
@@ -285,6 +306,7 @@ public class ReceiveCommits {
final SubmoduleOp.Factory subOpFactory) throws IOException { final SubmoduleOp.Factory subOpFactory) throws IOException {
this.currentUser = (IdentifiedUser) projectControl.getCurrentUser(); this.currentUser = (IdentifiedUser) projectControl.getCurrentUser();
this.db = db; this.db = db;
this.schemaFactory = schemaFactory;
this.accountResolver = accountResolver; this.accountResolver = accountResolver;
this.createChangeSenderFactory = createChangeSenderFactory; this.createChangeSenderFactory = createChangeSenderFactory;
this.mergedSenderFactory = mergedSenderFactory; this.mergedSenderFactory = mergedSenderFactory;
@@ -300,6 +322,7 @@ public class ReceiveCommits {
this.trackingFooters = trackingFooters; this.trackingFooters = trackingFooters;
this.tagCache = tagCache; this.tagCache = tagCache;
this.workQueue = workQueue; this.workQueue = workQueue;
this.changeUpdateExector = changeUpdateExector;
this.requestScopePropagator = requestScopePropagator; this.requestScopePropagator = requestScopePropagator;
this.projectControl = projectControl; this.projectControl = projectControl;
@@ -593,7 +616,7 @@ public class ReceiveCommits {
} }
} else if (replace.cmd.getResult() == OK) { } else if (replace.cmd.getResult() == OK) {
try { try {
if (replace.insertPatchSet() != null) { if (replace.insertPatchSet().checkedGet() != null) {
replace.inputCommand.setResult(OK); replace.inputCommand.setResult(OK);
} }
} catch (IOException err) { } catch (IOException err) {
@@ -635,14 +658,19 @@ public class ReceiveCommits {
} }
try { try {
List<CheckedFuture<?, OrmException>> futures = Lists.newArrayList();
for (ReplaceRequest replace : replaceByChange.values()) { for (ReplaceRequest replace : replaceByChange.values()) {
if (replace.inputCommand == newChange) { if (replace.inputCommand == newChange) {
replace.insertPatchSet(); futures.add(replace.insertPatchSet());
} }
} }
for (CreateRequest create : newChanges) { for (CreateRequest create : newChanges) {
create.insertChange(); futures.add(create.insertChange());
}
for (CheckedFuture<?, OrmException> f : futures) {
f.checkedGet();
} }
newChange.setResult(OK); newChange.setResult(OK);
} catch (OrmException err) { } catch (OrmException err) {
@@ -1226,10 +1254,35 @@ public class ReceiveCommits {
cmd = new ReceiveCommand(ObjectId.zeroId(), c, ps.getRefName()); cmd = new ReceiveCommand(ObjectId.zeroId(), c, ps.getRefName());
} }
void insertChange() throws IOException, OrmException { CheckedFuture<Void, OrmException> insertChange() throws IOException {
rp.getRevWalk().parseBody(commit); rp.getRevWalk().parseBody(commit);
warnMalformedMessage(commit); warnMalformedMessage(commit);
final Thread caller = Thread.currentThread();
ListenableFuture<Void> future = changeUpdateExector.submit(
requestScopePropagator.wrap(new Callable<Void>() {
@Override
public Void call() throws OrmException {
if (caller == Thread.currentThread()) {
insertChange(db);
} else {
ReviewDb db = schemaFactory.open();
try {
insertChange(db);
} finally {
db.close();
}
}
synchronized (newProgress) {
newProgress.update(1);
}
return null;
}
}));
return Futures.makeChecked(future, ORM_EXCEPTION);
}
private void insertChange(ReviewDb db) throws OrmException {
final Account.Id me = currentUser.getAccountId(); final Account.Id me = currentUser.getAccountId();
final Set<Account.Id> reviewers = new HashSet<Account.Id>(reviewerId); final Set<Account.Id> reviewers = new HashSet<Account.Id>(reviewerId);
final Set<Account.Id> cc = new HashSet<Account.Id>(ccId); final Set<Account.Id> cc = new HashSet<Account.Id>(ccId);
@@ -1251,11 +1304,12 @@ public class ReceiveCommits {
db.changes().beginTransaction(change.getId()); db.changes().beginTransaction(change.getId());
try { try {
insertAncestors(ps.getId(), commit); insertAncestors(db, ps.getId(), commit);
db.patchSets().insert(Collections.singleton(ps)); db.patchSets().insert(Collections.singleton(ps));
db.changes().insert(Collections.singleton(change)); db.changes().insert(Collections.singleton(change));
ChangeUtil.updateTrackingIds(db, change, trackingFooters, footerLines); ChangeUtil.updateTrackingIds(db, change, trackingFooters, footerLines);
approvalsUtil.addReviewers(change, ps, info, reviewers); approvalsUtil.addReviewers(db, change, ps, info,
reviewers, Collections.<Account.Id> emptySet());
db.commit(); db.commit();
} finally { } finally {
db.rollback(); db.rollback();
@@ -1264,7 +1318,6 @@ public class ReceiveCommits {
created = true; created = true;
replication.fire(project.getNameKey(), ps.getRefName()); replication.fire(project.getNameKey(), ps.getRefName());
hooks.doPatchsetCreatedHook(change, ps, db); hooks.doPatchsetCreatedHook(change, ps, db);
newProgress.update(1);
workQueue.getDefaultQueue() workQueue.getDefaultQueue()
.submit(requestScopePropagator.wrap(new Runnable() { .submit(requestScopePropagator.wrap(new Runnable() {
@Override @Override
@@ -1509,10 +1562,38 @@ public class ReceiveCommits {
return true; return true;
} }
PatchSet.Id insertPatchSet() throws IOException, OrmException { CheckedFuture<PatchSet.Id, OrmException> insertPatchSet()
throws IOException {
rp.getRevWalk().parseBody(newCommit); rp.getRevWalk().parseBody(newCommit);
warnMalformedMessage(newCommit); warnMalformedMessage(newCommit);
final Thread caller = Thread.currentThread();
ListenableFuture<PatchSet.Id> future = changeUpdateExector.submit(
requestScopePropagator.wrap(new Callable<PatchSet.Id>() {
@Override
public PatchSet.Id call() throws OrmException {
try {
if (caller == Thread.currentThread()) {
return insertPatchSet(db);
} else {
ReviewDb db = schemaFactory.open();
try {
return insertPatchSet(db);
} finally {
db.close();
}
}
} finally {
synchronized (newProgress) {
replaceProgress.update(1);
}
}
}
}));
return Futures.makeChecked(future, ORM_EXCEPTION);
}
PatchSet.Id insertPatchSet(ReviewDb db) throws OrmException {
final Account.Id me = currentUser.getAccountId(); final Account.Id me = currentUser.getAccountId();
final Set<Account.Id> reviewers = new HashSet<Account.Id>(reviewerId); final Set<Account.Id> reviewers = new HashSet<Account.Id>(reviewerId);
final Set<Account.Id> cc = new HashSet<Account.Id>(ccId); final Set<Account.Id> cc = new HashSet<Account.Id>(ccId);
@@ -1554,7 +1635,7 @@ public class ReceiveCommits {
return null; return null;
} }
insertAncestors(newPatchSet.getId(), newCommit); insertAncestors(db, newPatchSet.getId(), newCommit);
db.patchSets().insert(Collections.singleton(newPatchSet)); db.patchSets().insert(Collections.singleton(newPatchSet));
if (checkMergedInto) { if (checkMergedInto) {
@@ -1562,7 +1643,8 @@ public class ReceiveCommits {
mergedIntoRef = mergedInto != null ? mergedInto.getName() : null; mergedIntoRef = mergedInto != null ? mergedInto.getName() : null;
} }
List<PatchSetApproval> patchSetApprovals = approvalsUtil.copyVetosToLatestPatchSet(change); List<PatchSetApproval> patchSetApprovals =
approvalsUtil.copyVetosToLatestPatchSet(db, change);
final Set<Account.Id> haveApprovals = new HashSet<Account.Id>(); final Set<Account.Id> haveApprovals = new HashSet<Account.Id>();
oldReviewers.clear(); oldReviewers.clear();
@@ -1577,7 +1659,8 @@ public class ReceiveCommits {
} }
} }
approvalsUtil.addReviewers(change, newPatchSet, info, reviewers, haveApprovals); approvalsUtil.addReviewers(db, change, newPatchSet, info,
reviewers, haveApprovals);
msg = msg =
new ChangeMessage(new ChangeMessage.Key(change.getId(), ChangeUtil new ChangeMessage(new ChangeMessage.Key(change.getId(), ChangeUtil
@@ -1638,7 +1721,6 @@ public class ReceiveCommits {
replication.fire(project.getNameKey(), newPatchSet.getRefName()); replication.fire(project.getNameKey(), newPatchSet.getRefName());
hooks.doPatchsetCreatedHook(change, newPatchSet, db); hooks.doPatchsetCreatedHook(change, newPatchSet, db);
replaceProgress.update(1);
if (mergedIntoRef != null) { if (mergedIntoRef != null) {
hooks.doChangeMergedHook( hooks.doChangeMergedHook(
change, currentUser.getAccount(), newPatchSet, db); change, currentUser.getAccount(), newPatchSet, db);
@@ -2042,7 +2124,9 @@ public class ReceiveCommits {
} }
for (final ReplaceRequest req : toClose) { for (final ReplaceRequest req : toClose) {
final PatchSet.Id psi = req.validate(true) ? req.insertPatchSet() : null; final PatchSet.Id psi = req.validate(true)
? req.insertPatchSet().checkedGet()
: null;
if (psi != null) { if (psi != null) {
closeChange(req.inputCommand, psi, req.newCommit); closeChange(req.inputCommand, psi, req.newCommit);
closeProgress.update(1); closeProgress.update(1);
@@ -2189,7 +2273,7 @@ public class ReceiveCommits {
})); }));
} }
private void insertAncestors(PatchSet.Id id, RevCommit src) private void insertAncestors(ReviewDb db, PatchSet.Id id, RevCommit src)
throws OrmException { throws OrmException {
final int cnt = src.getParentCount(); final int cnt = src.getParentCount();
List<PatchSetAncestor> toInsert = new ArrayList<PatchSetAncestor>(cnt); List<PatchSetAncestor> toInsert = new ArrayList<PatchSetAncestor>(cnt);

View File

@@ -14,15 +14,20 @@
package com.google.gerrit.server.git; package com.google.gerrit.server.git;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gerrit.server.config.GerritServerConfig; import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.git.WorkQueue.Executor;
import com.google.inject.AbstractModule; import com.google.inject.AbstractModule;
import com.google.inject.Provides; import com.google.inject.Provides;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import org.eclipse.jgit.lib.Config; import org.eclipse.jgit.lib.Config;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/** Module providing the {@link ReceiveCommitsExecutor}. */ /** Module providing the {@link ReceiveCommitsExecutor}. */
public class ReceiveCommitsExecutorModule extends AbstractModule { public class ReceiveCommitsExecutorModule extends AbstractModule {
@Override @Override
@@ -32,10 +37,31 @@ public class ReceiveCommitsExecutorModule extends AbstractModule {
@Provides @Provides
@Singleton @Singleton
@ReceiveCommitsExecutor @ReceiveCommitsExecutor
public Executor getReceiveCommitsExecutor(@GerritServerConfig Config config, public WorkQueue.Executor createReceiveCommitsExecutor(
@GerritServerConfig Config config,
WorkQueue queues) { WorkQueue queues) {
int poolSize = config.getInt("receive", null, "threadPoolSize", int poolSize = config.getInt("receive", null, "threadPoolSize",
Runtime.getRuntime().availableProcessors()); Runtime.getRuntime().availableProcessors());
return queues.createQueue(poolSize, "ReceiveCommits"); return queues.createQueue(poolSize, "ReceiveCommits");
} }
@Provides
@Singleton
@ChangeUpdateExecutor
public ListeningExecutorService createChangeUpdateExecutor(@GerritServerConfig Config config) {
int poolSize = config.getInt("receive", null, "changeUpdateThreads", 1);
if (poolSize <= 1) {
return MoreExecutors.sameThreadExecutor();
}
return MoreExecutors.listeningDecorator(
MoreExecutors.getExitingExecutorService(
new ThreadPoolExecutor(1, poolSize,
10, TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(poolSize),
new ThreadFactoryBuilder()
.setNameFormat("ChangeUpdate-%d")
.setDaemon(true)
.build(),
new ThreadPoolExecutor.CallerRunsPolicy())));
}
} }