Fix copying of LoggingContext to background threads

We used a custom ThreadFactory to copy the LoggingContext to newly
created threads, but this is not sufficient since the new thread may be
cached in a thread pool and then be reused to execute further tasks. We
must copy the LoggingContext each time a task is executed in a
background thread, not only when the background thread is created.

To copy the LoggingContext when a Runnable or Callable is executed in
the background we implement LoggingContext aware wrappers for them.

We use 2 ways for executing tasks in background threads:

1. ExecutorService/ScheduledExecutorService
2. WorkQueue.Executor (custom executor)

To ensure the copying of the LoggingContext when a task is executed in
the background we now do:

1. Wrap each ExecutorService/ScheduledExecutorService with a wrapper
   that wraps each Runnable/Callable that is passed in with a
   LoggingContext aware wrapper.

2. Wrap each Runnable/Callable that is passed into WorkQueue.Executor
   with a LoggingContext aware wrapper.

For WorkQueue.Executor we would ideally wrap the Runnable/Callable in
decorateTask but the Runnable/Callable that is being executed is
contained in a RunnableScheduledFuture and we cannot access it
(decorateTask in ScheduledThreadPoolExecutor from which we inherit
ignores the passed in Runnable and only the passed in
RunnableScheduledFuture is relevant). Also overriding the
newTaskFor(Runnable)/newTaskFor(Callable) methods does not work since
ScheduledThreadPoolExecutor is creating tasks directly without invoking
any newTaskFor method.

Change-Id: I106dcdf6478c58dfa6fe1d7952a87aa16ead1a93
Signed-off-by: Edwin Kempin <ekempin@google.com>
This commit is contained in:
Edwin Kempin
2018-08-31 21:54:13 +02:00
parent 7f0127a3bd
commit f59c09b924
19 changed files with 629 additions and 268 deletions

View File

@@ -39,7 +39,8 @@ import com.google.gerrit.index.query.DataSource;
import com.google.gerrit.index.query.FieldBundle;
import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.index.IndexUtils;
import com.google.gerrit.server.logging.LoggingContextAwareThreadFactory;
import com.google.gerrit.server.logging.LoggingContextAwareExecutorService;
import com.google.gerrit.server.logging.LoggingContextAwareScheduledExecutorService;
import com.google.gwtorm.server.OrmException;
import com.google.gwtorm.server.ResultSet;
import java.io.IOException;
@@ -55,6 +56,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -100,7 +102,7 @@ public abstract class AbstractLuceneIndex<K, V> implements Index<K, V> {
private final ReferenceManager<IndexSearcher> searcherManager;
private final ControlledRealTimeReopenThread<IndexSearcher> reopenThread;
private final Set<NrtFuture> notDoneNrtFutures;
private ScheduledThreadPoolExecutor autoCommitExecutor;
private ScheduledExecutorService autoCommitExecutor;
AbstractLuceneIndex(
Schema<V> schema,
@@ -129,13 +131,13 @@ public abstract class AbstractLuceneIndex<K, V> implements Index<K, V> {
delegateWriter = autoCommitWriter;
autoCommitExecutor =
new ScheduledThreadPoolExecutor(
1,
new ThreadFactoryBuilder()
.setThreadFactory(new LoggingContextAwareThreadFactory())
.setNameFormat(index + " Commit-%d")
.setDaemon(true)
.build());
new LoggingContextAwareScheduledExecutorService(
new ScheduledThreadPoolExecutor(
1,
new ThreadFactoryBuilder()
.setNameFormat(index + " Commit-%d")
.setDaemon(true)
.build()));
@SuppressWarnings("unused") // Error handling within Runnable.
Future<?> possiblyIgnoredError =
autoCommitExecutor.scheduleAtFixedRate(
@@ -170,13 +172,13 @@ public abstract class AbstractLuceneIndex<K, V> implements Index<K, V> {
writerThread =
MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(
1,
new ThreadFactoryBuilder()
.setThreadFactory(new LoggingContextAwareThreadFactory())
.setNameFormat(index + " Write-%d")
.setDaemon(true)
.build()));
new LoggingContextAwareExecutorService(
Executors.newFixedThreadPool(
1,
new ThreadFactoryBuilder()
.setNameFormat(index + " Write-%d")
.setDaemon(true)
.build())));
reopenThread =
new ControlledRealTimeReopenThread<>(

View File

@@ -28,7 +28,8 @@ import com.google.gerrit.server.cache.h2.H2CacheImpl.SqlStore;
import com.google.gerrit.server.cache.h2.H2CacheImpl.ValueHolder;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.logging.LoggingContextAwareThreadFactory;
import com.google.gerrit.server.logging.LoggingContextAwareExecutorService;
import com.google.gerrit.server.logging.LoggingContextAwareScheduledExecutorService;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
@@ -75,20 +76,17 @@ class H2CacheFactory implements PersistentCacheFactory, LifecycleListener {
if (cacheDir != null) {
executor =
Executors.newFixedThreadPool(
1,
new ThreadFactoryBuilder()
.setThreadFactory(new LoggingContextAwareThreadFactory())
.setNameFormat("DiskCache-Store-%d")
.build());
new LoggingContextAwareExecutorService(
Executors.newFixedThreadPool(
1, new ThreadFactoryBuilder().setNameFormat("DiskCache-Store-%d").build()));
cleanup =
Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setThreadFactory(new LoggingContextAwareThreadFactory())
.setNameFormat("DiskCache-Prune-%d")
.setDaemon(true)
.build());
new LoggingContextAwareScheduledExecutorService(
Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setNameFormat("DiskCache-Prune-%d")
.setDaemon(true)
.build()));
} else {
executor = null;
cleanup = null;

View File

@@ -19,7 +19,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gerrit.server.FanOutExecutor;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.logging.LoggingContextAwareThreadFactory;
import com.google.gerrit.server.logging.LoggingContextAwareExecutorService;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
@@ -83,18 +83,18 @@ public class SysExecutorModule extends AbstractModule {
return MoreExecutors.newDirectExecutorService();
}
return MoreExecutors.listeningDecorator(
MoreExecutors.getExitingExecutorService(
new ThreadPoolExecutor(
1,
poolSize,
10,
TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(poolSize),
new ThreadFactoryBuilder()
.setThreadFactory(new LoggingContextAwareThreadFactory())
.setNameFormat("ChangeUpdate-%d")
.setDaemon(true)
.build(),
new ThreadPoolExecutor.CallerRunsPolicy())));
new LoggingContextAwareExecutorService(
MoreExecutors.getExitingExecutorService(
new ThreadPoolExecutor(
1,
poolSize,
10,
TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(poolSize),
new ThreadFactoryBuilder()
.setNameFormat("ChangeUpdate-%d")
.setDaemon(true)
.build(),
new ThreadPoolExecutor.CallerRunsPolicy()))));
}
}

View File

@@ -14,6 +14,8 @@
package com.google.gerrit.server.git;
import static java.util.stream.Collectors.toList;
import com.google.common.base.CaseFormat;
import com.google.common.base.Supplier;
import com.google.common.flogger.FluentLogger;
@@ -24,7 +26,8 @@ import com.google.gerrit.metrics.MetricMaker;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.config.ScheduleConfig.Schedule;
import com.google.gerrit.server.logging.LoggingContextAwareThreadFactory;
import com.google.gerrit.server.logging.LoggingContext;
import com.google.gerrit.server.logging.LoggingContextAwareRunnable;
import com.google.gerrit.server.util.IdGenerator;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -43,6 +46,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableScheduledFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -166,12 +170,11 @@ public class WorkQueue {
if (threadPriority != Thread.NORM_PRIORITY) {
ThreadFactory parent = executor.getThreadFactory();
executor.setThreadFactory(
new LoggingContextAwareThreadFactory(
task -> {
Thread t = parent.newThread(task);
t.setPriority(threadPriority);
return t;
}));
task -> {
Thread t = parent.newThread(task);
t.setPriority(threadPriority);
return t;
});
}
return executor;
@@ -253,19 +256,18 @@ public class WorkQueue {
Executor(int corePoolSize, final String queueName) {
super(
corePoolSize,
new LoggingContextAwareThreadFactory(
new ThreadFactory() {
private final ThreadFactory parent = Executors.defaultThreadFactory();
private final AtomicInteger tid = new AtomicInteger(1);
new ThreadFactory() {
private final ThreadFactory parent = Executors.defaultThreadFactory();
private final AtomicInteger tid = new AtomicInteger(1);
@Override
public Thread newThread(Runnable task) {
final Thread t = parent.newThread(task);
t.setName(queueName + "-" + tid.getAndIncrement());
t.setUncaughtExceptionHandler(LOG_UNCAUGHT_EXCEPTION);
return t;
}
}));
@Override
public Thread newThread(Runnable task) {
final Thread t = parent.newThread(task);
t.setName(queueName + "-" + tid.getAndIncrement());
t.setUncaughtExceptionHandler(LOG_UNCAUGHT_EXCEPTION);
return t;
}
});
all =
new ConcurrentHashMap<>( //
@@ -276,6 +278,75 @@ public class WorkQueue {
this.queueName = queueName;
}
@Override
public void execute(Runnable command) {
super.execute(LoggingContext.copy(command));
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return super.submit(LoggingContext.copy(task));
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return super.submit(LoggingContext.copy(task), result);
}
@Override
public Future<?> submit(Runnable task) {
return super.submit(LoggingContext.copy(task));
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return super.invokeAll(tasks.stream().map(LoggingContext::copy).collect(toList()));
}
@Override
public <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return super.invokeAll(
tasks.stream().map(LoggingContext::copy).collect(toList()), timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return super.invokeAny(tasks.stream().map(LoggingContext::copy).collect(toList()));
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return super.invokeAny(
tasks.stream().map(LoggingContext::copy).collect(toList()), timeout, unit);
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return super.schedule(LoggingContext.copy(command), delay, unit);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return super.schedule(LoggingContext.copy(callable), delay, unit);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command, long initialDelay, long period, TimeUnit unit) {
return super.scheduleAtFixedRate(LoggingContext.copy(command), initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command, long initialDelay, long delay, TimeUnit unit) {
return super.scheduleWithFixedDelay(LoggingContext.copy(command), initialDelay, delay, unit);
}
@Override
protected void terminated() {
super.terminated();
@@ -370,6 +441,10 @@ public class WorkQueue {
Task<V> task;
if (runnable instanceof LoggingContextAwareRunnable) {
runnable = ((LoggingContextAwareRunnable) runnable).unwrap();
}
if (runnable instanceof ProjectRunnable) {
task = new ProjectTask<>((ProjectRunnable) runnable, r, this, id);
} else {

View File

@@ -16,6 +16,7 @@ package com.google.gerrit.server.logging;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.flogger.backend.Tags;
import java.util.concurrent.Callable;
import java.util.logging.Level;
/**
@@ -42,6 +43,20 @@ public class LoggingContext extends com.google.common.flogger.backend.system.Log
return INSTANCE;
}
public static Runnable copy(Runnable runnable) {
if (runnable instanceof LoggingContextAwareRunnable) {
return runnable;
}
return new LoggingContextAwareRunnable(runnable);
}
public static <T> Callable<T> copy(Callable<T> callable) {
if (callable instanceof LoggingContextAwareCallable) {
return callable;
}
return new LoggingContextAwareCallable<>(callable);
}
@Override
public boolean shouldForceLogging(String loggerName, Level level, boolean isEnabled) {
return isLoggingForced();

View File

@@ -0,0 +1,66 @@
// Copyright (C) 2018 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.logging;
import com.google.common.collect.ImmutableSetMultimap;
import java.util.concurrent.Callable;
/**
* Wrapper for a {@link Callable} that copies the {@link LoggingContext} from the current thread to
* the thread that executes the callable.
*
* <p>The state of the logging context that is copied to the thread that executes the callable is
* fixed at the creation time of this wrapper. If the callable is submitted to an executor and is
* executed later this means that changes that are done to the logging context in between creating
* and executing the callable do not apply.
*
* <p>See {@link LoggingContextAwareRunnable} for an example.
*
* @see LoggingContextAwareRunnable
*/
class LoggingContextAwareCallable<T> implements Callable<T> {
private final Callable<T> callable;
private final Thread callingThread;
private final ImmutableSetMultimap<String, String> tags;
private final boolean forceLogging;
LoggingContextAwareCallable(Callable<T> callable) {
this.callable = callable;
this.callingThread = Thread.currentThread();
this.tags = LoggingContext.getInstance().getTagsAsMap();
this.forceLogging = LoggingContext.getInstance().isLoggingForced();
}
@Override
public T call() throws Exception {
if (callingThread.equals(Thread.currentThread())) {
// propagation of logging context is not needed
return callable.call();
}
// propagate logging context
LoggingContext loggingCtx = LoggingContext.getInstance();
ImmutableSetMultimap<String, String> oldTags = loggingCtx.getTagsAsMap();
boolean oldForceLogging = loggingCtx.isLoggingForced();
loggingCtx.setTags(tags);
loggingCtx.forceLogging(forceLogging);
try {
return callable.call();
} finally {
loggingCtx.setTags(oldTags);
loggingCtx.forceLogging(oldForceLogging);
}
}
}

View File

@@ -0,0 +1,110 @@
// Copyright (C) 2018 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.logging;
import static java.util.stream.Collectors.toList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* An {@link ExecutorService} that copies the {@link LoggingContext} on executing a {@link Runnable}
* to the executing thread.
*/
public class LoggingContextAwareExecutorService implements ExecutorService {
private final ExecutorService executorService;
public LoggingContextAwareExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
@Override
public void execute(Runnable command) {
executorService.execute(LoggingContext.copy(command));
}
@Override
public void shutdown() {
executorService.shutdown();
}
@Override
public List<Runnable> shutdownNow() {
return executorService.shutdownNow();
}
@Override
public boolean isShutdown() {
return executorService.isShutdown();
}
@Override
public boolean isTerminated() {
return executorService.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return executorService.awaitTermination(timeout, unit);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return executorService.submit(LoggingContext.copy(task));
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return executorService.submit(LoggingContext.copy(task), result);
}
@Override
public Future<?> submit(Runnable task) {
return executorService.submit(LoggingContext.copy(task));
}
@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return executorService.invokeAll(tasks.stream().map(LoggingContext::copy).collect(toList()));
}
@Override
public <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return executorService.invokeAll(
tasks.stream().map(LoggingContext::copy).collect(toList()), timeout, unit);
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return executorService.invokeAny(tasks.stream().map(LoggingContext::copy).collect(toList()));
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return executorService.invokeAny(
tasks.stream().map(LoggingContext::copy).collect(toList()), timeout, unit);
}
}

View File

@@ -0,0 +1,89 @@
// Copyright (C) 2018 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.logging;
import com.google.common.collect.ImmutableSetMultimap;
/**
* Wrapper for a {@link Runnable} that copies the {@link LoggingContext} from the current thread to
* the thread that executes the runnable.
*
* <p>The state of the logging context that is copied to the thread that executes the runnable is
* fixed at the creation time of this wrapper. If the runnable is submitted to an executor and is
* executed later this means that changes that are done to the logging context in between creating
* and executing the runnable do not apply.
*
* <p>Example:
*
* <pre>
* try (TraceContext traceContext = TraceContext.newTrace(true, ...)) {
* executor
* .submit(new LoggingContextAwareRunnable(
* () -> {
* // Tracing is enabled since the runnable is created within the TraceContext.
* // Tracing is even enabled if the executor runs the runnable only after the
* // TraceContext was closed.
*
* // The tag "foo=bar" is not set, since it was added to the logging context only
* // after this runnable was created.
*
* // do stuff
* }))
* .get();
* traceContext.addTag("foo", "bar");
* }
* </pre>
*
* @see LoggingContextAwareCallable
*/
public class LoggingContextAwareRunnable implements Runnable {
private final Runnable runnable;
private final Thread callingThread;
private final ImmutableSetMultimap<String, String> tags;
private final boolean forceLogging;
LoggingContextAwareRunnable(Runnable runnable) {
this.runnable = runnable;
this.callingThread = Thread.currentThread();
this.tags = LoggingContext.getInstance().getTagsAsMap();
this.forceLogging = LoggingContext.getInstance().isLoggingForced();
}
public Runnable unwrap() {
return runnable;
}
@Override
public void run() {
if (callingThread.equals(Thread.currentThread())) {
// propagation of logging context is not needed
runnable.run();
return;
}
// propagate logging context
LoggingContext loggingCtx = LoggingContext.getInstance();
ImmutableSetMultimap<String, String> oldTags = loggingCtx.getTagsAsMap();
boolean oldForceLogging = loggingCtx.isLoggingForced();
loggingCtx.setTags(tags);
loggingCtx.forceLogging(forceLogging);
try {
runnable.run();
} finally {
loggingCtx.setTags(oldTags);
loggingCtx.forceLogging(oldForceLogging);
}
}
}

View File

@@ -0,0 +1,59 @@
// Copyright (C) 2018 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.logging;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* A {@link ScheduledExecutorService} that copies the {@link LoggingContext} on executing a {@link
* Runnable} to the executing thread.
*/
public class LoggingContextAwareScheduledExecutorService extends LoggingContextAwareExecutorService
implements ScheduledExecutorService {
private final ScheduledExecutorService scheduledExecutorService;
public LoggingContextAwareScheduledExecutorService(
ScheduledExecutorService scheduledExecutorService) {
super(scheduledExecutorService);
this.scheduledExecutorService = scheduledExecutorService;
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return scheduledExecutorService.schedule(LoggingContext.copy(command), delay, unit);
}
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return scheduledExecutorService.schedule(LoggingContext.copy(callable), delay, unit);
}
@Override
public ScheduledFuture<?> scheduleAtFixedRate(
Runnable command, long initialDelay, long period, TimeUnit unit) {
return scheduledExecutorService.scheduleAtFixedRate(
LoggingContext.copy(command), initialDelay, period, unit);
}
@Override
public ScheduledFuture<?> scheduleWithFixedDelay(
Runnable command, long initialDelay, long delay, TimeUnit unit) {
return scheduledExecutorService.scheduleWithFixedDelay(
LoggingContext.copy(command), initialDelay, delay, unit);
}
}

View File

@@ -1,61 +0,0 @@
// Copyright (C) 2018 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.logging;
import com.google.common.collect.ImmutableSetMultimap;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/**
* ThreadFactory that copies the logging context of the current thread to any new thread that is
* created by this ThreadFactory.
*/
public class LoggingContextAwareThreadFactory implements ThreadFactory {
private final ThreadFactory parentThreadFactory;
public LoggingContextAwareThreadFactory() {
this.parentThreadFactory = Executors.defaultThreadFactory();
}
public LoggingContextAwareThreadFactory(ThreadFactory parentThreadFactory) {
this.parentThreadFactory = parentThreadFactory;
}
@Override
public Thread newThread(Runnable r) {
Thread callingThread = Thread.currentThread();
ImmutableSetMultimap<String, String> tags = LoggingContext.getInstance().getTagsAsMap();
boolean forceLogging = LoggingContext.getInstance().isLoggingForced();
return parentThreadFactory.newThread(
() -> {
if (callingThread.equals(Thread.currentThread())) {
// propagation of logging context is not needed
r.run();
return;
}
// propagate logging context
LoggingContext loggingCtx = LoggingContext.getInstance();
loggingCtx.setTags(tags);
loggingCtx.forceLogging(forceLogging);
try {
r.run();
} finally {
loggingCtx.clearTags();
loggingCtx.forceLogging(false);
}
});
}
}

View File

@@ -43,8 +43,9 @@ import java.util.Optional;
* </pre>
*
* <p>The logging tags and the force logging flag are stored in the {@link LoggingContext}. {@link
* LoggingContextAwareThreadFactory} ensures that the logging context is automatically copied to
* background threads.
* LoggingContextAwareExecutorService}, {@link LoggingContextAwareScheduledExecutorService} and the
* executor in {@link com.google.gerrit.server.git.WorkQueue} ensure that the logging context is
* automatically copied to background threads.
*
* <p>On close of the trace context newly set tags are unset. Force logging is disabled on close if
* it got enabled while the trace context was open.

View File

@@ -15,7 +15,7 @@
package com.google.gerrit.server.patch;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gerrit.server.logging.LoggingContextAwareThreadFactory;
import com.google.gerrit.server.logging.LoggingContextAwareExecutorService;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
@@ -32,11 +32,8 @@ public class DiffExecutorModule extends AbstractModule {
@Singleton
@DiffExecutor
public ExecutorService createDiffExecutor() {
return Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setThreadFactory(new LoggingContextAwareThreadFactory())
.setNameFormat("Diff-%d")
.setDaemon(true)
.build());
return new LoggingContextAwareExecutorService(
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("Diff-%d").setDaemon(true).build()));
}
}

View File

@@ -18,7 +18,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.server.config.ConfigUtil;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.logging.LoggingContextAwareThreadFactory;
import com.google.gerrit.server.logging.LoggingContextAwareScheduledExecutorService;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.util.concurrent.Executors;
@@ -54,14 +54,14 @@ public class ProjectCacheClock implements LifecycleListener {
// Start with generation 1 (to avoid magic 0 below).
generation.set(1);
executor =
Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setThreadFactory(new LoggingContextAwareThreadFactory())
.setNameFormat("ProjectCacheClock-%d")
.setDaemon(true)
.setPriority(Thread.MIN_PRIORITY)
.build());
new LoggingContextAwareScheduledExecutorService(
Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setNameFormat("ProjectCacheClock-%d")
.setDaemon(true)
.setPriority(Thread.MIN_PRIORITY)
.build()));
@SuppressWarnings("unused") // Runnable already handles errors
Future<?> possiblyIgnoredError =
executor.scheduleAtFixedRate(

View File

@@ -19,11 +19,11 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.logging.LoggingContextAwareThreadFactory;
import com.google.gerrit.server.logging.LoggingContextAwareExecutorService;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.jgit.lib.Config;
@@ -44,13 +44,11 @@ public class ProjectCacheWarmer implements LifecycleListener {
public void start() {
int cpus = Runtime.getRuntime().availableProcessors();
if (config.getBoolean("cache", "projects", "loadOnStartup", false)) {
ThreadPoolExecutor pool =
new ScheduledThreadPoolExecutor(
config.getInt("cache", "projects", "loadThreads", cpus),
new ThreadFactoryBuilder()
.setThreadFactory(new LoggingContextAwareThreadFactory())
.setNameFormat("ProjectCacheLoader-%d")
.build());
ExecutorService pool =
new LoggingContextAwareExecutorService(
new ScheduledThreadPoolExecutor(
config.getInt("cache", "projects", "loadThreads", cpus),
new ThreadFactoryBuilder().setNameFormat("ProjectCacheLoader-%d").build()));
Thread scheduler =
new Thread(
() -> {

View File

@@ -22,7 +22,7 @@ import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.logging.LoggingContextAwareThreadFactory;
import com.google.gerrit.server.logging.LoggingContextAwareExecutorService;
import com.google.gerrit.sshd.SshScope.Context;
import com.google.gwtorm.server.SchemaFactory;
import com.google.inject.Inject;
@@ -78,12 +78,12 @@ class CommandFactoryProvider implements Provider<CommandFactory>, LifecycleListe
int threads = cfg.getInt("sshd", "commandStartThreads", 2);
startExecutor = workQueue.createQueue(threads, "SshCommandStart", true);
destroyExecutor =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setThreadFactory(new LoggingContextAwareThreadFactory())
.setNameFormat("SshCommandDestroy-%s")
.setDaemon(true)
.build());
new LoggingContextAwareExecutorService(
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setNameFormat("SshCommandDestroy-%s")
.setDaemon(true)
.build()));
}
@Override

View File

@@ -20,6 +20,7 @@ import static org.apache.http.HttpStatus.SC_CREATED;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.truth.Expect;
import com.google.gerrit.acceptance.AbstractDaemonTest;
import com.google.gerrit.acceptance.PushOneCommit;
import com.google.gerrit.acceptance.RestResponse;
@@ -28,23 +29,31 @@ import com.google.gerrit.extensions.registration.RegistrationHandle;
import com.google.gerrit.httpd.restapi.ParameterParser;
import com.google.gerrit.httpd.restapi.RestApiServlet;
import com.google.gerrit.server.events.CommitReceivedEvent;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.git.validators.CommitValidationException;
import com.google.gerrit.server.git.validators.CommitValidationListener;
import com.google.gerrit.server.git.validators.CommitValidationMessage;
import com.google.gerrit.server.logging.LoggingContext;
import com.google.gerrit.server.logging.TraceContext;
import com.google.gerrit.server.project.CreateProjectArgs;
import com.google.gerrit.server.validators.ProjectCreationValidationListener;
import com.google.gerrit.server.validators.ValidationException;
import com.google.inject.Inject;
import java.util.List;
import java.util.SortedMap;
import java.util.SortedSet;
import org.apache.http.message.BasicHeader;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
public class TraceIT extends AbstractDaemonTest {
@Rule public final Expect expect = Expect.create();
@Inject private DynamicSet<ProjectCreationValidationListener> projectCreationValidationListeners;
@Inject private DynamicSet<CommitValidationListener> commitValidationListeners;
@Inject private WorkQueue workQueue;
private TraceValidatingProjectCreationValidationListener projectCreationListener;
private RegistrationHandle projectCreationListenerRegistrationHandle;
@@ -218,6 +227,47 @@ public class TraceIT extends AbstractDaemonTest {
assertThat(commitValidationListener.isLoggingForced).isTrue();
}
@Test
public void workQueueCopyLoggingContext() throws Exception {
assertThat(LoggingContext.getInstance().getTags().isEmpty()).isTrue();
assertForceLogging(false);
try (TraceContext traceContext = TraceContext.open().forceLogging().addTag("foo", "bar")) {
SortedMap<String, SortedSet<Object>> tagMap = LoggingContext.getInstance().getTags().asMap();
assertThat(tagMap.keySet()).containsExactly("foo");
assertThat(tagMap.get("foo")).containsExactly("bar");
assertForceLogging(true);
workQueue
.createQueue(1, "test-queue")
.submit(
() -> {
// Verify that the tags and force logging flag have been propagated to the new
// thread.
SortedMap<String, SortedSet<Object>> threadTagMap =
LoggingContext.getInstance().getTags().asMap();
expect.that(threadTagMap.keySet()).containsExactly("foo");
expect.that(threadTagMap.get("foo")).containsExactly("bar");
expect
.that(LoggingContext.getInstance().shouldForceLogging(null, null, false))
.isTrue();
})
.get();
// Verify that tags and force logging flag in the outer thread are still set.
tagMap = LoggingContext.getInstance().getTags().asMap();
assertThat(tagMap.keySet()).containsExactly("foo");
assertThat(tagMap.get("foo")).containsExactly("bar");
assertForceLogging(true);
}
assertThat(LoggingContext.getInstance().getTags().isEmpty()).isTrue();
assertForceLogging(false);
}
private void assertForceLogging(boolean expected) {
assertThat(LoggingContext.getInstance().shouldForceLogging(null, null, false))
.isEqualTo(expected);
}
private static class TraceValidatingProjectCreationValidationListener
implements ProjectCreationValidationListener {
String traceId;

View File

@@ -0,0 +1,71 @@
// Copyright (C) 2018 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.logging;
import static com.google.common.truth.Truth.assertThat;
import com.google.common.truth.Expect;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.junit.Rule;
import org.junit.Test;
public class LoggingContextAwareExecutorServiceTest {
@Rule public final Expect expect = Expect.create();
@Test
public void loggingContextPropagationToBackgroundThread() throws Exception {
assertThat(LoggingContext.getInstance().getTags().isEmpty()).isTrue();
assertForceLogging(false);
try (TraceContext traceContext = TraceContext.open().forceLogging().addTag("foo", "bar")) {
SortedMap<String, SortedSet<Object>> tagMap = LoggingContext.getInstance().getTags().asMap();
assertThat(tagMap.keySet()).containsExactly("foo");
assertThat(tagMap.get("foo")).containsExactly("bar");
assertForceLogging(true);
ExecutorService executor =
new LoggingContextAwareExecutorService(Executors.newFixedThreadPool(1));
executor
.submit(
() -> {
// Verify that the tags and force logging flag have been propagated to the new
// thread.
SortedMap<String, SortedSet<Object>> threadTagMap =
LoggingContext.getInstance().getTags().asMap();
expect.that(threadTagMap.keySet()).containsExactly("foo");
expect.that(threadTagMap.get("foo")).containsExactly("bar");
expect
.that(LoggingContext.getInstance().shouldForceLogging(null, null, false))
.isTrue();
})
.get();
// Verify that tags and force logging flag in the outer thread are still set.
tagMap = LoggingContext.getInstance().getTags().asMap();
assertThat(tagMap.keySet()).containsExactly("foo");
assertThat(tagMap.get("foo")).containsExactly("bar");
assertForceLogging(true);
}
assertThat(LoggingContext.getInstance().getTags().isEmpty()).isTrue();
assertForceLogging(false);
}
private void assertForceLogging(boolean expected) {
assertThat(LoggingContext.getInstance().shouldForceLogging(null, null, false))
.isEqualTo(expected);
}
}

View File

@@ -1,109 +0,0 @@
// Copyright (C) 2018 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.logging;
import static com.google.common.truth.Truth.assertThat;
import com.google.common.truth.Expect;
import java.util.SortedMap;
import java.util.SortedSet;
import org.junit.Rule;
import org.junit.Test;
public class LoggingContextAwareThreadFactoryTest {
@Rule public final Expect expect = Expect.create();
@Test
public void loggingContextPropagationToNewThread() throws Exception {
assertThat(LoggingContext.getInstance().getTags().isEmpty()).isTrue();
assertForceLogging(false);
try (TraceContext traceContext = TraceContext.open().forceLogging().addTag("foo", "bar")) {
SortedMap<String, SortedSet<Object>> tagMap = LoggingContext.getInstance().getTags().asMap();
assertThat(tagMap.keySet()).containsExactly("foo");
assertThat(tagMap.get("foo")).containsExactly("bar");
assertForceLogging(true);
Thread thread =
new LoggingContextAwareThreadFactory(r -> new Thread(r, "test-thread"))
.newThread(
() -> {
// Verify that the tags and force logging flag have been propagated to the new
// thread.
SortedMap<String, SortedSet<Object>> threadTagMap =
LoggingContext.getInstance().getTags().asMap();
expect.that(threadTagMap.keySet()).containsExactly("foo");
expect.that(threadTagMap.get("foo")).containsExactly("bar");
expect
.that(LoggingContext.getInstance().shouldForceLogging(null, null, false))
.isTrue();
});
// Execute in background.
thread.start();
thread.join();
// Verify that tags and force logging flag in the outer thread are still set.
tagMap = LoggingContext.getInstance().getTags().asMap();
assertThat(tagMap.keySet()).containsExactly("foo");
assertThat(tagMap.get("foo")).containsExactly("bar");
assertForceLogging(true);
}
assertThat(LoggingContext.getInstance().getTags().isEmpty()).isTrue();
assertForceLogging(false);
}
@Test
public void loggingContextPropagationToSameThread() throws Exception {
assertThat(LoggingContext.getInstance().getTags().isEmpty()).isTrue();
assertForceLogging(false);
try (TraceContext traceContext = TraceContext.open().forceLogging().addTag("foo", "bar")) {
SortedMap<String, SortedSet<Object>> tagMap = LoggingContext.getInstance().getTags().asMap();
assertThat(tagMap.keySet()).containsExactly("foo");
assertThat(tagMap.get("foo")).containsExactly("bar");
assertForceLogging(true);
Thread thread =
new LoggingContextAwareThreadFactory()
.newThread(
() -> {
// Verify that the tags and force logging flag have been propagated to the new
// thread.
SortedMap<String, SortedSet<Object>> threadTagMap =
LoggingContext.getInstance().getTags().asMap();
expect.that(threadTagMap.keySet()).containsExactly("foo");
expect.that(threadTagMap.get("foo")).containsExactly("bar");
expect
.that(LoggingContext.getInstance().shouldForceLogging(null, null, false))
.isTrue();
});
// Execute in the same thread.
thread.run();
// Verify that tags and force logging flag in the outer thread are still set.
tagMap = LoggingContext.getInstance().getTags().asMap();
assertThat(tagMap.keySet()).containsExactly("foo");
assertThat(tagMap.get("foo")).containsExactly("bar");
assertForceLogging(true);
}
assertThat(LoggingContext.getInstance().getTags().isEmpty()).isTrue();
assertForceLogging(false);
}
private void assertForceLogging(boolean expected) {
assertThat(LoggingContext.getInstance().shouldForceLogging(null, null, false))
.isEqualTo(expected);
}
}