Merge changes from topic "issue-7611" into stable-2.14
* changes: Properly await termination of all index executor threads on shutdown Stop ProjectCacheClock thread on server shutdown CleanUp after query tests
This commit is contained in:
@@ -35,7 +35,6 @@ import com.google.common.collect.MultimapBuilder;
|
|||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
|
||||||
import com.google.gerrit.reviewdb.client.Account;
|
import com.google.gerrit.reviewdb.client.Account;
|
||||||
import com.google.gerrit.reviewdb.client.Change;
|
import com.google.gerrit.reviewdb.client.Change;
|
||||||
import com.google.gerrit.reviewdb.client.PatchSet;
|
import com.google.gerrit.reviewdb.client.PatchSet;
|
||||||
@@ -75,7 +74,6 @@ import java.util.Set;
|
|||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import org.apache.lucene.document.Document;
|
import org.apache.lucene.document.Document;
|
||||||
import org.apache.lucene.index.IndexWriter;
|
import org.apache.lucene.index.IndexWriter;
|
||||||
import org.apache.lucene.index.IndexableField;
|
import org.apache.lucene.index.IndexableField;
|
||||||
@@ -187,11 +185,6 @@ public class LuceneChangeIndex implements ChangeIndex {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void stop() {
|
|
||||||
MoreExecutors.shutdownAndAwaitTermination(executor, Long.MAX_VALUE, TimeUnit.SECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
try {
|
try {
|
||||||
|
@@ -35,9 +35,6 @@ public interface Index<K, V> {
|
|||||||
/** @return the schema version used by this index. */
|
/** @return the schema version used by this index. */
|
||||||
Schema<V> getSchema();
|
Schema<V> getSchema();
|
||||||
|
|
||||||
/** Stop and await termination of all executor threads */
|
|
||||||
default void stop() {}
|
|
||||||
|
|
||||||
/** Close this index. */
|
/** Close this index. */
|
||||||
void close();
|
void close();
|
||||||
|
|
||||||
|
@@ -95,7 +95,6 @@ public abstract class IndexCollection<K, V, I extends Index<K, V>> implements Li
|
|||||||
}
|
}
|
||||||
for (I write : writeIndexes) {
|
for (I write : writeIndexes) {
|
||||||
if (write != read) {
|
if (write != read) {
|
||||||
write.stop();
|
|
||||||
write.close();
|
write.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableCollection;
|
|||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
|
import com.google.gerrit.extensions.events.LifecycleListener;
|
||||||
import com.google.gerrit.lifecycle.LifecycleModule;
|
import com.google.gerrit.lifecycle.LifecycleModule;
|
||||||
import com.google.gerrit.server.config.GerritServerConfig;
|
import com.google.gerrit.server.config.GerritServerConfig;
|
||||||
import com.google.gerrit.server.git.WorkQueue;
|
import com.google.gerrit.server.git.WorkQueue;
|
||||||
@@ -42,6 +43,7 @@ import com.google.gerrit.server.index.group.GroupIndexRewriter;
|
|||||||
import com.google.gerrit.server.index.group.GroupIndexer;
|
import com.google.gerrit.server.index.group.GroupIndexer;
|
||||||
import com.google.gerrit.server.index.group.GroupIndexerImpl;
|
import com.google.gerrit.server.index.group.GroupIndexerImpl;
|
||||||
import com.google.gerrit.server.index.group.GroupSchemaDefinitions;
|
import com.google.gerrit.server.index.group.GroupSchemaDefinitions;
|
||||||
|
import com.google.inject.Inject;
|
||||||
import com.google.inject.Injector;
|
import com.google.inject.Injector;
|
||||||
import com.google.inject.Key;
|
import com.google.inject.Key;
|
||||||
import com.google.inject.Provides;
|
import com.google.inject.Provides;
|
||||||
@@ -49,6 +51,7 @@ import com.google.inject.ProvisionException;
|
|||||||
import com.google.inject.Singleton;
|
import com.google.inject.Singleton;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import org.eclipse.jgit.lib.Config;
|
import org.eclipse.jgit.lib.Config;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -78,11 +81,13 @@ public class IndexModule extends LifecycleModule {
|
|||||||
private final int threads;
|
private final int threads;
|
||||||
private final ListeningExecutorService interactiveExecutor;
|
private final ListeningExecutorService interactiveExecutor;
|
||||||
private final ListeningExecutorService batchExecutor;
|
private final ListeningExecutorService batchExecutor;
|
||||||
|
private final boolean closeExecutorsOnShutdown;
|
||||||
|
|
||||||
public IndexModule(int threads) {
|
public IndexModule(int threads) {
|
||||||
this.threads = threads;
|
this.threads = threads;
|
||||||
this.interactiveExecutor = null;
|
this.interactiveExecutor = null;
|
||||||
this.batchExecutor = null;
|
this.batchExecutor = null;
|
||||||
|
this.closeExecutorsOnShutdown = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public IndexModule(
|
public IndexModule(
|
||||||
@@ -90,6 +95,7 @@ public class IndexModule extends LifecycleModule {
|
|||||||
this.threads = -1;
|
this.threads = -1;
|
||||||
this.interactiveExecutor = interactiveExecutor;
|
this.interactiveExecutor = interactiveExecutor;
|
||||||
this.batchExecutor = batchExecutor;
|
this.batchExecutor = batchExecutor;
|
||||||
|
this.closeExecutorsOnShutdown = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -108,6 +114,17 @@ public class IndexModule extends LifecycleModule {
|
|||||||
bind(GroupIndexCollection.class);
|
bind(GroupIndexCollection.class);
|
||||||
listener().to(GroupIndexCollection.class);
|
listener().to(GroupIndexCollection.class);
|
||||||
factory(GroupIndexerImpl.Factory.class);
|
factory(GroupIndexerImpl.Factory.class);
|
||||||
|
|
||||||
|
if (closeExecutorsOnShutdown) {
|
||||||
|
// The executors must be shutdown _before_ closing the indexes.
|
||||||
|
// On Gerrit start the LifecycleListeners are invoked in the order in which they are
|
||||||
|
// registered, but on shutdown of Gerrit the order is reversed. This means the
|
||||||
|
// LifecycleListener to shutdown the executors must be registered _after_ the
|
||||||
|
// LifecycleListeners that close the indexes. The closing of the indexes is done by
|
||||||
|
// *IndexCollection which have been registered as LifecycleListener above. The
|
||||||
|
// registration of the ShutdownIndexExecutors LifecycleListener must happen afterwards.
|
||||||
|
listener().to(ShutdownIndexExecutors.class);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Provides
|
@Provides
|
||||||
@@ -181,4 +198,28 @@ public class IndexModule extends LifecycleModule {
|
|||||||
}
|
}
|
||||||
return MoreExecutors.listeningDecorator(workQueue.createQueue(threads, "Index-Batch"));
|
return MoreExecutors.listeningDecorator(workQueue.createQueue(threads, "Index-Batch"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Singleton
|
||||||
|
private static class ShutdownIndexExecutors implements LifecycleListener {
|
||||||
|
private final ListeningExecutorService interactiveExecutor;
|
||||||
|
private final ListeningExecutorService batchExecutor;
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
ShutdownIndexExecutors(
|
||||||
|
@IndexExecutor(INTERACTIVE) ListeningExecutorService interactiveExecutor,
|
||||||
|
@IndexExecutor(BATCH) ListeningExecutorService batchExecutor) {
|
||||||
|
this.interactiveExecutor = interactiveExecutor;
|
||||||
|
this.batchExecutor = batchExecutor;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start() {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop() {
|
||||||
|
MoreExecutors.shutdownAndAwaitTermination(
|
||||||
|
interactiveExecutor, Long.MAX_VALUE, TimeUnit.SECONDS);
|
||||||
|
MoreExecutors.shutdownAndAwaitTermination(batchExecutor, Long.MAX_VALUE, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@@ -15,6 +15,7 @@
|
|||||||
package com.google.gerrit.server.project;
|
package com.google.gerrit.server.project;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
import com.google.gerrit.extensions.events.LifecycleListener;
|
||||||
import com.google.gerrit.server.config.ConfigUtil;
|
import com.google.gerrit.server.config.ConfigUtil;
|
||||||
import com.google.gerrit.server.config.GerritServerConfig;
|
import com.google.gerrit.server.config.GerritServerConfig;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
@@ -28,15 +29,22 @@ import org.eclipse.jgit.lib.Config;
|
|||||||
|
|
||||||
/** Ticks periodically to force refresh events for {@link ProjectCacheImpl}. */
|
/** Ticks periodically to force refresh events for {@link ProjectCacheImpl}. */
|
||||||
@Singleton
|
@Singleton
|
||||||
public class ProjectCacheClock {
|
public class ProjectCacheClock implements LifecycleListener {
|
||||||
|
private final Config serverConfig;
|
||||||
|
|
||||||
private final AtomicLong generation = new AtomicLong();
|
private final AtomicLong generation = new AtomicLong();
|
||||||
|
|
||||||
|
private ScheduledExecutorService executor;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public ProjectCacheClock(@GerritServerConfig Config serverConfig) {
|
public ProjectCacheClock(@GerritServerConfig Config serverConfig) {
|
||||||
this(checkFrequency(serverConfig));
|
this.serverConfig = serverConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ProjectCacheClock(long checkFrequencyMillis) {
|
@Override
|
||||||
|
public void start() {
|
||||||
|
long checkFrequencyMillis = checkFrequency(serverConfig);
|
||||||
|
|
||||||
if (checkFrequencyMillis == Long.MAX_VALUE) {
|
if (checkFrequencyMillis == Long.MAX_VALUE) {
|
||||||
// Start with generation 1 (to avoid magic 0 below).
|
// Start with generation 1 (to avoid magic 0 below).
|
||||||
// Do not begin background thread, disabling the clock.
|
// Do not begin background thread, disabling the clock.
|
||||||
@@ -44,7 +52,7 @@ public class ProjectCacheClock {
|
|||||||
} else if (10 < checkFrequencyMillis) {
|
} else if (10 < checkFrequencyMillis) {
|
||||||
// Start with generation 1 (to avoid magic 0 below).
|
// Start with generation 1 (to avoid magic 0 below).
|
||||||
generation.set(1);
|
generation.set(1);
|
||||||
ScheduledExecutorService executor =
|
executor =
|
||||||
Executors.newScheduledThreadPool(
|
Executors.newScheduledThreadPool(
|
||||||
1,
|
1,
|
||||||
new ThreadFactoryBuilder()
|
new ThreadFactoryBuilder()
|
||||||
@@ -68,6 +76,13 @@ public class ProjectCacheClock {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stop() {
|
||||||
|
if (executor != null) {
|
||||||
|
executor.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
long read() {
|
long read() {
|
||||||
return generation.get();
|
return generation.get();
|
||||||
}
|
}
|
||||||
|
@@ -20,7 +20,7 @@ import com.google.common.base.Throwables;
|
|||||||
import com.google.common.cache.CacheLoader;
|
import com.google.common.cache.CacheLoader;
|
||||||
import com.google.common.cache.LoadingCache;
|
import com.google.common.cache.LoadingCache;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import com.google.gerrit.extensions.events.LifecycleListener;
|
import com.google.gerrit.lifecycle.LifecycleModule;
|
||||||
import com.google.gerrit.reviewdb.client.AccountGroup;
|
import com.google.gerrit.reviewdb.client.AccountGroup;
|
||||||
import com.google.gerrit.reviewdb.client.Project;
|
import com.google.gerrit.reviewdb.client.Project;
|
||||||
import com.google.gerrit.server.cache.CacheModule;
|
import com.google.gerrit.server.cache.CacheModule;
|
||||||
@@ -32,7 +32,6 @@ import com.google.inject.Inject;
|
|||||||
import com.google.inject.Module;
|
import com.google.inject.Module;
|
||||||
import com.google.inject.Singleton;
|
import com.google.inject.Singleton;
|
||||||
import com.google.inject.TypeLiteral;
|
import com.google.inject.TypeLiteral;
|
||||||
import com.google.inject.internal.UniqueAnnotations;
|
|
||||||
import com.google.inject.name.Named;
|
import com.google.inject.name.Named;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
@@ -69,9 +68,15 @@ public class ProjectCacheImpl implements ProjectCache {
|
|||||||
|
|
||||||
bind(ProjectCacheImpl.class);
|
bind(ProjectCacheImpl.class);
|
||||||
bind(ProjectCache.class).to(ProjectCacheImpl.class);
|
bind(ProjectCache.class).to(ProjectCacheImpl.class);
|
||||||
bind(LifecycleListener.class)
|
|
||||||
.annotatedWith(UniqueAnnotations.create())
|
install(
|
||||||
.to(ProjectCacheWarmer.class);
|
new LifecycleModule() {
|
||||||
|
@Override
|
||||||
|
protected void configure() {
|
||||||
|
listener().to(ProjectCacheWarmer.class);
|
||||||
|
listener().to(ProjectCacheClock.class);
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@@ -121,6 +121,12 @@ public abstract class AbstractQueryAccountsTest extends GerritServerTests {
|
|||||||
currentUserInfo = gApi.accounts().id(userId.get()).get();
|
currentUserInfo = gApi.accounts().id(userId.get()).get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void cleanUp() {
|
||||||
|
lifecycle.stop();
|
||||||
|
db.close();
|
||||||
|
}
|
||||||
|
|
||||||
protected RequestContext newRequestContext(Account.Id requestUserId) {
|
protected RequestContext newRequestContext(Account.Id requestUserId) {
|
||||||
final CurrentUser requestUser = userFactory.create(requestUserId);
|
final CurrentUser requestUser = userFactory.create(requestUserId);
|
||||||
return new RequestContext() {
|
return new RequestContext() {
|
||||||
|
@@ -172,6 +172,12 @@ public abstract class AbstractQueryChangesTest extends GerritServerTests {
|
|||||||
setUpDatabase();
|
setUpDatabase();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void cleanUp() {
|
||||||
|
lifecycle.stop();
|
||||||
|
db.close();
|
||||||
|
}
|
||||||
|
|
||||||
protected void setUpDatabase() throws Exception {
|
protected void setUpDatabase() throws Exception {
|
||||||
db = schemaFactory.open();
|
db = schemaFactory.open();
|
||||||
schemaCreator.create(db);
|
schemaCreator.create(db);
|
||||||
|
@@ -121,6 +121,12 @@ public abstract class AbstractQueryGroupsTest extends GerritServerTests {
|
|||||||
currentUserInfo = gApi.accounts().id(userId.get()).get();
|
currentUserInfo = gApi.accounts().id(userId.get()).get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void cleanUp() {
|
||||||
|
lifecycle.stop();
|
||||||
|
db.close();
|
||||||
|
}
|
||||||
|
|
||||||
protected RequestContext newRequestContext(Account.Id requestUserId) {
|
protected RequestContext newRequestContext(Account.Id requestUserId) {
|
||||||
final CurrentUser requestUser = userFactory.create(requestUserId);
|
final CurrentUser requestUser = userFactory.create(requestUserId);
|
||||||
return new RequestContext() {
|
return new RequestContext() {
|
||||||
|
Reference in New Issue
Block a user