Avoid referencing gerrit specific Executor.

WorkQueue.Executor is a thin layer on top of
ScheduledThreadPoolExecutor. Since it is a class, rather than an
implementation, it is impossible to inject a wrapper around an
existing implementation.

This change is one step towards enabling this.

Change-Id: Id6b1d0fd10b5f8549cbc76a4867b3afb750aca02
This commit is contained in:
Han-Wen Nienhuys
2017-06-12 14:54:59 +02:00
parent 0c8a4ddebd
commit 40270c4bef
17 changed files with 56 additions and 45 deletions

View File

@@ -22,7 +22,6 @@ import static javax.servlet.http.HttpServletResponse.SC_SERVICE_UNAVAILABLE;
import com.google.gerrit.server.CurrentUser;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.git.QueueProvider;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.git.WorkQueue.CancelableRunnable;
import com.google.gerrit.sshd.CommandExecutorQueueProvider;
import com.google.inject.Inject;
@@ -30,6 +29,7 @@ import com.google.inject.Provider;
import com.google.inject.Singleton;
import com.google.inject.servlet.ServletModule;
import java.io.IOException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.servlet.Filter;
@@ -102,7 +102,7 @@ public class ProjectQoSFilter implements Filter {
final HttpServletResponse rsp = (HttpServletResponse) response;
final Continuation cont = ContinuationSupport.getContinuation(req);
WorkQueue.Executor executor = getExecutor();
ScheduledThreadPoolExecutor executor = getExecutor();
if (cont.isInitial()) {
TaskThunk task = new TaskThunk(executor, cont, req);
@@ -136,7 +136,7 @@ public class ProjectQoSFilter implements Filter {
}
}
private WorkQueue.Executor getExecutor() {
private ScheduledThreadPoolExecutor getExecutor() {
return queue.getQueue(user.get().getCapabilities().getQueueType());
}
@@ -148,14 +148,17 @@ public class ProjectQoSFilter implements Filter {
private final class TaskThunk implements CancelableRunnable, ContinuationListener {
private final WorkQueue.Executor executor;
private final ScheduledThreadPoolExecutor executor;
private final Continuation cont;
private final String name;
private final Object lock = new Object();
private boolean done;
private Thread worker;
TaskThunk(WorkQueue.Executor executor, Continuation cont, HttpServletRequest req) {
TaskThunk(
ScheduledThreadPoolExecutor executor,
Continuation cont,
HttpServletRequest req) {
this.executor = executor;
this.cont = cont;
this.name = generateName(req);

View File

@@ -17,7 +17,6 @@ package com.google.gerrit.server.git;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.server.config.ConfigUtil;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.git.WorkQueue.Executor;
import com.google.gerrit.server.project.ProjectControl;
import com.google.gerrit.server.util.RequestScopePropagator;
import com.google.inject.Inject;
@@ -30,6 +29,7 @@ import com.google.inject.name.Named;
import java.io.OutputStream;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.jgit.lib.Config;
import org.eclipse.jgit.lib.Repository;
@@ -125,7 +125,7 @@ public class AsyncReceiveCommits implements PreReceiveHook {
}
private final ReceiveCommits rc;
private final Executor executor;
private final ScheduledThreadPoolExecutor executor;
private final RequestScopePropagator scopePropagator;
private final MultiProgressMonitor progress;
private final long timeoutMillis;
@@ -133,7 +133,7 @@ public class AsyncReceiveCommits implements PreReceiveHook {
@Inject
AsyncReceiveCommits(
ReceiveCommits.Factory factory,
@ReceiveCommitsExecutor Executor executor,
@ReceiveCommitsExecutor ScheduledThreadPoolExecutor executor,
RequestScopePropagator scopePropagator,
@Named(TIMEOUT_NAME) long timeoutMillis,
@Assisted ProjectControl projectControl,

View File

@@ -14,11 +14,13 @@
package com.google.gerrit.server.git;
import java.util.concurrent.ScheduledThreadPoolExecutor;
public interface QueueProvider {
enum QueueType {
INTERACTIVE,
BATCH
}
WorkQueue.Executor getQueue(QueueType type);
ScheduledThreadPoolExecutor getQueue(QueueType type);
}

View File

@@ -18,8 +18,9 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.Retention;
import java.util.concurrent.ScheduledThreadPoolExecutor;
/** Marker on the global {@link WorkQueue.Executor} used by {@link ReceiveCommits}. */
/** Marker on the global {@link ScheduledThreadPoolExecutor} used by {@link ReceiveCommits}. */
@Retention(RUNTIME)
@BindingAnnotation
public @interface ReceiveCommitsExecutor {}

View File

@@ -24,6 +24,7 @@ import com.google.inject.Provides;
import com.google.inject.Singleton;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.jgit.lib.Config;
@@ -36,7 +37,7 @@ public class ReceiveCommitsExecutorModule extends AbstractModule {
@Provides
@Singleton
@ReceiveCommitsExecutor
public WorkQueue.Executor createReceiveCommitsExecutor(
public ScheduledThreadPoolExecutor createReceiveCommitsExecutor(
@GerritServerConfig Config config, WorkQueue queues) {
int poolSize =
config.getInt(

View File

@@ -18,8 +18,9 @@ import static java.lang.annotation.RetentionPolicy.RUNTIME;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.Retention;
import java.util.concurrent.ScheduledThreadPoolExecutor;
/** Marker on the global {@link WorkQueue.Executor} used to send email. */
/** Marker on the global {@link ScheduledThreadPoolExecutor} used to send email. */
@Retention(RUNTIME)
@BindingAnnotation
public @interface SendEmailExecutor {}

View File

@@ -82,7 +82,7 @@ public class WorkQueue {
}
};
private Executor defaultQueue;
private ScheduledThreadPoolExecutor defaultQueue;
private int defaultQueueSize;
private final IdGenerator idGenerator;
private final CopyOnWriteArrayList<Executor> queues;
@@ -99,7 +99,7 @@ public class WorkQueue {
}
/** Get the default work queue, for miscellaneous tasks. */
public synchronized Executor getDefaultQueue() {
public synchronized ScheduledThreadPoolExecutor getDefaultQueue() {
if (defaultQueue == null) {
defaultQueue = createQueue(defaultQueueSize, "WorkQueue");
}
@@ -107,7 +107,7 @@ public class WorkQueue {
}
/** Create a new executor queue. */
public Executor createQueue(int poolsize, String prefix) {
public ScheduledThreadPoolExecutor createQueue(int poolsize, String prefix) {
final Executor r = new Executor(poolsize, prefix);
r.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
r.setExecuteExistingDelayedTasksAfterShutdownPolicy(true);
@@ -150,7 +150,7 @@ public class WorkQueue {
return result;
}
public Executor getExecutor(String queueName) {
public ScheduledThreadPoolExecutor getExecutor(String queueName) {
for (Executor e : queues) {
if (e.queueName.equals(queueName)) {
return e;
@@ -175,7 +175,7 @@ public class WorkQueue {
}
/** An isolated queue. */
public class Executor extends ScheduledThreadPoolExecutor {
private class Executor extends ScheduledThreadPoolExecutor {
private final ConcurrentHashMap<Integer, Task<?>> all;
private final String queueName;
@@ -430,8 +430,8 @@ public class WorkQueue {
@Override
public String toString() {
//This is a workaround to be able to print a proper name when the task
//is wrapped into a TrustedListenableFutureTask.
// This is a workaround to be able to print a proper name when the task
// is wrapped into a TrustedListenableFutureTask.
try {
if (runnable
.getClass()

View File

@@ -29,6 +29,7 @@ import com.google.inject.Scope;
import com.google.inject.servlet.ServletScopes;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
/**
* Base class for propagating request-scoped data between threads.
@@ -64,10 +65,9 @@ public abstract class RequestScopePropagator {
* request state when the returned Callable is invoked. The method must be called in a request
* scope and the returned Callable may only be invoked in a thread that is not already in a
* request scope or is in the same request scope. The returned Callable will inherit toString()
* from the passed in Callable. A {@link com.google.gerrit.server.git.WorkQueue.Executor} does not
* accept a Callable, so there is no ProjectCallable implementation. Implementations of this
* method must be consistent with Guice's {@link ServletScopes#continueRequest(Callable,
* java.util.Map)}.
* from the passed in Callable. A {@link ScheduledThreadPoolExecutor} does not accept a Callable,
* so there is no ProjectCallable implementation. Implementations of this method must be
* consistent with Guice's {@link ServletScopes#continueRequest(Callable, java.util.Map)}.
*
* <p>There are some limitations:
*

View File

@@ -28,7 +28,6 @@ import com.google.gerrit.server.DynamicOptions;
import com.google.gerrit.server.IdentifiedUser;
import com.google.gerrit.server.RequestCleanup;
import com.google.gerrit.server.git.ProjectRunnable;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.git.WorkQueue.CancelableRunnable;
import com.google.gerrit.server.permissions.GlobalPermission;
import com.google.gerrit.server.permissions.PermissionBackend;
@@ -50,6 +49,7 @@ import java.io.PrintWriter;
import java.io.StringWriter;
import java.nio.charset.Charset;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.sshd.common.SshException;
import org.apache.sshd.server.Command;
@@ -85,7 +85,7 @@ public abstract class BaseCommand implements Command {
@Inject private RequestCleanup cleanup;
@Inject @CommandExecutor private WorkQueue.Executor executor;
@Inject @CommandExecutor private ScheduledThreadPoolExecutor executor;
@Inject private PermissionBackend permissionBackend;
@Inject private CurrentUser user;

View File

@@ -16,11 +16,11 @@ package com.google.gerrit.sshd;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import com.google.gerrit.server.git.WorkQueue.Executor;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.Retention;
import java.util.concurrent.ScheduledThreadPoolExecutor;
/** Marker on {@link Executor} used by SSH threads. */
/** Marker on {@link ScheduledThreadPoolExecutor} used by SSH threads. */
@Retention(RUNTIME)
@BindingAnnotation
public @interface CommandExecutor {}

View File

@@ -16,11 +16,11 @@ package com.google.gerrit.sshd;
import com.google.gerrit.server.CurrentUser;
import com.google.gerrit.server.git.QueueProvider;
import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
import com.google.inject.Provider;
import java.util.concurrent.ScheduledThreadPoolExecutor;
class CommandExecutorProvider implements Provider<WorkQueue.Executor> {
class CommandExecutorProvider implements Provider<ScheduledThreadPoolExecutor> {
private final QueueProvider queues;
private final CurrentUser user;
@@ -32,7 +32,7 @@ class CommandExecutorProvider implements Provider<WorkQueue.Executor> {
}
@Override
public WorkQueue.Executor get() {
public ScheduledThreadPoolExecutor get() {
return queues.getQueue(user.getCapabilities().getQueueType());
}
}

View File

@@ -19,6 +19,7 @@ import com.google.gerrit.server.config.ThreadSettingsConfig;
import com.google.gerrit.server.git.QueueProvider;
import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import org.eclipse.jgit.lib.Config;
@@ -26,8 +27,8 @@ public class CommandExecutorQueueProvider implements QueueProvider {
private int poolSize;
private final int batchThreads;
private final WorkQueue.Executor interactiveExecutor;
private final WorkQueue.Executor batchExecutor;
private final ScheduledThreadPoolExecutor interactiveExecutor;
private final ScheduledThreadPoolExecutor batchExecutor;
@Inject
public CommandExecutorQueueProvider(
@@ -51,7 +52,7 @@ public class CommandExecutorQueueProvider implements QueueProvider {
setThreadFactory(interactiveExecutor);
}
private void setThreadFactory(WorkQueue.Executor executor) {
private void setThreadFactory(ScheduledThreadPoolExecutor executor) {
final ThreadFactory parent = executor.getThreadFactory();
executor.setThreadFactory(
new ThreadFactory() {
@@ -65,7 +66,7 @@ public class CommandExecutorQueueProvider implements QueueProvider {
}
@Override
public WorkQueue.Executor getQueue(QueueType type) {
public ScheduledThreadPoolExecutor getQueue(QueueType type) {
switch (type) {
case INTERACTIVE:
return interactiveExecutor;

View File

@@ -26,7 +26,6 @@ import com.google.gerrit.server.config.GerritRequestModule;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.git.AsyncReceiveCommits;
import com.google.gerrit.server.git.QueueProvider;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.plugins.ModuleGenerator;
import com.google.gerrit.server.plugins.ReloadPluginListener;
import com.google.gerrit.server.plugins.StartPluginListener;
@@ -39,6 +38,7 @@ import com.google.inject.servlet.RequestScoped;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.sshd.server.CommandFactory;
import org.apache.sshd.server.auth.gss.GSSAuthenticator;
import org.apache.sshd.server.auth.pubkey.PublickeyAuthenticator;
@@ -77,7 +77,7 @@ public class SshModule extends LifecycleModule {
.toInstance(new DispatchCommandProvider(Commands.CMD_ROOT));
bind(CommandFactoryProvider.class);
bind(CommandFactory.class).toProvider(CommandFactoryProvider.class);
bind(WorkQueue.Executor.class)
bind(ScheduledThreadPoolExecutor.class)
.annotatedWith(StreamCommandExecutor.class)
.toProvider(StreamCommandExecutorProvider.class)
.in(SINGLETON);
@@ -126,7 +126,7 @@ public class SshModule extends LifecycleModule {
.toProvider(SshRemotePeerProvider.class)
.in(SshScope.REQUEST);
bind(WorkQueue.Executor.class)
bind(ScheduledThreadPoolExecutor.class)
.annotatedWith(CommandExecutor.class)
.toProvider(CommandExecutorProvider.class)
.in(SshScope.REQUEST);

View File

@@ -16,11 +16,11 @@ package com.google.gerrit.sshd;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import com.google.gerrit.server.git.WorkQueue.Executor;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.Retention;
import java.util.concurrent.ScheduledThreadPoolExecutor;
/** Marker on {@link Executor} used by delayed event streaming. */
/** Marker on {@link ScheduledThreadPoolExecutor} used by delayed event streaming. */
@Retention(RUNTIME)
@BindingAnnotation
public @interface StreamCommandExecutor {}

View File

@@ -18,10 +18,11 @@ import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.Inject;
import com.google.inject.Provider;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import org.eclipse.jgit.lib.Config;
class StreamCommandExecutorProvider implements Provider<WorkQueue.Executor> {
class StreamCommandExecutorProvider implements Provider<ScheduledThreadPoolExecutor> {
private final int poolSize;
private final WorkQueue queues;
@@ -33,8 +34,8 @@ class StreamCommandExecutorProvider implements Provider<WorkQueue.Executor> {
}
@Override
public WorkQueue.Executor get() {
final WorkQueue.Executor executor;
public ScheduledThreadPoolExecutor get() {
final ScheduledThreadPoolExecutor executor;
executor = queues.createQueue(poolSize, "SSH-Stream-Worker");

View File

@@ -38,6 +38,7 @@ import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.sshd.server.Environment;
import org.kohsuke.args4j.Option;
@@ -108,7 +109,7 @@ final class ShowQueue extends SshCommand {
if (groupByQueue) {
ListMultimap<String, TaskInfo> byQueue = byQueue(tasks);
for (String queueName : byQueue.keySet()) {
WorkQueue.Executor e = workQueue.getExecutor(queueName);
ScheduledThreadPoolExecutor e = workQueue.getExecutor(queueName);
stdout.print(String.format("Queue: %s\n", queueName));
print(byQueue.get(queueName), now, viewAll, e.getCorePoolSize());
}

View File

@@ -29,7 +29,6 @@ import com.google.gerrit.server.events.Event;
import com.google.gerrit.server.events.EventTypes;
import com.google.gerrit.server.events.ProjectNameKeySerializer;
import com.google.gerrit.server.events.SupplierSerializer;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.git.WorkQueue.CancelableRunnable;
import com.google.gerrit.sshd.BaseCommand;
import com.google.gerrit.sshd.CommandMetaData;
@@ -43,6 +42,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.sshd.server.Environment;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
@@ -71,7 +71,7 @@ final class StreamEvents extends BaseCommand {
@Inject private DynamicSet<UserScopedEventListener> eventListeners;
@Inject @StreamCommandExecutor private WorkQueue.Executor pool;
@Inject @StreamCommandExecutor private ScheduledThreadPoolExecutor pool;
/** Queue of events to stream to the connected user. */
private final LinkedBlockingQueue<Event> queue = new LinkedBlockingQueue<>(MAX_EVENTS);