Enqueue SSH commands through a thread pool

We now use a thread pool to queue up and run received SSH commands.
This way the server has a maximum number of threads it will permit
to execute on the CPUs at once, and over-subscription is impossible
as additional work requests will be queued up and serviced only
when a thread becomes available.

We seed the SSH thread pool at 1.5x the number of CPUs in the system,
under the assumption that the threads may wind up blocking for IO
and therefore would be able to yield and share the CPUs.  This can
be changed by the administrator with the sshd.threads parameter.

Threads in the SSH thread pool run at MIN_PRIORITY so they get
pushed into the background relative to HTTP service threads.
The SSH transport tends to be for larger bulk operations with longer
executions, so it should take a back seat to HTTP requests which
we want to service in sub 200 millisecond times.

To aid debugging a worker thread in the SSH thread pool changes
its name to the command it is executing.  This makes stack dumps
easier to inspect when looking for deadlock.

Admin-only commands always bypass the thread pool and execute on
their own unique thread.  These commands tend to be executed a lot
less often then user commands, and may be used by an administrator
to inspect current server state.  We don't want them to block and
wait for a thread to become available.

Bug: issue 320
Change-Id: I4060182d0662594c0caea28b6915df1d3740aa19
Signed-off-by: Shawn O. Pearce <sop@google.com>
This commit is contained in:
Shawn O. Pearce
2009-11-19 17:37:10 -08:00
parent 7029fc15df
commit 1a4580be30
5 changed files with 187 additions and 47 deletions

View File

@@ -16,6 +16,7 @@ package com.google.gerrit.sshd;
import com.google.gerrit.reviewdb.Account;
import com.google.gerrit.server.RequestCleanup;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.project.NoSuchChangeException;
import com.google.gerrit.server.project.NoSuchProjectException;
import com.google.gerrit.sshd.SshScopes.Context;
@@ -43,6 +44,7 @@ import java.io.StringWriter;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
public abstract class BaseCommand implements Command {
private static final Logger log = LoggerFactory.getLogger(BaseCommand.class);
@@ -63,6 +65,13 @@ public abstract class BaseCommand implements Command {
@Inject
private RequestCleanup cleanup;
@Inject
@CommandExecutor
private WorkQueue.Executor executor;
/** The task, as scheduled on a worker thread. */
private Future<?> task;
/** Text of the command line which lead up to invoking this instance. */
protected String commandPrefix = "";
@@ -104,6 +113,9 @@ public abstract class BaseCommand implements Command {
@Override
public void destroy() {
if (task != null && !task.isDone()) {
task.cancel(true);
}
}
/**
@@ -243,46 +255,27 @@ public abstract class BaseCommand implements Command {
* @param thunk the runnable to execute on the thread, performing the
* command's logic.
*/
protected void startThread(final CommandRunnable thunk) {
final Context context = SshScopes.getContext();
final List<Command> active = context.session.getAttribute(SshUtil.ACTIVE);
final Command cmd = this;
new Thread(threadName()) {
@Override
public void run() {
int rc = 0;
try {
synchronized (active) {
active.add(cmd);
}
SshScopes.current.set(context);
try {
thunk.run();
} catch (NoSuchProjectException e) {
throw new UnloggedFailure(1, e.getMessage() + " no such project");
} catch (NoSuchChangeException e) {
throw new UnloggedFailure(1, e.getMessage() + " no such change");
}
out.flush();
err.flush();
} catch (Throwable e) {
try {
out.flush();
} catch (Throwable e2) {
}
try {
err.flush();
} catch (Throwable e2) {
}
rc = handleError(e);
} finally {
synchronized (active) {
active.remove(cmd);
}
onExit(rc);
}
}
}.start();
protected synchronized void startThread(final CommandRunnable thunk) {
final List<Command> active =
SshScopes.getContext().session.getAttribute(SshUtil.ACTIVE);
synchronized (active) {
active.add(BaseCommand.this);
}
final TaskThunk tt = new TaskThunk(thunk);
if (isAdminCommand()) {
// Admin commands should not block the main work threads (there
// might be an interactive shell there), nor should they wait
// for the main work threads.
//
new Thread(tt, tt.toString()).start();
} else {
task = executor.submit(tt);
}
}
private final boolean isAdminCommand() {
return getClass().getAnnotation(AdminCommand.class) != null;
}
/**
@@ -311,13 +304,6 @@ public abstract class BaseCommand implements Command {
}
}
private String threadName() {
final ServerSession session = SshScopes.getContext().session;
final String who = session.getUsername();
final Account.Id id = session.getAttribute(SshUtil.CURRENT_ACCOUNT);
return "SSH " + getFullCommandLine() + " / " + who + " " + id;
}
private int handleError(final Throwable e) {
if (e.getClass() == IOException.class
&& "Pipe closed".equals(e.getMessage())) {
@@ -388,6 +374,65 @@ public abstract class BaseCommand implements Command {
return commandPrefix + " " + commandLine;
}
private final class TaskThunk implements Runnable {
private final CommandRunnable thunk;
private final Context context;
private TaskThunk(final CommandRunnable thunk) {
this.thunk = thunk;
this.context = SshScopes.getContext();
}
@Override
public void run() {
final Thread thisThread = Thread.currentThread();
final String thisName = thisThread.getName();
int rc = 0;
try {
thisThread.setName(toString());
SshScopes.current.set(context);
try {
thunk.run();
} catch (NoSuchProjectException e) {
throw new UnloggedFailure(1, e.getMessage() + " no such project");
} catch (NoSuchChangeException e) {
throw new UnloggedFailure(1, e.getMessage() + " no such change");
}
out.flush();
err.flush();
} catch (Throwable e) {
try {
out.flush();
} catch (Throwable e2) {
}
try {
err.flush();
} catch (Throwable e2) {
}
rc = handleError(e);
} finally {
try {
List<Command> active = context.session.getAttribute(SshUtil.ACTIVE);
synchronized (active) {
active.remove(BaseCommand.this);
}
onExit(rc);
} finally {
SshScopes.current.set(null);
thisThread.setName(thisName);
}
}
}
@Override
public String toString() {
final ServerSession session = context.session;
final String who = session.getUsername();
final Account.Id id = session.getAttribute(SshUtil.CURRENT_ACCOUNT);
return "SSH " + getFullCommandLine() + " / " + who + " " + id;
}
}
/** Runnable function which can throw an exception. */
public static interface CommandRunnable {
public void run() throws Exception;