Control threadpool behavior for indexing

For the Reindex command, use a new threadpool defaulting to the number
of available processors, under the assumption that the bulk of
indexing time is in CPU-intensive work like tree diffing. Modify
ChangeIndexer to return ListenableFutures so we can bound the size of
the work queue.

For the server, default to using the default work queue, but make this
configurable as index.threads to use a dedicated threadpool, with
non-positive values indicating the default.

Because we now have more non-Lucene-specific Guice logic, split that
out into its own module.

Change-Id: I55181e556a2d43b81c9032f53b74b690342ab62b
This commit is contained in:
Dave Borowitz 2013-05-29 15:03:12 -07:00
parent 20035a4dc9
commit c64399fe93
8 changed files with 244 additions and 69 deletions

View File

@ -15,38 +15,26 @@
package com.google.gerrit.lucene;
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.index.ChangeIndex;
import com.google.gerrit.server.index.ChangeIndexer;
import com.google.gerrit.server.index.ChangeIndexerImpl;
import com.google.gerrit.server.query.change.IndexRewrite;
import com.google.gerrit.server.query.change.IndexRewriteImpl;
import com.google.inject.Injector;
import com.google.inject.Key;
import org.eclipse.jgit.lib.Config;
import com.google.gerrit.server.index.IndexModule;
public class LuceneIndexModule extends LifecycleModule {
public static boolean isEnabled(Injector injector) {
return injector.getInstance(Key.get(Config.class, GerritServerConfig.class))
.getBoolean("index", null, "enabled", false);
}
private final boolean checkVersion;
private final int threads;
public LuceneIndexModule() {
this(true);
this(true, 0);
}
public LuceneIndexModule(boolean checkVersion) {
public LuceneIndexModule(boolean checkVersion, int threads) {
this.checkVersion = checkVersion;
this.threads = threads;
}
@Override
protected void configure() {
install(new IndexModule(threads));
bind(ChangeIndex.Manager.class).to(LuceneChangeIndexManager.class);
bind(ChangeIndexer.class).to(ChangeIndexerImpl.class);
bind(IndexRewrite.class).to(IndexRewriteImpl.class);
listener().to(LuceneChangeIndexManager.class);
if (checkVersion) {
listener().to(IndexVersionCheck.class);

View File

@ -51,6 +51,7 @@ import com.google.gerrit.server.config.MasterNodeStartup;
import com.google.gerrit.server.contact.HttpContactStoreConnection;
import com.google.gerrit.server.git.ReceiveCommitsExecutorModule;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.index.IndexModule;
import com.google.gerrit.server.index.NoIndexModule;
import com.google.gerrit.server.mail.SignedTokenEmailTokenVerifier;
import com.google.gerrit.server.mail.SmtpEmailSender;
@ -321,7 +322,7 @@ public class Daemon extends SiteProgram {
modules.add(new SmtpEmailSender.Module());
modules.add(new SignedTokenEmailTokenVerifier.Module());
modules.add(new PluginModule());
if (LuceneIndexModule.isEnabled(cfgInjector)) {
if (IndexModule.isEnabled(cfgInjector)) {
modules.add(new LuceneIndexModule());
} else {
modules.add(new NoIndexModule());

View File

@ -21,8 +21,12 @@ import static com.google.gerrit.server.schema.DataSourceProvider.Context.SINGLE_
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.lifecycle.LifecycleManager;
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.gerrit.lucene.LuceneIndexModule;
import com.google.gerrit.pgm.util.SiteProgram;
import com.google.gerrit.reviewdb.client.Change;
@ -31,13 +35,16 @@ import com.google.gerrit.server.cache.CacheRemovalListener;
import com.google.gerrit.server.cache.h2.DefaultCacheFactory;
import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.index.ChangeIndexer;
import com.google.gerrit.server.index.IndexModule;
import com.google.gerrit.server.patch.PatchListCacheImpl;
import com.google.gwtorm.server.OrmException;
import com.google.gwtorm.server.SchemaFactory;
import com.google.inject.AbstractModule;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provider;
import com.google.inject.ProvisionException;
import com.google.inject.TypeLiteral;
import org.apache.lucene.store.Directory;
@ -45,18 +52,26 @@ import org.apache.lucene.store.FSDirectory;
import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.storage.file.FileBasedConfig;
import org.eclipse.jgit.util.FS;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicBoolean;
public class Reindex extends SiteProgram {
private final LifecycleManager manager = new LifecycleManager();
private final AtomicReference<ReviewDb> dbRef =
new AtomicReference<ReviewDb>();
private static final Logger log = LoggerFactory.getLogger(Reindex.class);
@Option(name = "--threads", usage = "Number of threads to use for indexing")
private int threads = Runtime.getRuntime().availableProcessors();
private Injector dbInjector;
private Injector sysInjector;
private SitePaths sitePaths;
@ -65,53 +80,37 @@ public class Reindex extends SiteProgram {
public int run() throws Exception {
mustHaveValidSite();
dbInjector = createDbInjector(SINGLE_USER);
if (!LuceneIndexModule.isEnabled(dbInjector)) {
if (!IndexModule.isEnabled(dbInjector)) {
throw die("Secondary index not enabled");
}
LifecycleManager dbManager = new LifecycleManager();
dbManager.add(dbInjector);
dbManager.start();
sitePaths = dbInjector.getInstance(SitePaths.class);
deleteAll();
sysInjector = createSysInjector();
manager.add(dbInjector);
manager.add(sysInjector);
manager.start();
LifecycleManager sysManager = new LifecycleManager();
sysManager.add(sysInjector);
sysManager.start();
SchemaFactory<ReviewDb> schema = dbInjector.getInstance(
Key.get(new TypeLiteral<SchemaFactory<ReviewDb>>() {}));
ReviewDb db = schema.open();
dbRef.set(db);
ChangeIndexer indexer = sysInjector.getInstance(ChangeIndexer.class);
Stopwatch sw = new Stopwatch().start();
int i = 0;
for (Change change : db.changes().all()) {
indexer.index(change).get();
i++;
}
double elapsed = sw.elapsed(TimeUnit.MILLISECONDS) / 1000d;
System.out.format("Reindexed %d changes in %.02fms", i, elapsed);
deleteAll();
int result = indexAll();
writeVersion();
manager.stop();
return 0;
sysManager.stop();
dbManager.stop();
return result;
}
private Injector createSysInjector() {
List<Module> modules = Lists.newArrayList();
modules.add(PatchListCacheImpl.module());
modules.add(new LuceneIndexModule(false));
modules.add(new LuceneIndexModule(false, threads));
modules.add(new ReviewDbModule());
modules.add(new AbstractModule() {
@SuppressWarnings("rawtypes")
@Override
protected void configure() {
bind(ReviewDb.class).toProvider(new Provider<ReviewDb>() {
@Override
public ReviewDb get() {
return dbRef.get();
}
});
// Plugins are not loaded and we're just running through each change
// once, so don't worry about cache removal.
bind(new TypeLiteral<DynamicSet<CacheRemovalListener>>() {})
@ -122,6 +121,47 @@ public class Reindex extends SiteProgram {
return dbInjector.createChildInjector(modules);
}
private class ReviewDbModule extends LifecycleModule {
@Override
protected void configure() {
final SchemaFactory<ReviewDb> schema = dbInjector.getInstance(
Key.get(new TypeLiteral<SchemaFactory<ReviewDb>>() {}));
final List<ReviewDb> dbs = Collections.synchronizedList(
Lists.<ReviewDb> newArrayListWithCapacity(threads + 1));
final ThreadLocal<ReviewDb> localDb = new ThreadLocal<ReviewDb>();
bind(ReviewDb.class).toProvider(new Provider<ReviewDb>() {
@Override
public ReviewDb get() {
ReviewDb db = localDb.get();
if (db == null) {
try {
db = schema.open();
dbs.add(db);
localDb.set(db);
} catch (OrmException e) {
throw new ProvisionException("unable to open ReviewDb", e);
}
}
return db;
}
});
listener().toInstance(new LifecycleListener() {
@Override
public void start() {
// Do nothing.
}
@Override
public void stop() {
for (ReviewDb db : dbs) {
db.close();
}
}
});
}
}
private void deleteAll() throws IOException {
for (String index : SCHEMA_VERSIONS.keySet()) {
File file = new File(sitePaths.index_dir, index);
@ -138,7 +178,44 @@ public class Reindex extends SiteProgram {
}
}
private void writeVersion() throws IOException, ConfigInvalidException {
private int indexAll() throws Exception {
ReviewDb db = sysInjector.getInstance(ReviewDb.class);
ChangeIndexer indexer = sysInjector.getInstance(ChangeIndexer.class);
Stopwatch sw = new Stopwatch().start();
int queueLen = 2 * threads;
final Semaphore sem = new Semaphore(queueLen);
final AtomicBoolean ok = new AtomicBoolean(true);
int i = 0;
for (final Change change : db.changes().all()) {
sem.acquire();
final ListenableFuture<?> future = indexer.index(change);
future.addListener(new Runnable() {
@Override
public void run() {
try {
future.get();
} catch (InterruptedException e) {
log.error("Failed to index change " + change.getId(), e);
ok.set(false);
} catch (ExecutionException e) {
log.error("Failed to index change " + change.getId(), e);
ok.set(false);
} finally {
sem.release();
}
}
}, MoreExecutors.sameThreadExecutor());
i++;
}
sem.acquire(queueLen);
double elapsed = sw.elapsed(TimeUnit.MILLISECONDS) / 1000d;
System.out.format("Reindexed %d changes in %.02fms", i, elapsed);
return ok.get() ? 0 : 1;
}
private void writeVersion() throws IOException,
ConfigInvalidException {
FileBasedConfig cfg =
new FileBasedConfig(gerritIndexConfig(sitePaths), FS.detect());
cfg.load();

View File

@ -15,11 +15,10 @@
package com.google.gerrit.server.index;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.server.util.RequestScopePropagator;
import java.util.concurrent.Future;
/**
* Helper for (re)indexing a change document.
* <p>
@ -30,12 +29,13 @@ public interface ChangeIndexer {
/** Instance indicating secondary index is disabled. */
public static final ChangeIndexer DISABLED = new ChangeIndexer() {
@Override
public Future<?> index(Change change) {
public ListenableFuture<?> index(Change change) {
return Futures.immediateFuture(null);
}
@Override
public Future<?> index(Change change, RequestScopePropagator prop) {
public ListenableFuture<?> index(Change change,
RequestScopePropagator prop) {
return Futures.immediateFuture(null);
}
};
@ -44,14 +44,16 @@ public interface ChangeIndexer {
* Start indexing a change.
*
* @param change change to index.
* @return future for the indexing task.
*/
public Future<?> index(Change change);
public ListenableFuture<?> index(Change change);
/**
* Start indexing a change.
*
* @param change change to index.
* @param prop propagator to wrap any created runnables in.
* @return future for the indexing task.
*/
public Future<?> index(Change change, RequestScopePropagator prop);
public ListenableFuture<?> index(Change change, RequestScopePropagator prop);
}

View File

@ -14,8 +14,9 @@
package com.google.gerrit.server.index;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.query.change.ChangeData;
import com.google.gerrit.server.util.RequestScopePropagator;
import com.google.inject.Inject;
@ -24,7 +25,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.Future;
/**
* Helper for (re)indexing a change document.
@ -36,30 +36,31 @@ public class ChangeIndexerImpl implements ChangeIndexer {
private static final Logger log =
LoggerFactory.getLogger(ChangeIndexerImpl.class);
private final WorkQueue workQueue;
private final ListeningScheduledExecutorService executor;
private final ChangeIndex openIndex;
private final ChangeIndex closedIndex;
@Inject
ChangeIndexerImpl(WorkQueue workQueue,
ChangeIndexerImpl(@IndexExecutor ListeningScheduledExecutorService executor,
ChangeIndex.Manager indexManager) throws IOException {
this.workQueue = workQueue;
this.executor = executor;
this.openIndex = indexManager.get("changes_open");
this.closedIndex = indexManager.get("changes_closed");
}
@Override
public Future<?> index(Change change) {
public ListenableFuture<?> index(Change change) {
return index(change, null);
}
@Override
public Future<?> index(Change change, RequestScopePropagator prop) {
public ListenableFuture<?> index(Change change,
RequestScopePropagator prop) {
Runnable task = new Task(change);
if (prop != null) {
task = prop.wrap(task);
}
return workQueue.getDefaultQueue().submit(task);
return executor.submit(task);
}
private class Task implements Runnable {

View File

@ -0,0 +1,31 @@
// Copyright (C) 2013 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.package com.google.gerrit.server.git;
package com.google.gerrit.server.index;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.Retention;
/**
* Marker on {@link ListeningScheduledExecutorService} used by secondary
* indexing threads.
*/
@Retention(RUNTIME)
@BindingAnnotation
@interface IndexExecutor {
}

View File

@ -0,0 +1,74 @@
// Copyright (C) 2013 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.package com.google.gerrit.server.git;
package com.google.gerrit.server.index;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.git.WorkQueue.Executor;
import com.google.gerrit.server.query.change.IndexRewrite;
import com.google.gerrit.server.query.change.IndexRewriteImpl;
import com.google.inject.AbstractModule;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import org.eclipse.jgit.lib.Config;
/**
* Module for non-indexer-specific secondary index setup.
* <p>
* This module should not be used directly except by specific secondary indexer
* implementations (e.g. Lucene).
*/
public class IndexModule extends AbstractModule {
public static boolean isEnabled(Injector injector) {
return injector.getInstance(Key.get(Config.class, GerritServerConfig.class))
.getBoolean("index", null, "enabled", false);
}
private final int threads;
public IndexModule(int threads) {
this.threads = threads;
}
@Override
protected void configure() {
bind(ChangeIndexer.class).to(ChangeIndexerImpl.class);
bind(IndexRewrite.class).to(IndexRewriteImpl.class);
}
@Provides
@Singleton
@IndexExecutor
ListeningScheduledExecutorService getIndexExecutor(
@GerritServerConfig Config config,
WorkQueue workQueue) {
int threads = this.threads;
if (threads <= 0) {
threads = config.getInt("index", null, "threads", 0);
}
Executor executor;
if (threads <= 0) {
executor = workQueue.getDefaultQueue();
} else {
executor = workQueue.createQueue(threads, "index");
}
return MoreExecutors.listeningDecorator(executor);
}
}

View File

@ -37,6 +37,7 @@ import com.google.gerrit.server.contact.HttpContactStoreConnection;
import com.google.gerrit.server.git.LocalDiskRepositoryManager;
import com.google.gerrit.server.git.ReceiveCommitsExecutorModule;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.index.IndexModule;
import com.google.gerrit.server.index.NoIndexModule;
import com.google.gerrit.server.mail.SignedTokenEmailTokenVerifier;
import com.google.gerrit.server.mail.SmtpEmailSender;
@ -237,7 +238,7 @@ public class WebAppInitializer extends GuiceServletContextListener {
modules.add(new SmtpEmailSender.Module());
modules.add(new SignedTokenEmailTokenVerifier.Module());
modules.add(new PluginModule());
if (LuceneIndexModule.isEnabled(cfgInjector)) {
if (IndexModule.isEnabled(cfgInjector)) {
modules.add(new LuceneIndexModule());
} else {
modules.add(new NoIndexModule());