From 3b145b7c5cc66c320d2d69e7a970557cb7739fe3 Mon Sep 17 00:00:00 2001 From: Nasser Grainawi Date: Tue, 12 Nov 2019 17:25:10 -0800 Subject: [PATCH] Replace Jetty Continuation with AsyncContext Continuation is deprecated in Jetty 9.4 and removed in Jetty 10. Migrate ProjectQoSFilter to use javax.servlet.AsyncContext instead. Change-Id: I398fca0f75b675ad683a6d4c572ffb844c23e364 --- Documentation/licenses.txt | 1 - WORKSPACE | 6 - .../pgm/http/jetty/ProjectQoSFilter.java | 166 +++++++++++++----- lib/jetty/BUILD | 9 - 4 files changed, 121 insertions(+), 61 deletions(-) diff --git a/Documentation/licenses.txt b/Documentation/licenses.txt index ac25816441..18501b3e27 100644 --- a/Documentation/licenses.txt +++ b/Documentation/licenses.txt @@ -65,7 +65,6 @@ Apache2.0 * httpcomponents:httpcore * httpcomponents:httpcore-nio * jackson:jackson-core -* jetty:continuation * jetty:http * jetty:io * jetty:jmx diff --git a/WORKSPACE b/WORKSPACE index ac3d5e0afc..18a78a0770 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -953,12 +953,6 @@ maven_jar( sha1 = "f4c2654db1a55f0780acdfcee8bb98550f56ca70", ) -maven_jar( - name = "jetty-continuation", - artifact = "org.eclipse.jetty:jetty-continuation:" + JETTY_VERS, - sha1 = "3c421a3be5be5805e32b1a7f9c6046526524181d", -) - maven_jar( name = "jetty-http", artifact = "org.eclipse.jetty:jetty-http:" + JETTY_VERS, diff --git a/java/com/google/gerrit/pgm/http/jetty/ProjectQoSFilter.java b/java/com/google/gerrit/pgm/http/jetty/ProjectQoSFilter.java index 93542099f3..4f9d7e73e5 100644 --- a/java/com/google/gerrit/pgm/http/jetty/ProjectQoSFilter.java +++ b/java/com/google/gerrit/pgm/http/jetty/ProjectQoSFilter.java @@ -34,6 +34,10 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.regex.Matcher; import java.util.regex.Pattern; +import javax.servlet.AsyncContext; +import javax.servlet.AsyncEvent; +import javax.servlet.AsyncListener; +import javax.servlet.DispatcherType; import javax.servlet.Filter; import javax.servlet.FilterChain; import javax.servlet.FilterConfig; @@ -43,16 +47,13 @@ import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.eclipse.jetty.continuation.Continuation; -import org.eclipse.jetty.continuation.ContinuationListener; -import org.eclipse.jetty.continuation.ContinuationSupport; import org.eclipse.jgit.lib.Config; /** - * Use Jetty continuations to defer execution until threads are available. + * Use AsyncContexts to defer execution until threads are available. * *

We actually schedule a task into the same execution queue as the SSH daemon uses for command - * execution, and then park the web request in a continuation until an execution thread is + * execution, and then park the web request in an AsyncContext until an execution thread is * available. This ensures that the overall JVM process doesn't exceed the configured limit on * concurrent Git requests. * @@ -61,12 +62,10 @@ import org.eclipse.jgit.lib.Config; * Jetty's HTTP parser to crash, so we instead block the SSH execution queue thread and ask Jetty to * resume processing on the web service thread. */ -@SuppressWarnings("deprecation") @Singleton public class ProjectQoSFilter implements Filter { - private static final String ATT_SPACE = ProjectQoSFilter.class.getName(); - private static final String TASK = ATT_SPACE + "/TASK"; - private static final String CANCEL = ATT_SPACE + "/CANCEL"; + private static final String ATT_SPACE = ProjectQoSFilter.class.getName() + "/"; + private static final String TASK = ATT_SPACE + "TASK"; private static final String FILTER_RE = "^/(.*)/(git-upload-pack|git-receive-pack)$"; private static final Pattern URI_PATTERN = Pattern.compile(FILTER_RE); @@ -79,6 +78,59 @@ public class ProjectQoSFilter implements Filter { } } + public enum RequestState { + INITIAL, + SUSPENDED, + RESUMED, + CANCELED, + UNEXPECTED; + + private static final String CANCELED_ATT = ATT_SPACE + CANCELED; + private static final String SUSPENDED_ATT = ATT_SPACE + SUSPENDED; + private static final String RESUMED_ATT = ATT_SPACE + RESUMED; + + private void set(ServletRequest req) { + switch (this) { + case SUSPENDED: + req.setAttribute(SUSPENDED_ATT, true); + req.setAttribute(RESUMED_ATT, false); + break; + case CANCELED: + req.setAttribute(CANCELED_ATT, true); + break; + case RESUMED: + req.setAttribute(RESUMED_ATT, true); + break; + case INITIAL: + case UNEXPECTED: + default: + break; + } + } + + private static RequestState get(ServletRequest req) { + if (Boolean.FALSE.equals(req.getAttribute(RESUMED_ATT)) + && Boolean.TRUE.equals(req.getAttribute(SUSPENDED_ATT))) { + return SUSPENDED; + } + + if (req.getDispatcherType() != DispatcherType.ASYNC) { + return INITIAL; + } + + if (Boolean.TRUE.equals(req.getAttribute(RESUMED_ATT)) + && Boolean.TRUE.equals(req.getAttribute(CANCELED_ATT))) { + return CANCELED; + } + + if (Boolean.TRUE.equals(req.getAttribute(RESUMED_ATT))) { + return RESUMED; + } + + return UNEXPECTED; + } + } + private final AccountLimits.Factory limitsFactory; private final Provider user; private final QueueProvider queue; @@ -104,40 +156,50 @@ public class ProjectQoSFilter implements Filter { throws IOException, ServletException { final HttpServletRequest req = (HttpServletRequest) request; final HttpServletResponse rsp = (HttpServletResponse) response; - final Continuation cont = ContinuationSupport.getContinuation(req); - if (cont.isInitial()) { - TaskThunk task = new TaskThunk(cont, req); - if (maxWait > 0) { - cont.setTimeout(maxWait); - } - cont.suspend(rsp); - cont.setAttribute(TASK, task); + final TaskThunk task; - Future f = getExecutor().submit(task); - cont.addContinuationListener(new Listener(f)); - } else if (cont.isExpired()) { - rsp.sendError(SC_SERVICE_UNAVAILABLE); + switch (RequestState.get(request)) { + case INITIAL: + AsyncContext asyncContext = suspend(request); + task = new TaskThunk(asyncContext, req); + if (maxWait > 0) { + asyncContext.setTimeout(maxWait); + } - } else if (cont.isResumed() && cont.getAttribute(CANCEL) == Boolean.TRUE) { - rsp.sendError(SC_SERVICE_UNAVAILABLE); + request.setAttribute(TASK, task); - } else if (cont.isResumed()) { - TaskThunk task = (TaskThunk) cont.getAttribute(TASK); - try { - task.begin(Thread.currentThread()); - chain.doFilter(req, rsp); - } finally { - task.end(); - Thread.interrupted(); - } - - } else { - context.log("Unexpected QoS continuation state, aborting request"); - rsp.sendError(SC_SERVICE_UNAVAILABLE); + Future f = getExecutor().submit(task); + asyncContext.addListener(new Listener(f)); + break; + case CANCELED: + rsp.sendError(SC_SERVICE_UNAVAILABLE); + break; + case RESUMED: + task = (TaskThunk) request.getAttribute(TASK); + try { + task.begin(Thread.currentThread()); + chain.doFilter(req, rsp); + } finally { + task.end(); + Thread.interrupted(); + } + break; + case SUSPENDED: + case UNEXPECTED: + default: + context.log("Unexpected QoS state, aborting request"); + rsp.sendError(SC_SERVICE_UNAVAILABLE); + break; } } + private AsyncContext suspend(ServletRequest request) { + AsyncContext asyncContext = request.startAsync(); + RequestState.SUSPENDED.set(request); + return asyncContext; + } + private ScheduledThreadPoolExecutor getExecutor() { QueueProvider.QueueType qt = limitsFactory.create(user.get()).getQueueType(); return queue.getQueue(qt); @@ -149,7 +211,7 @@ public class ProjectQoSFilter implements Filter { @Override public void destroy() {} - private static final class Listener implements ContinuationListener { + private static final class Listener implements AsyncListener { final Future future; Listener(Future future) { @@ -157,29 +219,35 @@ public class ProjectQoSFilter implements Filter { } @Override - public void onComplete(Continuation self) {} + public void onComplete(AsyncEvent event) throws IOException {} @Override - public void onTimeout(Continuation self) { + public void onTimeout(AsyncEvent event) throws IOException { future.cancel(true); } + + @Override + public void onError(AsyncEvent event) throws IOException {} + + @Override + public void onStartAsync(AsyncEvent event) throws IOException {} } private final class TaskThunk implements CancelableRunnable { - private final Continuation cont; + private final AsyncContext asyncContext; private final String name; private final Object lock = new Object(); private boolean done; private Thread worker; - TaskThunk(Continuation cont, HttpServletRequest req) { - this.cont = cont; + TaskThunk(AsyncContext asyncContext, HttpServletRequest req) { + this.asyncContext = asyncContext; this.name = generateName(req); } @Override public void run() { - cont.resume(); + resume(); synchronized (lock) { while (!done) { @@ -212,8 +280,16 @@ public class ProjectQoSFilter implements Filter { @Override public void cancel() { - cont.setAttribute(CANCEL, Boolean.TRUE); - cont.resume(); + RequestState.CANCELED.set(asyncContext.getRequest()); + resume(); + } + + private void resume() { + ServletRequest req = asyncContext.getRequest(); + if (RequestState.SUSPENDED.equals(RequestState.get(req))) { + RequestState.RESUMED.set(req); + asyncContext.dispatch(); + } } @Override diff --git a/lib/jetty/BUILD b/lib/jetty/BUILD index 641738597d..fe07794b87 100644 --- a/lib/jetty/BUILD +++ b/lib/jetty/BUILD @@ -21,7 +21,6 @@ java_library( data = ["//lib:LICENSE-Apache2.0"], visibility = ["//visibility:public"], exports = [ - ":continuation", ":http", "@jetty-server//jar", ], @@ -32,19 +31,11 @@ java_library( data = ["//lib:LICENSE-Apache2.0"], visibility = ["//visibility:public"], exports = [ - ":continuation", ":http", "@jetty-jmx//jar", ], ) -java_library( - name = "continuation", - data = ["//lib:LICENSE-Apache2.0"], - visibility = ["//visibility:public"], - exports = ["@jetty-continuation//jar"], -) - java_library( name = "http", data = ["//lib:LICENSE-Apache2.0"],