From 2797e9ac1b12e195f922c862a7c958bc15b0ab73 Mon Sep 17 00:00:00 2001 From: Shawn Pearce Date: Sun, 19 Jun 2016 23:45:33 -0700 Subject: [PATCH] Write each Lucene index using a dedicated background thread Like searches, it is not safe to interrupt the IndexWriter. It also reads from the NIOFSDirectory which closes file handles if the thread is interrupted (such as by SSH command being Ctrl-C'd). Although IndexWriter is thread safe it is essentially single threaded. Each of these methods acquires a lock on entry, manipulates the index, and releases the lock. There isn't a lot of value in allowing these to be running on parallel threads borrowed from Gerrit. Background (and serialize) all writes onto a single thread to prevent an interrupt on the application thread from passing into Lucene code. Change-Id: I54296d62fd9206b2ed2bbcbd5bbcc941890206a3 --- .../gerrit/lucene/AbstractLuceneIndex.java | 77 +++++++++++++++---- 1 file changed, 62 insertions(+), 15 deletions(-) diff --git a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/AbstractLuceneIndex.java b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/AbstractLuceneIndex.java index 1bbc75d195..eb0dfaacb7 100644 --- a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/AbstractLuceneIndex.java +++ b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/AbstractLuceneIndex.java @@ -19,7 +19,11 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import com.google.common.base.Joiner; import com.google.common.collect.Sets; import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.gerrit.server.config.SitePaths; import com.google.gerrit.server.index.FieldDef; @@ -54,8 +58,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.sql.Timestamp; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -84,6 +90,7 @@ public abstract class AbstractLuceneIndex implements Index { private final SitePaths sitePaths; private final Directory dir; private final String name; + private final ListeningExecutorService writerThread; private final TrackingIndexWriter writer; private final ReferenceManager searcherManager; private final ControlledRealTimeReopenThread reopenThread; @@ -117,7 +124,7 @@ public abstract class AbstractLuceneIndex implements Index { delegateWriter = autoCommitWriter; autoCommitExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder() - .setNameFormat("Commit-%d " + index) + .setNameFormat(index + " Commit-%d") .setDaemon(true) .build()); autoCommitExecutor.scheduleAtFixedRate(new Runnable() { @@ -148,11 +155,18 @@ public abstract class AbstractLuceneIndex implements Index { notDoneNrtFutures = Sets.newConcurrentHashSet(); + writerThread = MoreExecutors.listeningDecorator( + Executors.newFixedThreadPool(1, + new ThreadFactoryBuilder() + .setNameFormat(index + " Write-%d") + .setDaemon(true) + .build())); + reopenThread = new ControlledRealTimeReopenThread<>( writer, searcherManager, 0.500 /* maximum stale age (seconds) */, 0.010 /* minimum stale age (seconds) */); - reopenThread.setName("NRT " + name); + reopenThread.setName(index + " NRT"); reopenThread.setPriority(Math.min( Thread.currentThread().getPriority() + 2, Thread.MAX_PRIORITY)); @@ -193,6 +207,15 @@ public abstract class AbstractLuceneIndex implements Index { autoCommitExecutor.shutdown(); } + writerThread.shutdown(); + try { + if (!writerThread.awaitTermination(5, TimeUnit.SECONDS)) { + log.warn("shutting down " + name + " index with pending Lucene writes"); + } + } catch (InterruptedException e) { + log.warn("interrupted waiting for pending Lucene writes of " + name + + " index", e); + } reopenThread.close(); // Closing the reopen thread sets its generation to Long.MAX_VALUE, but we @@ -222,16 +245,45 @@ public abstract class AbstractLuceneIndex implements Index { } } - ListenableFuture insert(Document doc) throws IOException { - return new NrtFuture(writer.addDocument(doc)); + ListenableFuture insert(final Document doc) { + return submit(new Callable() { + @Override + public Long call() throws IOException, InterruptedException { + return writer.addDocument(doc); + } + }); } - ListenableFuture replace(Term term, Document doc) throws IOException { - return new NrtFuture(writer.updateDocument(term, doc)); + ListenableFuture replace(final Term term, final Document doc) { + return submit(new Callable() { + @Override + public Long call() throws IOException, InterruptedException { + return writer.updateDocument(term, doc); + } + }); } - ListenableFuture delete(Term term) throws IOException { - return new NrtFuture(writer.deleteDocuments(term)); + ListenableFuture delete(final Term term) { + return submit(new Callable() { + @Override + public Long call() throws IOException, InterruptedException { + return writer.deleteDocuments(term); + } + }); + } + + private ListenableFuture submit(Callable task) { + ListenableFuture future = + Futures.nonCancellationPropagating(writerThread.submit(task)); + return Futures.transformAsync(future, new AsyncFunction() { + @Override + public ListenableFuture apply(Long gen) throws InterruptedException { + // Tell the reopen thread a future is waiting on this + // generation so it uses the min stale time when refreshing. + reopenThread.waitForGeneration(gen, 0); + return new NrtFuture(gen); + } + }); } @Override @@ -305,9 +357,6 @@ public abstract class AbstractLuceneIndex implements Index { NrtFuture(long gen) { this.gen = gen; - // Tell the reopen thread we are waiting on this generation so it uses the - // min stale time when refreshing. - isGenAvailableNowForCurrentSearcher(); } @Override @@ -323,12 +372,10 @@ public abstract class AbstractLuceneIndex implements Index { public Void get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException { if (!isDone()) { - if (reopenThread.waitForGeneration(gen, - (int) MILLISECONDS.convert(timeout, unit))) { - set(null); - } else { + if (!reopenThread.waitForGeneration(gen, (int) unit.toMillis(timeout))) { throw new TimeoutException(); } + set(null); } return super.get(timeout, unit); }