Add threadPriority argument to createQueue.

This stops ScheduledThreadPoolExecutor API usage from leaking beyond WorkQueue.

Change-Id: I9c7b141272a6d547c4f2c39c28a5ea60ada944d7
This commit is contained in:
Han-Wen Nienhuys 2017-06-13 17:48:36 +02:00
parent 008eb7c482
commit 4daf68c8b8
3 changed files with 23 additions and 39 deletions

View File

@ -108,11 +108,25 @@ public class WorkQueue {
/** Create a new executor queue. */
public ScheduledThreadPoolExecutor createQueue(int poolsize, String prefix) {
final Executor r = new Executor(poolsize, prefix);
r.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
r.setExecuteExistingDelayedTasksAfterShutdownPolicy(true);
queues.add(r);
return r;
return createQueue(poolsize, prefix, Thread.NORM_PRIORITY);
}
public ScheduledThreadPoolExecutor createQueue(int poolsize, String prefix, int threadPriority) {
Executor executor = new Executor(poolsize, prefix);
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(true);
queues.add(executor);
if (threadPriority != Thread.NORM_PRIORITY) {
ThreadFactory parent = executor.getThreadFactory();
executor.setThreadFactory(
task -> {
Thread t = parent.newThread(task);
t.setPriority(threadPriority);
return t;
});
}
return executor;
}
/** Get all of the tasks currently scheduled in any work queue. */

View File

@ -20,7 +20,6 @@ import com.google.gerrit.server.git.QueueProvider;
import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import org.eclipse.jgit.lib.Config;
public class CommandExecutorQueueProvider implements QueueProvider {
@ -42,27 +41,13 @@ public class CommandExecutorQueueProvider implements QueueProvider {
poolSize += batchThreads;
}
int interactiveThreads = Math.max(1, poolSize - batchThreads);
interactiveExecutor = queues.createQueue(interactiveThreads, "SSH-Interactive-Worker");
interactiveExecutor =
queues.createQueue(interactiveThreads, "SSH-Interactive-Worker", Thread.MIN_PRIORITY);
if (batchThreads != 0) {
batchExecutor = queues.createQueue(batchThreads, "SSH-Batch-Worker");
setThreadFactory(batchExecutor);
batchExecutor = queues.createQueue(batchThreads, "SSH-Batch-Worker", Thread.MIN_PRIORITY);
} else {
batchExecutor = interactiveExecutor;
}
setThreadFactory(interactiveExecutor);
}
private void setThreadFactory(ScheduledThreadPoolExecutor executor) {
final ThreadFactory parent = executor.getThreadFactory();
executor.setThreadFactory(
new ThreadFactory() {
@Override
public Thread newThread(Runnable task) {
final Thread t = parent.newThread(task);
t.setPriority(Thread.MIN_PRIORITY);
return t;
}
});
}
@Override

View File

@ -19,7 +19,6 @@ import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
import com.google.inject.Provider;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import org.eclipse.jgit.lib.Config;
class StreamCommandExecutorProvider implements Provider<ScheduledThreadPoolExecutor> {
@ -35,20 +34,6 @@ class StreamCommandExecutorProvider implements Provider<ScheduledThreadPoolExecu
@Override
public ScheduledThreadPoolExecutor get() {
final ScheduledThreadPoolExecutor executor;
executor = queues.createQueue(poolSize, "SSH-Stream-Worker");
final ThreadFactory parent = executor.getThreadFactory();
executor.setThreadFactory(
new ThreadFactory() {
@Override
public Thread newThread(Runnable task) {
final Thread t = parent.newThread(task);
t.setPriority(Thread.MIN_PRIORITY);
return t;
}
});
return executor;
return queues.createQueue(poolSize, "SSH-Stream-Worker", Thread.MIN_PRIORITY);
}
}