Write each Lucene index using a dedicated background thread

Like searches, it is not safe to interrupt the IndexWriter.  It also
reads from the NIOFSDirectory which closes file handles if the thread
is interrupted (such as by SSH command being Ctrl-C'd).

Although IndexWriter is thread safe it is essentially single threaded.
Each of these methods acquires a lock on entry, manipulates the index,
and releases the lock. There isn't a lot of value in allowing these to
be running on parallel threads borrowed from Gerrit.

Background (and serialize) all writes onto a single thread to prevent
an interrupt on the application thread from passing into Lucene code.

Change-Id: I54296d62fd9206b2ed2bbcbd5bbcc941890206a3
This commit is contained in:
Shawn Pearce 2016-06-19 23:45:33 -07:00 committed by Hugo Arès
parent 685a66ab1e
commit 2797e9ac1b

View File

@ -19,7 +19,11 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import com.google.common.base.Joiner;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.index.FieldDef;
@ -54,8 +58,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -84,6 +90,7 @@ public abstract class AbstractLuceneIndex<K, V> implements Index<K, V> {
private final SitePaths sitePaths;
private final Directory dir;
private final String name;
private final ListeningExecutorService writerThread;
private final TrackingIndexWriter writer;
private final ReferenceManager<IndexSearcher> searcherManager;
private final ControlledRealTimeReopenThread<IndexSearcher> reopenThread;
@ -117,7 +124,7 @@ public abstract class AbstractLuceneIndex<K, V> implements Index<K, V> {
delegateWriter = autoCommitWriter;
autoCommitExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder()
.setNameFormat("Commit-%d " + index)
.setNameFormat(index + " Commit-%d")
.setDaemon(true)
.build());
autoCommitExecutor.scheduleAtFixedRate(new Runnable() {
@ -148,11 +155,18 @@ public abstract class AbstractLuceneIndex<K, V> implements Index<K, V> {
notDoneNrtFutures = Sets.newConcurrentHashSet();
writerThread = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1,
new ThreadFactoryBuilder()
.setNameFormat(index + " Write-%d")
.setDaemon(true)
.build()));
reopenThread = new ControlledRealTimeReopenThread<>(
writer, searcherManager,
0.500 /* maximum stale age (seconds) */,
0.010 /* minimum stale age (seconds) */);
reopenThread.setName("NRT " + name);
reopenThread.setName(index + " NRT");
reopenThread.setPriority(Math.min(
Thread.currentThread().getPriority() + 2,
Thread.MAX_PRIORITY));
@ -193,6 +207,15 @@ public abstract class AbstractLuceneIndex<K, V> implements Index<K, V> {
autoCommitExecutor.shutdown();
}
writerThread.shutdown();
try {
if (!writerThread.awaitTermination(5, TimeUnit.SECONDS)) {
log.warn("shutting down " + name + " index with pending Lucene writes");
}
} catch (InterruptedException e) {
log.warn("interrupted waiting for pending Lucene writes of " + name +
" index", e);
}
reopenThread.close();
// Closing the reopen thread sets its generation to Long.MAX_VALUE, but we
@ -222,16 +245,45 @@ public abstract class AbstractLuceneIndex<K, V> implements Index<K, V> {
}
}
ListenableFuture<?> insert(Document doc) throws IOException {
return new NrtFuture(writer.addDocument(doc));
ListenableFuture<?> insert(final Document doc) {
return submit(new Callable<Long>() {
@Override
public Long call() throws IOException, InterruptedException {
return writer.addDocument(doc);
}
});
}
ListenableFuture<?> replace(Term term, Document doc) throws IOException {
return new NrtFuture(writer.updateDocument(term, doc));
ListenableFuture<?> replace(final Term term, final Document doc) {
return submit(new Callable<Long>() {
@Override
public Long call() throws IOException, InterruptedException {
return writer.updateDocument(term, doc);
}
});
}
ListenableFuture<?> delete(Term term) throws IOException {
return new NrtFuture(writer.deleteDocuments(term));
ListenableFuture<?> delete(final Term term) {
return submit(new Callable<Long>() {
@Override
public Long call() throws IOException, InterruptedException {
return writer.deleteDocuments(term);
}
});
}
private ListenableFuture<?> submit(Callable<Long> task) {
ListenableFuture<Long> future =
Futures.nonCancellationPropagating(writerThread.submit(task));
return Futures.transformAsync(future, new AsyncFunction<Long, Void>() {
@Override
public ListenableFuture<Void> apply(Long gen) throws InterruptedException {
// Tell the reopen thread a future is waiting on this
// generation so it uses the min stale time when refreshing.
reopenThread.waitForGeneration(gen, 0);
return new NrtFuture(gen);
}
});
}
@Override
@ -305,9 +357,6 @@ public abstract class AbstractLuceneIndex<K, V> implements Index<K, V> {
NrtFuture(long gen) {
this.gen = gen;
// Tell the reopen thread we are waiting on this generation so it uses the
// min stale time when refreshing.
isGenAvailableNowForCurrentSearcher();
}
@Override
@ -323,12 +372,10 @@ public abstract class AbstractLuceneIndex<K, V> implements Index<K, V> {
public Void get(long timeout, TimeUnit unit) throws InterruptedException,
TimeoutException, ExecutionException {
if (!isDone()) {
if (reopenThread.waitForGeneration(gen,
(int) MILLISECONDS.convert(timeout, unit))) {
set(null);
} else {
if (!reopenThread.waitForGeneration(gen, (int) unit.toMillis(timeout))) {
throw new TimeoutException();
}
set(null);
}
return super.get(timeout, unit);
}