From 234734a5f57eabdb7b64ca8428690de346959232 Mon Sep 17 00:00:00 2001 From: Dave Borowitz Date: Thu, 1 Mar 2012 14:22:29 -0800 Subject: [PATCH] Run ReceiveCommits in a shared thread pool Since the work ReceiveCommits may take a long, potentially unbounded amount of time, we would like to have it run in the background so it can be monitored for timeouts and cancelled, and have stalls reported to the user from the main thread. Wraps ReceiveCommits in a new PreReceiveHook, AsyncReceiveCommits, that runs its delegated ReceiveCommits in a WorkQueue.Executor. The default implementation sizes the thread pool to the number of available processors, under the assumption that most database and pack operations are essentially CPU-bound. This is after reading the pack from the wire, indexing it, and storing it locally, so there is no I/O to the client involved, and available CPUs is a pretty good estimate. Since it may not be in some cases, for example if threads block on slow database I/O, the pool size is configurable. Change-Id: I5769944b49ead4224a1855ba72ee359de26e7bf8 --- Documentation/config-gerrit.txt | 10 +- .../gerrit/httpd/GitOverHttpServlet.java | 8 +- .../java/com/google/gerrit/pgm/Daemon.java | 2 + .../server/config/GerritRequestModule.java | 4 +- .../server/git/AsyncReceiveCommits.java | 138 ++++++++++++++++++ .../gerrit/server/git/ReceiveCommits.java | 24 +-- .../server/git/ReceiveCommitsExecutor.java | 30 ++++ .../git/ReceiveCommitsExecutorModule.java | 41 ++++++ .../google/gerrit/sshd/commands/Receive.java | 6 +- .../gerrit/httpd/WebAppInitializer.java | 2 + 10 files changed, 248 insertions(+), 17 deletions(-) create mode 100644 gerrit-server/src/main/java/com/google/gerrit/server/git/AsyncReceiveCommits.java create mode 100644 gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommitsExecutor.java create mode 100644 gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommitsExecutorModule.java diff --git a/Documentation/config-gerrit.txt b/Documentation/config-gerrit.txt index 964defd38f..870e5774d1 100644 --- a/Documentation/config-gerrit.txt +++ b/Documentation/config-gerrit.txt @@ -1675,7 +1675,8 @@ By default, 1. This section is used to set who can execute the 'receive-pack' and to limit the maximum Git object size that 'receive-pack' will accept. 'receive-pack' is what runs on the server during a user's push or -repo upload command. +repo upload command. It also contains some advanced options for tuning the +behavior of Gerrit's 'receive-pack' mechanism. ---- [receive] @@ -1706,6 +1707,13 @@ Default is zero. + Common unit suffixes of 'k', 'm', or 'g' are supported. +[[receive.threadPoolSize]]receive.threadPoolSize:: ++ +Maximum size of the thread pool in which the change data in received packs is +processed. ++ +Defaults to the number of available CPUs according to the Java runtime. + [[repository]]Section repository ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/gerrit-httpd/src/main/java/com/google/gerrit/httpd/GitOverHttpServlet.java b/gerrit-httpd/src/main/java/com/google/gerrit/httpd/GitOverHttpServlet.java index 7de1f84065..c36df04a68 100644 --- a/gerrit-httpd/src/main/java/com/google/gerrit/httpd/GitOverHttpServlet.java +++ b/gerrit-httpd/src/main/java/com/google/gerrit/httpd/GitOverHttpServlet.java @@ -22,6 +22,7 @@ import com.google.gerrit.server.AnonymousUser; import com.google.gerrit.server.IdentifiedUser; import com.google.gerrit.server.cache.Cache; import com.google.gerrit.server.cache.CacheModule; +import com.google.gerrit.server.git.AsyncReceiveCommits; import com.google.gerrit.server.git.GitRepositoryManager; import com.google.gerrit.server.git.ReceiveCommits; import com.google.gerrit.server.git.TagCache; @@ -231,11 +232,12 @@ public class GitOverHttpServlet extends GitServlet { } static class ReceiveFactory implements ReceivePackFactory { - private final ReceiveCommits.Factory factory; + private final AsyncReceiveCommits.Factory factory; private final Provider session; @Inject - ReceiveFactory(ReceiveCommits.Factory factory, Provider session) { + ReceiveFactory(AsyncReceiveCommits.Factory factory, + Provider session) { this.factory = factory; this.session = session; } @@ -251,7 +253,7 @@ public class GitOverHttpServlet extends GitServlet { } final IdentifiedUser user = (IdentifiedUser) pc.getCurrentUser(); - final ReceiveCommits rc = factory.create(pc, db); + final ReceiveCommits rc = factory.create(pc, db).getReceiveCommits(); rc.getReceivePack().setRefLogIdent(user.newRefLogIdent()); req.setAttribute(ATT_RC, rc); session.get().setAccessPath(AccessPath.GIT); diff --git a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Daemon.java b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Daemon.java index 14619f25f1..25b7699a68 100644 --- a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Daemon.java +++ b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Daemon.java @@ -42,6 +42,7 @@ import com.google.gerrit.server.config.GerritGlobalModule; import com.google.gerrit.server.config.MasterNodeStartup; import com.google.gerrit.server.contact.HttpContactStoreConnection; import com.google.gerrit.server.git.PushReplication; +import com.google.gerrit.server.git.ReceiveCommitsExecutorModule; import com.google.gerrit.server.git.WorkQueue; import com.google.gerrit.server.mail.SignedTokenEmailTokenVerifier; import com.google.gerrit.server.mail.SmtpEmailSender; @@ -201,6 +202,7 @@ public class Daemon extends SiteProgram { modules.add(new LogFileCompressor.Module()); modules.add(new WorkQueue.Module()); modules.add(new ChangeHookRunner.Module()); + modules.add(new ReceiveCommitsExecutorModule()); modules.add(cfgInjector.getInstance(GerritGlobalModule.class)); modules.add(new EhcachePoolImpl.Module()); modules.add(new SmtpEmailSender.Module()); diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/config/GerritRequestModule.java b/gerrit-server/src/main/java/com/google/gerrit/server/config/GerritRequestModule.java index 611687cdcc..bf741a4b90 100644 --- a/gerrit-server/src/main/java/com/google/gerrit/server/config/GerritRequestModule.java +++ b/gerrit-server/src/main/java/com/google/gerrit/server/config/GerritRequestModule.java @@ -33,10 +33,10 @@ import com.google.gerrit.server.changedetail.DeleteDraftPatchSet; import com.google.gerrit.server.changedetail.PublishDraft; import com.google.gerrit.server.changedetail.RestoreChange; import com.google.gerrit.server.changedetail.Submit; +import com.google.gerrit.server.git.AsyncReceiveCommits; import com.google.gerrit.server.git.CreateCodeReviewNotes; import com.google.gerrit.server.git.MergeOp; import com.google.gerrit.server.git.MetaDataUpdate; -import com.google.gerrit.server.git.ReceiveCommits; import com.google.gerrit.server.git.SubmoduleOp; import com.google.gerrit.server.mail.AbandonedSender; import com.google.gerrit.server.mail.AddReviewerSender; @@ -79,10 +79,10 @@ public class GerritRequestModule extends FactoryModule { bind(AccountControl.Factory.class).in(SINGLETON); factory(ChangeQueryBuilder.Factory.class); - factory(ReceiveCommits.Factory.class); factory(SubmoduleOp.Factory.class); factory(MergeOp.Factory.class); factory(CreateCodeReviewNotes.Factory.class); + install(new AsyncReceiveCommits.Module()); // Not really per-request, but dammit, I don't know where else to // easily park this stuff. diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/git/AsyncReceiveCommits.java b/gerrit-server/src/main/java/com/google/gerrit/server/git/AsyncReceiveCommits.java new file mode 100644 index 0000000000..c991cb08db --- /dev/null +++ b/gerrit-server/src/main/java/com/google/gerrit/server/git/AsyncReceiveCommits.java @@ -0,0 +1,138 @@ +// Copyright (C) 2012 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.gerrit.server.git; + +import com.google.gerrit.reviewdb.client.Project; +import com.google.gerrit.server.project.ProjectControl; +import com.google.gerrit.server.util.RequestScopePropagator; +import com.google.gerrit.server.git.WorkQueue.Executor; +import com.google.inject.assistedinject.Assisted; +import com.google.inject.assistedinject.FactoryModuleBuilder; +import com.google.inject.Inject; +import com.google.inject.PrivateModule; + +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.transport.PreReceiveHook; +import org.eclipse.jgit.transport.ReceiveCommand; +import org.eclipse.jgit.transport.ReceivePack; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +/** Hook that delegates to {@link ReceiveCommits} in a worker thread. */ +public class AsyncReceiveCommits implements PreReceiveHook { + private static final Logger log = + LoggerFactory.getLogger(AsyncReceiveCommits.class); + + private final ReceiveCommits rc; + private final Executor executor; + private final RequestScopePropagator scopePropagator; + + public interface Factory { + AsyncReceiveCommits create(ProjectControl projectControl, + Repository repository); + } + + public static class Module extends PrivateModule { + @Override + public void configure() { + install(new FactoryModuleBuilder() + .build(AsyncReceiveCommits.Factory.class)); + expose(AsyncReceiveCommits.Factory.class); + // Don't expose the binding for ReceiveCommits.Factory. All callers should + // be using AsyncReceiveCommits.Factory instead. + install(new FactoryModuleBuilder() + .build(ReceiveCommits.Factory.class)); + } + } + + private class Worker implements ProjectRunnable { + private final Collection commands; + + private Worker(final Collection commands) { + this.commands = commands; + } + + @Override + public void run() { + rc.processCommands(commands); + } + + @Override + public Project.NameKey getProjectNameKey() { + return rc.getProject().getNameKey(); + } + + @Override + public String getRemoteName() { + return null; + } + + @Override + public boolean hasCustomizedPrint() { + return true; + } + + @Override + public String toString() { + return "receive-commits"; + } + } + + @Inject + AsyncReceiveCommits(final ReceiveCommits.Factory factory, + @ReceiveCommitsExecutor final Executor executor, + final RequestScopePropagator scopePropagator, + @Assisted final ProjectControl projectControl, + @Assisted final Repository repo) { + this.executor = executor; + this.scopePropagator = scopePropagator; + rc = factory.create(projectControl, repo); + rc.getReceivePack().setPreReceiveHook(this); + } + + @Override + public void onPreReceive(final ReceivePack rp, + final Collection commands) { + Future workerFuture = executor.submit( + scopePropagator.wrap(new Worker(commands))); + Exception err = null; + try { + workerFuture.get(); + } catch (ExecutionException e) { + err = e; + } catch (InterruptedException e) { + err = e; + } + if (err != null) { + log.warn("Error in ReceiveCommits", err); + rc.getMessageSender().sendError("internal error while processing changes"); + // ReceiveCommits has tried its best to catch errors, so anything at this + // point is very bad. + for (final ReceiveCommand c : commands) { + if (c.getResult() == ReceiveCommand.Result.NOT_ATTEMPTED) { + ReceiveCommits.reject(c, "internal error"); + } + } + } + } + + public ReceiveCommits getReceiveCommits() { + return rc; + } +} diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommits.java b/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommits.java index 1ec7457345..5242e2e1af 100644 --- a/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommits.java +++ b/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommits.java @@ -77,7 +77,6 @@ import org.eclipse.jgit.revwalk.RevWalk; import org.eclipse.jgit.revwalk.filter.RevFilter; import org.eclipse.jgit.transport.AdvertiseRefsHook; import org.eclipse.jgit.transport.AdvertiseRefsHookChain; -import org.eclipse.jgit.transport.PreReceiveHook; import org.eclipse.jgit.transport.ReceiveCommand; import org.eclipse.jgit.transport.ReceiveCommand.Result; import org.eclipse.jgit.transport.ReceivePack; @@ -100,7 +99,7 @@ import java.util.regex.Pattern; import javax.annotation.Nullable; /** Receives change upload using the Git receive-pack protocol. */ -public class ReceiveCommits implements PreReceiveHook { +public class ReceiveCommits { private static final Logger log = LoggerFactory.getLogger(ReceiveCommits.class); @@ -111,7 +110,7 @@ public class ReceiveCommits implements PreReceiveHook { private static final FooterKey TESTED_BY = new FooterKey("Tested-by"); private static final FooterKey CHANGE_ID = new FooterKey("Change-Id"); - public interface Factory { + interface Factory { ReceiveCommits create(ProjectControl projectControl, Repository repository); } @@ -261,8 +260,6 @@ public class ReceiveCommits implements PreReceiveHook { advHooks.add(rp.getAdvertiseRefsHook()); advHooks.add(new ReceiveCommitsAdvertiseRefsHook()); rp.setAdvertiseRefsHook(AdvertiseRefsHookChain.newChain(advHooks)); - - rp.setPreReceiveHook(this); } /** Add reviewers for new (or updated) changes. */ @@ -280,6 +277,17 @@ public class ReceiveCommits implements PreReceiveHook { messageSender = ms != null ? ms : new ReceivePackMessageSender(); } + MessageSender getMessageSender() { + if (messageSender == null) { + setMessageSender(null); + } + return messageSender; + } + + Project getProject() { + return project; + } + /** @return the ReceivePack instance to speak the native Git protocol. */ public ReceivePack getReceivePack() { return rp; @@ -369,9 +377,7 @@ public class ReceiveCommits implements PreReceiveHook { return MagicBranch.checkMagicBranchRefs(repo, project); } - @Override - public void onPreReceive(final ReceivePack arg0, - final Collection commands) { + void processCommands(final Collection commands) { parseCommands(commands); if (newChange != null && newChange.getResult() == ReceiveCommand.Result.NOT_ATTEMPTED) { @@ -1991,7 +1997,7 @@ public class ReceiveCommits implements PreReceiveHook { reject(cmd, "prohibited by Gerrit"); } - private static void reject(final ReceiveCommand cmd, final String why) { + static void reject(final ReceiveCommand cmd, final String why) { cmd.setResult(ReceiveCommand.Result.REJECTED_OTHER_REASON, why); } diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommitsExecutor.java b/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommitsExecutor.java new file mode 100644 index 0000000000..2049ceada5 --- /dev/null +++ b/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommitsExecutor.java @@ -0,0 +1,30 @@ +// Copyright (C) 2012 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.gerrit.server.git; + +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.Retention; + +/** + * Marker on the global {@link WorkQueue.Executor} used by + * {@link ReceiveCommits}. + */ +@Retention(RUNTIME) +@BindingAnnotation +public @interface ReceiveCommitsExecutor { +} diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommitsExecutorModule.java b/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommitsExecutorModule.java new file mode 100644 index 0000000000..063db2d841 --- /dev/null +++ b/gerrit-server/src/main/java/com/google/gerrit/server/git/ReceiveCommitsExecutorModule.java @@ -0,0 +1,41 @@ +// Copyright (C) 2012 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.gerrit.server.git; + +import com.google.gerrit.server.config.GerritServerConfig; +import com.google.gerrit.server.git.WorkQueue; +import com.google.gerrit.server.git.WorkQueue.Executor; +import com.google.inject.AbstractModule; +import com.google.inject.Provides; +import com.google.inject.Singleton; + +import org.eclipse.jgit.lib.Config; + +/** Module providing the {@link ReceiveCommitsExecutor}. */ +public class ReceiveCommitsExecutorModule extends AbstractModule { + @Override + protected void configure() { + } + + @Provides + @Singleton + @ReceiveCommitsExecutor + public Executor getReceiveCommitsExecutor(@GerritServerConfig Config config, + WorkQueue queues) { + int poolSize = config.getInt("receive", null, "threadPoolSize", + Runtime.getRuntime().availableProcessors()); + return queues.createQueue(poolSize, "ReceiveCommits"); + } +} diff --git a/gerrit-sshd/src/main/java/com/google/gerrit/sshd/commands/Receive.java b/gerrit-sshd/src/main/java/com/google/gerrit/sshd/commands/Receive.java index 0322f0b829..0e7ff83f53 100644 --- a/gerrit-sshd/src/main/java/com/google/gerrit/sshd/commands/Receive.java +++ b/gerrit-sshd/src/main/java/com/google/gerrit/sshd/commands/Receive.java @@ -17,6 +17,7 @@ package com.google.gerrit.sshd.commands; import com.google.gerrit.common.data.Capable; import com.google.gerrit.reviewdb.client.Account; import com.google.gerrit.server.IdentifiedUser; +import com.google.gerrit.server.git.AsyncReceiveCommits; import com.google.gerrit.server.git.ReceiveCommits; import com.google.gerrit.server.git.TransferConfig; import com.google.gerrit.server.git.VisibleRefFilter; @@ -39,7 +40,7 @@ import java.util.Set; /** Receives change upload over SSH using the Git receive-pack protocol. */ final class Receive extends AbstractGitCommand { @Inject - private ReceiveCommits.Factory factory; + private AsyncReceiveCommits.Factory factory; @Inject private IdentifiedUser currentUser; @@ -69,7 +70,8 @@ final class Receive extends AbstractGitCommand { throw new Failure(1, "fatal: receive-pack not permitted on this server"); } - final ReceiveCommits receive = factory.create(projectControl, repo); + final ReceiveCommits receive = factory.create(projectControl, repo) + .getReceiveCommits(); Capable r = receive.canUpload(); if (r != Capable.OK) { diff --git a/gerrit-war/src/main/java/com/google/gerrit/httpd/WebAppInitializer.java b/gerrit-war/src/main/java/com/google/gerrit/httpd/WebAppInitializer.java index 2901bc7495..01b4a44b2e 100644 --- a/gerrit-war/src/main/java/com/google/gerrit/httpd/WebAppInitializer.java +++ b/gerrit-war/src/main/java/com/google/gerrit/httpd/WebAppInitializer.java @@ -33,6 +33,7 @@ import com.google.gerrit.server.config.SitePath; import com.google.gerrit.server.contact.HttpContactStoreConnection; import com.google.gerrit.server.git.LocalDiskRepositoryManager; import com.google.gerrit.server.git.PushReplication; +import com.google.gerrit.server.git.ReceiveCommitsExecutorModule; import com.google.gerrit.server.git.WorkQueue; import com.google.gerrit.server.mail.SignedTokenEmailTokenVerifier; import com.google.gerrit.server.mail.SmtpEmailSender; @@ -190,6 +191,7 @@ public class WebAppInitializer extends GuiceServletContextListener { final List modules = new ArrayList(); modules.add(new WorkQueue.Module()); modules.add(new ChangeHookRunner.Module()); + modules.add(new ReceiveCommitsExecutorModule()); modules.add(cfgInjector.getInstance(GerritGlobalModule.class)); modules.add(new EhcachePoolImpl.Module()); modules.add(new SmtpEmailSender.Module());