Merge "Cancel outstanding tasks with Future.cancel()"
This commit is contained in:
@@ -30,6 +30,8 @@ import com.google.inject.Provider;
|
||||
import com.google.inject.Singleton;
|
||||
import com.google.inject.servlet.ServletModule;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
@@ -104,18 +106,16 @@ public class ProjectQoSFilter implements Filter {
|
||||
final HttpServletResponse rsp = (HttpServletResponse) response;
|
||||
final Continuation cont = ContinuationSupport.getContinuation(req);
|
||||
|
||||
ScheduledThreadPoolExecutor executor = getExecutor();
|
||||
|
||||
if (cont.isInitial()) {
|
||||
TaskThunk task = new TaskThunk(executor, cont, req);
|
||||
TaskThunk task = new TaskThunk(cont, req);
|
||||
if (maxWait > 0) {
|
||||
cont.setTimeout(maxWait);
|
||||
}
|
||||
cont.suspend(rsp);
|
||||
cont.addContinuationListener(task);
|
||||
cont.setAttribute(TASK, task);
|
||||
executor.submit(task);
|
||||
|
||||
Future f = getExecutor().submit(task);
|
||||
cont.addContinuationListener(new Listener(f));
|
||||
} else if (cont.isExpired()) {
|
||||
rsp.sendError(SC_SERVICE_UNAVAILABLE);
|
||||
|
||||
@@ -149,17 +149,31 @@ public class ProjectQoSFilter implements Filter {
|
||||
@Override
|
||||
public void destroy() {}
|
||||
|
||||
private final class TaskThunk implements CancelableRunnable, ContinuationListener {
|
||||
private final class Listener implements ContinuationListener {
|
||||
final Future future;
|
||||
|
||||
private final ScheduledThreadPoolExecutor executor;
|
||||
Listener(Future future) {
|
||||
this.future = future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete(Continuation self) {}
|
||||
|
||||
@Override
|
||||
public void onTimeout(Continuation self) {
|
||||
future.cancel(true);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private final class TaskThunk implements CancelableRunnable {
|
||||
private final Continuation cont;
|
||||
private final String name;
|
||||
private final Object lock = new Object();
|
||||
private boolean done;
|
||||
private Thread worker;
|
||||
|
||||
TaskThunk(ScheduledThreadPoolExecutor executor, Continuation cont, HttpServletRequest req) {
|
||||
this.executor = executor;
|
||||
TaskThunk(Continuation cont, HttpServletRequest req) {
|
||||
this.cont = cont;
|
||||
this.name = generateName(req);
|
||||
}
|
||||
@@ -203,14 +217,6 @@ public class ProjectQoSFilter implements Filter {
|
||||
cont.resume();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onComplete(Continuation self) {}
|
||||
|
||||
@Override
|
||||
public void onTimeout(Continuation self) {
|
||||
executor.remove(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return name;
|
||||
|
||||
Reference in New Issue
Block a user