Support copying logging context to background threads

The logging context (contains logging tags) is stored in ThreadLocal
variables. When we execute work in background threads we must propagate
the logging context from the current thread to the background threads.
Add a LoggingContextAwareThreadFactory that does this and use it in all
places where we create new threads.

Change-Id: I349f0a8b667266df1684ae5f5d8fb0bcae9aceaa
Signed-off-by: Edwin Kempin <ekempin@google.com>
This commit is contained in:
Edwin Kempin
2018-08-08 11:02:39 +02:00
parent d92f6f754a
commit aa9629dda8
15 changed files with 254 additions and 21 deletions

View File

@@ -39,6 +39,7 @@ import com.google.gerrit.index.query.DataSource;
import com.google.gerrit.index.query.FieldBundle;
import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.index.IndexUtils;
import com.google.gerrit.server.logging.LoggingContextAwareThreadFactory;
import com.google.gwtorm.server.OrmException;
import com.google.gwtorm.server.ResultSet;
import java.io.IOException;
@@ -131,6 +132,7 @@ public abstract class AbstractLuceneIndex<K, V> implements Index<K, V> {
new ScheduledThreadPoolExecutor(
1,
new ThreadFactoryBuilder()
.setThreadFactory(new LoggingContextAwareThreadFactory())
.setNameFormat(index + " Commit-%d")
.setDaemon(true)
.build());
@@ -171,6 +173,7 @@ public abstract class AbstractLuceneIndex<K, V> implements Index<K, V> {
Executors.newFixedThreadPool(
1,
new ThreadFactoryBuilder()
.setThreadFactory(new LoggingContextAwareThreadFactory())
.setNameFormat(index + " Write-%d")
.setDaemon(true)
.build()));

View File

@@ -28,6 +28,7 @@ import com.google.gerrit.server.cache.h2.H2CacheImpl.SqlStore;
import com.google.gerrit.server.cache.h2.H2CacheImpl.ValueHolder;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.logging.LoggingContextAwareThreadFactory;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
@@ -75,11 +76,16 @@ class H2CacheFactory implements PersistentCacheFactory, LifecycleListener {
if (cacheDir != null) {
executor =
Executors.newFixedThreadPool(
1, new ThreadFactoryBuilder().setNameFormat("DiskCache-Store-%d").build());
1,
new ThreadFactoryBuilder()
.setThreadFactory(new LoggingContextAwareThreadFactory())
.setNameFormat("DiskCache-Store-%d")
.build());
cleanup =
Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setThreadFactory(new LoggingContextAwareThreadFactory())
.setNameFormat("DiskCache-Prune-%d")
.setDaemon(true)
.build());

View File

@@ -19,6 +19,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gerrit.server.FanOutExecutor;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.logging.LoggingContextAwareThreadFactory;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
@@ -89,7 +90,11 @@ public class SysExecutorModule extends AbstractModule {
10,
TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(poolSize),
new ThreadFactoryBuilder().setNameFormat("ChangeUpdate-%d").setDaemon(true).build(),
new ThreadFactoryBuilder()
.setThreadFactory(new LoggingContextAwareThreadFactory())
.setNameFormat("ChangeUpdate-%d")
.setDaemon(true)
.build(),
new ThreadPoolExecutor.CallerRunsPolicy())));
}
}

View File

@@ -24,6 +24,7 @@ import com.google.gerrit.metrics.MetricMaker;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.config.ScheduleConfig.Schedule;
import com.google.gerrit.server.logging.LoggingContextAwareThreadFactory;
import com.google.gerrit.server.util.IdGenerator;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@@ -165,11 +166,12 @@ public class WorkQueue {
if (threadPriority != Thread.NORM_PRIORITY) {
ThreadFactory parent = executor.getThreadFactory();
executor.setThreadFactory(
task -> {
Thread t = parent.newThread(task);
t.setPriority(threadPriority);
return t;
});
new LoggingContextAwareThreadFactory(
task -> {
Thread t = parent.newThread(task);
t.setPriority(threadPriority);
return t;
}));
}
return executor;
@@ -251,18 +253,19 @@ public class WorkQueue {
Executor(int corePoolSize, final String queueName) {
super(
corePoolSize,
new ThreadFactory() {
private final ThreadFactory parent = Executors.defaultThreadFactory();
private final AtomicInteger tid = new AtomicInteger(1);
new LoggingContextAwareThreadFactory(
new ThreadFactory() {
private final ThreadFactory parent = Executors.defaultThreadFactory();
private final AtomicInteger tid = new AtomicInteger(1);
@Override
public Thread newThread(Runnable task) {
final Thread t = parent.newThread(task);
t.setName(queueName + "-" + tid.getAndIncrement());
t.setUncaughtExceptionHandler(LOG_UNCAUGHT_EXCEPTION);
return t;
}
});
@Override
public Thread newThread(Runnable task) {
final Thread t = parent.newThread(task);
t.setName(queueName + "-" + tid.getAndIncrement());
t.setUncaughtExceptionHandler(LOG_UNCAUGHT_EXCEPTION);
return t;
}
}));
all =
new ConcurrentHashMap<>( //

View File

@@ -14,6 +14,7 @@
package com.google.gerrit.server.logging;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.flogger.backend.Tags;
import java.util.logging.Level;
@@ -51,6 +52,11 @@ public class LoggingContext extends com.google.common.flogger.backend.system.Log
return mutableTags != null ? mutableTags.getTags() : Tags.empty();
}
public ImmutableSetMultimap<String, String> getTagsAsMap() {
MutableTags mutableTags = tags.get();
return mutableTags != null ? mutableTags.asMap() : ImmutableSetMultimap.of();
}
boolean addTag(String name, String value) {
return getMutableTags().add(name, value);
}
@@ -63,6 +69,14 @@ public class LoggingContext extends com.google.common.flogger.backend.system.Log
}
}
void setTags(ImmutableSetMultimap<String, String> newTags) {
if (newTags.isEmpty()) {
tags.remove();
return;
}
getMutableTags().set(newTags);
}
void clearTags() {
tags.remove();
}

View File

@@ -0,0 +1,57 @@
// Copyright (C) 2018 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.logging;
import com.google.common.collect.ImmutableSetMultimap;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/**
* ThreadFactory that copies the logging context of the current thread to any new thread that is
* created by this ThreadFactory.
*/
public class LoggingContextAwareThreadFactory implements ThreadFactory {
private final ThreadFactory parentThreadFactory;
public LoggingContextAwareThreadFactory() {
this.parentThreadFactory = Executors.defaultThreadFactory();
}
public LoggingContextAwareThreadFactory(ThreadFactory parentThreadFactory) {
this.parentThreadFactory = parentThreadFactory;
}
@Override
public Thread newThread(Runnable r) {
Thread callingThread = Thread.currentThread();
ImmutableSetMultimap<String, String> tags = LoggingContext.getInstance().getTagsAsMap();
return parentThreadFactory.newThread(
() -> {
if (callingThread.equals(Thread.currentThread())) {
// propagation of logging context is not needed
r.run();
return;
}
// propagate logging context
LoggingContext.getInstance().setTags(tags);
try {
r.run();
} finally {
LoggingContext.getInstance().clearTags();
}
});
}
}

View File

@@ -16,6 +16,7 @@ package com.google.gerrit.server.logging;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.MultimapBuilder;
import com.google.common.collect.SetMultimap;
import com.google.common.flogger.backend.Tags;
@@ -76,6 +77,26 @@ public class MutableTags {
tags = Tags.empty();
}
/**
* Returns the tags as Multimap.
*
* @return the tags as Multimap
*/
public ImmutableSetMultimap<String, String> asMap() {
return ImmutableSetMultimap.copyOf(tagMap);
}
/**
* Replaces the existing tags with the provided tags.
*
* @param tags the tags that should be set.
*/
void set(ImmutableSetMultimap<String, String> tags) {
tagMap.clear();
tags.forEach(tagMap::put);
buildTags();
}
private void buildTags() {
if (tagMap.isEmpty()) {
if (tags.isEmpty()) {

View File

@@ -15,6 +15,7 @@
package com.google.gerrit.server.patch;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gerrit.server.logging.LoggingContextAwareThreadFactory;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
@@ -32,6 +33,10 @@ public class DiffExecutorModule extends AbstractModule {
@DiffExecutor
public ExecutorService createDiffExecutor() {
return Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("Diff-%d").setDaemon(true).build());
new ThreadFactoryBuilder()
.setThreadFactory(new LoggingContextAwareThreadFactory())
.setNameFormat("Diff-%d")
.setDaemon(true)
.build());
}
}

View File

@@ -18,6 +18,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.server.config.ConfigUtil;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.logging.LoggingContextAwareThreadFactory;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.util.concurrent.Executors;
@@ -56,6 +57,7 @@ public class ProjectCacheClock implements LifecycleListener {
Executors.newScheduledThreadPool(
1,
new ThreadFactoryBuilder()
.setThreadFactory(new LoggingContextAwareThreadFactory())
.setNameFormat("ProjectCacheClock-%d")
.setDaemon(true)
.setPriority(Thread.MIN_PRIORITY)

View File

@@ -19,6 +19,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.logging.LoggingContextAwareThreadFactory;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -46,7 +47,10 @@ public class ProjectCacheWarmer implements LifecycleListener {
ThreadPoolExecutor pool =
new ScheduledThreadPoolExecutor(
config.getInt("cache", "projects", "loadThreads", cpus),
new ThreadFactoryBuilder().setNameFormat("ProjectCacheLoader-%d").build());
new ThreadFactoryBuilder()
.setThreadFactory(new LoggingContextAwareThreadFactory())
.setNameFormat("ProjectCacheLoader-%d")
.build());
Thread scheduler =
new Thread(
() -> {

View File

@@ -22,6 +22,7 @@ import com.google.gerrit.extensions.registration.DynamicItem;
import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.logging.LoggingContextAwareThreadFactory;
import com.google.gerrit.sshd.SshScope.Context;
import com.google.gwtorm.server.SchemaFactory;
import com.google.inject.Inject;
@@ -79,6 +80,7 @@ class CommandFactoryProvider implements Provider<CommandFactory>, LifecycleListe
destroyExecutor =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()
.setThreadFactory(new LoggingContextAwareThreadFactory())
.setNameFormat("SshCommandDestroy-%s")
.setDaemon(true)
.build());

View File

@@ -63,6 +63,7 @@ junit_tests(
"//lib/auto:auto-value",
"//lib/auto:auto-value-annotations",
"//lib/commons:codec",
"//lib/flogger:api",
"//lib/guice",
"//lib/jgit/org.eclipse.jgit:jgit",
"//lib/jgit/org.eclipse.jgit.junit:junit",

View File

@@ -0,0 +1,88 @@
// Copyright (C) 2018 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.logging;
import static com.google.common.truth.Truth.assertThat;
import com.google.common.truth.Expect;
import java.util.SortedMap;
import java.util.SortedSet;
import org.junit.Rule;
import org.junit.Test;
public class LoggingContextAwareThreadFactoryTest {
@Rule public final Expect expect = Expect.create();
@Test
public void loggingContextPropagationToNewThread() throws Exception {
assertThat(LoggingContext.getInstance().getTags().isEmpty()).isTrue();
try (TraceContext traceContext = new TraceContext("foo", "bar")) {
SortedMap<String, SortedSet<Object>> tagMap = LoggingContext.getInstance().getTags().asMap();
assertThat(tagMap.keySet()).containsExactly("foo");
assertThat(tagMap.get("foo")).containsExactly("bar");
Thread thread =
new LoggingContextAwareThreadFactory(r -> new Thread(r, "test-thread"))
.newThread(
() -> {
// Verify that the tags have been propagated to the new thread.
SortedMap<String, SortedSet<Object>> threadTagMap =
LoggingContext.getInstance().getTags().asMap();
expect.that(threadTagMap.keySet()).containsExactly("foo");
expect.that(threadTagMap.get("foo")).containsExactly("bar");
});
// Execute in background.
thread.start();
thread.join();
// Verify that tags in the outer thread are still set.
tagMap = LoggingContext.getInstance().getTags().asMap();
assertThat(tagMap.keySet()).containsExactly("foo");
assertThat(tagMap.get("foo")).containsExactly("bar");
}
assertThat(LoggingContext.getInstance().getTags().isEmpty()).isTrue();
}
@Test
public void loggingContextPropagationToSameThread() throws Exception {
assertThat(LoggingContext.getInstance().getTags().isEmpty()).isTrue();
try (TraceContext traceContext = new TraceContext("foo", "bar")) {
SortedMap<String, SortedSet<Object>> tagMap = LoggingContext.getInstance().getTags().asMap();
assertThat(tagMap.keySet()).containsExactly("foo");
assertThat(tagMap.get("foo")).containsExactly("bar");
Thread thread =
new LoggingContextAwareThreadFactory()
.newThread(
() -> {
// Verify that the tags have been propagated to the new thread.
SortedMap<String, SortedSet<Object>> threadTagMap =
LoggingContext.getInstance().getTags().asMap();
expect.that(threadTagMap.keySet()).containsExactly("foo");
expect.that(threadTagMap.get("foo")).containsExactly("bar");
});
// Execute in the same thread.
thread.run();
// Verify that tags in the outer thread are still set.
tagMap = LoggingContext.getInstance().getTags().asMap();
assertThat(tagMap.keySet()).containsExactly("foo");
assertThat(tagMap.get("foo")).containsExactly("bar");
}
assertThat(LoggingContext.getInstance().getTags().isEmpty()).isTrue();
}
}

View File

@@ -19,6 +19,7 @@ import static com.google.common.truth.Truth.assert_;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
import java.util.Map;
import java.util.SortedMap;
import java.util.SortedSet;
@@ -109,6 +110,27 @@ public class MutableTagsTest {
assertTags(ImmutableMap.of("name", ImmutableSet.of("value")));
}
@Test
public void setTags() {
tags.add("name", "value");
assertTags(ImmutableMap.of("name", ImmutableSet.of("value")));
tags.set(ImmutableSetMultimap.of("foo", "bar", "foo", "baz", "bar", "baz"));
assertTags(
ImmutableMap.of("foo", ImmutableSet.of("bar", "baz"), "bar", ImmutableSet.of("baz")));
}
@Test
public void asMap() {
tags.add("name", "value");
assertThat(tags.asMap()).containsExactlyEntriesIn(ImmutableSetMultimap.of("name", "value"));
tags.set(ImmutableSetMultimap.of("foo", "bar", "foo", "baz", "bar", "baz"));
assertThat(tags.asMap())
.containsExactlyEntriesIn(
ImmutableSetMultimap.of("foo", "bar", "foo", "baz", "bar", "baz"));
}
@Test
public void clearTags() {
tags.add("name1", "value1");