Replace Jetty Continuation with AsyncContext

Continuation is deprecated in Jetty 9.4 and removed in Jetty 10. Migrate
ProjectQoSFilter to use javax.servlet.AsyncContext instead.

Change-Id: I398fca0f75b675ad683a6d4c572ffb844c23e364
This commit is contained in:
Nasser Grainawi
2019-11-12 17:25:10 -08:00
parent 12e2ef5963
commit 3b145b7c5c
4 changed files with 121 additions and 61 deletions

View File

@@ -65,7 +65,6 @@ Apache2.0
* httpcomponents:httpcore * httpcomponents:httpcore
* httpcomponents:httpcore-nio * httpcomponents:httpcore-nio
* jackson:jackson-core * jackson:jackson-core
* jetty:continuation
* jetty:http * jetty:http
* jetty:io * jetty:io
* jetty:jmx * jetty:jmx

View File

@@ -953,12 +953,6 @@ maven_jar(
sha1 = "f4c2654db1a55f0780acdfcee8bb98550f56ca70", sha1 = "f4c2654db1a55f0780acdfcee8bb98550f56ca70",
) )
maven_jar(
name = "jetty-continuation",
artifact = "org.eclipse.jetty:jetty-continuation:" + JETTY_VERS,
sha1 = "3c421a3be5be5805e32b1a7f9c6046526524181d",
)
maven_jar( maven_jar(
name = "jetty-http", name = "jetty-http",
artifact = "org.eclipse.jetty:jetty-http:" + JETTY_VERS, artifact = "org.eclipse.jetty:jetty-http:" + JETTY_VERS,

View File

@@ -34,6 +34,10 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.DispatcherType;
import javax.servlet.Filter; import javax.servlet.Filter;
import javax.servlet.FilterChain; import javax.servlet.FilterChain;
import javax.servlet.FilterConfig; import javax.servlet.FilterConfig;
@@ -43,16 +47,13 @@ import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse; import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.continuation.ContinuationListener;
import org.eclipse.jetty.continuation.ContinuationSupport;
import org.eclipse.jgit.lib.Config; import org.eclipse.jgit.lib.Config;
/** /**
* Use Jetty continuations to defer execution until threads are available. * Use AsyncContexts to defer execution until threads are available.
* *
* <p>We actually schedule a task into the same execution queue as the SSH daemon uses for command * <p>We actually schedule a task into the same execution queue as the SSH daemon uses for command
* execution, and then park the web request in a continuation until an execution thread is * execution, and then park the web request in an AsyncContext until an execution thread is
* available. This ensures that the overall JVM process doesn't exceed the configured limit on * available. This ensures that the overall JVM process doesn't exceed the configured limit on
* concurrent Git requests. * concurrent Git requests.
* *
@@ -61,12 +62,10 @@ import org.eclipse.jgit.lib.Config;
* Jetty's HTTP parser to crash, so we instead block the SSH execution queue thread and ask Jetty to * Jetty's HTTP parser to crash, so we instead block the SSH execution queue thread and ask Jetty to
* resume processing on the web service thread. * resume processing on the web service thread.
*/ */
@SuppressWarnings("deprecation")
@Singleton @Singleton
public class ProjectQoSFilter implements Filter { public class ProjectQoSFilter implements Filter {
private static final String ATT_SPACE = ProjectQoSFilter.class.getName(); private static final String ATT_SPACE = ProjectQoSFilter.class.getName() + "/";
private static final String TASK = ATT_SPACE + "/TASK"; private static final String TASK = ATT_SPACE + "TASK";
private static final String CANCEL = ATT_SPACE + "/CANCEL";
private static final String FILTER_RE = "^/(.*)/(git-upload-pack|git-receive-pack)$"; private static final String FILTER_RE = "^/(.*)/(git-upload-pack|git-receive-pack)$";
private static final Pattern URI_PATTERN = Pattern.compile(FILTER_RE); private static final Pattern URI_PATTERN = Pattern.compile(FILTER_RE);
@@ -79,6 +78,59 @@ public class ProjectQoSFilter implements Filter {
} }
} }
public enum RequestState {
INITIAL,
SUSPENDED,
RESUMED,
CANCELED,
UNEXPECTED;
private static final String CANCELED_ATT = ATT_SPACE + CANCELED;
private static final String SUSPENDED_ATT = ATT_SPACE + SUSPENDED;
private static final String RESUMED_ATT = ATT_SPACE + RESUMED;
private void set(ServletRequest req) {
switch (this) {
case SUSPENDED:
req.setAttribute(SUSPENDED_ATT, true);
req.setAttribute(RESUMED_ATT, false);
break;
case CANCELED:
req.setAttribute(CANCELED_ATT, true);
break;
case RESUMED:
req.setAttribute(RESUMED_ATT, true);
break;
case INITIAL:
case UNEXPECTED:
default:
break;
}
}
private static RequestState get(ServletRequest req) {
if (Boolean.FALSE.equals(req.getAttribute(RESUMED_ATT))
&& Boolean.TRUE.equals(req.getAttribute(SUSPENDED_ATT))) {
return SUSPENDED;
}
if (req.getDispatcherType() != DispatcherType.ASYNC) {
return INITIAL;
}
if (Boolean.TRUE.equals(req.getAttribute(RESUMED_ATT))
&& Boolean.TRUE.equals(req.getAttribute(CANCELED_ATT))) {
return CANCELED;
}
if (Boolean.TRUE.equals(req.getAttribute(RESUMED_ATT))) {
return RESUMED;
}
return UNEXPECTED;
}
}
private final AccountLimits.Factory limitsFactory; private final AccountLimits.Factory limitsFactory;
private final Provider<CurrentUser> user; private final Provider<CurrentUser> user;
private final QueueProvider queue; private final QueueProvider queue;
@@ -104,40 +156,50 @@ public class ProjectQoSFilter implements Filter {
throws IOException, ServletException { throws IOException, ServletException {
final HttpServletRequest req = (HttpServletRequest) request; final HttpServletRequest req = (HttpServletRequest) request;
final HttpServletResponse rsp = (HttpServletResponse) response; final HttpServletResponse rsp = (HttpServletResponse) response;
final Continuation cont = ContinuationSupport.getContinuation(req);
if (cont.isInitial()) { final TaskThunk task;
TaskThunk task = new TaskThunk(cont, req);
if (maxWait > 0) {
cont.setTimeout(maxWait);
}
cont.suspend(rsp);
cont.setAttribute(TASK, task);
Future<?> f = getExecutor().submit(task); switch (RequestState.get(request)) {
cont.addContinuationListener(new Listener(f)); case INITIAL:
} else if (cont.isExpired()) { AsyncContext asyncContext = suspend(request);
rsp.sendError(SC_SERVICE_UNAVAILABLE); task = new TaskThunk(asyncContext, req);
if (maxWait > 0) {
asyncContext.setTimeout(maxWait);
}
} else if (cont.isResumed() && cont.getAttribute(CANCEL) == Boolean.TRUE) { request.setAttribute(TASK, task);
rsp.sendError(SC_SERVICE_UNAVAILABLE);
} else if (cont.isResumed()) { Future<?> f = getExecutor().submit(task);
TaskThunk task = (TaskThunk) cont.getAttribute(TASK); asyncContext.addListener(new Listener(f));
try { break;
task.begin(Thread.currentThread()); case CANCELED:
chain.doFilter(req, rsp); rsp.sendError(SC_SERVICE_UNAVAILABLE);
} finally { break;
task.end(); case RESUMED:
Thread.interrupted(); task = (TaskThunk) request.getAttribute(TASK);
} try {
task.begin(Thread.currentThread());
} else { chain.doFilter(req, rsp);
context.log("Unexpected QoS continuation state, aborting request"); } finally {
rsp.sendError(SC_SERVICE_UNAVAILABLE); task.end();
Thread.interrupted();
}
break;
case SUSPENDED:
case UNEXPECTED:
default:
context.log("Unexpected QoS state, aborting request");
rsp.sendError(SC_SERVICE_UNAVAILABLE);
break;
} }
} }
private AsyncContext suspend(ServletRequest request) {
AsyncContext asyncContext = request.startAsync();
RequestState.SUSPENDED.set(request);
return asyncContext;
}
private ScheduledThreadPoolExecutor getExecutor() { private ScheduledThreadPoolExecutor getExecutor() {
QueueProvider.QueueType qt = limitsFactory.create(user.get()).getQueueType(); QueueProvider.QueueType qt = limitsFactory.create(user.get()).getQueueType();
return queue.getQueue(qt); return queue.getQueue(qt);
@@ -149,7 +211,7 @@ public class ProjectQoSFilter implements Filter {
@Override @Override
public void destroy() {} public void destroy() {}
private static final class Listener implements ContinuationListener { private static final class Listener implements AsyncListener {
final Future<?> future; final Future<?> future;
Listener(Future<?> future) { Listener(Future<?> future) {
@@ -157,29 +219,35 @@ public class ProjectQoSFilter implements Filter {
} }
@Override @Override
public void onComplete(Continuation self) {} public void onComplete(AsyncEvent event) throws IOException {}
@Override @Override
public void onTimeout(Continuation self) { public void onTimeout(AsyncEvent event) throws IOException {
future.cancel(true); future.cancel(true);
} }
@Override
public void onError(AsyncEvent event) throws IOException {}
@Override
public void onStartAsync(AsyncEvent event) throws IOException {}
} }
private final class TaskThunk implements CancelableRunnable { private final class TaskThunk implements CancelableRunnable {
private final Continuation cont; private final AsyncContext asyncContext;
private final String name; private final String name;
private final Object lock = new Object(); private final Object lock = new Object();
private boolean done; private boolean done;
private Thread worker; private Thread worker;
TaskThunk(Continuation cont, HttpServletRequest req) { TaskThunk(AsyncContext asyncContext, HttpServletRequest req) {
this.cont = cont; this.asyncContext = asyncContext;
this.name = generateName(req); this.name = generateName(req);
} }
@Override @Override
public void run() { public void run() {
cont.resume(); resume();
synchronized (lock) { synchronized (lock) {
while (!done) { while (!done) {
@@ -212,8 +280,16 @@ public class ProjectQoSFilter implements Filter {
@Override @Override
public void cancel() { public void cancel() {
cont.setAttribute(CANCEL, Boolean.TRUE); RequestState.CANCELED.set(asyncContext.getRequest());
cont.resume(); resume();
}
private void resume() {
ServletRequest req = asyncContext.getRequest();
if (RequestState.SUSPENDED.equals(RequestState.get(req))) {
RequestState.RESUMED.set(req);
asyncContext.dispatch();
}
} }
@Override @Override

View File

@@ -21,7 +21,6 @@ java_library(
data = ["//lib:LICENSE-Apache2.0"], data = ["//lib:LICENSE-Apache2.0"],
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
exports = [ exports = [
":continuation",
":http", ":http",
"@jetty-server//jar", "@jetty-server//jar",
], ],
@@ -32,19 +31,11 @@ java_library(
data = ["//lib:LICENSE-Apache2.0"], data = ["//lib:LICENSE-Apache2.0"],
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
exports = [ exports = [
":continuation",
":http", ":http",
"@jetty-jmx//jar", "@jetty-jmx//jar",
], ],
) )
java_library(
name = "continuation",
data = ["//lib:LICENSE-Apache2.0"],
visibility = ["//visibility:public"],
exports = ["@jetty-continuation//jar"],
)
java_library( java_library(
name = "http", name = "http",
data = ["//lib:LICENSE-Apache2.0"], data = ["//lib:LICENSE-Apache2.0"],