From 4d295284e2fff0b2a4ba194b7fcec9928687c1c3 Mon Sep 17 00:00:00 2001 From: Edwin Kempin Date: Fri, 3 Nov 2017 10:16:43 +0100 Subject: [PATCH] Properly await termination of all index executor threads on shutdown When Gerrit is shutdown we must wait for all currently running index threads to finish because otherwise these index events are lost and the index becomes stale. There are 2 executors for indexing, one for interactive indexing and one for batch indexing. A first attempt to fix this was done by change I7bda130585, but it broke the site after online reindexing (see issue 4618) and hence it was reverted by change I6ec90eb733. Change Id332ec0215 fixed the issue partly. In addition to the Index#close() method which closes the index it added a Index#stop() method that stops the (interactive) index executor and awaits the termination of all running index threads. The Index#stop() method is only called when Gerrit is shutdown, while Index#close() is also invoked when online reindexing is done. There are several issues with this fix: 1. It only cared about the interactive index executor, but not about the batch index executor. 2. The IndexCollection#stop() method which is called on Gerrit shutdown missed to call Index#stop() on the write index if it was the same Index instance as the read index. 3. Although documented developers were confused about Index having both a close() and a stop() method and there was a danger of calling stop() accidentally and thus break all further indexing. 4. Closing the interactive index executor when stopping the change index assumed that the inactive index executor is only used by the change index. This assumption is currently true, but only because the other indexes (account, group, project) missed to use this executor. 5. The interactive index executor was only stopped when Lucene was used, but not when ElasticSearch was used. Due to 1. and 2. all query tests (e.g. LuceneQueryChangesTest) were leaking threads. Each test case accumulated more running (interactive and batch) index threads. These threads consumed memory and in the end this possibly resulted in an OutOfMemoryError (only for running LuceneQueryChangesTest on Mac, see issue 7611). To fix this IndexModule adds a new LifecycleListener that is responsible for shutting down both index executors: - Both index executors are now shutdown (interactive and batch). - The stop() method of LifecycleListeners is only invoked when Gerrit is shutdown and it's unlikely to be triggered during normal Gerrit runtime (e.g. when online reindexing is done). - There is exactly one place that shuts down the index executors (as opposed to n indexes trying to take care of it). This makes sure that the executors can be shared across indexes. - The stop() method from Index is removed so that there is less confusion about stop() vs. close(). One important thing is that the executors are shutdown before closing the indexes. This ensures that all writes have been performed in-memory before the final flush to disk is done. This means that the LifecycleListener to stop the executors must be registered after the LifecycleListeners that close the indexes (on Gerrit start the LifecycleListeners are invoked in the order in which they are registered, on shutdown of Gerrit the order is reversed). IndexModule also allows to provide external executors for the interactive and batch indexing (instead of creating new executors). In this case IndexModule doesn't register a LifecycleListener to shutdown the executors since they are externally managed. Providing external executors is only used by Google for their multi-tenant setup. In this setup the executors are global singletons in the process that are shared by many sites and shutting down one site must not shut down the global executors. This fixes the OutOfMemoryError that was observed on Mac when running the LuceneQueryChangesTest (see issue 7611). Bug: Issue 7611 Change-Id: I093fb19b56aba90fbb7579f0e59da435c4c1b702 Signed-off-by: Edwin Kempin --- .../java/com/google/gerrit/index/Index.java | 3 -- .../google/gerrit/index/IndexCollection.java | 1 - .../gerrit/lucene/LuceneChangeIndex.java | 7 ---- .../gerrit/server/index/IndexModule.java | 41 +++++++++++++++++++ 4 files changed, 41 insertions(+), 11 deletions(-) diff --git a/gerrit-index/src/main/java/com/google/gerrit/index/Index.java b/gerrit-index/src/main/java/com/google/gerrit/index/Index.java index 739c358a99..34f7d33b7c 100644 --- a/gerrit-index/src/main/java/com/google/gerrit/index/Index.java +++ b/gerrit-index/src/main/java/com/google/gerrit/index/Index.java @@ -36,9 +36,6 @@ public interface Index { /** @return the schema version used by this index. */ Schema getSchema(); - /** Stop and await termination of all executor threads */ - default void stop() {} - /** Close this index. */ void close(); diff --git a/gerrit-index/src/main/java/com/google/gerrit/index/IndexCollection.java b/gerrit-index/src/main/java/com/google/gerrit/index/IndexCollection.java index 77cabd19e0..2837f7e4d1 100644 --- a/gerrit-index/src/main/java/com/google/gerrit/index/IndexCollection.java +++ b/gerrit-index/src/main/java/com/google/gerrit/index/IndexCollection.java @@ -95,7 +95,6 @@ public abstract class IndexCollection> implements Li } for (I write : writeIndexes) { if (write != read) { - write.stop(); write.close(); } } diff --git a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneChangeIndex.java b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneChangeIndex.java index 80078dcaa4..2912733d48 100644 --- a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneChangeIndex.java +++ b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneChangeIndex.java @@ -35,7 +35,6 @@ import com.google.common.collect.MultimapBuilder; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; import com.google.gerrit.index.QueryOptions; import com.google.gerrit.index.Schema; import com.google.gerrit.index.query.Predicate; @@ -74,7 +73,6 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexableField; @@ -187,11 +185,6 @@ public class LuceneChangeIndex implements ChangeIndex { } } - @Override - public void stop() { - MoreExecutors.shutdownAndAwaitTermination(executor, Long.MAX_VALUE, TimeUnit.SECONDS); - } - @Override public void close() { try { diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/index/IndexModule.java b/gerrit-server/src/main/java/com/google/gerrit/server/index/IndexModule.java index 6854a8772f..c14f4db264 100644 --- a/gerrit-server/src/main/java/com/google/gerrit/server/index/IndexModule.java +++ b/gerrit-server/src/main/java/com/google/gerrit/server/index/IndexModule.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableCollection; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import com.google.gerrit.extensions.events.LifecycleListener; import com.google.gerrit.extensions.registration.DynamicSet; import com.google.gerrit.index.IndexDefinition; import com.google.gerrit.index.SchemaDefinitions; @@ -45,6 +46,7 @@ import com.google.gerrit.server.index.group.GroupIndexRewriter; import com.google.gerrit.server.index.group.GroupIndexer; import com.google.gerrit.server.index.group.GroupIndexerImpl; import com.google.gerrit.server.index.group.GroupSchemaDefinitions; +import com.google.inject.Inject; import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Provides; @@ -52,6 +54,7 @@ import com.google.inject.ProvisionException; import com.google.inject.Singleton; import java.util.Collection; import java.util.Set; +import java.util.concurrent.TimeUnit; import org.eclipse.jgit.lib.Config; /** @@ -81,11 +84,13 @@ public class IndexModule extends LifecycleModule { private final int threads; private final ListeningExecutorService interactiveExecutor; private final ListeningExecutorService batchExecutor; + private final boolean closeExecutorsOnShutdown; public IndexModule(int threads) { this.threads = threads; this.interactiveExecutor = null; this.batchExecutor = null; + this.closeExecutorsOnShutdown = true; } public IndexModule( @@ -93,6 +98,7 @@ public class IndexModule extends LifecycleModule { this.threads = -1; this.interactiveExecutor = interactiveExecutor; this.batchExecutor = batchExecutor; + this.closeExecutorsOnShutdown = false; } @Override @@ -112,6 +118,17 @@ public class IndexModule extends LifecycleModule { listener().to(GroupIndexCollection.class); factory(GroupIndexerImpl.Factory.class); + if (closeExecutorsOnShutdown) { + // The executors must be shutdown _before_ closing the indexes. + // On Gerrit start the LifecycleListeners are invoked in the order in which they are + // registered, but on shutdown of Gerrit the order is reversed. This means the + // LifecycleListener to shutdown the executors must be registered _after_ the + // LifecycleListeners that close the indexes. The closing of the indexes is done by + // *IndexCollection which have been registered as LifecycleListener above. The + // registration of the ShutdownIndexExecutors LifecycleListener must happen afterwards. + listener().to(ShutdownIndexExecutors.class); + } + DynamicSet.setOf(binder(), OnlineUpgradeListener.class); } @@ -186,4 +203,28 @@ public class IndexModule extends LifecycleModule { } return MoreExecutors.listeningDecorator(workQueue.createQueue(threads, "Index-Batch")); } + + @Singleton + private static class ShutdownIndexExecutors implements LifecycleListener { + private final ListeningExecutorService interactiveExecutor; + private final ListeningExecutorService batchExecutor; + + @Inject + ShutdownIndexExecutors( + @IndexExecutor(INTERACTIVE) ListeningExecutorService interactiveExecutor, + @IndexExecutor(BATCH) ListeningExecutorService batchExecutor) { + this.interactiveExecutor = interactiveExecutor; + this.batchExecutor = batchExecutor; + } + + @Override + public void start() {} + + @Override + public void stop() { + MoreExecutors.shutdownAndAwaitTermination( + interactiveExecutor, Long.MAX_VALUE, TimeUnit.SECONDS); + MoreExecutors.shutdownAndAwaitTermination(batchExecutor, Long.MAX_VALUE, TimeUnit.SECONDS); + } + } }