Add a class for propagating request scoping to different threads
Request scopes are all implemented with ThreadLocals, which meant that RequestScoped data would not be available in any new threads created from the main request thread. Add a class to wrap Runnables and Callables so tasks that depend on request scoped objects can be inserted in a WorkQueue. Include ScopePropagator implementations for all request scope types. The GuiceRequestScopePropagator will forward materialized instances of CurrentUser, RemotePeer SocketAddress, and the CannonicalWebUrl. To support this, CanonicalWebUrlModule is not bound as a SINGLETON anymore and HttpIdentifiedUserProvider implementation now relies on CurrentUser, instead of the WebSession. This was needed because some fields in the HttpRequest object are cleared after the request is finished, but possibly before the propagated request context has been processed. The non guice request scopes do not have this problem, so they can reconstruct the needed objects in the propagated context. Change-Id: I586cd1c91727e2cb8abb166f23fc504e9949a944
This commit is contained in:

committed by
Colby Ranger

parent
2e66220190
commit
3e7ef3cc5f
@@ -22,16 +22,16 @@ import com.google.inject.Provider;
|
||||
import com.google.inject.ProvisionException;
|
||||
|
||||
class HttpIdentifiedUserProvider implements Provider<IdentifiedUser> {
|
||||
private final WebSession session;
|
||||
private final Provider<CurrentUser> currUserProvider;
|
||||
|
||||
@Inject
|
||||
HttpIdentifiedUserProvider(WebSession session) {
|
||||
this.session = session;
|
||||
HttpIdentifiedUserProvider(Provider<CurrentUser> currUserProvider) {
|
||||
this.currUserProvider = currUserProvider;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IdentifiedUser get() {
|
||||
CurrentUser user = session.getCurrentUser();
|
||||
CurrentUser user = currUserProvider.get();
|
||||
if (user instanceof IdentifiedUser) {
|
||||
return (IdentifiedUser) user;
|
||||
}
|
||||
|
@@ -36,6 +36,8 @@ import com.google.gerrit.server.config.FactoryModule;
|
||||
import com.google.gerrit.server.config.GerritRequestModule;
|
||||
import com.google.gerrit.server.contact.ContactStore;
|
||||
import com.google.gerrit.server.contact.ContactStoreProvider;
|
||||
import com.google.gerrit.server.util.GuiceRequestScopePropagator;
|
||||
import com.google.gerrit.server.util.RequestScopePropagator;
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Injector;
|
||||
@@ -76,6 +78,7 @@ public class WebModule extends FactoryModule {
|
||||
filter("/*").through(RequestCleanupFilter.class);
|
||||
}
|
||||
});
|
||||
bind(RequestScopePropagator.class).to(GuiceRequestScopePropagator.class);
|
||||
|
||||
if (wantSSL) {
|
||||
install(new RequireSslFilter.Module());
|
||||
|
@@ -14,8 +14,6 @@
|
||||
|
||||
package com.google.gerrit.server.config;
|
||||
|
||||
import static com.google.inject.Scopes.SINGLETON;
|
||||
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Provider;
|
||||
|
||||
@@ -31,7 +29,6 @@ public abstract class CanonicalWebUrlModule extends AbstractModule {
|
||||
// running in an HTTP environment.
|
||||
//
|
||||
final Class<? extends Provider<String>> provider = provider();
|
||||
bind(provider).in(SINGLETON);
|
||||
bind(String.class).annotatedWith(CanonicalWebUrl.class)
|
||||
.toProvider(provider);
|
||||
}
|
||||
|
@@ -24,6 +24,7 @@ import com.google.gerrit.server.IdentifiedUser;
|
||||
import com.google.gerrit.server.RemotePeer;
|
||||
import com.google.gerrit.server.config.GerritRequestModule;
|
||||
import com.google.gerrit.server.ssh.SshInfo;
|
||||
import com.google.gerrit.server.util.RequestScopePropagator;
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Injector;
|
||||
@@ -65,6 +66,8 @@ public class ChangeMergeQueue implements MergeQueue {
|
||||
@Override
|
||||
protected void configure() {
|
||||
bindScope(RequestScoped.class, PerThreadRequestScope.REQUEST);
|
||||
bind(RequestScopePropagator.class)
|
||||
.to(PerThreadRequestScope.Propagator.class);
|
||||
install(new GerritRequestModule());
|
||||
|
||||
bind(CurrentUser.class).to(IdentifiedUser.class);
|
||||
|
@@ -15,6 +15,7 @@
|
||||
package com.google.gerrit.server.git;
|
||||
|
||||
import com.google.gerrit.server.RequestCleanup;
|
||||
import com.google.gerrit.server.util.ThreadLocalRequestScopePropagator;
|
||||
import com.google.inject.Key;
|
||||
import com.google.inject.OutOfScopeException;
|
||||
import com.google.inject.Provider;
|
||||
@@ -24,10 +25,23 @@ import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
class PerThreadRequestScope {
|
||||
static class Propagator
|
||||
extends ThreadLocalRequestScopePropagator<PerThreadRequestScope> {
|
||||
Propagator() {
|
||||
super(REQUEST, current);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PerThreadRequestScope continuingContext(
|
||||
PerThreadRequestScope ctx) {
|
||||
return new PerThreadRequestScope();
|
||||
}
|
||||
}
|
||||
|
||||
private static final ThreadLocal<PerThreadRequestScope> current =
|
||||
new ThreadLocal<PerThreadRequestScope>();
|
||||
|
||||
private static PerThreadRequestScope getContext() {
|
||||
private static PerThreadRequestScope requireContext() {
|
||||
final PerThreadRequestScope ctx = current.get();
|
||||
if (ctx == null) {
|
||||
throw new OutOfScopeException("Not in command/request");
|
||||
@@ -45,7 +59,7 @@ class PerThreadRequestScope {
|
||||
public <T> Provider<T> scope(final Key<T> key, final Provider<T> creator) {
|
||||
return new Provider<T>() {
|
||||
public T get() {
|
||||
return getContext().get(key, creator);
|
||||
return requireContext().get(key, creator);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@@ -0,0 +1,84 @@
|
||||
// Copyright (C) 2012 The Android Open Source Project
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package com.google.gerrit.server.util;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.gerrit.server.CurrentUser;
|
||||
import com.google.gerrit.server.RemotePeer;
|
||||
import com.google.gerrit.server.config.CanonicalWebUrl;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Key;
|
||||
import com.google.inject.Provider;
|
||||
import com.google.inject.servlet.ServletScopes;
|
||||
import com.google.inject.util.Providers;
|
||||
import com.google.inject.util.Types;
|
||||
|
||||
import java.lang.reflect.ParameterizedType;
|
||||
import java.lang.reflect.Type;
|
||||
import java.net.SocketAddress;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
/** Propagator for Guice's built-in servlet scope. */
|
||||
public class GuiceRequestScopePropagator extends RequestScopePropagator {
|
||||
|
||||
private final Provider<String> urlProvider;
|
||||
private final Provider<SocketAddress> remotePeerProvider;
|
||||
private final Provider<CurrentUser> currentUserProvider;
|
||||
|
||||
@Inject
|
||||
GuiceRequestScopePropagator(
|
||||
@CanonicalWebUrl @Nullable Provider<String> urlProvider,
|
||||
@RemotePeer Provider<SocketAddress> remotePeerProvider,
|
||||
Provider<CurrentUser> currentUserProvider) {
|
||||
super(ServletScopes.REQUEST);
|
||||
this.urlProvider = urlProvider;
|
||||
this.remotePeerProvider = remotePeerProvider;
|
||||
this.currentUserProvider = currentUserProvider;
|
||||
}
|
||||
|
||||
/**
|
||||
* @see RequestScopePropagator#wrap(Callable)
|
||||
*/
|
||||
@Override
|
||||
protected <T> Callable<T> wrapImpl(Callable<T> callable) {
|
||||
Map<Key<?>, Object> seedMap = Maps.newHashMap();
|
||||
|
||||
// Request scopes appear to use specific keys in their map, instead of only
|
||||
// providers. Add bindings for both the key to the instance directly and the
|
||||
// provider to the instance to be safe.
|
||||
String url = urlProvider.get();
|
||||
seedMap.put(Key.get(typeOfProvider(String.class), CanonicalWebUrl.class),
|
||||
Providers.of(url));
|
||||
seedMap.put(Key.get(String.class, CanonicalWebUrl.class), url);
|
||||
|
||||
SocketAddress peer = remotePeerProvider.get();
|
||||
seedMap.put(Key.get(typeOfProvider(SocketAddress.class), RemotePeer.class),
|
||||
Providers.of(peer));
|
||||
seedMap.put(Key.get(SocketAddress.class, RemotePeer.class), peer);
|
||||
|
||||
CurrentUser user = currentUserProvider.get();
|
||||
seedMap.put(Key.get(typeOfProvider(CurrentUser.class)), Providers.of(user));
|
||||
seedMap.put(Key.get(CurrentUser.class), user);
|
||||
|
||||
return ServletScopes.continueRequest(callable, seedMap);
|
||||
}
|
||||
|
||||
private ParameterizedType typeOfProvider(Type type) {
|
||||
return Types.newParameterizedType(Provider.class, type);
|
||||
}
|
||||
}
|
@@ -0,0 +1,181 @@
|
||||
// Copyright (C) 2012 The Android Open Source Project
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package com.google.gerrit.server.util;
|
||||
|
||||
import com.google.gerrit.reviewdb.client.Project.NameKey;
|
||||
import com.google.gerrit.server.RequestCleanup;
|
||||
import com.google.gerrit.server.git.ProjectRunnable;
|
||||
import com.google.inject.Key;
|
||||
import com.google.inject.Provider;
|
||||
import com.google.inject.Scope;
|
||||
import com.google.inject.servlet.ServletScopes;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.Executors;
|
||||
|
||||
/**
|
||||
* Base class for propagating request-scoped data between threads.
|
||||
* <p>
|
||||
* Request scopes are typically linked to a {@link ThreadLocal}, which is only
|
||||
* available to the current thread. In order to allow background work involving
|
||||
* RequestScoped data, the ThreadLocal data must be copied from the request thread to
|
||||
* the new background thread.
|
||||
* <p>
|
||||
* Every type of RequestScope must provide an implementation of
|
||||
* RequestScopePropagator. See {@link #wrap(Callable)} for details on the
|
||||
* implementation, usage, and restrictions.
|
||||
*
|
||||
* @see ThreadLocalRequestScopePropagator
|
||||
*/
|
||||
public abstract class RequestScopePropagator {
|
||||
|
||||
private final Scope scope;
|
||||
|
||||
protected RequestScopePropagator(Scope scope) {
|
||||
this.scope = scope;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps callable in a new {@link Callable} that propagates the current
|
||||
* request state when the callable is invoked. The method must be called in a
|
||||
* request scope and the returned Callable may only be invoked in a thread
|
||||
* that is not already in a request scope. The returned Callable will inherit
|
||||
* toString() from the passed in Callable. A
|
||||
* {@link com.google.gerrit.server.git.WorkQueue.Executor} does not accept a
|
||||
* Callable, so there is no ProjectCallable implementation. Implementations of
|
||||
* this method must be consistent with Guice's
|
||||
* {@link ServletScopes#continueRequest(Callable, java.util.Map)}.
|
||||
* <p>
|
||||
* There are some limitations:
|
||||
* <ul>
|
||||
* <li>Derived objects (i.e. anything marked created in a request scope) will
|
||||
* not be transported.</li>
|
||||
* <li>State changes to the request scoped context after this method is called
|
||||
* will not be seen in the continued thread.</li>
|
||||
* </ul>
|
||||
*
|
||||
* @param callable the Callable to wrap.
|
||||
* @return a new Callable which will execute in the current request scope.
|
||||
*/
|
||||
public final <T> Callable<T> wrap(final Callable<T> callable) {
|
||||
final Callable<T> wrapped = wrapImpl(new Callable<T>() {
|
||||
@Override
|
||||
public T call() throws Exception {
|
||||
RequestCleanup cleanup = scope.scope(
|
||||
Key.get(RequestCleanup.class),
|
||||
new Provider<RequestCleanup>() {
|
||||
@Override
|
||||
public RequestCleanup get() {
|
||||
return new RequestCleanup();
|
||||
}
|
||||
}).get();
|
||||
|
||||
try {
|
||||
return callable.call();
|
||||
} finally {
|
||||
cleanup.run();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return new Callable<T>() {
|
||||
@Override
|
||||
public T call() throws Exception {
|
||||
return wrapped.call();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return callable.toString();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps runnable in a new {@link Runnable} that propagates the current
|
||||
* request state when the runnable is invoked. The method must be called in a
|
||||
* request scope and the returned Runnable may only be invoked in a thread
|
||||
* that is not already in a request scope. The returned Runnable will inherit
|
||||
* toString() from the passed in Runnable. Furthermore, if the passed runnable
|
||||
* is of type {@link ProjectRunnable}, the returned runnable will be of the
|
||||
* same type with the methods delegated.
|
||||
*
|
||||
* See {@link #wrap(Callable)} for details on implementation and usage.
|
||||
*
|
||||
* @param runnable the Runnable to wrap.
|
||||
* @return a new Runnable which will execute in the current request scope.
|
||||
*/
|
||||
public final Runnable wrap(final Runnable runnable) {
|
||||
final Callable<Object> wrapped = wrap(Executors.callable(runnable));
|
||||
|
||||
if (runnable instanceof ProjectRunnable) {
|
||||
return new ProjectRunnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
wrapped.call();
|
||||
} catch (RuntimeException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e); // Not possible.
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public NameKey getProjectNameKey() {
|
||||
return ((ProjectRunnable) runnable).getProjectNameKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRemoteName() {
|
||||
return ((ProjectRunnable) runnable).getRemoteName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasCustomizedPrint() {
|
||||
return ((ProjectRunnable) runnable).hasCustomizedPrint();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return runnable.toString();
|
||||
}
|
||||
};
|
||||
} else {
|
||||
return new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
wrapped.call();
|
||||
} catch (RuntimeException e) {
|
||||
throw e;
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e); // Not possible.
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return runnable.toString();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @see #wrap(Callable)
|
||||
*/
|
||||
protected abstract <T> Callable<T> wrapImpl(final Callable<T> callable);
|
||||
}
|
@@ -0,0 +1,82 @@
|
||||
// Copyright (C) 2012 The Android Open Source Project
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package com.google.gerrit.server.util;
|
||||
|
||||
import com.google.inject.OutOfScopeException;
|
||||
import com.google.inject.Scope;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
/**
|
||||
* {@link RequestScopePropagator} implementation for request scopes based on
|
||||
* a {@link ThreadLocal} context.
|
||||
*
|
||||
* @param <C> "context" type stored in the {@link ThreadLocal}.
|
||||
*/
|
||||
public abstract class ThreadLocalRequestScopePropagator<C>
|
||||
extends RequestScopePropagator {
|
||||
|
||||
private final ThreadLocal<C> threadLocal;
|
||||
|
||||
protected ThreadLocalRequestScopePropagator(Scope scope,
|
||||
ThreadLocal<C> threadLocal) {
|
||||
super(scope);
|
||||
this.threadLocal = threadLocal;
|
||||
}
|
||||
|
||||
/**
|
||||
* @see RequestScopePropagator#wrap(Callable)
|
||||
*/
|
||||
@Override
|
||||
protected final <T> Callable<T> wrapImpl(final Callable<T> callable) {
|
||||
final C ctx = continuingContext(requireContext());
|
||||
return new Callable<T>() {
|
||||
@Override
|
||||
public T call() throws Exception {
|
||||
if (threadLocal.get() != null) {
|
||||
// This is consistent with the Guice ServletScopes.continueRequest()
|
||||
// behavior.
|
||||
throw new IllegalStateException("Cannot continue request, "
|
||||
+ "thread already has request in progress. A new thread must "
|
||||
+ "be used to propagate the request scope context.");
|
||||
}
|
||||
|
||||
threadLocal.set(ctx);
|
||||
try {
|
||||
return callable.call();
|
||||
} finally {
|
||||
threadLocal.remove();
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private C requireContext() {
|
||||
C context = threadLocal.get();
|
||||
if (context == null) {
|
||||
throw new OutOfScopeException("Cannot access scoped object");
|
||||
}
|
||||
return context;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a new context object based on the passed in context that has no
|
||||
* request scoped objects initialized.
|
||||
*
|
||||
* @param ctx the context to continue.
|
||||
* @return a new context.
|
||||
*/
|
||||
protected abstract C continuingContext(C ctx);
|
||||
}
|
@@ -32,6 +32,7 @@ import com.google.gerrit.server.git.QueueProvider;
|
||||
import com.google.gerrit.server.git.WorkQueue;
|
||||
import com.google.gerrit.server.project.ProjectControl;
|
||||
import com.google.gerrit.server.ssh.SshInfo;
|
||||
import com.google.gerrit.server.util.RequestScopePropagator;
|
||||
import com.google.gerrit.sshd.args4j.AccountGroupIdHandler;
|
||||
import com.google.gerrit.sshd.args4j.AccountGroupUUIDHandler;
|
||||
import com.google.gerrit.sshd.args4j.AccountIdHandler;
|
||||
@@ -57,6 +58,7 @@ public class SshModule extends FactoryModule {
|
||||
@Override
|
||||
protected void configure() {
|
||||
bindScope(RequestScoped.class, SshScope.REQUEST);
|
||||
bind(RequestScopePropagator.class).to(SshScope.Propagator.class);
|
||||
|
||||
configureRequestScope();
|
||||
configureCmdLineParser();
|
||||
|
@@ -15,6 +15,7 @@
|
||||
package com.google.gerrit.sshd;
|
||||
|
||||
import com.google.gerrit.server.RequestCleanup;
|
||||
import com.google.gerrit.server.util.ThreadLocalRequestScopePropagator;
|
||||
import com.google.inject.Key;
|
||||
import com.google.inject.OutOfScopeException;
|
||||
import com.google.inject.Provider;
|
||||
@@ -93,21 +94,32 @@ class SshScope {
|
||||
static class ContextProvider implements Provider<Context> {
|
||||
@Override
|
||||
public Context get() {
|
||||
return getContext();
|
||||
return requireContext();
|
||||
}
|
||||
}
|
||||
|
||||
static class SshSessionProvider implements Provider<SshSession> {
|
||||
@Override
|
||||
public SshSession get() {
|
||||
return getContext().getSession();
|
||||
return requireContext().getSession();
|
||||
}
|
||||
}
|
||||
|
||||
static class Propagator extends ThreadLocalRequestScopePropagator<Context> {
|
||||
Propagator() {
|
||||
super(REQUEST, current);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Context continuingContext(Context ctx) {
|
||||
return ctx.subContext(ctx.getSession(), ctx.getCommandLine());
|
||||
}
|
||||
}
|
||||
|
||||
private static final ThreadLocal<Context> current =
|
||||
new ThreadLocal<Context>();
|
||||
|
||||
private static Context getContext() {
|
||||
private static Context requireContext() {
|
||||
final Context ctx = current.get();
|
||||
if (ctx == null) {
|
||||
throw new OutOfScopeException("Not in command/request");
|
||||
@@ -126,7 +138,7 @@ class SshScope {
|
||||
public <T> Provider<T> scope(final Key<T> key, final Provider<T> creator) {
|
||||
return new Provider<T>() {
|
||||
public T get() {
|
||||
return getContext().get(key, creator);
|
||||
return requireContext().get(key, creator);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
Reference in New Issue
Block a user