Separate change indexer threads into interactive/batch pools
This mirrors the behavior for mergeability checks, and allows isolation of online reindexing threads from interactive threads used by the current index version. Change-Id: Id699fdadb325ac7ee7ad12cbf3dffc1a96afc97e
This commit is contained in:
@@ -2036,10 +2036,18 @@ By default, `LUCENE`.
|
||||
|
||||
[[index.threads]]index.threads::
|
||||
+
|
||||
Determines the number of threads to use for indexing.
|
||||
Number of threads to use for indexing in normal interactive operations.
|
||||
+
|
||||
Defaults to 1 if not set, or set to a negative value.
|
||||
|
||||
[[index.batchThreads]]index.batchThreads::
|
||||
+
|
||||
Number of threads to use for indexing in background operations, such as
|
||||
online schema upgrades.
|
||||
+
|
||||
If not set or set to a negative value, defaults to using the same
|
||||
thread pool as interactive operations.
|
||||
|
||||
==== Lucene configuration
|
||||
|
||||
Open and closed changes are indexed in separate indexes named
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
package com.google.gerrit.lucene;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static com.google.gerrit.server.git.QueueProvider.QueueType.INTERACTIVE;
|
||||
import static com.google.gerrit.server.index.IndexRewriteImpl.CLOSED_STATUSES;
|
||||
import static com.google.gerrit.server.index.IndexRewriteImpl.OPEN_STATUSES;
|
||||
import static java.util.concurrent.TimeUnit.MILLISECONDS;
|
||||
@@ -229,7 +230,7 @@ public class LuceneChangeIndex implements ChangeIndex {
|
||||
LuceneChangeIndex(
|
||||
@GerritServerConfig Config cfg,
|
||||
SitePaths sitePaths,
|
||||
@IndexExecutor ListeningExecutorService executor,
|
||||
@IndexExecutor(INTERACTIVE) ListeningExecutorService executor,
|
||||
Provider<ReviewDb> db,
|
||||
ChangeData.Factory changeDataFactory,
|
||||
FillArgs fillArgs,
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
package com.google.gerrit.server.index;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static com.google.gerrit.server.git.QueueProvider.QueueType.BATCH;
|
||||
import static org.eclipse.jgit.lib.RefDatabase.ALL;
|
||||
|
||||
import com.google.common.base.Stopwatch;
|
||||
@@ -127,7 +128,7 @@ public class ChangeBatchIndexer {
|
||||
ChangeBatchIndexer(SchemaFactory<ReviewDb> schemaFactory,
|
||||
ChangeData.Factory changeDataFactory,
|
||||
GitRepositoryManager repoManager,
|
||||
@IndexExecutor ListeningExecutorService executor,
|
||||
@IndexExecutor(BATCH) ListeningExecutorService executor,
|
||||
ChangeIndexer.Factory indexerFactory,
|
||||
@GerritServerConfig Config config) {
|
||||
this.schemaFactory = schemaFactory;
|
||||
@@ -180,7 +181,7 @@ public class ChangeBatchIndexer {
|
||||
ok.set(false);
|
||||
}
|
||||
final ListenableFuture<?> future = executor.submit(reindexProject(
|
||||
indexerFactory.create(index), project, doneTask, failedTask,
|
||||
indexerFactory.create(executor, index), project, doneTask, failedTask,
|
||||
verboseWriter));
|
||||
futures.add(future);
|
||||
future.addListener(new Runnable() {
|
||||
|
||||
@@ -55,8 +55,9 @@ public class ChangeIndexer {
|
||||
LoggerFactory.getLogger(ChangeIndexer.class);
|
||||
|
||||
public interface Factory {
|
||||
ChangeIndexer create(ChangeIndex index);
|
||||
ChangeIndexer create(IndexCollection indexes);
|
||||
ChangeIndexer create(ListeningExecutorService executor, ChangeIndex index);
|
||||
ChangeIndexer create(ListeningExecutorService executor,
|
||||
IndexCollection indexes);
|
||||
}
|
||||
|
||||
private static final Function<Exception, IOException> MAPPER =
|
||||
@@ -82,10 +83,10 @@ public class ChangeIndexer {
|
||||
private final ListeningExecutorService executor;
|
||||
|
||||
@AssistedInject
|
||||
ChangeIndexer(@IndexExecutor ListeningExecutorService executor,
|
||||
SchemaFactory<ReviewDb> schemaFactory,
|
||||
ChangeIndexer(SchemaFactory<ReviewDb> schemaFactory,
|
||||
ChangeData.Factory changeDataFactory,
|
||||
ThreadLocalRequestContext context,
|
||||
@Assisted ListeningExecutorService executor,
|
||||
@Assisted ChangeIndex index) {
|
||||
this.executor = executor;
|
||||
this.schemaFactory = schemaFactory;
|
||||
@@ -96,10 +97,10 @@ public class ChangeIndexer {
|
||||
}
|
||||
|
||||
@AssistedInject
|
||||
ChangeIndexer(@IndexExecutor ListeningExecutorService executor,
|
||||
SchemaFactory<ReviewDb> schemaFactory,
|
||||
ChangeIndexer(SchemaFactory<ReviewDb> schemaFactory,
|
||||
ChangeData.Factory changeDataFactory,
|
||||
ThreadLocalRequestContext context,
|
||||
@Assisted ListeningExecutorService executor,
|
||||
@Assisted IndexCollection indexes) {
|
||||
this.executor = executor;
|
||||
this.schemaFactory = schemaFactory;
|
||||
|
||||
@@ -17,6 +17,7 @@ package com.google.gerrit.server.index;
|
||||
import static java.lang.annotation.RetentionPolicy.RUNTIME;
|
||||
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.gerrit.server.git.QueueProvider.QueueType;
|
||||
import com.google.inject.BindingAnnotation;
|
||||
|
||||
import java.lang.annotation.Retention;
|
||||
@@ -28,4 +29,5 @@ import java.lang.annotation.Retention;
|
||||
@Retention(RUNTIME)
|
||||
@BindingAnnotation
|
||||
public @interface IndexExecutor {
|
||||
QueueType value();
|
||||
}
|
||||
|
||||
@@ -14,6 +14,9 @@
|
||||
|
||||
package com.google.gerrit.server.index;
|
||||
|
||||
import static com.google.gerrit.server.git.QueueProvider.QueueType.BATCH;
|
||||
import static com.google.gerrit.server.git.QueueProvider.QueueType.INTERACTIVE;
|
||||
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.gerrit.lifecycle.LifecycleModule;
|
||||
@@ -21,7 +24,6 @@ import com.google.gerrit.server.config.GerritServerConfig;
|
||||
import com.google.gerrit.server.git.WorkQueue;
|
||||
import com.google.gerrit.server.query.change.BasicChangeRewrites;
|
||||
import com.google.gerrit.server.query.change.ChangeQueryRewriter;
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Injector;
|
||||
import com.google.inject.Key;
|
||||
import com.google.inject.Provides;
|
||||
@@ -48,16 +50,20 @@ public class IndexModule extends LifecycleModule {
|
||||
}
|
||||
|
||||
private final int threads;
|
||||
private final ListeningExecutorService indexExecutor;
|
||||
private final ListeningExecutorService interactiveExecutor;
|
||||
private final ListeningExecutorService batchExecutor;
|
||||
|
||||
public IndexModule(int threads) {
|
||||
this.threads = threads;
|
||||
this.indexExecutor = null;
|
||||
this.interactiveExecutor = null;
|
||||
this.batchExecutor = null;
|
||||
}
|
||||
|
||||
public IndexModule(ListeningExecutorService indexExecutor) {
|
||||
public IndexModule(ListeningExecutorService interactiveExecutor,
|
||||
ListeningExecutorService batchExecutor) {
|
||||
this.threads = -1;
|
||||
this.indexExecutor = indexExecutor;
|
||||
this.interactiveExecutor = interactiveExecutor;
|
||||
this.batchExecutor = batchExecutor;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -67,40 +73,28 @@ public class IndexModule extends LifecycleModule {
|
||||
bind(IndexCollection.class);
|
||||
listener().to(IndexCollection.class);
|
||||
factory(ChangeIndexer.Factory.class);
|
||||
|
||||
if (indexExecutor != null) {
|
||||
bind(ListeningExecutorService.class)
|
||||
.annotatedWith(IndexExecutor.class)
|
||||
.toInstance(indexExecutor);
|
||||
} else {
|
||||
install(new IndexExecutorModule(threads));
|
||||
}
|
||||
}
|
||||
|
||||
@Provides
|
||||
ChangeIndexer getChangeIndexer(
|
||||
ChangeIndexer.Factory factory,
|
||||
IndexCollection indexes) {
|
||||
return factory.create(indexes);
|
||||
}
|
||||
|
||||
private static class IndexExecutorModule extends AbstractModule {
|
||||
private final int threads;
|
||||
|
||||
private IndexExecutorModule(int threads) {
|
||||
this.threads = threads;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void configure() {
|
||||
}
|
||||
|
||||
@Provides
|
||||
@Singleton
|
||||
@IndexExecutor
|
||||
ListeningExecutorService getIndexExecutor(
|
||||
ChangeIndexer getChangeIndexer(
|
||||
@IndexExecutor(INTERACTIVE) ListeningExecutorService executor,
|
||||
ChangeIndexer.Factory factory,
|
||||
IndexCollection indexes) {
|
||||
// Bind default indexer to interactive executor; callers who need a
|
||||
// different executor can use the factory directly.
|
||||
return factory.create(executor, indexes);
|
||||
}
|
||||
|
||||
@Provides
|
||||
@Singleton
|
||||
@IndexExecutor(INTERACTIVE)
|
||||
ListeningExecutorService getInteractiveIndexExecutor(
|
||||
@GerritServerConfig Config config,
|
||||
WorkQueue workQueue) {
|
||||
if (interactiveExecutor != null) {
|
||||
return interactiveExecutor;
|
||||
}
|
||||
int threads = this.threads;
|
||||
if (threads <= 0) {
|
||||
threads = config.getInt("index", null, "threads", 0);
|
||||
@@ -109,7 +103,24 @@ public class IndexModule extends LifecycleModule {
|
||||
return MoreExecutors.newDirectExecutorService();
|
||||
}
|
||||
return MoreExecutors.listeningDecorator(
|
||||
workQueue.createQueue(threads, "index"));
|
||||
workQueue.createQueue(threads, "Index-Interactive"));
|
||||
}
|
||||
|
||||
@Provides
|
||||
@Singleton
|
||||
@IndexExecutor(BATCH)
|
||||
ListeningExecutorService getBatchIndexExecutor(
|
||||
@IndexExecutor(INTERACTIVE) ListeningExecutorService interactive,
|
||||
@GerritServerConfig Config config,
|
||||
WorkQueue workQueue) {
|
||||
if (batchExecutor != null) {
|
||||
return batchExecutor;
|
||||
}
|
||||
int threads = config.getInt("index", null, "batchThreads", 0);
|
||||
if (threads <= 0) {
|
||||
return interactive;
|
||||
}
|
||||
return MoreExecutors.listeningDecorator(
|
||||
workQueue.createQueue(threads, "Index-Batch"));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user