Queue smart HTTP requests alongside SSH requests
Because the server has a finite amount of memory available to service incoming requests we can't handle smart HTTP requests when they are immediately received. Instead park them using the Jetty continuation features and resume processing only when we have reserved space in the SSH command queue. By reusing the same queue as the SSH daemon we ensure that the two daemons will share the same resources and avoid doubling our memory footprint. Change-Id: I4734b1e36315b82157609e93345e301eda9b8a8f Signed-off-by: Shawn O. Pearce <sop@google.com>
This commit is contained in:
@@ -879,6 +879,29 @@ thread pool waiting for a worker thread to become available.
|
||||
+
|
||||
By default 50.
|
||||
|
||||
[[httpd.maxWait]]httpd.maxWait::
|
||||
+
|
||||
Maximum amount of time a client will wait to for an available
|
||||
thread to handle a project clone, fetch or push request over the
|
||||
smart HTTP transport.
|
||||
+
|
||||
Values should use common unit suffixes to express their setting:
|
||||
+
|
||||
* s, sec, second, seconds
|
||||
* m, min, minute, minutes
|
||||
* h, hr, hour, hours
|
||||
* d, day, days
|
||||
* w, week, weeks (`1 week` is treated as `7 days`)
|
||||
* mon, month, months (`1 month` is treated as `30 days`)
|
||||
* y, year, years (`1 year` is treated as `365 days`)
|
||||
|
||||
+
|
||||
If a unit suffix is not specified, `minutes` is assumed. If 0
|
||||
is supplied, the maximum age is infinite and connections will not
|
||||
abort until the client disconnects.
|
||||
+
|
||||
By default, 5 minutes.
|
||||
|
||||
|
||||
[[ldap]]Section ldap
|
||||
~~~~~~~~~~~~~~~~~~~~
|
||||
|
@@ -21,8 +21,9 @@ import com.google.gerrit.httpd.WebModule;
|
||||
import com.google.gerrit.lifecycle.LifecycleManager;
|
||||
import com.google.gerrit.pgm.http.jetty.JettyEnv;
|
||||
import com.google.gerrit.pgm.http.jetty.JettyModule;
|
||||
import com.google.gerrit.pgm.util.LogFileCompressor;
|
||||
import com.google.gerrit.pgm.http.jetty.ProjectQoSFilter;
|
||||
import com.google.gerrit.pgm.util.ErrorLogFile;
|
||||
import com.google.gerrit.pgm.util.LogFileCompressor;
|
||||
import com.google.gerrit.pgm.util.RuntimeShutdown;
|
||||
import com.google.gerrit.pgm.util.SiteProgram;
|
||||
import com.google.gerrit.server.config.AuthConfigModule;
|
||||
@@ -193,6 +194,7 @@ public class Daemon extends SiteProgram {
|
||||
|
||||
private Injector createWebInjector() {
|
||||
final List<Module> modules = new ArrayList<Module>();
|
||||
modules.add(sshInjector.getInstance(ProjectQoSFilter.Module.class));
|
||||
modules.add(sshInjector.getInstance(WebModule.class));
|
||||
return sysInjector.createChildInjector(modules);
|
||||
}
|
||||
|
@@ -0,0 +1,243 @@
|
||||
// Copyright (C) 2010 The Android Open Source Project
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package com.google.gerrit.pgm.http.jetty;
|
||||
|
||||
import static com.google.gerrit.server.config.ConfigUtil.getTimeUnit;
|
||||
import static java.util.concurrent.TimeUnit.MINUTES;
|
||||
import static javax.servlet.http.HttpServletResponse.SC_SERVICE_UNAVAILABLE;
|
||||
|
||||
import com.google.gerrit.server.CurrentUser;
|
||||
import com.google.gerrit.server.IdentifiedUser;
|
||||
import com.google.gerrit.server.config.GerritServerConfig;
|
||||
import com.google.gerrit.server.git.WorkQueue;
|
||||
import com.google.gerrit.server.git.WorkQueue.CancelableRunnable;
|
||||
import com.google.gerrit.sshd.CommandExecutor;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Provider;
|
||||
import com.google.inject.Singleton;
|
||||
import com.google.inject.servlet.ServletModule;
|
||||
|
||||
import org.eclipse.jetty.continuation.Continuation;
|
||||
import org.eclipse.jetty.continuation.ContinuationListener;
|
||||
import org.eclipse.jetty.continuation.ContinuationSupport;
|
||||
import org.eclipse.jgit.lib.Config;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import javax.servlet.Filter;
|
||||
import javax.servlet.FilterChain;
|
||||
import javax.servlet.FilterConfig;
|
||||
import javax.servlet.ServletContext;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.ServletRequest;
|
||||
import javax.servlet.ServletResponse;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
/**
|
||||
* Use Jetty continuations to defer execution until threads are available.
|
||||
* <p>
|
||||
* We actually schedule a task into the same execution queue as the SSH daemon
|
||||
* uses for command execution, and then park the web request in a continuation
|
||||
* until an execution thread is available. This ensures that the overall JVM
|
||||
* process doesn't exceed the configured limit on concurrent Git requests.
|
||||
* <p>
|
||||
* During Git request execution however we have to use the Jetty service thread,
|
||||
* not the thread from the SSH execution queue. Trying to complete the request
|
||||
* on the SSH execution queue caused Jetty's HTTP parser to crash, so we instead
|
||||
* block the SSH execution queue thread and ask Jetty to resume processing on
|
||||
* the web service thread.
|
||||
*/
|
||||
@Singleton
|
||||
public class ProjectQoSFilter implements Filter {
|
||||
private static final String ATT_SPACE = ProjectQoSFilter.class.getName();
|
||||
private static final String TASK = ATT_SPACE + "/TASK";
|
||||
private static final String CANCEL = ATT_SPACE + "/CANCEL";
|
||||
|
||||
private static final String FILTER_RE =
|
||||
"^/p/(.*)/(git-upload-pack|git-receive-pack)$";
|
||||
private static final Pattern URI_PATTERN = Pattern.compile(FILTER_RE);
|
||||
|
||||
public static class Module extends ServletModule {
|
||||
private final WorkQueue.Executor executor;
|
||||
|
||||
@Inject
|
||||
Module(@CommandExecutor final WorkQueue.Executor executor) {
|
||||
this.executor = executor;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void configureServlets() {
|
||||
bind(WorkQueue.Executor.class).annotatedWith(CommandExecutor.class)
|
||||
.toInstance(executor);
|
||||
filterRegex(FILTER_RE).through(ProjectQoSFilter.class);
|
||||
}
|
||||
}
|
||||
|
||||
private final Provider<CurrentUser> userProvider;
|
||||
private final WorkQueue.Executor executor;
|
||||
private final ServletContext context;
|
||||
private final long maxWait;
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Inject
|
||||
ProjectQoSFilter(final Provider<CurrentUser> userProvider,
|
||||
@CommandExecutor final WorkQueue.Executor executor,
|
||||
final ServletContext context, @GerritServerConfig final Config cfg) {
|
||||
this.userProvider = userProvider;
|
||||
this.executor = executor;
|
||||
this.context = context;
|
||||
this.maxWait = getTimeUnit(cfg, "httpd", null, "maxwait", 5, MINUTES);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doFilter(ServletRequest request, ServletResponse response,
|
||||
FilterChain chain) throws IOException, ServletException {
|
||||
final HttpServletRequest req = (HttpServletRequest) request;
|
||||
final HttpServletResponse rsp = (HttpServletResponse) response;
|
||||
final Continuation cont = ContinuationSupport.getContinuation(req);
|
||||
|
||||
if (cont.isInitial()) {
|
||||
TaskThunk task = new TaskThunk(cont, req);
|
||||
if (maxWait > 0) {
|
||||
cont.setTimeout(maxWait);
|
||||
}
|
||||
cont.suspend(rsp);
|
||||
cont.addContinuationListener(task);
|
||||
cont.setAttribute(TASK, task);
|
||||
executor.submit(task);
|
||||
|
||||
} else if (cont.isExpired()) {
|
||||
rsp.sendError(SC_SERVICE_UNAVAILABLE);
|
||||
|
||||
} else if (cont.isResumed() && cont.getAttribute(CANCEL) == Boolean.TRUE) {
|
||||
rsp.sendError(SC_SERVICE_UNAVAILABLE);
|
||||
|
||||
} else if (cont.isResumed()) {
|
||||
TaskThunk task = (TaskThunk) cont.getAttribute(TASK);
|
||||
try {
|
||||
task.begin(Thread.currentThread());
|
||||
chain.doFilter(req, rsp);
|
||||
} finally {
|
||||
task.end();
|
||||
Thread.interrupted();
|
||||
}
|
||||
|
||||
} else {
|
||||
context.log("Unexpected QoS continuation state, aborting request");
|
||||
rsp.sendError(SC_SERVICE_UNAVAILABLE);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(FilterConfig config) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
}
|
||||
|
||||
private final class TaskThunk implements CancelableRunnable,
|
||||
ContinuationListener {
|
||||
|
||||
private final Continuation cont;
|
||||
private final String name;
|
||||
private final Object lock = new Object();
|
||||
private boolean done;
|
||||
private Thread worker;
|
||||
|
||||
TaskThunk(final Continuation cont, final HttpServletRequest req) {
|
||||
this.cont = cont;
|
||||
this.name = generateName(req);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
cont.resume();
|
||||
|
||||
synchronized (lock) {
|
||||
while (!done) {
|
||||
try {
|
||||
lock.wait();
|
||||
} catch (InterruptedException e) {
|
||||
if (worker != null) {
|
||||
worker.interrupt();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void begin(Thread thread) {
|
||||
synchronized (lock) {
|
||||
worker = thread;
|
||||
}
|
||||
}
|
||||
|
||||
void end() {
|
||||
synchronized (lock) {
|
||||
worker = null;
|
||||
done = true;
|
||||
lock.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel() {
|
||||
cont.setAttribute(CANCEL, Boolean.TRUE);
|
||||
cont.resume();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete(Continuation self) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onTimeout(Continuation self) {
|
||||
executor.remove(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return name;
|
||||
}
|
||||
|
||||
private String generateName(HttpServletRequest req) {
|
||||
String userName = "";
|
||||
|
||||
CurrentUser who = userProvider.get();
|
||||
if (who instanceof IdentifiedUser) {
|
||||
String name = ((IdentifiedUser) who).getUserName();
|
||||
if (name != null && !name.isEmpty()) {
|
||||
userName = " (" + name + ")";
|
||||
}
|
||||
}
|
||||
|
||||
String uri = req.getServletPath();
|
||||
Matcher m = URI_PATTERN.matcher(uri);
|
||||
if (m.matches()) {
|
||||
String path = m.group(1);
|
||||
String cmd = m.group(2);
|
||||
return cmd + " " + path + userName;
|
||||
} else {
|
||||
return req.getMethod() + " " + uri + userName;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user