Break IntraLineDiff threads into their own module
This small refactoring allows me to manually bind the IntraLineWorkerPool within the gerrit-review server build, offering better control over how the threads are managed by this pool. Change-Id: I26c56ef3c57a86189e1c6a1b668b464e4f079995
This commit is contained in:
parent
7609714b2c
commit
5797d87d6b
@ -51,13 +51,14 @@ import com.google.gerrit.server.git.ReceiveCommitsExecutorModule;
|
|||||||
import com.google.gerrit.server.git.WorkQueue;
|
import com.google.gerrit.server.git.WorkQueue;
|
||||||
import com.google.gerrit.server.mail.SignedTokenEmailTokenVerifier;
|
import com.google.gerrit.server.mail.SignedTokenEmailTokenVerifier;
|
||||||
import com.google.gerrit.server.mail.SmtpEmailSender;
|
import com.google.gerrit.server.mail.SmtpEmailSender;
|
||||||
|
import com.google.gerrit.server.patch.IntraLineWorkerPool;
|
||||||
import com.google.gerrit.server.plugins.PluginGuiceEnvironment;
|
import com.google.gerrit.server.plugins.PluginGuiceEnvironment;
|
||||||
import com.google.gerrit.server.plugins.PluginModule;
|
import com.google.gerrit.server.plugins.PluginModule;
|
||||||
import com.google.gerrit.server.schema.SchemaUpdater;
|
import com.google.gerrit.server.schema.SchemaUpdater;
|
||||||
import com.google.gerrit.server.schema.SchemaVersionCheck;
|
import com.google.gerrit.server.schema.SchemaVersionCheck;
|
||||||
import com.google.gerrit.server.schema.UpdateUI;
|
import com.google.gerrit.server.schema.UpdateUI;
|
||||||
import com.google.gerrit.server.ssh.NoSshModule;
|
|
||||||
import com.google.gerrit.server.ssh.NoSshKeyCache;
|
import com.google.gerrit.server.ssh.NoSshKeyCache;
|
||||||
|
import com.google.gerrit.server.ssh.NoSshModule;
|
||||||
import com.google.gerrit.sshd.SshKeyCacheImpl;
|
import com.google.gerrit.sshd.SshKeyCacheImpl;
|
||||||
import com.google.gerrit.sshd.SshModule;
|
import com.google.gerrit.sshd.SshModule;
|
||||||
import com.google.gerrit.sshd.commands.MasterCommandModule;
|
import com.google.gerrit.sshd.commands.MasterCommandModule;
|
||||||
@ -310,6 +311,7 @@ public class Daemon extends SiteProgram {
|
|||||||
modules.add(new WorkQueue.Module());
|
modules.add(new WorkQueue.Module());
|
||||||
modules.add(new ChangeHookRunner.Module());
|
modules.add(new ChangeHookRunner.Module());
|
||||||
modules.add(new ReceiveCommitsExecutorModule());
|
modules.add(new ReceiveCommitsExecutorModule());
|
||||||
|
modules.add(new IntraLineWorkerPool.Module());
|
||||||
modules.add(cfgInjector.getInstance(GerritGlobalModule.class));
|
modules.add(cfgInjector.getInstance(GerritGlobalModule.class));
|
||||||
modules.add(new DefaultCacheFactory.Module());
|
modules.add(new DefaultCacheFactory.Module());
|
||||||
modules.add(new SmtpEmailSender.Module());
|
modules.add(new SmtpEmailSender.Module());
|
||||||
|
@ -29,10 +29,7 @@ import org.slf4j.LoggerFactory;
|
|||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.ArrayBlockingQueue;
|
|
||||||
import java.util.concurrent.BlockingQueue;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
class IntraLineLoader extends CacheLoader<IntraLineDiffKey, IntraLineDiff> {
|
class IntraLineLoader extends CacheLoader<IntraLineDiffKey, IntraLineDiff> {
|
||||||
@ -44,16 +41,12 @@ class IntraLineLoader extends CacheLoader<IntraLineDiffKey, IntraLineDiff> {
|
|||||||
private static final Pattern CONTROL_BLOCK_START_RE = Pattern
|
private static final Pattern CONTROL_BLOCK_START_RE = Pattern
|
||||||
.compile("[{:][ \\t]*$");
|
.compile("[{:][ \\t]*$");
|
||||||
|
|
||||||
private final BlockingQueue<Worker> workerPool;
|
private final IntraLineWorkerPool workerPool;
|
||||||
private final long timeoutMillis;
|
private final long timeoutMillis;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
IntraLineLoader(final @GerritServerConfig Config cfg) {
|
IntraLineLoader(IntraLineWorkerPool pool, @GerritServerConfig Config cfg) {
|
||||||
final int workers =
|
workerPool = pool;
|
||||||
cfg.getInt("cache", PatchListCacheImpl.INTRA_NAME, "maxIdleWorkers",
|
|
||||||
Runtime.getRuntime().availableProcessors() * 3 / 2);
|
|
||||||
workerPool = new ArrayBlockingQueue<Worker>(workers, true /* fair */);
|
|
||||||
|
|
||||||
timeoutMillis =
|
timeoutMillis =
|
||||||
ConfigUtil.getTimeUnit(cfg, "cache", PatchListCacheImpl.INTRA_NAME,
|
ConfigUtil.getTimeUnit(cfg, "cache", PatchListCacheImpl.INTRA_NAME,
|
||||||
"timeout", TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS),
|
"timeout", TimeUnit.MILLISECONDS.convert(5, TimeUnit.SECONDS),
|
||||||
@ -62,26 +55,17 @@ class IntraLineLoader extends CacheLoader<IntraLineDiffKey, IntraLineDiff> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public IntraLineDiff load(IntraLineDiffKey key) throws Exception {
|
public IntraLineDiff load(IntraLineDiffKey key) throws Exception {
|
||||||
Worker w = workerPool.poll();
|
IntraLineWorkerPool.Worker w = workerPool.acquire();
|
||||||
if (w == null) {
|
IntraLineWorkerPool.Worker.Result r = w.computeWithTimeout(key, timeoutMillis);
|
||||||
w = new Worker();
|
|
||||||
}
|
|
||||||
|
|
||||||
Worker.Result r = w.computeWithTimeout(key, timeoutMillis);
|
if (r == IntraLineWorkerPool.Worker.Result.TIMEOUT) {
|
||||||
|
|
||||||
if (r == Worker.Result.TIMEOUT) {
|
|
||||||
// Don't keep this thread. We have to murder it unsafely, which
|
// Don't keep this thread. We have to murder it unsafely, which
|
||||||
// means its unable to be reused in the future. Return back a
|
// means its unable to be reused in the future. Return back a
|
||||||
// null result, indicating the cache cannot load this key.
|
// null result, indicating the cache cannot load this key.
|
||||||
//
|
//
|
||||||
return new IntraLineDiff(IntraLineDiff.Status.TIMEOUT);
|
return new IntraLineDiff(IntraLineDiff.Status.TIMEOUT);
|
||||||
}
|
}
|
||||||
|
workerPool.release(w);
|
||||||
if (!workerPool.offer(w)) {
|
|
||||||
// If the idle worker pool is full, terminate this thread.
|
|
||||||
//
|
|
||||||
w.end();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (r.error != null) {
|
if (r.error != null) {
|
||||||
// If there was an error computing the result, carry it
|
// If there was an error computing the result, carry it
|
||||||
@ -93,127 +77,7 @@ class IntraLineLoader extends CacheLoader<IntraLineDiffKey, IntraLineDiff> {
|
|||||||
return r.diff;
|
return r.diff;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class Worker {
|
static IntraLineDiff compute(IntraLineDiffKey key) throws Exception {
|
||||||
private static final AtomicInteger count = new AtomicInteger(1);
|
|
||||||
|
|
||||||
private final ArrayBlockingQueue<Input> input;
|
|
||||||
private final ArrayBlockingQueue<Result> result;
|
|
||||||
private final Thread thread;
|
|
||||||
|
|
||||||
Worker() {
|
|
||||||
input = new ArrayBlockingQueue<Input>(1);
|
|
||||||
result = new ArrayBlockingQueue<Result>(1);
|
|
||||||
|
|
||||||
thread = new Thread(new Runnable() {
|
|
||||||
public void run() {
|
|
||||||
workerLoop();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
thread.setName("IntraLineDiff-" + count.getAndIncrement());
|
|
||||||
thread.setDaemon(true);
|
|
||||||
thread.start();
|
|
||||||
}
|
|
||||||
|
|
||||||
Result computeWithTimeout(IntraLineDiffKey key, long timeoutMillis)
|
|
||||||
throws Exception {
|
|
||||||
if (!input.offer(new Input(key))) {
|
|
||||||
log.error("Cannot enqueue task to thread " + thread.getName());
|
|
||||||
return Result.TIMEOUT;
|
|
||||||
}
|
|
||||||
|
|
||||||
Result r = result.poll(timeoutMillis, TimeUnit.MILLISECONDS);
|
|
||||||
if (r != null) {
|
|
||||||
return r;
|
|
||||||
} else {
|
|
||||||
log.warn(timeoutMillis + " ms timeout reached for IntraLineDiff"
|
|
||||||
+ " in project " + key.getProject().get() //
|
|
||||||
+ " on commit " + key.getCommit().name() //
|
|
||||||
+ " for path " + key.getPath() //
|
|
||||||
+ " comparing " + key.getBlobA().name() //
|
|
||||||
+ ".." + key.getBlobB().name() //
|
|
||||||
+ ". Killing " + thread.getName());
|
|
||||||
forcefullyKillThreadInAnUglyWay();
|
|
||||||
return Result.TIMEOUT;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
|
||||||
private void forcefullyKillThreadInAnUglyWay() {
|
|
||||||
try {
|
|
||||||
thread.stop();
|
|
||||||
} catch (Throwable error) {
|
|
||||||
// Ignore any reason the thread won't stop.
|
|
||||||
log.error("Cannot stop runaway thread " + thread.getName(), error);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void end() {
|
|
||||||
if (!input.offer(Input.END_THREAD)) {
|
|
||||||
log.error("Cannot gracefully stop thread " + thread.getName());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void workerLoop() {
|
|
||||||
try {
|
|
||||||
for (;;) {
|
|
||||||
Input in;
|
|
||||||
try {
|
|
||||||
in = input.take();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
log.error("Unexpected interrupt on " + thread.getName());
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (in == Input.END_THREAD) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
Result r;
|
|
||||||
try {
|
|
||||||
r = new Result(IntraLineLoader.compute(in.key));
|
|
||||||
} catch (Exception error) {
|
|
||||||
r = new Result(error);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!result.offer(r)) {
|
|
||||||
log.error("Cannot return result from " + thread.getName());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (ThreadDeath iHaveBeenShot) {
|
|
||||||
// Handle thread death by gracefully returning to the caller,
|
|
||||||
// allowing the thread to be destroyed.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static class Input {
|
|
||||||
static final Input END_THREAD = new Input(null);
|
|
||||||
|
|
||||||
final IntraLineDiffKey key;
|
|
||||||
|
|
||||||
Input(IntraLineDiffKey key) {
|
|
||||||
this.key = key;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static class Result {
|
|
||||||
static final Result TIMEOUT = new Result((IntraLineDiff) null);
|
|
||||||
|
|
||||||
final IntraLineDiff diff;
|
|
||||||
final Exception error;
|
|
||||||
|
|
||||||
Result(IntraLineDiff diff) {
|
|
||||||
this.diff = diff;
|
|
||||||
this.error = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
Result(Exception error) {
|
|
||||||
this.diff = null;
|
|
||||||
this.error = error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static IntraLineDiff compute(IntraLineDiffKey key) throws Exception {
|
|
||||||
List<Edit> edits = new ArrayList<Edit>(key.getEdits());
|
List<Edit> edits = new ArrayList<Edit>(key.getEdits());
|
||||||
Text aContent = key.getTextA();
|
Text aContent = key.getTextA();
|
||||||
Text bContent = key.getTextB();
|
Text bContent = key.getTextB();
|
||||||
|
@ -0,0 +1,182 @@
|
|||||||
|
// Copyright (C) 2009 The Android Open Source Project
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
|
||||||
|
package com.google.gerrit.server.patch;
|
||||||
|
|
||||||
|
import static com.google.gerrit.server.patch.IntraLineLoader.log;
|
||||||
|
|
||||||
|
import com.google.gerrit.server.config.GerritServerConfig;
|
||||||
|
import com.google.inject.AbstractModule;
|
||||||
|
import com.google.inject.Inject;
|
||||||
|
import com.google.inject.Singleton;
|
||||||
|
|
||||||
|
import org.eclipse.jgit.lib.Config;
|
||||||
|
|
||||||
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
@Singleton
|
||||||
|
public class IntraLineWorkerPool {
|
||||||
|
public static class Module extends AbstractModule {
|
||||||
|
@Override
|
||||||
|
protected void configure() {
|
||||||
|
bind(IntraLineWorkerPool.class);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final BlockingQueue<Worker> workerPool;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public IntraLineWorkerPool(@GerritServerConfig Config cfg) {
|
||||||
|
int workers = cfg.getInt(
|
||||||
|
"cache", PatchListCacheImpl.INTRA_NAME, "maxIdleWorkers",
|
||||||
|
Runtime.getRuntime().availableProcessors() * 3 / 2);
|
||||||
|
workerPool = new ArrayBlockingQueue<Worker>(workers, true /* fair */);
|
||||||
|
}
|
||||||
|
|
||||||
|
Worker acquire() {
|
||||||
|
Worker w = workerPool.poll();
|
||||||
|
if (w == null) {
|
||||||
|
// If no worker is immediately available, start a new one.
|
||||||
|
// Maximum parallelism is controlled by the web server.
|
||||||
|
w = new Worker();
|
||||||
|
w.start();
|
||||||
|
}
|
||||||
|
return w;
|
||||||
|
}
|
||||||
|
|
||||||
|
void release(Worker w) {
|
||||||
|
if (!workerPool.offer(w)) {
|
||||||
|
// If the idle worker pool is full, terminate the worker.
|
||||||
|
w.shutdownGracefully();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class Worker extends Thread {
|
||||||
|
private static final AtomicInteger count = new AtomicInteger(1);
|
||||||
|
|
||||||
|
private final ArrayBlockingQueue<Input> input;
|
||||||
|
private final ArrayBlockingQueue<Result> result;
|
||||||
|
|
||||||
|
Worker() {
|
||||||
|
input = new ArrayBlockingQueue<Input>(1);
|
||||||
|
result = new ArrayBlockingQueue<Result>(1);
|
||||||
|
|
||||||
|
setName("IntraLineDiff-" + count.getAndIncrement());
|
||||||
|
setDaemon(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
Result computeWithTimeout(IntraLineDiffKey key, long timeoutMillis)
|
||||||
|
throws Exception {
|
||||||
|
if (!input.offer(new Input(key))) {
|
||||||
|
log.error("Cannot enqueue task to thread " + getName());
|
||||||
|
return Result.TIMEOUT;
|
||||||
|
}
|
||||||
|
|
||||||
|
Result r = result.poll(timeoutMillis, TimeUnit.MILLISECONDS);
|
||||||
|
if (r != null) {
|
||||||
|
return r;
|
||||||
|
} else {
|
||||||
|
log.warn(timeoutMillis + " ms timeout reached for IntraLineDiff"
|
||||||
|
+ " in project " + key.getProject().get()
|
||||||
|
+ " on commit " + key.getCommit().name()
|
||||||
|
+ " for path " + key.getPath()
|
||||||
|
+ " comparing " + key.getBlobA().name()
|
||||||
|
+ ".." + key.getBlobB().name()
|
||||||
|
+ ". Killing " + getName());
|
||||||
|
forcefullyKillThreadInAnUglyWay();
|
||||||
|
return Result.TIMEOUT;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
private void forcefullyKillThreadInAnUglyWay() {
|
||||||
|
try {
|
||||||
|
stop();
|
||||||
|
} catch (Throwable error) {
|
||||||
|
// Ignore any reason the thread won't stop.
|
||||||
|
log.error("Cannot stop runaway thread " + getName(), error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void shutdownGracefully() {
|
||||||
|
if (!input.offer(Input.END_THREAD)) {
|
||||||
|
log.error("Cannot gracefully stop thread " + getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
for (;;) {
|
||||||
|
Input in;
|
||||||
|
try {
|
||||||
|
in = input.take();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
log.error("Unexpected interrupt on " + getName());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (in == Input.END_THREAD) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Result r;
|
||||||
|
try {
|
||||||
|
r = new Result(IntraLineLoader.compute(in.key));
|
||||||
|
} catch (Exception error) {
|
||||||
|
r = new Result(error);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!result.offer(r)) {
|
||||||
|
log.error("Cannot return result from " + getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (ThreadDeath iHaveBeenShot) {
|
||||||
|
// Handle thread death by gracefully returning to the caller,
|
||||||
|
// allowing the thread to be destroyed.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class Input {
|
||||||
|
static final Input END_THREAD = new Input(null);
|
||||||
|
|
||||||
|
final IntraLineDiffKey key;
|
||||||
|
|
||||||
|
Input(IntraLineDiffKey key) {
|
||||||
|
this.key = key;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class Result {
|
||||||
|
static final Result TIMEOUT = new Result((IntraLineDiff) null);
|
||||||
|
|
||||||
|
final IntraLineDiff diff;
|
||||||
|
final Exception error;
|
||||||
|
|
||||||
|
Result(IntraLineDiff diff) {
|
||||||
|
this.diff = diff;
|
||||||
|
this.error = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
Result(Exception error) {
|
||||||
|
this.diff = null;
|
||||||
|
this.error = error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -39,6 +39,7 @@ import com.google.gerrit.server.git.ReceiveCommitsExecutorModule;
|
|||||||
import com.google.gerrit.server.git.WorkQueue;
|
import com.google.gerrit.server.git.WorkQueue;
|
||||||
import com.google.gerrit.server.mail.SignedTokenEmailTokenVerifier;
|
import com.google.gerrit.server.mail.SignedTokenEmailTokenVerifier;
|
||||||
import com.google.gerrit.server.mail.SmtpEmailSender;
|
import com.google.gerrit.server.mail.SmtpEmailSender;
|
||||||
|
import com.google.gerrit.server.patch.IntraLineWorkerPool;
|
||||||
import com.google.gerrit.server.plugins.PluginGuiceEnvironment;
|
import com.google.gerrit.server.plugins.PluginGuiceEnvironment;
|
||||||
import com.google.gerrit.server.plugins.PluginModule;
|
import com.google.gerrit.server.plugins.PluginModule;
|
||||||
import com.google.gerrit.server.schema.DataSourceModule;
|
import com.google.gerrit.server.schema.DataSourceModule;
|
||||||
@ -229,6 +230,7 @@ public class WebAppInitializer extends GuiceServletContextListener {
|
|||||||
modules.add(new WorkQueue.Module());
|
modules.add(new WorkQueue.Module());
|
||||||
modules.add(new ChangeHookRunner.Module());
|
modules.add(new ChangeHookRunner.Module());
|
||||||
modules.add(new ReceiveCommitsExecutorModule());
|
modules.add(new ReceiveCommitsExecutorModule());
|
||||||
|
modules.add(new IntraLineWorkerPool.Module());
|
||||||
modules.add(cfgInjector.getInstance(GerritGlobalModule.class));
|
modules.add(cfgInjector.getInstance(GerritGlobalModule.class));
|
||||||
modules.add(new DefaultCacheFactory.Module());
|
modules.add(new DefaultCacheFactory.Module());
|
||||||
modules.add(new SmtpEmailSender.Module());
|
modules.add(new SmtpEmailSender.Module());
|
||||||
|
Loading…
Reference in New Issue
Block a user