From 5797d87d6b2f0be3c27647f85cd59c36ed1769cc Mon Sep 17 00:00:00 2001 From: Shawn Pearce Date: Wed, 20 Feb 2013 11:20:17 -0800 Subject: [PATCH] Break IntraLineDiff threads into their own module This small refactoring allows me to manually bind the IntraLineWorkerPool within the gerrit-review server build, offering better control over how the threads are managed by this pool. Change-Id: I26c56ef3c57a86189e1c6a1b668b464e4f079995 --- .../java/com/google/gerrit/pgm/Daemon.java | 4 +- .../gerrit/server/patch/IntraLineLoader.java | 152 +-------------- .../server/patch/IntraLineWorkerPool.java | 182 ++++++++++++++++++ .../gerrit/httpd/WebAppInitializer.java | 2 + 4 files changed, 195 insertions(+), 145 deletions(-) create mode 100644 gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineWorkerPool.java diff --git a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Daemon.java b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Daemon.java index 6e825ef0fc..9654804125 100644 --- a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Daemon.java +++ b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Daemon.java @@ -51,13 +51,14 @@ import com.google.gerrit.server.git.ReceiveCommitsExecutorModule; import com.google.gerrit.server.git.WorkQueue; import com.google.gerrit.server.mail.SignedTokenEmailTokenVerifier; import com.google.gerrit.server.mail.SmtpEmailSender; +import com.google.gerrit.server.patch.IntraLineWorkerPool; import com.google.gerrit.server.plugins.PluginGuiceEnvironment; import com.google.gerrit.server.plugins.PluginModule; import com.google.gerrit.server.schema.SchemaUpdater; import com.google.gerrit.server.schema.SchemaVersionCheck; import com.google.gerrit.server.schema.UpdateUI; -import com.google.gerrit.server.ssh.NoSshModule; import com.google.gerrit.server.ssh.NoSshKeyCache; +import com.google.gerrit.server.ssh.NoSshModule; import com.google.gerrit.sshd.SshKeyCacheImpl; import com.google.gerrit.sshd.SshModule; import com.google.gerrit.sshd.commands.MasterCommandModule; @@ -310,6 +311,7 @@ public class Daemon extends SiteProgram { modules.add(new WorkQueue.Module()); modules.add(new ChangeHookRunner.Module()); modules.add(new ReceiveCommitsExecutorModule()); + modules.add(new IntraLineWorkerPool.Module()); modules.add(cfgInjector.getInstance(GerritGlobalModule.class)); modules.add(new DefaultCacheFactory.Module()); modules.add(new SmtpEmailSender.Module()); diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineLoader.java b/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineLoader.java index 5b659206f6..b95994fb1a 100644 --- a/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineLoader.java +++ b/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineLoader.java @@ -29,10 +29,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; class IntraLineLoader extends CacheLoader { @@ -44,16 +41,12 @@ class IntraLineLoader extends CacheLoader { private static final Pattern CONTROL_BLOCK_START_RE = Pattern .compile("[{:][ \\t]*$"); - private final BlockingQueue workerPool; + private final IntraLineWorkerPool workerPool; private final long timeoutMillis; @Inject - IntraLineLoader(final @GerritServerConfig Config cfg) { - final int workers = - cfg.getInt("cache", PatchListCacheImpl.INTRA_NAME, "maxIdleWorkers", - Runtime.getRuntime().availableProcessors() * 3 / 2); - workerPool = new ArrayBlockingQueue(workers, true /* fair */); - + IntraLineLoader(IntraLineWorkerPool pool, @GerritServerConfig Config cfg) { + workerPool = pool; timeoutMillis = ConfigUtil.getTimeUnit(cfg, "cache", PatchListCacheImpl.INTRA_NAME, "timeout", TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS), @@ -62,26 +55,17 @@ class IntraLineLoader extends CacheLoader { @Override public IntraLineDiff load(IntraLineDiffKey key) throws Exception { - Worker w = workerPool.poll(); - if (w == null) { - w = new Worker(); - } + IntraLineWorkerPool.Worker w = workerPool.acquire(); + IntraLineWorkerPool.Worker.Result r = w.computeWithTimeout(key, timeoutMillis); - Worker.Result r = w.computeWithTimeout(key, timeoutMillis); - - if (r == Worker.Result.TIMEOUT) { + if (r == IntraLineWorkerPool.Worker.Result.TIMEOUT) { // Don't keep this thread. We have to murder it unsafely, which // means its unable to be reused in the future. Return back a // null result, indicating the cache cannot load this key. // return new IntraLineDiff(IntraLineDiff.Status.TIMEOUT); } - - if (!workerPool.offer(w)) { - // If the idle worker pool is full, terminate this thread. - // - w.end(); - } + workerPool.release(w); if (r.error != null) { // If there was an error computing the result, carry it @@ -93,127 +77,7 @@ class IntraLineLoader extends CacheLoader { return r.diff; } - private static class Worker { - private static final AtomicInteger count = new AtomicInteger(1); - - private final ArrayBlockingQueue input; - private final ArrayBlockingQueue result; - private final Thread thread; - - Worker() { - input = new ArrayBlockingQueue(1); - result = new ArrayBlockingQueue(1); - - thread = new Thread(new Runnable() { - public void run() { - workerLoop(); - } - }); - thread.setName("IntraLineDiff-" + count.getAndIncrement()); - thread.setDaemon(true); - thread.start(); - } - - Result computeWithTimeout(IntraLineDiffKey key, long timeoutMillis) - throws Exception { - if (!input.offer(new Input(key))) { - log.error("Cannot enqueue task to thread " + thread.getName()); - return Result.TIMEOUT; - } - - Result r = result.poll(timeoutMillis, TimeUnit.MILLISECONDS); - if (r != null) { - return r; - } else { - log.warn(timeoutMillis + " ms timeout reached for IntraLineDiff" - + " in project " + key.getProject().get() // - + " on commit " + key.getCommit().name() // - + " for path " + key.getPath() // - + " comparing " + key.getBlobA().name() // - + ".." + key.getBlobB().name() // - + ". Killing " + thread.getName()); - forcefullyKillThreadInAnUglyWay(); - return Result.TIMEOUT; - } - } - - @SuppressWarnings("deprecation") - private void forcefullyKillThreadInAnUglyWay() { - try { - thread.stop(); - } catch (Throwable error) { - // Ignore any reason the thread won't stop. - log.error("Cannot stop runaway thread " + thread.getName(), error); - } - } - - void end() { - if (!input.offer(Input.END_THREAD)) { - log.error("Cannot gracefully stop thread " + thread.getName()); - } - } - - private void workerLoop() { - try { - for (;;) { - Input in; - try { - in = input.take(); - } catch (InterruptedException e) { - log.error("Unexpected interrupt on " + thread.getName()); - continue; - } - - if (in == Input.END_THREAD) { - return; - } - - Result r; - try { - r = new Result(IntraLineLoader.compute(in.key)); - } catch (Exception error) { - r = new Result(error); - } - - if (!result.offer(r)) { - log.error("Cannot return result from " + thread.getName()); - } - } - } catch (ThreadDeath iHaveBeenShot) { - // Handle thread death by gracefully returning to the caller, - // allowing the thread to be destroyed. - } - } - - private static class Input { - static final Input END_THREAD = new Input(null); - - final IntraLineDiffKey key; - - Input(IntraLineDiffKey key) { - this.key = key; - } - } - - static class Result { - static final Result TIMEOUT = new Result((IntraLineDiff) null); - - final IntraLineDiff diff; - final Exception error; - - Result(IntraLineDiff diff) { - this.diff = diff; - this.error = null; - } - - Result(Exception error) { - this.diff = null; - this.error = error; - } - } - } - - private static IntraLineDiff compute(IntraLineDiffKey key) throws Exception { + static IntraLineDiff compute(IntraLineDiffKey key) throws Exception { List edits = new ArrayList(key.getEdits()); Text aContent = key.getTextA(); Text bContent = key.getTextB(); diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineWorkerPool.java b/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineWorkerPool.java new file mode 100644 index 0000000000..5c6338fea1 --- /dev/null +++ b/gerrit-server/src/main/java/com/google/gerrit/server/patch/IntraLineWorkerPool.java @@ -0,0 +1,182 @@ +// Copyright (C) 2009 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +package com.google.gerrit.server.patch; + +import static com.google.gerrit.server.patch.IntraLineLoader.log; + +import com.google.gerrit.server.config.GerritServerConfig; +import com.google.inject.AbstractModule; +import com.google.inject.Inject; +import com.google.inject.Singleton; + +import org.eclipse.jgit.lib.Config; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +@Singleton +public class IntraLineWorkerPool { + public static class Module extends AbstractModule { + @Override + protected void configure() { + bind(IntraLineWorkerPool.class); + } + } + + private final BlockingQueue workerPool; + + @Inject + public IntraLineWorkerPool(@GerritServerConfig Config cfg) { + int workers = cfg.getInt( + "cache", PatchListCacheImpl.INTRA_NAME, "maxIdleWorkers", + Runtime.getRuntime().availableProcessors() * 3 / 2); + workerPool = new ArrayBlockingQueue(workers, true /* fair */); + } + + Worker acquire() { + Worker w = workerPool.poll(); + if (w == null) { + // If no worker is immediately available, start a new one. + // Maximum parallelism is controlled by the web server. + w = new Worker(); + w.start(); + } + return w; + } + + void release(Worker w) { + if (!workerPool.offer(w)) { + // If the idle worker pool is full, terminate the worker. + w.shutdownGracefully(); + } + } + + static class Worker extends Thread { + private static final AtomicInteger count = new AtomicInteger(1); + + private final ArrayBlockingQueue input; + private final ArrayBlockingQueue result; + + Worker() { + input = new ArrayBlockingQueue(1); + result = new ArrayBlockingQueue(1); + + setName("IntraLineDiff-" + count.getAndIncrement()); + setDaemon(true); + } + + Result computeWithTimeout(IntraLineDiffKey key, long timeoutMillis) + throws Exception { + if (!input.offer(new Input(key))) { + log.error("Cannot enqueue task to thread " + getName()); + return Result.TIMEOUT; + } + + Result r = result.poll(timeoutMillis, TimeUnit.MILLISECONDS); + if (r != null) { + return r; + } else { + log.warn(timeoutMillis + " ms timeout reached for IntraLineDiff" + + " in project " + key.getProject().get() + + " on commit " + key.getCommit().name() + + " for path " + key.getPath() + + " comparing " + key.getBlobA().name() + + ".." + key.getBlobB().name() + + ". Killing " + getName()); + forcefullyKillThreadInAnUglyWay(); + return Result.TIMEOUT; + } + } + + @SuppressWarnings("deprecation") + private void forcefullyKillThreadInAnUglyWay() { + try { + stop(); + } catch (Throwable error) { + // Ignore any reason the thread won't stop. + log.error("Cannot stop runaway thread " + getName(), error); + } + } + + private void shutdownGracefully() { + if (!input.offer(Input.END_THREAD)) { + log.error("Cannot gracefully stop thread " + getName()); + } + } + + @Override + public void run() { + try { + for (;;) { + Input in; + try { + in = input.take(); + } catch (InterruptedException e) { + log.error("Unexpected interrupt on " + getName()); + continue; + } + + if (in == Input.END_THREAD) { + return; + } + + Result r; + try { + r = new Result(IntraLineLoader.compute(in.key)); + } catch (Exception error) { + r = new Result(error); + } + + if (!result.offer(r)) { + log.error("Cannot return result from " + getName()); + } + } + } catch (ThreadDeath iHaveBeenShot) { + // Handle thread death by gracefully returning to the caller, + // allowing the thread to be destroyed. + } + } + + private static class Input { + static final Input END_THREAD = new Input(null); + + final IntraLineDiffKey key; + + Input(IntraLineDiffKey key) { + this.key = key; + } + } + + static class Result { + static final Result TIMEOUT = new Result((IntraLineDiff) null); + + final IntraLineDiff diff; + final Exception error; + + Result(IntraLineDiff diff) { + this.diff = diff; + this.error = null; + } + + Result(Exception error) { + this.diff = null; + this.error = error; + } + } + } +} diff --git a/gerrit-war/src/main/java/com/google/gerrit/httpd/WebAppInitializer.java b/gerrit-war/src/main/java/com/google/gerrit/httpd/WebAppInitializer.java index 3059dfac41..d0b8c3843a 100644 --- a/gerrit-war/src/main/java/com/google/gerrit/httpd/WebAppInitializer.java +++ b/gerrit-war/src/main/java/com/google/gerrit/httpd/WebAppInitializer.java @@ -39,6 +39,7 @@ import com.google.gerrit.server.git.ReceiveCommitsExecutorModule; import com.google.gerrit.server.git.WorkQueue; import com.google.gerrit.server.mail.SignedTokenEmailTokenVerifier; import com.google.gerrit.server.mail.SmtpEmailSender; +import com.google.gerrit.server.patch.IntraLineWorkerPool; import com.google.gerrit.server.plugins.PluginGuiceEnvironment; import com.google.gerrit.server.plugins.PluginModule; import com.google.gerrit.server.schema.DataSourceModule; @@ -229,6 +230,7 @@ public class WebAppInitializer extends GuiceServletContextListener { modules.add(new WorkQueue.Module()); modules.add(new ChangeHookRunner.Module()); modules.add(new ReceiveCommitsExecutorModule()); + modules.add(new IntraLineWorkerPool.Module()); modules.add(cfgInjector.getInstance(GerritGlobalModule.class)); modules.add(new DefaultCacheFactory.Module()); modules.add(new SmtpEmailSender.Module());