Run all Lucene queries on background threads
The NIOFSDirectory type does not handle being interrupted well, as an interrupt delivered by an SSH connection closing will close the file handles used to read the index. Isolate the Lucene reader from the SSH threads by always running a Lucene query on the interactive index thread pool. This may reduce latency for user dashboards on a multi-core system as each query for the different sections can now run on separate threads and return results when ready. Change-Id: Ic830992431c58b57f22e49185f8164995170f737
This commit is contained in:
@@ -25,11 +25,11 @@ import static com.google.gerrit.server.index.change.ChangeIndexRewriter.CLOSED_S
|
|||||||
import static com.google.gerrit.server.index.change.ChangeIndexRewriter.OPEN_STATUSES;
|
import static com.google.gerrit.server.index.change.ChangeIndexRewriter.OPEN_STATUSES;
|
||||||
|
|
||||||
import com.google.common.base.Function;
|
import com.google.common.base.Function;
|
||||||
|
import com.google.common.base.Throwables;
|
||||||
import com.google.common.collect.ArrayListMultimap;
|
import com.google.common.collect.ArrayListMultimap;
|
||||||
import com.google.common.collect.FluentIterable;
|
import com.google.common.collect.FluentIterable;
|
||||||
import com.google.common.collect.ImmutableSet;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
import com.google.common.collect.Multimap;
|
import com.google.common.collect.Multimap;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
@@ -59,6 +59,7 @@ import com.google.gerrit.server.query.change.ChangeData;
|
|||||||
import com.google.gerrit.server.query.change.ChangeDataSource;
|
import com.google.gerrit.server.query.change.ChangeDataSource;
|
||||||
import com.google.gwtorm.protobuf.ProtobufCodec;
|
import com.google.gwtorm.protobuf.ProtobufCodec;
|
||||||
import com.google.gwtorm.server.OrmException;
|
import com.google.gwtorm.server.OrmException;
|
||||||
|
import com.google.gwtorm.server.OrmRuntimeException;
|
||||||
import com.google.gwtorm.server.ResultSet;
|
import com.google.gwtorm.server.ResultSet;
|
||||||
import com.google.inject.Provider;
|
import com.google.inject.Provider;
|
||||||
import com.google.inject.assistedinject.Assisted;
|
import com.google.inject.assistedinject.Assisted;
|
||||||
@@ -91,7 +92,9 @@ import java.util.Collections;
|
|||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -242,7 +245,7 @@ public class LuceneChangeIndex implements ChangeIndex {
|
|||||||
public ChangeDataSource getSource(Predicate<ChangeData> p, QueryOptions opts)
|
public ChangeDataSource getSource(Predicate<ChangeData> p, QueryOptions opts)
|
||||||
throws QueryParseException {
|
throws QueryParseException {
|
||||||
Set<Change.Status> statuses = ChangeIndexRewriter.getPossibleStatus(p);
|
Set<Change.Status> statuses = ChangeIndexRewriter.getPossibleStatus(p);
|
||||||
List<ChangeSubIndex> indexes = Lists.newArrayListWithCapacity(2);
|
List<ChangeSubIndex> indexes = new ArrayList<>(2);
|
||||||
if (!Sets.intersection(statuses, OPEN_STATUSES).isEmpty()) {
|
if (!Sets.intersection(statuses, OPEN_STATUSES).isEmpty()) {
|
||||||
indexes.add(openIndex);
|
indexes.add(openIndex);
|
||||||
}
|
}
|
||||||
@@ -300,6 +303,22 @@ public class LuceneChangeIndex implements ChangeIndex {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ResultSet<ChangeData> read() throws OrmException {
|
public ResultSet<ChangeData> read() throws OrmException {
|
||||||
|
if (Thread.interrupted()) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new OrmException("interrupted");
|
||||||
|
}
|
||||||
|
|
||||||
|
final Set<String> fields = fields(opts);
|
||||||
|
return new ChangeDataResults(
|
||||||
|
executor.submit(new Callable<List<Document>>() {
|
||||||
|
@Override
|
||||||
|
public List<Document> call() throws IOException {
|
||||||
|
return doRead(fields);
|
||||||
|
}
|
||||||
|
}), fields);
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<Document> doRead(Set<String> fields) throws IOException {
|
||||||
IndexSearcher[] searchers = new IndexSearcher[indexes.size()];
|
IndexSearcher[] searchers = new IndexSearcher[indexes.size()];
|
||||||
try {
|
try {
|
||||||
int realLimit = opts.start() + opts.limit();
|
int realLimit = opts.start() + opts.limit();
|
||||||
@@ -310,35 +329,12 @@ public class LuceneChangeIndex implements ChangeIndex {
|
|||||||
}
|
}
|
||||||
TopDocs docs = TopDocs.merge(sort, realLimit, hits);
|
TopDocs docs = TopDocs.merge(sort, realLimit, hits);
|
||||||
|
|
||||||
List<ChangeData> result =
|
List<Document> result = new ArrayList<>(docs.scoreDocs.length);
|
||||||
Lists.newArrayListWithCapacity(docs.scoreDocs.length);
|
|
||||||
Set<String> fields = fields(opts);
|
|
||||||
String idFieldName = LEGACY_ID.getName();
|
|
||||||
for (int i = opts.start(); i < docs.scoreDocs.length; i++) {
|
for (int i = opts.start(); i < docs.scoreDocs.length; i++) {
|
||||||
ScoreDoc sd = docs.scoreDocs[i];
|
ScoreDoc sd = docs.scoreDocs[i];
|
||||||
Document doc = searchers[sd.shardIndex].doc(sd.doc, fields);
|
result.add(searchers[sd.shardIndex].doc(sd.doc, fields));
|
||||||
result.add(toChangeData(fields(doc, fields), fields, idFieldName));
|
|
||||||
}
|
}
|
||||||
|
return result;
|
||||||
final List<ChangeData> r = Collections.unmodifiableList(result);
|
|
||||||
return new ResultSet<ChangeData>() {
|
|
||||||
@Override
|
|
||||||
public Iterator<ChangeData> iterator() {
|
|
||||||
return r.iterator();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<ChangeData> toList() {
|
|
||||||
return r;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() {
|
|
||||||
// Do nothing.
|
|
||||||
}
|
|
||||||
};
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new OrmException(e);
|
|
||||||
} finally {
|
} finally {
|
||||||
for (int i = 0; i < indexes.size(); i++) {
|
for (int i = 0; i < indexes.size(); i++) {
|
||||||
if (searchers[i] != null) {
|
if (searchers[i] != null) {
|
||||||
@@ -353,6 +349,45 @@ public class LuceneChangeIndex implements ChangeIndex {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private class ChangeDataResults implements ResultSet<ChangeData> {
|
||||||
|
private final Future<List<Document>> future;
|
||||||
|
private final Set<String> fields;
|
||||||
|
|
||||||
|
ChangeDataResults(Future<List<Document>> future, Set<String> fields) {
|
||||||
|
this.future = future;
|
||||||
|
this.fields = fields;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterator<ChangeData> iterator() {
|
||||||
|
return toList().iterator();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<ChangeData> toList() {
|
||||||
|
try {
|
||||||
|
List<Document> docs = future.get();
|
||||||
|
List<ChangeData> result = new ArrayList<>(docs.size());
|
||||||
|
String idFieldName = LEGACY_ID.getName();
|
||||||
|
for (Document doc : docs) {
|
||||||
|
result.add(toChangeData(fields(doc, fields), fields, idFieldName));
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
close();
|
||||||
|
throw new OrmRuntimeException(e);
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
Throwables.propagateIfPossible(e.getCause());
|
||||||
|
throw new OrmRuntimeException(e.getCause());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
future.cancel(false /* do not interrupt Lucene */);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private Set<String> fields(QueryOptions opts) {
|
private Set<String> fields(QueryOptions opts) {
|
||||||
// Ensure we request enough fields to construct a ChangeData.
|
// Ensure we request enough fields to construct a ChangeData.
|
||||||
Set<String> fs = opts.fields();
|
Set<String> fs = opts.fields();
|
||||||
|
@@ -31,6 +31,7 @@ import com.google.gerrit.server.index.IndexRewriter;
|
|||||||
import com.google.gerrit.server.index.QueryOptions;
|
import com.google.gerrit.server.index.QueryOptions;
|
||||||
import com.google.gerrit.server.index.SchemaDefinitions;
|
import com.google.gerrit.server.index.SchemaDefinitions;
|
||||||
import com.google.gwtorm.server.OrmException;
|
import com.google.gwtorm.server.OrmException;
|
||||||
|
import com.google.gwtorm.server.OrmRuntimeException;
|
||||||
import com.google.gwtorm.server.ResultSet;
|
import com.google.gwtorm.server.ResultSet;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.google.inject.Provider;
|
import com.google.inject.Provider;
|
||||||
@@ -136,6 +137,8 @@ public abstract class QueryProcessor<T> {
|
|||||||
throws OrmException, QueryParseException {
|
throws OrmException, QueryParseException {
|
||||||
try {
|
try {
|
||||||
return query(null, queries);
|
return query(null, queries);
|
||||||
|
} catch (OrmRuntimeException e) {
|
||||||
|
throw new OrmException(e.getMessage(), e);
|
||||||
} catch (OrmException e) {
|
} catch (OrmException e) {
|
||||||
Throwables.propagateIfInstanceOf(e.getCause(), QueryParseException.class);
|
Throwables.propagateIfInstanceOf(e.getCause(), QueryParseException.class);
|
||||||
throw e;
|
throw e;
|
||||||
|
Reference in New Issue
Block a user