Properly await termination of all index executor threads on shutdown

When Gerrit is shutdown we must wait for all currently running index
threads to finish because otherwise these index events are lost and the
index becomes stale.

There are 2 executors for indexing, one for interactive indexing and one
for batch indexing.

A first attempt to fix this was done by change I7bda130585, but it broke
the site after online reindexing (see issue 4618) and hence it was
reverted by change I6ec90eb733.

Change Id332ec0215 fixed the issue partly. In addition to the
Index#close() method which closes the index it added a Index#stop()
method that stops the (interactive) index executor and awaits the
termination of all running index threads. The Index#stop() method is
only called when Gerrit is shutdown, while Index#close() is also invoked
when online reindexing is done. There are several issues with this fix:

1. It only cared about the interactive index executor, but not about the
   batch index executor.
2. The IndexCollection#stop() method which is called on Gerrit shutdown
   missed to call Index#stop() on the write index if it was the same
   Index instance as the read index.
3. Although documented developers were confused about Index having both
   a close() and a stop() method and there was a danger of calling
   stop() accidentally and thus break all further indexing.
4. Closing the interactive index executor when stopping the change index
   assumed that the inactive index executor is only used by the change
   index. This assumption is currently true, but only because the other
   indexes (account, group, project) missed to use this executor.
5. The interactive index executor was only stopped when Lucene was used,
   but not when ElasticSearch was used.

Due to 1. and 2. all query tests (e.g. LuceneQueryChangesTest) were
leaking threads. Each test case accumulated more running (interactive
and batch) index threads. These threads consumed memory and in the end
this possibly resulted in an OutOfMemoryError (only for running
LuceneQueryChangesTest on Mac, see issue 7611).

To fix this IndexModule adds a new LifecycleListener that is responsible
for shutting down both index executors:

- Both index executors are now shutdown (interactive and batch).
- The stop() method of LifecycleListeners is only invoked when Gerrit is
  shutdown and it's unlikely to be triggered during normal Gerrit
  runtime (e.g. when online reindexing is done).
- There is exactly one place that shuts down the index executors (as
  opposed to n indexes trying to take care of it). This makes sure that
  the executors can be shared across indexes.
- The stop() method from Index is removed so that there is less
  confusion about stop() vs. close().

One important thing is that the executors are shutdown before closing
the indexes. This ensures that all writes have been performed in-memory
before the final flush to disk is done. This means that the
LifecycleListener to stop the executors must be registered after the
LifecycleListeners that close the indexes (on Gerrit start the
LifecycleListeners are invoked in the order in which they are
registered, on shutdown of Gerrit the order is reversed).

IndexModule also allows to provide external executors for the
interactive and batch indexing (instead of creating new executors). In
this case IndexModule doesn't register a LifecycleListener to shutdown
the executors since they are externally managed. Providing external
executors is only used by Google for their multi-tenant setup. In this
setup the executors are global singletons in the process that are shared
by many sites and shutting down one site must not shut down the global
executors.

This fixes the OutOfMemoryError that was observed on Mac when running
the LuceneQueryChangesTest (see issue 7611).

Bug: Issue 7611
Change-Id: I093fb19b56aba90fbb7579f0e59da435c4c1b702
Signed-off-by: Edwin Kempin <ekempin@google.com>
This commit is contained in:
Edwin Kempin 2017-11-03 10:16:43 +01:00
parent 7ba0e151e4
commit 4d295284e2
4 changed files with 41 additions and 11 deletions

View File

@ -36,9 +36,6 @@ public interface Index<K, V> {
/** @return the schema version used by this index. */
Schema<V> getSchema();
/** Stop and await termination of all executor threads */
default void stop() {}
/** Close this index. */
void close();

View File

@ -95,7 +95,6 @@ public abstract class IndexCollection<K, V, I extends Index<K, V>> implements Li
}
for (I write : writeIndexes) {
if (write != read) {
write.stop();
write.close();
}
}

View File

@ -35,7 +35,6 @@ import com.google.common.collect.MultimapBuilder;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.index.QueryOptions;
import com.google.gerrit.index.Schema;
import com.google.gerrit.index.query.Predicate;
@ -74,7 +73,6 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexableField;
@ -187,11 +185,6 @@ public class LuceneChangeIndex implements ChangeIndex {
}
}
@Override
public void stop() {
MoreExecutors.shutdownAndAwaitTermination(executor, Long.MAX_VALUE, TimeUnit.SECONDS);
}
@Override
public void close() {
try {

View File

@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicSet;
import com.google.gerrit.index.IndexDefinition;
import com.google.gerrit.index.SchemaDefinitions;
@ -45,6 +46,7 @@ import com.google.gerrit.server.index.group.GroupIndexRewriter;
import com.google.gerrit.server.index.group.GroupIndexer;
import com.google.gerrit.server.index.group.GroupIndexerImpl;
import com.google.gerrit.server.index.group.GroupSchemaDefinitions;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Provides;
@ -52,6 +54,7 @@ import com.google.inject.ProvisionException;
import com.google.inject.Singleton;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.eclipse.jgit.lib.Config;
/**
@ -81,11 +84,13 @@ public class IndexModule extends LifecycleModule {
private final int threads;
private final ListeningExecutorService interactiveExecutor;
private final ListeningExecutorService batchExecutor;
private final boolean closeExecutorsOnShutdown;
public IndexModule(int threads) {
this.threads = threads;
this.interactiveExecutor = null;
this.batchExecutor = null;
this.closeExecutorsOnShutdown = true;
}
public IndexModule(
@ -93,6 +98,7 @@ public class IndexModule extends LifecycleModule {
this.threads = -1;
this.interactiveExecutor = interactiveExecutor;
this.batchExecutor = batchExecutor;
this.closeExecutorsOnShutdown = false;
}
@Override
@ -112,6 +118,17 @@ public class IndexModule extends LifecycleModule {
listener().to(GroupIndexCollection.class);
factory(GroupIndexerImpl.Factory.class);
if (closeExecutorsOnShutdown) {
// The executors must be shutdown _before_ closing the indexes.
// On Gerrit start the LifecycleListeners are invoked in the order in which they are
// registered, but on shutdown of Gerrit the order is reversed. This means the
// LifecycleListener to shutdown the executors must be registered _after_ the
// LifecycleListeners that close the indexes. The closing of the indexes is done by
// *IndexCollection which have been registered as LifecycleListener above. The
// registration of the ShutdownIndexExecutors LifecycleListener must happen afterwards.
listener().to(ShutdownIndexExecutors.class);
}
DynamicSet.setOf(binder(), OnlineUpgradeListener.class);
}
@ -186,4 +203,28 @@ public class IndexModule extends LifecycleModule {
}
return MoreExecutors.listeningDecorator(workQueue.createQueue(threads, "Index-Batch"));
}
@Singleton
private static class ShutdownIndexExecutors implements LifecycleListener {
private final ListeningExecutorService interactiveExecutor;
private final ListeningExecutorService batchExecutor;
@Inject
ShutdownIndexExecutors(
@IndexExecutor(INTERACTIVE) ListeningExecutorService interactiveExecutor,
@IndexExecutor(BATCH) ListeningExecutorService batchExecutor) {
this.interactiveExecutor = interactiveExecutor;
this.batchExecutor = batchExecutor;
}
@Override
public void start() {}
@Override
public void stop() {
MoreExecutors.shutdownAndAwaitTermination(
interactiveExecutor, Long.MAX_VALUE, TimeUnit.SECONDS);
MoreExecutors.shutdownAndAwaitTermination(batchExecutor, Long.MAX_VALUE, TimeUnit.SECONDS);
}
}
}