Replace SearcherManager with NRTManager

The NRTManager allows Gerrit to wait for a specific document mutation
to be visible to searchers before trying to run a new search.  The
NRTManager comes with its own background thread to manage reopens,
replacing the thread that reopened the index every 100 ms.

The change index API now returns a ListenableFuture the caller can
wait on to learn when new queries will return the updates.

Change-Id: I1b3c5ba036241ffd54c88a16ee8b2ffb3d3bf5f2
This commit is contained in:
Shawn Pearce 2013-06-26 10:21:42 -06:00
parent e39b40a265
commit 75ac1f1b99
4 changed files with 175 additions and 87 deletions

View File

@ -20,10 +20,12 @@ import static org.apache.lucene.search.BooleanClause.Occur.MUST;
import static org.apache.lucene.search.BooleanClause.Occur.MUST_NOT;
import static org.apache.lucene.search.BooleanClause.Occur.SHOULD;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.reviewdb.client.Change;
@ -118,7 +120,6 @@ public class LuceneChangeIndex implements ChangeIndex, LifecycleListener {
return writerConfig;
}
private final RefreshThread refreshThread;
private final FillArgs fillArgs;
private final ExecutorService executor;
private final boolean readOnly;
@ -128,7 +129,6 @@ public class LuceneChangeIndex implements ChangeIndex, LifecycleListener {
LuceneChangeIndex(Config cfg, SitePaths sitePaths,
ListeningScheduledExecutorService executor, FillArgs fillArgs,
boolean readOnly) throws IOException {
this.refreshThread = new RefreshThread();
this.fillArgs = fillArgs;
this.executor = executor;
this.readOnly = readOnly;
@ -140,12 +140,10 @@ public class LuceneChangeIndex implements ChangeIndex, LifecycleListener {
@Override
public void start() {
refreshThread.start();
}
@Override
public void stop() {
refreshThread.halt();
List<Future<?>> closeFutures = Lists.newArrayListWithCapacity(2);
closeFutures.add(executor.submit(new Runnable() {
@Override
@ -164,49 +162,66 @@ public class LuceneChangeIndex implements ChangeIndex, LifecycleListener {
}
}
@SuppressWarnings("unchecked")
@Override
public void insert(ChangeData cd) throws IOException {
public ListenableFuture<Void> insert(ChangeData cd) throws IOException {
Term id = idTerm(cd);
Document doc = toDocument(cd);
if (readOnly) {
return;
return Futures.immediateFuture(null);
}
if (cd.getChange().getStatus().isOpen()) {
closedIndex.delete(id);
openIndex.insert(doc);
return allOf(
closedIndex.delete(id),
openIndex.insert(doc));
} else {
openIndex.delete(id);
closedIndex.insert(doc);
return allOf(
openIndex.delete(id),
closedIndex.insert(doc));
}
}
@SuppressWarnings("unchecked")
@Override
public void replace(ChangeData cd) throws IOException {
public ListenableFuture<Void> replace(ChangeData cd) throws IOException {
Term id = idTerm(cd);
Document doc = toDocument(cd);
if (readOnly) {
return;
return Futures.immediateFuture(null);
}
if (cd.getChange().getStatus().isOpen()) {
closedIndex.delete(id);
openIndex.replace(id, doc);
return allOf(
closedIndex.delete(id),
openIndex.replace(id, doc));
} else {
openIndex.delete(id);
closedIndex.replace(id, doc);
return allOf(
openIndex.delete(id),
closedIndex.replace(id, doc));
}
}
@SuppressWarnings("unchecked")
@Override
public void delete(ChangeData cd) throws IOException {
public ListenableFuture<Void> delete(ChangeData cd) throws IOException {
Term id = idTerm(cd);
if (readOnly) {
return;
}
if (cd.getChange().getStatus().isOpen()) {
openIndex.delete(id);
} else {
closedIndex.delete(id);
return Futures.immediateFuture(null);
}
return allOf(
openIndex.delete(id),
closedIndex.delete(id));
}
private static <V> ListenableFuture<Void> allOf(ListenableFuture<V>... f) {
return Futures.transform(
Futures.allAsList(f),
new Function<List<V>, Void>() {
@Override
public Void apply(List<V> input) {
return null;
}
});
}
@Override
@ -485,35 +500,4 @@ public class LuceneChangeIndex implements ChangeIndex, LifecycleListener {
private static IllegalArgumentException badFieldType(FieldType<?> t) {
return new IllegalArgumentException("unknown index field type " + t);
}
private class RefreshThread extends Thread {
private boolean stop;
@Override
public void run() {
while (!stop) {
openIndex.maybeRefresh();
closedIndex.maybeRefresh();
synchronized (this) {
try {
wait(100);
} catch (InterruptedException e) {
log.warn("error refreshing index searchers", e);
}
}
}
}
void halt() {
synchronized (this) {
stop = true;
notify();
}
try {
join();
} catch (InterruptedException e) {
log.warn("error stopping refresh thread", e);
}
}
}
}

View File

@ -14,12 +14,20 @@
package com.google.gerrit.lucene;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gwt.thirdparty.guava.common.collect.Maps;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.SearcherManager;
import org.apache.lucene.search.NRTManager;
import org.apache.lucene.search.NRTManager.TrackingIndexWriter;
import org.apache.lucene.search.NRTManagerReopenThread;
import org.apache.lucene.search.ReferenceManager.RefreshListener;
import org.apache.lucene.search.SearcherFactory;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.slf4j.Logger;
@ -27,29 +35,63 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
/** Piece of the change index that is implemented as a separate Lucene index. */
class SubIndex {
private static final Logger log = LoggerFactory.getLogger(SubIndex.class);
private final Directory dir;
private final IndexWriter writer;
private final SearcherManager searcherManager;
private final TrackingIndexWriter writer;
private final NRTManager nrtManager;
private final NRTManagerReopenThread reopenThread;
private final ConcurrentMap<RefreshListener, Boolean> refreshListeners;
SubIndex(File file, IndexWriterConfig writerConfig) throws IOException {
dir = FSDirectory.open(file);
writer = new IndexWriter(dir, writerConfig);
searcherManager = new SearcherManager(writer, true, null);
writer = new NRTManager.TrackingIndexWriter(new IndexWriter(dir, writerConfig));
nrtManager = new NRTManager(writer, new SearcherFactory());
refreshListeners = Maps.newConcurrentMap();
nrtManager.addListener(new RefreshListener() {
@Override
public void beforeRefresh() throws IOException {
}
@Override
public void afterRefresh(boolean didRefresh) throws IOException {
for (RefreshListener l : refreshListeners.keySet()) {
l.afterRefresh(didRefresh);
}
}
});
reopenThread = new NRTManagerReopenThread(
nrtManager,
0.500 /* maximum stale age (seconds) */,
0.010 /* minimum stale age (seconds) */);
reopenThread.setName("NRT " + file.getName());
reopenThread.setPriority(Math.min(
Thread.currentThread().getPriority() + 2,
Thread.MAX_PRIORITY));
reopenThread.setDaemon(true);
reopenThread.start();
}
void close() {
reopenThread.close();
try {
searcherManager.close();
nrtManager.close();
} catch (IOException e) {
log.warn("error closing Lucene searcher", e);
}
try {
writer.close();
writer.getIndexWriter().close();
} catch (IOException e) {
log.warn("error closing Lucene writer", e);
}
@ -60,31 +102,91 @@ class SubIndex {
}
}
void insert(Document doc) throws IOException {
writer.addDocument(doc);
ListenableFuture<Void> insert(Document doc) throws IOException {
return new NrtFuture(writer.addDocument(doc));
}
void replace(Term term, Document doc) throws IOException {
writer.updateDocument(term, doc);
ListenableFuture<Void> replace(Term term, Document doc) throws IOException {
return new NrtFuture(writer.updateDocument(term, doc));
}
void delete(Term term) throws IOException {
writer.deleteDocuments(term);
ListenableFuture<Void> delete(Term term) throws IOException {
return new NrtFuture(writer.deleteDocuments(term));
}
IndexSearcher acquire() throws IOException {
return searcherManager.acquire();
return nrtManager.acquire();
}
void release(IndexSearcher searcher) throws IOException {
searcherManager.release(searcher);
nrtManager.release(searcher);
}
void maybeRefresh() {
try {
searcherManager.maybeRefresh();
} catch (IOException e) {
log.warn("error refreshing indexer", e);
private final class NrtFuture extends AbstractFuture<Void>
implements RefreshListener {
private final long gen;
private final AtomicBoolean hasListeners = new AtomicBoolean();
NrtFuture(long gen) {
this.gen = gen;
}
@Override
public Void get() throws InterruptedException, ExecutionException {
if (!isDone()) {
nrtManager.waitForGeneration(gen);
set(null);
}
return super.get();
}
@Override
public Void get(long timeout, TimeUnit unit) throws InterruptedException,
TimeoutException, ExecutionException {
if (!isDone()) {
nrtManager.waitForGeneration(gen, timeout, unit);
set(null);
}
return super.get(timeout, unit);
}
@Override
public boolean isDone() {
if (super.isDone()) {
return true;
} else if (gen <= nrtManager.getCurrentSearchingGen()) {
set(null);
return true;
}
return false;
}
@Override
public void addListener(Runnable listener, Executor executor) {
if (hasListeners.compareAndSet(false, true) && !isDone()) {
nrtManager.addListener(this);
}
super.addListener(listener, executor);
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (hasListeners.get()) {
refreshListeners.put(this, true);
}
return super.cancel(mayInterruptIfRunning);
}
@Override
public void beforeRefresh() throws IOException {
}
@Override
public void afterRefresh(boolean didRefresh) throws IOException {
if (gen <= nrtManager.getCurrentSearchingGen()) {
refreshListeners.remove(this);
set(null);
}
}
}
}

View File

@ -14,6 +14,8 @@
package com.google.gerrit.server.index;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gerrit.server.query.Predicate;
import com.google.gerrit.server.query.QueryParseException;
import com.google.gerrit.server.query.change.ChangeData;
@ -35,18 +37,18 @@ public interface ChangeIndex {
/** Instance indicating secondary index is disabled. */
public static final ChangeIndex DISABLED = new ChangeIndex() {
@Override
public void insert(ChangeData cd) throws IOException {
// Do nothing.
public ListenableFuture<Void> insert(ChangeData cd) throws IOException {
return Futures.immediateFuture(null);
}
@Override
public void replace(ChangeData cd) throws IOException {
// Do nothing.
public ListenableFuture<Void> replace(ChangeData cd) throws IOException {
return Futures.immediateFuture(null);
}
@Override
public void delete(ChangeData cd) throws IOException {
// Do nothing.
public ListenableFuture<Void> delete(ChangeData cd) throws IOException {
return Futures.immediateFuture(null);
}
@Override
@ -67,7 +69,7 @@ public interface ChangeIndex {
*
* @throws IOException if the change could not be inserted.
*/
public void insert(ChangeData cd) throws IOException;
public ListenableFuture<Void> insert(ChangeData cd) throws IOException;
/**
* Update a change document in the index.
@ -81,7 +83,7 @@ public interface ChangeIndex {
*
* @throws IOException
*/
public void replace(ChangeData cd) throws IOException;
public ListenableFuture<Void> replace(ChangeData cd) throws IOException;
/**
* Delete a change document from the index.
@ -90,7 +92,7 @@ public interface ChangeIndex {
*
* @throws IOException
*/
public void delete(ChangeData cd) throws IOException;
public ListenableFuture<Void> delete(ChangeData cd) throws IOException;
/**
* Convert the given operator predicate into a source searching the index and

View File

@ -21,6 +21,7 @@ import static com.google.gerrit.reviewdb.client.Change.Status.NEW;
import static com.google.gerrit.reviewdb.client.Change.Status.SUBMITTED;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.server.index.ChangeIndex;
import com.google.gerrit.server.index.PredicateWrapper;
@ -34,7 +35,6 @@ import com.google.gwtorm.server.ResultSet;
import junit.framework.TestCase;
import java.io.IOException;
import java.util.EnumSet;
import java.util.Set;
@ -42,17 +42,17 @@ import java.util.Set;
public class IndexRewriteTest extends TestCase {
private static class DummyIndex implements ChangeIndex {
@Override
public void insert(ChangeData cd) throws IOException {
public ListenableFuture<Void> insert(ChangeData cd) {
throw new UnsupportedOperationException();
}
@Override
public void replace(ChangeData cd) throws IOException {
public ListenableFuture<Void> replace(ChangeData cd) {
throw new UnsupportedOperationException();
}
@Override
public void delete(ChangeData cd) throws IOException {
public ListenableFuture<Void> delete(ChangeData cd) {
throw new UnsupportedOperationException();
}