Run ReceiveCommits in a shared thread pool
Since the work ReceiveCommits may take a long, potentially unbounded amount of time, we would like to have it run in the background so it can be monitored for timeouts and cancelled, and have stalls reported to the user from the main thread. Wraps ReceiveCommits in a new PreReceiveHook, AsyncReceiveCommits, that runs its delegated ReceiveCommits in a WorkQueue.Executor. The default implementation sizes the thread pool to the number of available processors, under the assumption that most database and pack operations are essentially CPU-bound. This is after reading the pack from the wire, indexing it, and storing it locally, so there is no I/O to the client involved, and available CPUs is a pretty good estimate. Since it may not be in some cases, for example if threads block on slow database I/O, the pool size is configurable. Change-Id: I5769944b49ead4224a1855ba72ee359de26e7bf8
This commit is contained in:
@@ -1675,7 +1675,8 @@ By default, 1.
|
||||
This section is used to set who can execute the 'receive-pack' and
|
||||
to limit the maximum Git object size that 'receive-pack' will accept.
|
||||
'receive-pack' is what runs on the server during a user's push or
|
||||
repo upload command.
|
||||
repo upload command. It also contains some advanced options for tuning the
|
||||
behavior of Gerrit's 'receive-pack' mechanism.
|
||||
|
||||
----
|
||||
[receive]
|
||||
@@ -1706,6 +1707,13 @@ Default is zero.
|
||||
+
|
||||
Common unit suffixes of 'k', 'm', or 'g' are supported.
|
||||
|
||||
[[receive.threadPoolSize]]receive.threadPoolSize::
|
||||
+
|
||||
Maximum size of the thread pool in which the change data in received packs is
|
||||
processed.
|
||||
+
|
||||
Defaults to the number of available CPUs according to the Java runtime.
|
||||
|
||||
|
||||
[[repository]]Section repository
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
@@ -22,6 +22,7 @@ import com.google.gerrit.server.AnonymousUser;
|
||||
import com.google.gerrit.server.IdentifiedUser;
|
||||
import com.google.gerrit.server.cache.Cache;
|
||||
import com.google.gerrit.server.cache.CacheModule;
|
||||
import com.google.gerrit.server.git.AsyncReceiveCommits;
|
||||
import com.google.gerrit.server.git.GitRepositoryManager;
|
||||
import com.google.gerrit.server.git.ReceiveCommits;
|
||||
import com.google.gerrit.server.git.TagCache;
|
||||
@@ -231,11 +232,12 @@ public class GitOverHttpServlet extends GitServlet {
|
||||
}
|
||||
|
||||
static class ReceiveFactory implements ReceivePackFactory<HttpServletRequest> {
|
||||
private final ReceiveCommits.Factory factory;
|
||||
private final AsyncReceiveCommits.Factory factory;
|
||||
private final Provider<WebSession> session;
|
||||
|
||||
@Inject
|
||||
ReceiveFactory(ReceiveCommits.Factory factory, Provider<WebSession> session) {
|
||||
ReceiveFactory(AsyncReceiveCommits.Factory factory,
|
||||
Provider<WebSession> session) {
|
||||
this.factory = factory;
|
||||
this.session = session;
|
||||
}
|
||||
@@ -251,7 +253,7 @@ public class GitOverHttpServlet extends GitServlet {
|
||||
}
|
||||
|
||||
final IdentifiedUser user = (IdentifiedUser) pc.getCurrentUser();
|
||||
final ReceiveCommits rc = factory.create(pc, db);
|
||||
final ReceiveCommits rc = factory.create(pc, db).getReceiveCommits();
|
||||
rc.getReceivePack().setRefLogIdent(user.newRefLogIdent());
|
||||
req.setAttribute(ATT_RC, rc);
|
||||
session.get().setAccessPath(AccessPath.GIT);
|
||||
|
@@ -42,6 +42,7 @@ import com.google.gerrit.server.config.GerritGlobalModule;
|
||||
import com.google.gerrit.server.config.MasterNodeStartup;
|
||||
import com.google.gerrit.server.contact.HttpContactStoreConnection;
|
||||
import com.google.gerrit.server.git.PushReplication;
|
||||
import com.google.gerrit.server.git.ReceiveCommitsExecutorModule;
|
||||
import com.google.gerrit.server.git.WorkQueue;
|
||||
import com.google.gerrit.server.mail.SignedTokenEmailTokenVerifier;
|
||||
import com.google.gerrit.server.mail.SmtpEmailSender;
|
||||
@@ -201,6 +202,7 @@ public class Daemon extends SiteProgram {
|
||||
modules.add(new LogFileCompressor.Module());
|
||||
modules.add(new WorkQueue.Module());
|
||||
modules.add(new ChangeHookRunner.Module());
|
||||
modules.add(new ReceiveCommitsExecutorModule());
|
||||
modules.add(cfgInjector.getInstance(GerritGlobalModule.class));
|
||||
modules.add(new EhcachePoolImpl.Module());
|
||||
modules.add(new SmtpEmailSender.Module());
|
||||
|
@@ -33,10 +33,10 @@ import com.google.gerrit.server.changedetail.DeleteDraftPatchSet;
|
||||
import com.google.gerrit.server.changedetail.PublishDraft;
|
||||
import com.google.gerrit.server.changedetail.RestoreChange;
|
||||
import com.google.gerrit.server.changedetail.Submit;
|
||||
import com.google.gerrit.server.git.AsyncReceiveCommits;
|
||||
import com.google.gerrit.server.git.CreateCodeReviewNotes;
|
||||
import com.google.gerrit.server.git.MergeOp;
|
||||
import com.google.gerrit.server.git.MetaDataUpdate;
|
||||
import com.google.gerrit.server.git.ReceiveCommits;
|
||||
import com.google.gerrit.server.git.SubmoduleOp;
|
||||
import com.google.gerrit.server.mail.AbandonedSender;
|
||||
import com.google.gerrit.server.mail.AddReviewerSender;
|
||||
@@ -79,10 +79,10 @@ public class GerritRequestModule extends FactoryModule {
|
||||
bind(AccountControl.Factory.class).in(SINGLETON);
|
||||
|
||||
factory(ChangeQueryBuilder.Factory.class);
|
||||
factory(ReceiveCommits.Factory.class);
|
||||
factory(SubmoduleOp.Factory.class);
|
||||
factory(MergeOp.Factory.class);
|
||||
factory(CreateCodeReviewNotes.Factory.class);
|
||||
install(new AsyncReceiveCommits.Module());
|
||||
|
||||
// Not really per-request, but dammit, I don't know where else to
|
||||
// easily park this stuff.
|
||||
|
@@ -0,0 +1,138 @@
|
||||
// Copyright (C) 2012 The Android Open Source Project
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package com.google.gerrit.server.git;
|
||||
|
||||
import com.google.gerrit.reviewdb.client.Project;
|
||||
import com.google.gerrit.server.project.ProjectControl;
|
||||
import com.google.gerrit.server.util.RequestScopePropagator;
|
||||
import com.google.gerrit.server.git.WorkQueue.Executor;
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
import com.google.inject.assistedinject.FactoryModuleBuilder;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.PrivateModule;
|
||||
|
||||
import org.eclipse.jgit.lib.Repository;
|
||||
import org.eclipse.jgit.transport.PreReceiveHook;
|
||||
import org.eclipse.jgit.transport.ReceiveCommand;
|
||||
import org.eclipse.jgit.transport.ReceivePack;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
/** Hook that delegates to {@link ReceiveCommits} in a worker thread. */
|
||||
public class AsyncReceiveCommits implements PreReceiveHook {
|
||||
private static final Logger log =
|
||||
LoggerFactory.getLogger(AsyncReceiveCommits.class);
|
||||
|
||||
private final ReceiveCommits rc;
|
||||
private final Executor executor;
|
||||
private final RequestScopePropagator scopePropagator;
|
||||
|
||||
public interface Factory {
|
||||
AsyncReceiveCommits create(ProjectControl projectControl,
|
||||
Repository repository);
|
||||
}
|
||||
|
||||
public static class Module extends PrivateModule {
|
||||
@Override
|
||||
public void configure() {
|
||||
install(new FactoryModuleBuilder()
|
||||
.build(AsyncReceiveCommits.Factory.class));
|
||||
expose(AsyncReceiveCommits.Factory.class);
|
||||
// Don't expose the binding for ReceiveCommits.Factory. All callers should
|
||||
// be using AsyncReceiveCommits.Factory instead.
|
||||
install(new FactoryModuleBuilder()
|
||||
.build(ReceiveCommits.Factory.class));
|
||||
}
|
||||
}
|
||||
|
||||
private class Worker implements ProjectRunnable {
|
||||
private final Collection<ReceiveCommand> commands;
|
||||
|
||||
private Worker(final Collection<ReceiveCommand> commands) {
|
||||
this.commands = commands;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
rc.processCommands(commands);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Project.NameKey getProjectNameKey() {
|
||||
return rc.getProject().getNameKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRemoteName() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasCustomizedPrint() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "receive-commits";
|
||||
}
|
||||
}
|
||||
|
||||
@Inject
|
||||
AsyncReceiveCommits(final ReceiveCommits.Factory factory,
|
||||
@ReceiveCommitsExecutor final Executor executor,
|
||||
final RequestScopePropagator scopePropagator,
|
||||
@Assisted final ProjectControl projectControl,
|
||||
@Assisted final Repository repo) {
|
||||
this.executor = executor;
|
||||
this.scopePropagator = scopePropagator;
|
||||
rc = factory.create(projectControl, repo);
|
||||
rc.getReceivePack().setPreReceiveHook(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPreReceive(final ReceivePack rp,
|
||||
final Collection<ReceiveCommand> commands) {
|
||||
Future<?> workerFuture = executor.submit(
|
||||
scopePropagator.wrap(new Worker(commands)));
|
||||
Exception err = null;
|
||||
try {
|
||||
workerFuture.get();
|
||||
} catch (ExecutionException e) {
|
||||
err = e;
|
||||
} catch (InterruptedException e) {
|
||||
err = e;
|
||||
}
|
||||
if (err != null) {
|
||||
log.warn("Error in ReceiveCommits", err);
|
||||
rc.getMessageSender().sendError("internal error while processing changes");
|
||||
// ReceiveCommits has tried its best to catch errors, so anything at this
|
||||
// point is very bad.
|
||||
for (final ReceiveCommand c : commands) {
|
||||
if (c.getResult() == ReceiveCommand.Result.NOT_ATTEMPTED) {
|
||||
ReceiveCommits.reject(c, "internal error");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public ReceiveCommits getReceiveCommits() {
|
||||
return rc;
|
||||
}
|
||||
}
|
@@ -77,7 +77,6 @@ import org.eclipse.jgit.revwalk.RevWalk;
|
||||
import org.eclipse.jgit.revwalk.filter.RevFilter;
|
||||
import org.eclipse.jgit.transport.AdvertiseRefsHook;
|
||||
import org.eclipse.jgit.transport.AdvertiseRefsHookChain;
|
||||
import org.eclipse.jgit.transport.PreReceiveHook;
|
||||
import org.eclipse.jgit.transport.ReceiveCommand;
|
||||
import org.eclipse.jgit.transport.ReceiveCommand.Result;
|
||||
import org.eclipse.jgit.transport.ReceivePack;
|
||||
@@ -100,7 +99,7 @@ import java.util.regex.Pattern;
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
/** Receives change upload using the Git receive-pack protocol. */
|
||||
public class ReceiveCommits implements PreReceiveHook {
|
||||
public class ReceiveCommits {
|
||||
private static final Logger log =
|
||||
LoggerFactory.getLogger(ReceiveCommits.class);
|
||||
|
||||
@@ -111,7 +110,7 @@ public class ReceiveCommits implements PreReceiveHook {
|
||||
private static final FooterKey TESTED_BY = new FooterKey("Tested-by");
|
||||
private static final FooterKey CHANGE_ID = new FooterKey("Change-Id");
|
||||
|
||||
public interface Factory {
|
||||
interface Factory {
|
||||
ReceiveCommits create(ProjectControl projectControl, Repository repository);
|
||||
}
|
||||
|
||||
@@ -261,8 +260,6 @@ public class ReceiveCommits implements PreReceiveHook {
|
||||
advHooks.add(rp.getAdvertiseRefsHook());
|
||||
advHooks.add(new ReceiveCommitsAdvertiseRefsHook());
|
||||
rp.setAdvertiseRefsHook(AdvertiseRefsHookChain.newChain(advHooks));
|
||||
|
||||
rp.setPreReceiveHook(this);
|
||||
}
|
||||
|
||||
/** Add reviewers for new (or updated) changes. */
|
||||
@@ -280,6 +277,17 @@ public class ReceiveCommits implements PreReceiveHook {
|
||||
messageSender = ms != null ? ms : new ReceivePackMessageSender();
|
||||
}
|
||||
|
||||
MessageSender getMessageSender() {
|
||||
if (messageSender == null) {
|
||||
setMessageSender(null);
|
||||
}
|
||||
return messageSender;
|
||||
}
|
||||
|
||||
Project getProject() {
|
||||
return project;
|
||||
}
|
||||
|
||||
/** @return the ReceivePack instance to speak the native Git protocol. */
|
||||
public ReceivePack getReceivePack() {
|
||||
return rp;
|
||||
@@ -369,9 +377,7 @@ public class ReceiveCommits implements PreReceiveHook {
|
||||
return MagicBranch.checkMagicBranchRefs(repo, project);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPreReceive(final ReceivePack arg0,
|
||||
final Collection<ReceiveCommand> commands) {
|
||||
void processCommands(final Collection<ReceiveCommand> commands) {
|
||||
parseCommands(commands);
|
||||
if (newChange != null
|
||||
&& newChange.getResult() == ReceiveCommand.Result.NOT_ATTEMPTED) {
|
||||
@@ -1991,7 +1997,7 @@ public class ReceiveCommits implements PreReceiveHook {
|
||||
reject(cmd, "prohibited by Gerrit");
|
||||
}
|
||||
|
||||
private static void reject(final ReceiveCommand cmd, final String why) {
|
||||
static void reject(final ReceiveCommand cmd, final String why) {
|
||||
cmd.setResult(ReceiveCommand.Result.REJECTED_OTHER_REASON, why);
|
||||
}
|
||||
|
||||
|
@@ -0,0 +1,30 @@
|
||||
// Copyright (C) 2012 The Android Open Source Project
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package com.google.gerrit.server.git;
|
||||
|
||||
import static java.lang.annotation.RetentionPolicy.RUNTIME;
|
||||
|
||||
import com.google.inject.BindingAnnotation;
|
||||
|
||||
import java.lang.annotation.Retention;
|
||||
|
||||
/**
|
||||
* Marker on the global {@link WorkQueue.Executor} used by
|
||||
* {@link ReceiveCommits}.
|
||||
*/
|
||||
@Retention(RUNTIME)
|
||||
@BindingAnnotation
|
||||
public @interface ReceiveCommitsExecutor {
|
||||
}
|
@@ -0,0 +1,41 @@
|
||||
// Copyright (C) 2012 The Android Open Source Project
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package com.google.gerrit.server.git;
|
||||
|
||||
import com.google.gerrit.server.config.GerritServerConfig;
|
||||
import com.google.gerrit.server.git.WorkQueue;
|
||||
import com.google.gerrit.server.git.WorkQueue.Executor;
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Provides;
|
||||
import com.google.inject.Singleton;
|
||||
|
||||
import org.eclipse.jgit.lib.Config;
|
||||
|
||||
/** Module providing the {@link ReceiveCommitsExecutor}. */
|
||||
public class ReceiveCommitsExecutorModule extends AbstractModule {
|
||||
@Override
|
||||
protected void configure() {
|
||||
}
|
||||
|
||||
@Provides
|
||||
@Singleton
|
||||
@ReceiveCommitsExecutor
|
||||
public Executor getReceiveCommitsExecutor(@GerritServerConfig Config config,
|
||||
WorkQueue queues) {
|
||||
int poolSize = config.getInt("receive", null, "threadPoolSize",
|
||||
Runtime.getRuntime().availableProcessors());
|
||||
return queues.createQueue(poolSize, "ReceiveCommits");
|
||||
}
|
||||
}
|
@@ -17,6 +17,7 @@ package com.google.gerrit.sshd.commands;
|
||||
import com.google.gerrit.common.data.Capable;
|
||||
import com.google.gerrit.reviewdb.client.Account;
|
||||
import com.google.gerrit.server.IdentifiedUser;
|
||||
import com.google.gerrit.server.git.AsyncReceiveCommits;
|
||||
import com.google.gerrit.server.git.ReceiveCommits;
|
||||
import com.google.gerrit.server.git.TransferConfig;
|
||||
import com.google.gerrit.server.git.VisibleRefFilter;
|
||||
@@ -39,7 +40,7 @@ import java.util.Set;
|
||||
/** Receives change upload over SSH using the Git receive-pack protocol. */
|
||||
final class Receive extends AbstractGitCommand {
|
||||
@Inject
|
||||
private ReceiveCommits.Factory factory;
|
||||
private AsyncReceiveCommits.Factory factory;
|
||||
|
||||
@Inject
|
||||
private IdentifiedUser currentUser;
|
||||
@@ -69,7 +70,8 @@ final class Receive extends AbstractGitCommand {
|
||||
throw new Failure(1, "fatal: receive-pack not permitted on this server");
|
||||
}
|
||||
|
||||
final ReceiveCommits receive = factory.create(projectControl, repo);
|
||||
final ReceiveCommits receive = factory.create(projectControl, repo)
|
||||
.getReceiveCommits();
|
||||
|
||||
Capable r = receive.canUpload();
|
||||
if (r != Capable.OK) {
|
||||
|
@@ -33,6 +33,7 @@ import com.google.gerrit.server.config.SitePath;
|
||||
import com.google.gerrit.server.contact.HttpContactStoreConnection;
|
||||
import com.google.gerrit.server.git.LocalDiskRepositoryManager;
|
||||
import com.google.gerrit.server.git.PushReplication;
|
||||
import com.google.gerrit.server.git.ReceiveCommitsExecutorModule;
|
||||
import com.google.gerrit.server.git.WorkQueue;
|
||||
import com.google.gerrit.server.mail.SignedTokenEmailTokenVerifier;
|
||||
import com.google.gerrit.server.mail.SmtpEmailSender;
|
||||
@@ -190,6 +191,7 @@ public class WebAppInitializer extends GuiceServletContextListener {
|
||||
final List<Module> modules = new ArrayList<Module>();
|
||||
modules.add(new WorkQueue.Module());
|
||||
modules.add(new ChangeHookRunner.Module());
|
||||
modules.add(new ReceiveCommitsExecutorModule());
|
||||
modules.add(cfgInjector.getInstance(GerritGlobalModule.class));
|
||||
modules.add(new EhcachePoolImpl.Module());
|
||||
modules.add(new SmtpEmailSender.Module());
|
||||
|
Reference in New Issue
Block a user