From 75ac1f1b99c4dd098db8565240f01522e66e0dff Mon Sep 17 00:00:00 2001 From: Shawn Pearce Date: Wed, 26 Jun 2013 10:21:42 -0600 Subject: [PATCH] Replace SearcherManager with NRTManager The NRTManager allows Gerrit to wait for a specific document mutation to be visible to searchers before trying to run a new search. The NRTManager comes with its own background thread to manage reopens, replacing the thread that reopened the index every 100 ms. The change index API now returns a ListenableFuture the caller can wait on to learn when new queries will return the updates. Change-Id: I1b3c5ba036241ffd54c88a16ee8b2ffb3d3bf5f2 --- .../gerrit/lucene/LuceneChangeIndex.java | 92 +++++------- .../com/google/gerrit/lucene/SubIndex.java | 142 +++++++++++++++--- .../gerrit/server/index/ChangeIndex.java | 20 +-- .../server/query/change/IndexRewriteTest.java | 8 +- 4 files changed, 175 insertions(+), 87 deletions(-) diff --git a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneChangeIndex.java b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneChangeIndex.java index b9e03b3cdb..4aced04bed 100644 --- a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneChangeIndex.java +++ b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneChangeIndex.java @@ -20,10 +20,12 @@ import static org.apache.lucene.search.BooleanClause.Occur.MUST; import static org.apache.lucene.search.BooleanClause.Occur.MUST_NOT; import static org.apache.lucene.search.BooleanClause.Occur.SHOULD; +import com.google.common.base.Function; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.gerrit.extensions.events.LifecycleListener; import com.google.gerrit.reviewdb.client.Change; @@ -118,7 +120,6 @@ public class LuceneChangeIndex implements ChangeIndex, LifecycleListener { return writerConfig; } - private final RefreshThread refreshThread; private final FillArgs fillArgs; private final ExecutorService executor; private final boolean readOnly; @@ -128,7 +129,6 @@ public class LuceneChangeIndex implements ChangeIndex, LifecycleListener { LuceneChangeIndex(Config cfg, SitePaths sitePaths, ListeningScheduledExecutorService executor, FillArgs fillArgs, boolean readOnly) throws IOException { - this.refreshThread = new RefreshThread(); this.fillArgs = fillArgs; this.executor = executor; this.readOnly = readOnly; @@ -140,12 +140,10 @@ public class LuceneChangeIndex implements ChangeIndex, LifecycleListener { @Override public void start() { - refreshThread.start(); } @Override public void stop() { - refreshThread.halt(); List> closeFutures = Lists.newArrayListWithCapacity(2); closeFutures.add(executor.submit(new Runnable() { @Override @@ -164,49 +162,66 @@ public class LuceneChangeIndex implements ChangeIndex, LifecycleListener { } } + @SuppressWarnings("unchecked") @Override - public void insert(ChangeData cd) throws IOException { + public ListenableFuture insert(ChangeData cd) throws IOException { Term id = idTerm(cd); Document doc = toDocument(cd); if (readOnly) { - return; + return Futures.immediateFuture(null); } + if (cd.getChange().getStatus().isOpen()) { - closedIndex.delete(id); - openIndex.insert(doc); + return allOf( + closedIndex.delete(id), + openIndex.insert(doc)); } else { - openIndex.delete(id); - closedIndex.insert(doc); + return allOf( + openIndex.delete(id), + closedIndex.insert(doc)); } } + @SuppressWarnings("unchecked") @Override - public void replace(ChangeData cd) throws IOException { + public ListenableFuture replace(ChangeData cd) throws IOException { Term id = idTerm(cd); Document doc = toDocument(cd); if (readOnly) { - return; + return Futures.immediateFuture(null); } if (cd.getChange().getStatus().isOpen()) { - closedIndex.delete(id); - openIndex.replace(id, doc); + return allOf( + closedIndex.delete(id), + openIndex.replace(id, doc)); } else { - openIndex.delete(id); - closedIndex.replace(id, doc); + return allOf( + openIndex.delete(id), + closedIndex.replace(id, doc)); } } + @SuppressWarnings("unchecked") @Override - public void delete(ChangeData cd) throws IOException { + public ListenableFuture delete(ChangeData cd) throws IOException { Term id = idTerm(cd); if (readOnly) { - return; - } - if (cd.getChange().getStatus().isOpen()) { - openIndex.delete(id); - } else { - closedIndex.delete(id); + return Futures.immediateFuture(null); } + return allOf( + openIndex.delete(id), + closedIndex.delete(id)); + } + + private static ListenableFuture allOf(ListenableFuture... f) { + return Futures.transform( + Futures.allAsList(f), + new Function, Void>() { + @Override + public Void apply(List input) { + return null; + } + }); } @Override @@ -485,35 +500,4 @@ public class LuceneChangeIndex implements ChangeIndex, LifecycleListener { private static IllegalArgumentException badFieldType(FieldType t) { return new IllegalArgumentException("unknown index field type " + t); } - - private class RefreshThread extends Thread { - private boolean stop; - - @Override - public void run() { - while (!stop) { - openIndex.maybeRefresh(); - closedIndex.maybeRefresh(); - synchronized (this) { - try { - wait(100); - } catch (InterruptedException e) { - log.warn("error refreshing index searchers", e); - } - } - } - } - - void halt() { - synchronized (this) { - stop = true; - notify(); - } - try { - join(); - } catch (InterruptedException e) { - log.warn("error stopping refresh thread", e); - } - } - } } diff --git a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/SubIndex.java b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/SubIndex.java index d909f7a2e9..2ae8d3a274 100644 --- a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/SubIndex.java +++ b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/SubIndex.java @@ -14,12 +14,20 @@ package com.google.gerrit.lucene; +import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.gwt.thirdparty.guava.common.collect.Maps; + import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriterConfig; import org.apache.lucene.index.Term; import org.apache.lucene.search.IndexSearcher; -import org.apache.lucene.search.SearcherManager; +import org.apache.lucene.search.NRTManager; +import org.apache.lucene.search.NRTManager.TrackingIndexWriter; +import org.apache.lucene.search.NRTManagerReopenThread; +import org.apache.lucene.search.ReferenceManager.RefreshListener; +import org.apache.lucene.search.SearcherFactory; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.slf4j.Logger; @@ -27,29 +35,63 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; /** Piece of the change index that is implemented as a separate Lucene index. */ class SubIndex { private static final Logger log = LoggerFactory.getLogger(SubIndex.class); private final Directory dir; - private final IndexWriter writer; - private final SearcherManager searcherManager; + private final TrackingIndexWriter writer; + private final NRTManager nrtManager; + private final NRTManagerReopenThread reopenThread; + private final ConcurrentMap refreshListeners; SubIndex(File file, IndexWriterConfig writerConfig) throws IOException { dir = FSDirectory.open(file); - writer = new IndexWriter(dir, writerConfig); - searcherManager = new SearcherManager(writer, true, null); + writer = new NRTManager.TrackingIndexWriter(new IndexWriter(dir, writerConfig)); + nrtManager = new NRTManager(writer, new SearcherFactory()); + + refreshListeners = Maps.newConcurrentMap(); + nrtManager.addListener(new RefreshListener() { + @Override + public void beforeRefresh() throws IOException { + } + + @Override + public void afterRefresh(boolean didRefresh) throws IOException { + for (RefreshListener l : refreshListeners.keySet()) { + l.afterRefresh(didRefresh); + } + } + }); + + reopenThread = new NRTManagerReopenThread( + nrtManager, + 0.500 /* maximum stale age (seconds) */, + 0.010 /* minimum stale age (seconds) */); + reopenThread.setName("NRT " + file.getName()); + reopenThread.setPriority(Math.min( + Thread.currentThread().getPriority() + 2, + Thread.MAX_PRIORITY)); + reopenThread.setDaemon(true); + reopenThread.start(); } void close() { + reopenThread.close(); try { - searcherManager.close(); + nrtManager.close(); } catch (IOException e) { log.warn("error closing Lucene searcher", e); } try { - writer.close(); + writer.getIndexWriter().close(); } catch (IOException e) { log.warn("error closing Lucene writer", e); } @@ -60,31 +102,91 @@ class SubIndex { } } - void insert(Document doc) throws IOException { - writer.addDocument(doc); + ListenableFuture insert(Document doc) throws IOException { + return new NrtFuture(writer.addDocument(doc)); } - void replace(Term term, Document doc) throws IOException { - writer.updateDocument(term, doc); + ListenableFuture replace(Term term, Document doc) throws IOException { + return new NrtFuture(writer.updateDocument(term, doc)); } - void delete(Term term) throws IOException { - writer.deleteDocuments(term); + ListenableFuture delete(Term term) throws IOException { + return new NrtFuture(writer.deleteDocuments(term)); } IndexSearcher acquire() throws IOException { - return searcherManager.acquire(); + return nrtManager.acquire(); } void release(IndexSearcher searcher) throws IOException { - searcherManager.release(searcher); + nrtManager.release(searcher); } - void maybeRefresh() { - try { - searcherManager.maybeRefresh(); - } catch (IOException e) { - log.warn("error refreshing indexer", e); + private final class NrtFuture extends AbstractFuture + implements RefreshListener { + private final long gen; + private final AtomicBoolean hasListeners = new AtomicBoolean(); + + NrtFuture(long gen) { + this.gen = gen; + } + + @Override + public Void get() throws InterruptedException, ExecutionException { + if (!isDone()) { + nrtManager.waitForGeneration(gen); + set(null); + } + return super.get(); + } + + @Override + public Void get(long timeout, TimeUnit unit) throws InterruptedException, + TimeoutException, ExecutionException { + if (!isDone()) { + nrtManager.waitForGeneration(gen, timeout, unit); + set(null); + } + return super.get(timeout, unit); + } + + @Override + public boolean isDone() { + if (super.isDone()) { + return true; + } else if (gen <= nrtManager.getCurrentSearchingGen()) { + set(null); + return true; + } + return false; + } + + @Override + public void addListener(Runnable listener, Executor executor) { + if (hasListeners.compareAndSet(false, true) && !isDone()) { + nrtManager.addListener(this); + } + super.addListener(listener, executor); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (hasListeners.get()) { + refreshListeners.put(this, true); + } + return super.cancel(mayInterruptIfRunning); + } + + @Override + public void beforeRefresh() throws IOException { + } + + @Override + public void afterRefresh(boolean didRefresh) throws IOException { + if (gen <= nrtManager.getCurrentSearchingGen()) { + refreshListeners.remove(this); + set(null); + } } } } diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeIndex.java b/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeIndex.java index 1b1eeadb6d..cae5d7d8e0 100644 --- a/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeIndex.java +++ b/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeIndex.java @@ -14,6 +14,8 @@ package com.google.gerrit.server.index; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.google.gerrit.server.query.Predicate; import com.google.gerrit.server.query.QueryParseException; import com.google.gerrit.server.query.change.ChangeData; @@ -35,18 +37,18 @@ public interface ChangeIndex { /** Instance indicating secondary index is disabled. */ public static final ChangeIndex DISABLED = new ChangeIndex() { @Override - public void insert(ChangeData cd) throws IOException { - // Do nothing. + public ListenableFuture insert(ChangeData cd) throws IOException { + return Futures.immediateFuture(null); } @Override - public void replace(ChangeData cd) throws IOException { - // Do nothing. + public ListenableFuture replace(ChangeData cd) throws IOException { + return Futures.immediateFuture(null); } @Override - public void delete(ChangeData cd) throws IOException { - // Do nothing. + public ListenableFuture delete(ChangeData cd) throws IOException { + return Futures.immediateFuture(null); } @Override @@ -67,7 +69,7 @@ public interface ChangeIndex { * * @throws IOException if the change could not be inserted. */ - public void insert(ChangeData cd) throws IOException; + public ListenableFuture insert(ChangeData cd) throws IOException; /** * Update a change document in the index. @@ -81,7 +83,7 @@ public interface ChangeIndex { * * @throws IOException */ - public void replace(ChangeData cd) throws IOException; + public ListenableFuture replace(ChangeData cd) throws IOException; /** * Delete a change document from the index. @@ -90,7 +92,7 @@ public interface ChangeIndex { * * @throws IOException */ - public void delete(ChangeData cd) throws IOException; + public ListenableFuture delete(ChangeData cd) throws IOException; /** * Convert the given operator predicate into a source searching the index and diff --git a/gerrit-server/src/test/java/com/google/gerrit/server/query/change/IndexRewriteTest.java b/gerrit-server/src/test/java/com/google/gerrit/server/query/change/IndexRewriteTest.java index 295c1cb420..5e605332bf 100644 --- a/gerrit-server/src/test/java/com/google/gerrit/server/query/change/IndexRewriteTest.java +++ b/gerrit-server/src/test/java/com/google/gerrit/server/query/change/IndexRewriteTest.java @@ -21,6 +21,7 @@ import static com.google.gerrit.reviewdb.client.Change.Status.NEW; import static com.google.gerrit.reviewdb.client.Change.Status.SUBMITTED; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ListenableFuture; import com.google.gerrit.reviewdb.client.Change; import com.google.gerrit.server.index.ChangeIndex; import com.google.gerrit.server.index.PredicateWrapper; @@ -34,7 +35,6 @@ import com.google.gwtorm.server.ResultSet; import junit.framework.TestCase; -import java.io.IOException; import java.util.EnumSet; import java.util.Set; @@ -42,17 +42,17 @@ import java.util.Set; public class IndexRewriteTest extends TestCase { private static class DummyIndex implements ChangeIndex { @Override - public void insert(ChangeData cd) throws IOException { + public ListenableFuture insert(ChangeData cd) { throw new UnsupportedOperationException(); } @Override - public void replace(ChangeData cd) throws IOException { + public ListenableFuture replace(ChangeData cd) { throw new UnsupportedOperationException(); } @Override - public void delete(ChangeData cd) throws IOException { + public ListenableFuture delete(ChangeData cd) { throw new UnsupportedOperationException(); }