Merge changes from topic 'lucene-stability'

* changes:
  Write each Lucene index using a dedicated background thread
  Do not allow index futures to be cancelled
  Run all Lucene queries on background threads
  Optimize loading change from Lucene index
This commit is contained in:
Hugo Arès 2016-08-02 17:23:08 +00:00 committed by Gerrit Code Review
commit 1b5c1b1bbe
4 changed files with 190 additions and 83 deletions

View File

@ -19,7 +19,11 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import com.google.common.base.Joiner;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.index.FieldDef;
@ -54,8 +58,10 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -84,6 +90,7 @@ public abstract class AbstractLuceneIndex<K, V> implements Index<K, V> {
private final SitePaths sitePaths;
private final Directory dir;
private final String name;
private final ListeningExecutorService writerThread;
private final TrackingIndexWriter writer;
private final ReferenceManager<IndexSearcher> searcherManager;
private final ControlledRealTimeReopenThread<IndexSearcher> reopenThread;
@ -117,7 +124,7 @@ public abstract class AbstractLuceneIndex<K, V> implements Index<K, V> {
delegateWriter = autoCommitWriter;
autoCommitExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder()
.setNameFormat("Commit-%d " + index)
.setNameFormat(index + " Commit-%d")
.setDaemon(true)
.build());
autoCommitExecutor.scheduleAtFixedRate(new Runnable() {
@ -148,11 +155,18 @@ public abstract class AbstractLuceneIndex<K, V> implements Index<K, V> {
notDoneNrtFutures = Sets.newConcurrentHashSet();
writerThread = MoreExecutors.listeningDecorator(
Executors.newFixedThreadPool(1,
new ThreadFactoryBuilder()
.setNameFormat(index + " Write-%d")
.setDaemon(true)
.build()));
reopenThread = new ControlledRealTimeReopenThread<>(
writer, searcherManager,
0.500 /* maximum stale age (seconds) */,
0.010 /* minimum stale age (seconds) */);
reopenThread.setName("NRT " + name);
reopenThread.setName(index + " NRT");
reopenThread.setPriority(Math.min(
Thread.currentThread().getPriority() + 2,
Thread.MAX_PRIORITY));
@ -193,6 +207,15 @@ public abstract class AbstractLuceneIndex<K, V> implements Index<K, V> {
autoCommitExecutor.shutdown();
}
writerThread.shutdown();
try {
if (!writerThread.awaitTermination(5, TimeUnit.SECONDS)) {
log.warn("shutting down " + name + " index with pending Lucene writes");
}
} catch (InterruptedException e) {
log.warn("interrupted waiting for pending Lucene writes of " + name +
" index", e);
}
reopenThread.close();
// Closing the reopen thread sets its generation to Long.MAX_VALUE, but we
@ -222,16 +245,45 @@ public abstract class AbstractLuceneIndex<K, V> implements Index<K, V> {
}
}
ListenableFuture<?> insert(Document doc) throws IOException {
return new NrtFuture(writer.addDocument(doc));
ListenableFuture<?> insert(final Document doc) {
return submit(new Callable<Long>() {
@Override
public Long call() throws IOException, InterruptedException {
return writer.addDocument(doc);
}
});
}
ListenableFuture<?> replace(Term term, Document doc) throws IOException {
return new NrtFuture(writer.updateDocument(term, doc));
ListenableFuture<?> replace(final Term term, final Document doc) {
return submit(new Callable<Long>() {
@Override
public Long call() throws IOException, InterruptedException {
return writer.updateDocument(term, doc);
}
});
}
ListenableFuture<?> delete(Term term) throws IOException {
return new NrtFuture(writer.deleteDocuments(term));
ListenableFuture<?> delete(final Term term) {
return submit(new Callable<Long>() {
@Override
public Long call() throws IOException, InterruptedException {
return writer.deleteDocuments(term);
}
});
}
private ListenableFuture<?> submit(Callable<Long> task) {
ListenableFuture<Long> future =
Futures.nonCancellationPropagating(writerThread.submit(task));
return Futures.transformAsync(future, new AsyncFunction<Long, Void>() {
@Override
public ListenableFuture<Void> apply(Long gen) throws InterruptedException {
// Tell the reopen thread a future is waiting on this
// generation so it uses the min stale time when refreshing.
reopenThread.waitForGeneration(gen, 0);
return new NrtFuture(gen);
}
});
}
@Override
@ -305,9 +357,6 @@ public abstract class AbstractLuceneIndex<K, V> implements Index<K, V> {
NrtFuture(long gen) {
this.gen = gen;
// Tell the reopen thread we are waiting on this generation so it uses the
// min stale time when refreshing.
isGenAvailableNowForCurrentSearcher();
}
@Override
@ -323,12 +372,10 @@ public abstract class AbstractLuceneIndex<K, V> implements Index<K, V> {
public Void get(long timeout, TimeUnit unit) throws InterruptedException,
TimeoutException, ExecutionException {
if (!isDone()) {
if (reopenThread.waitForGeneration(gen,
(int) MILLISECONDS.convert(timeout, unit))) {
set(null);
} else {
if (!reopenThread.waitForGeneration(gen, (int) unit.toMillis(timeout))) {
throw new TimeoutException();
}
set(null);
}
return super.get(timeout, unit);
}

View File

@ -25,10 +25,11 @@ import static com.google.gerrit.server.index.change.ChangeIndexRewriter.CLOSED_S
import static com.google.gerrit.server.index.change.ChangeIndexRewriter.OPEN_STATUSES;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
@ -58,6 +59,7 @@ import com.google.gerrit.server.query.change.ChangeData;
import com.google.gerrit.server.query.change.ChangeDataSource;
import com.google.gwtorm.protobuf.ProtobufCodec;
import com.google.gwtorm.server.OrmException;
import com.google.gwtorm.server.OrmRuntimeException;
import com.google.gwtorm.server.ResultSet;
import com.google.inject.Provider;
import com.google.inject.assistedinject.Assisted;
@ -85,11 +87,14 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
@ -240,7 +245,7 @@ public class LuceneChangeIndex implements ChangeIndex {
public ChangeDataSource getSource(Predicate<ChangeData> p, QueryOptions opts)
throws QueryParseException {
Set<Change.Status> statuses = ChangeIndexRewriter.getPossibleStatus(p);
List<ChangeSubIndex> indexes = Lists.newArrayListWithCapacity(2);
List<ChangeSubIndex> indexes = new ArrayList<>(2);
if (!Sets.intersection(statuses, OPEN_STATUSES).isEmpty()) {
indexes.add(openIndex);
}
@ -298,6 +303,22 @@ public class LuceneChangeIndex implements ChangeIndex {
@Override
public ResultSet<ChangeData> read() throws OrmException {
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
throw new OrmException("interrupted");
}
final Set<String> fields = fields(opts);
return new ChangeDataResults(
executor.submit(new Callable<List<Document>>() {
@Override
public List<Document> call() throws IOException {
return doRead(fields);
}
}), fields);
}
private List<Document> doRead(Set<String> fields) throws IOException {
IndexSearcher[] searchers = new IndexSearcher[indexes.size()];
try {
int realLimit = opts.start() + opts.limit();
@ -308,35 +329,12 @@ public class LuceneChangeIndex implements ChangeIndex {
}
TopDocs docs = TopDocs.merge(sort, realLimit, hits);
List<ChangeData> result =
Lists.newArrayListWithCapacity(docs.scoreDocs.length);
Set<String> fields = fields(opts);
String idFieldName = LEGACY_ID.getName();
List<Document> result = new ArrayList<>(docs.scoreDocs.length);
for (int i = opts.start(); i < docs.scoreDocs.length; i++) {
ScoreDoc sd = docs.scoreDocs[i];
Document doc = searchers[sd.shardIndex].doc(sd.doc, fields);
result.add(toChangeData(doc, fields, idFieldName));
result.add(searchers[sd.shardIndex].doc(sd.doc, fields));
}
final List<ChangeData> r = Collections.unmodifiableList(result);
return new ResultSet<ChangeData>() {
@Override
public Iterator<ChangeData> iterator() {
return r.iterator();
}
@Override
public List<ChangeData> toList() {
return r;
}
@Override
public void close() {
// Do nothing.
}
};
} catch (IOException e) {
throw new OrmException(e);
return result;
} finally {
for (int i = 0; i < indexes.size(); i++) {
if (searchers[i] != null) {
@ -351,6 +349,45 @@ public class LuceneChangeIndex implements ChangeIndex {
}
}
private class ChangeDataResults implements ResultSet<ChangeData> {
private final Future<List<Document>> future;
private final Set<String> fields;
ChangeDataResults(Future<List<Document>> future, Set<String> fields) {
this.future = future;
this.fields = fields;
}
@Override
public Iterator<ChangeData> iterator() {
return toList().iterator();
}
@Override
public List<ChangeData> toList() {
try {
List<Document> docs = future.get();
List<ChangeData> result = new ArrayList<>(docs.size());
String idFieldName = LEGACY_ID.getName();
for (Document doc : docs) {
result.add(toChangeData(fields(doc, fields), fields, idFieldName));
}
return result;
} catch (InterruptedException e) {
close();
throw new OrmRuntimeException(e);
} catch (ExecutionException e) {
Throwables.propagateIfPossible(e.getCause());
throw new OrmRuntimeException(e.getCause());
}
}
@Override
public void close() {
future.cancel(false /* do not interrupt Lucene */);
}
}
private Set<String> fields(QueryOptions opts) {
// Ensure we request enough fields to construct a ChangeData.
Set<String> fs = opts.fields();
@ -376,19 +413,33 @@ public class LuceneChangeIndex implements ChangeIndex {
ImmutableSet.of(LEGACY_ID.getName(), PROJECT.getName()));
}
private ChangeData toChangeData(Document doc, Set<String> fields,
String idFieldName) {
private static Multimap<String, IndexableField> fields(Document doc,
Set<String> fields) {
Multimap<String, IndexableField> stored =
ArrayListMultimap.create(fields.size(), 4);
for (IndexableField f : doc) {
String name = f.name();
if (fields.contains(name)) {
stored.put(name, f);
}
}
return stored;
}
private ChangeData toChangeData(Multimap<String, IndexableField> doc,
Set<String> fields, String idFieldName) {
ChangeData cd;
// Either change or the ID field was guaranteed to be included in the call
// to fields() above.
BytesRef cb = doc.getBinaryValue(CHANGE_FIELD);
IndexableField cb = Iterables.getFirst(doc.get(CHANGE_FIELD), null);
if (cb != null) {
BytesRef proto = cb.binaryValue();
cd = changeDataFactory.create(db.get(),
ChangeProtoField.CODEC.decode(cb.bytes, cb.offset, cb.length));
ChangeProtoField.CODEC.decode(proto.bytes, proto.offset, proto.length));
} else {
Change.Id id =
new Change.Id(doc.getField(idFieldName).numericValue().intValue());
IndexableField project = doc.getField(PROJECT.getName());
IndexableField f = Iterables.getFirst(doc.get(idFieldName), null);
Change.Id id = new Change.Id(f.numericValue().intValue());
IndexableField project = Iterables.getFirst(doc.get(PROJECT.getName()), null);
if (project == null) {
// Old schema without project field: we can safely assume NoteDb is
// disabled.
@ -429,7 +480,7 @@ public class LuceneChangeIndex implements ChangeIndex {
return cd;
}
private void decodePatchSets(Document doc, ChangeData cd) {
private void decodePatchSets(Multimap<String, IndexableField> doc, ChangeData cd) {
List<PatchSet> patchSets =
decodeProtos(doc, PATCH_SET_FIELD, PatchSetProtoField.CODEC);
if (!patchSets.isEmpty()) {
@ -439,14 +490,14 @@ public class LuceneChangeIndex implements ChangeIndex {
}
}
private void decodeApprovals(Document doc, ChangeData cd) {
private void decodeApprovals(Multimap<String, IndexableField> doc, ChangeData cd) {
cd.setCurrentApprovals(
decodeProtos(doc, APPROVAL_FIELD, PatchSetApprovalProtoField.CODEC));
}
private void decodeChangedLines(Document doc, ChangeData cd) {
IndexableField added = doc.getField(ADDED_FIELD);
IndexableField deleted = doc.getField(DELETED_FIELD);
private void decodeChangedLines(Multimap<String, IndexableField> doc, ChangeData cd) {
IndexableField added = Iterables.getFirst(doc.get(ADDED_FIELD), null);
IndexableField deleted = Iterables.getFirst(doc.get(DELETED_FIELD), null);
if (added != null && deleted != null) {
cd.setChangedLines(
added.numericValue().intValue(),
@ -460,23 +511,26 @@ public class LuceneChangeIndex implements ChangeIndex {
}
}
private void decodeMergeable(Document doc, ChangeData cd) {
String mergeable = doc.get(MERGEABLE_FIELD);
if ("1".equals(mergeable)) {
cd.setMergeable(true);
} else if ("0".equals(mergeable)) {
cd.setMergeable(false);
private void decodeMergeable(Multimap<String, IndexableField> doc, ChangeData cd) {
IndexableField f = Iterables.getFirst(doc.get(MERGEABLE_FIELD), null);
if (f != null) {
String mergeable = f.stringValue();
if ("1".equals(mergeable)) {
cd.setMergeable(true);
} else if ("0".equals(mergeable)) {
cd.setMergeable(false);
}
}
}
private void decodeReviewedBy(Document doc, ChangeData cd) {
IndexableField[] reviewedBy = doc.getFields(REVIEWEDBY_FIELD);
if (reviewedBy.length > 0) {
private void decodeReviewedBy(Multimap<String, IndexableField> doc, ChangeData cd) {
Collection<IndexableField> reviewedBy = doc.get(REVIEWEDBY_FIELD);
if (reviewedBy.size() > 0) {
Set<Account.Id> accounts =
Sets.newHashSetWithExpectedSize(reviewedBy.length);
Sets.newHashSetWithExpectedSize(reviewedBy.size());
for (IndexableField r : reviewedBy) {
int id = r.numericValue().intValue();
if (reviewedBy.length == 1 && id == ChangeField.NOT_REVIEWED) {
if (reviewedBy.size() == 1 && id == ChangeField.NOT_REVIEWED) {
break;
}
accounts.add(new Account.Id(id));
@ -485,9 +539,9 @@ public class LuceneChangeIndex implements ChangeIndex {
}
}
private void decodeHashtags(Document doc, ChangeData cd) {
IndexableField[] hashtag = doc.getFields(HASHTAG_FIELD);
Set<String> hashtags = Sets.newHashSetWithExpectedSize(hashtag.length);
private void decodeHashtags(Multimap<String, IndexableField> doc, ChangeData cd) {
Collection<IndexableField> hashtag = doc.get(HASHTAG_FIELD);
Set<String> hashtags = Sets.newHashSetWithExpectedSize(hashtag.size());
for (IndexableField r : hashtag) {
hashtags.add(r.binaryValue().utf8ToString());
}
@ -495,18 +549,18 @@ public class LuceneChangeIndex implements ChangeIndex {
}
@Deprecated
private void decodeStarredBy(Document doc, ChangeData cd) {
IndexableField[] starredBy = doc.getFields(STARREDBY_FIELD);
private void decodeStarredBy(Multimap<String, IndexableField> doc, ChangeData cd) {
Collection<IndexableField> starredBy = doc.get(STARREDBY_FIELD);
Set<Account.Id> accounts =
Sets.newHashSetWithExpectedSize(starredBy.length);
Sets.newHashSetWithExpectedSize(starredBy.size());
for (IndexableField r : starredBy) {
accounts.add(new Account.Id(r.numericValue().intValue()));
}
cd.setStarredBy(accounts);
}
private void decodeStar(Document doc, ChangeData cd) {
IndexableField[] star = doc.getFields(STAR_FIELD);
private void decodeStar(Multimap<String, IndexableField> doc, ChangeData cd) {
Collection<IndexableField> star = doc.get(STAR_FIELD);
Multimap<Account.Id, String> stars = ArrayListMultimap.create();
for (IndexableField r : star) {
StarredChangesUtil.StarField starField =
@ -518,10 +572,10 @@ public class LuceneChangeIndex implements ChangeIndex {
cd.setStars(stars);
}
private void decodeReviewers(Document doc, ChangeData cd) {
private void decodeReviewers(Multimap<String, IndexableField> doc, ChangeData cd) {
cd.setReviewers(
ChangeField.parseReviewerFieldValues(
FluentIterable.of(doc.getFields(REVIEWER_FIELD))
FluentIterable.from(doc.get(REVIEWER_FIELD))
.transform(
new Function<IndexableField, String>() {
@Override
@ -531,14 +585,16 @@ public class LuceneChangeIndex implements ChangeIndex {
})));
}
private static <T> List<T> decodeProtos(Document doc, String fieldName,
ProtobufCodec<T> codec) {
BytesRef[] bytesRefs = doc.getBinaryValues(fieldName);
if (bytesRefs.length == 0) {
private static <T> List<T> decodeProtos(Multimap<String, IndexableField> doc,
String fieldName, ProtobufCodec<T> codec) {
Collection<IndexableField> fields = doc.get(fieldName);
if (fields.isEmpty()) {
return Collections.emptyList();
}
List<T> result = new ArrayList<>(bytesRefs.length);
for (BytesRef r : bytesRefs) {
List<T> result = new ArrayList<>(fields.size());
for (IndexableField f : fields) {
BytesRef r = f.binaryValue();
result.add(codec.decode(r.bytes, r.offset, r.length));
}
return result;

View File

@ -246,7 +246,8 @@ public class ChangeIndexer {
}
private CheckedFuture<?, IOException> submit(Callable<?> task) {
return Futures.makeChecked(executor.submit(task), MAPPER);
return Futures.makeChecked(
Futures.nonCancellationPropagating(executor.submit(task)), MAPPER);
}
private class IndexTask implements Callable<Void> {

View File

@ -31,6 +31,7 @@ import com.google.gerrit.server.index.IndexRewriter;
import com.google.gerrit.server.index.QueryOptions;
import com.google.gerrit.server.index.SchemaDefinitions;
import com.google.gwtorm.server.OrmException;
import com.google.gwtorm.server.OrmRuntimeException;
import com.google.gwtorm.server.ResultSet;
import com.google.inject.Inject;
import com.google.inject.Provider;
@ -136,6 +137,8 @@ public abstract class QueryProcessor<T> {
throws OrmException, QueryParseException {
try {
return query(null, queries);
} catch (OrmRuntimeException e) {
throw new OrmException(e.getMessage(), e);
} catch (OrmException e) {
Throwables.propagateIfInstanceOf(e.getCause(), QueryParseException.class);
throw e;