Extract SubIndex as an abstract Index implementation
The main functionality we want to abstract out is wrapping IndexWriters in the constructor and setting up the NRT handling. We don't implement the core add/update/delete/get methods, since they require per-index logic to convert the Gerrit type to a Lucene Document. Additionally, ChangeSubIndex is a bit of a special case, because ChangeIndex has special logic for combining results from multiple SubIndexes; this implementation throws UnsupportedOperationException. Other implementations will be able to do more useful work directly. Change-Id: I122ac8732ff08daf53ea12b772a97779bb9f5b22
This commit is contained in:
@@ -20,6 +20,9 @@ import com.google.common.collect.Sets;
|
||||
import com.google.common.util.concurrent.AbstractFuture;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.google.gerrit.server.config.SitePaths;
|
||||
import com.google.gerrit.server.index.Index;
|
||||
import com.google.gerrit.server.index.Schema;
|
||||
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
@@ -32,12 +35,12 @@ import org.apache.lucene.search.ReferenceManager.RefreshListener;
|
||||
import org.apache.lucene.search.SearcherFactory;
|
||||
import org.apache.lucene.store.AlreadyClosedException;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.FSDirectory;
|
||||
import org.eclipse.jgit.errors.ConfigInvalidException;
|
||||
import org.eclipse.jgit.storage.file.FileBasedConfig;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executor;
|
||||
@@ -45,25 +48,41 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
/** Piece of the change index that is implemented as a separate Lucene index. */
|
||||
public class SubIndex {
|
||||
private static final Logger log = LoggerFactory.getLogger(SubIndex.class);
|
||||
/** Basic Lucene index implementation. */
|
||||
public abstract class AbstractLuceneIndex<K, V> implements Index<K, V> {
|
||||
private static final Logger log =
|
||||
LoggerFactory.getLogger(AbstractLuceneIndex.class);
|
||||
|
||||
public static void setReady(SitePaths sitePaths, int version, boolean ready)
|
||||
throws IOException {
|
||||
try {
|
||||
// TODO(dborowitz): Totally broken for non-change indexes.
|
||||
FileBasedConfig cfg =
|
||||
LuceneVersionManager.loadGerritIndexConfig(sitePaths);
|
||||
LuceneVersionManager.setReady(cfg, version, ready);
|
||||
cfg.save();
|
||||
} catch (ConfigInvalidException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private final Schema<V> schema;
|
||||
private final SitePaths sitePaths;
|
||||
private final Directory dir;
|
||||
private final TrackingIndexWriter writer;
|
||||
private final ReferenceManager<IndexSearcher> searcherManager;
|
||||
private final ControlledRealTimeReopenThread<IndexSearcher> reopenThread;
|
||||
private final Set<NrtFuture> notDoneNrtFutures;
|
||||
|
||||
SubIndex(Path path, GerritIndexWriterConfig writerConfig,
|
||||
SearcherFactory searcherFactory) throws IOException {
|
||||
this(FSDirectory.open(path), path.getFileName().toString(), writerConfig,
|
||||
searcherFactory);
|
||||
}
|
||||
|
||||
SubIndex(Directory dir, final String dirName,
|
||||
AbstractLuceneIndex(
|
||||
Schema<V> schema,
|
||||
SitePaths sitePaths,
|
||||
Directory dir,
|
||||
final String name,
|
||||
GerritIndexWriterConfig writerConfig,
|
||||
SearcherFactory searcherFactory) throws IOException {
|
||||
this.schema = schema;
|
||||
this.sitePaths = sitePaths;
|
||||
this.dir = dir;
|
||||
IndexWriter delegateWriter;
|
||||
long commitPeriod = writerConfig.getCommitWithinMs();
|
||||
@@ -79,7 +98,7 @@ public class SubIndex {
|
||||
delegateWriter = autoCommitWriter;
|
||||
|
||||
new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder()
|
||||
.setNameFormat("Commit-%d " + dirName)
|
||||
.setNameFormat("Commit-%d " + name)
|
||||
.setDaemon(true)
|
||||
.build())
|
||||
.scheduleAtFixedRate(new Runnable() {
|
||||
@@ -91,14 +110,14 @@ public class SubIndex {
|
||||
autoCommitWriter.commit();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
log.error("Error committing Lucene index " + dirName, e);
|
||||
log.error("Error committing " + name + " Lucene index", e);
|
||||
} catch (OutOfMemoryError e) {
|
||||
log.error("Error committing Lucene index " + dirName, e);
|
||||
log.error("Error committing " + name + " Lucene index", e);
|
||||
try {
|
||||
autoCommitWriter.close();
|
||||
} catch (IOException e2) {
|
||||
log.error("SEVERE: Error closing Lucene index " + dirName
|
||||
+ " after OOM; index may be corrupted.", e);
|
||||
log.error("SEVERE: Error closing " + name
|
||||
+ " Lucene index after OOM; index may be corrupted.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -114,7 +133,7 @@ public class SubIndex {
|
||||
writer, searcherManager,
|
||||
0.500 /* maximum stale age (seconds) */,
|
||||
0.010 /* minimum stale age (seconds) */);
|
||||
reopenThread.setName("NRT " + dirName);
|
||||
reopenThread.setName("NRT " + name);
|
||||
reopenThread.setPriority(Math.min(
|
||||
Thread.currentThread().getPriority() + 2,
|
||||
Thread.MAX_PRIORITY));
|
||||
@@ -144,7 +163,13 @@ public class SubIndex {
|
||||
reopenThread.start();
|
||||
}
|
||||
|
||||
void close() {
|
||||
@Override
|
||||
public void markReady(boolean ready) throws IOException {
|
||||
setReady(sitePaths, schema.getVersion(), ready);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
reopenThread.close();
|
||||
|
||||
// Closing the reopen thread sets its generation to Long.MAX_VALUE, but we
|
||||
@@ -186,7 +211,8 @@ public class SubIndex {
|
||||
return new NrtFuture(writer.deleteDocuments(term));
|
||||
}
|
||||
|
||||
void deleteAll() throws IOException {
|
||||
@Override
|
||||
public void deleteAll() throws IOException {
|
||||
writer.deleteAll();
|
||||
}
|
||||
|
||||
@@ -286,4 +312,9 @@ public class SubIndex {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Schema<V> getSchema() {
|
||||
return schema;
|
||||
}
|
||||
}
|
@@ -0,0 +1,74 @@
|
||||
// Copyright (C) 2016 The Android Open Source Project
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package com.google.gerrit.lucene;
|
||||
|
||||
import com.google.gerrit.reviewdb.client.Change;
|
||||
import com.google.gerrit.server.config.SitePaths;
|
||||
import com.google.gerrit.server.index.QueryOptions;
|
||||
import com.google.gerrit.server.index.Schema;
|
||||
import com.google.gerrit.server.index.change.ChangeIndex;
|
||||
import com.google.gerrit.server.query.DataSource;
|
||||
import com.google.gerrit.server.query.Predicate;
|
||||
import com.google.gerrit.server.query.QueryParseException;
|
||||
import com.google.gerrit.server.query.change.ChangeData;
|
||||
|
||||
import org.apache.lucene.search.SearcherFactory;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.FSDirectory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
|
||||
public class ChangeSubIndex extends AbstractLuceneIndex<Change.Id, ChangeData>
|
||||
implements ChangeIndex {
|
||||
ChangeSubIndex(
|
||||
Schema<ChangeData> schema,
|
||||
SitePaths sitePaths,
|
||||
Path path,
|
||||
GerritIndexWriterConfig writerConfig,
|
||||
SearcherFactory searcherFactory) throws IOException {
|
||||
this(schema, sitePaths, FSDirectory.open(path),
|
||||
path.getFileName().toString(), writerConfig, searcherFactory);
|
||||
}
|
||||
|
||||
ChangeSubIndex(
|
||||
Schema<ChangeData> schema,
|
||||
SitePaths sitePaths,
|
||||
Directory dir,
|
||||
String name,
|
||||
GerritIndexWriterConfig writerConfig,
|
||||
SearcherFactory searcherFactory) throws IOException {
|
||||
super(schema, sitePaths, dir, name, writerConfig, searcherFactory);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void replace(ChangeData obj) throws IOException {
|
||||
throw new UnsupportedOperationException(
|
||||
"don't use ChangeSubIndex directly");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void delete(Change.Id key) throws IOException {
|
||||
throw new UnsupportedOperationException(
|
||||
"don't use ChangeSubIndex directly");
|
||||
}
|
||||
|
||||
@Override
|
||||
public DataSource<ChangeData> getSource(Predicate<ChangeData> p,
|
||||
QueryOptions opts) throws QueryParseException {
|
||||
throw new UnsupportedOperationException(
|
||||
"don't use ChangeSubIndex directly");
|
||||
}
|
||||
}
|
@@ -84,9 +84,7 @@ import org.apache.lucene.search.TopDocs;
|
||||
import org.apache.lucene.search.TopFieldDocs;
|
||||
import org.apache.lucene.store.RAMDirectory;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.eclipse.jgit.errors.ConfigInvalidException;
|
||||
import org.eclipse.jgit.lib.Config;
|
||||
import org.eclipse.jgit.storage.file.FileBasedConfig;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -128,18 +126,6 @@ public class LuceneChangeIndex implements ChangeIndex {
|
||||
private static final String ID_SORT_FIELD =
|
||||
sortFieldName(ChangeField.LEGACY_ID);
|
||||
|
||||
public static void setReady(SitePaths sitePaths, int version, boolean ready)
|
||||
throws IOException {
|
||||
try {
|
||||
FileBasedConfig cfg =
|
||||
LuceneVersionManager.loadGerritIndexConfig(sitePaths);
|
||||
LuceneVersionManager.setReady(cfg, version, ready);
|
||||
cfg.save();
|
||||
} catch (ConfigInvalidException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private static String sortFieldName(FieldDef<?, ?> f) {
|
||||
return f.getName() + "_SORT";
|
||||
}
|
||||
@@ -155,8 +141,8 @@ public class LuceneChangeIndex implements ChangeIndex {
|
||||
private final ChangeData.Factory changeDataFactory;
|
||||
private final Schema<ChangeData> schema;
|
||||
private final QueryBuilder queryBuilder;
|
||||
private final SubIndex openIndex;
|
||||
private final SubIndex closedIndex;
|
||||
private final ChangeSubIndex openIndex;
|
||||
private final ChangeSubIndex closedIndex;
|
||||
|
||||
@AssistedInject
|
||||
LuceneChangeIndex(
|
||||
@@ -183,16 +169,16 @@ public class LuceneChangeIndex implements ChangeIndex {
|
||||
|
||||
SearcherFactory searcherFactory = new SearcherFactory();
|
||||
if (LuceneIndexModule.isInMemoryTest(cfg)) {
|
||||
openIndex = new SubIndex(new RAMDirectory(), "ramOpen", openConfig,
|
||||
searcherFactory);
|
||||
closedIndex = new SubIndex(new RAMDirectory(), "ramClosed", closedConfig,
|
||||
searcherFactory);
|
||||
openIndex = new ChangeSubIndex(schema, sitePaths, new RAMDirectory(),
|
||||
"ramOpen", openConfig, searcherFactory);
|
||||
closedIndex = new ChangeSubIndex(schema, sitePaths, new RAMDirectory(),
|
||||
"ramClosed", closedConfig, searcherFactory);
|
||||
} else {
|
||||
Path dir = LuceneVersionManager.getDir(sitePaths, CHANGES_PREFIX, schema);
|
||||
openIndex = new SubIndex(dir.resolve(CHANGES_OPEN), openConfig,
|
||||
searcherFactory);
|
||||
closedIndex = new SubIndex(dir.resolve(CHANGES_CLOSED), closedConfig,
|
||||
searcherFactory);
|
||||
openIndex = new ChangeSubIndex(schema, sitePaths,
|
||||
dir.resolve(CHANGES_OPEN), openConfig, searcherFactory);
|
||||
closedIndex = new ChangeSubIndex(schema, sitePaths,
|
||||
dir.resolve(CHANGES_CLOSED), closedConfig, searcherFactory);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -260,7 +246,7 @@ public class LuceneChangeIndex implements ChangeIndex {
|
||||
public ChangeDataSource getSource(Predicate<ChangeData> p, QueryOptions opts)
|
||||
throws QueryParseException {
|
||||
Set<Change.Status> statuses = IndexRewriter.getPossibleStatus(p);
|
||||
List<SubIndex> indexes = Lists.newArrayListWithCapacity(2);
|
||||
List<ChangeSubIndex> indexes = Lists.newArrayListWithCapacity(2);
|
||||
if (!Sets.intersection(statuses, OPEN_STATUSES).isEmpty()) {
|
||||
indexes.add(openIndex);
|
||||
}
|
||||
@@ -272,7 +258,9 @@ public class LuceneChangeIndex implements ChangeIndex {
|
||||
|
||||
@Override
|
||||
public void markReady(boolean ready) throws IOException {
|
||||
setReady(sitePaths, schema.getVersion(), ready);
|
||||
// Do not delegate to ChangeSubIndex#markReady, since changes have an
|
||||
// additional level of directory nesting.
|
||||
AbstractLuceneIndex.setReady(sitePaths, schema.getVersion(), ready);
|
||||
}
|
||||
|
||||
private Sort getSort() {
|
||||
@@ -281,21 +269,17 @@ public class LuceneChangeIndex implements ChangeIndex {
|
||||
new SortField(ID_SORT_FIELD, SortField.Type.LONG, true));
|
||||
}
|
||||
|
||||
public SubIndex getOpenChangesIndex() {
|
||||
return openIndex;
|
||||
}
|
||||
|
||||
public SubIndex getClosedChangesIndex() {
|
||||
public ChangeSubIndex getClosedChangesIndex() {
|
||||
return closedIndex;
|
||||
}
|
||||
|
||||
private class QuerySource implements ChangeDataSource {
|
||||
private final List<SubIndex> indexes;
|
||||
private final List<ChangeSubIndex> indexes;
|
||||
private final Query query;
|
||||
private final QueryOptions opts;
|
||||
private final Sort sort;
|
||||
|
||||
private QuerySource(List<SubIndex> indexes, Query query, QueryOptions opts,
|
||||
private QuerySource(List<ChangeSubIndex> indexes, Query query, QueryOptions opts,
|
||||
Sort sort) {
|
||||
this.indexes = indexes;
|
||||
this.query = checkNotNull(query, "null query from Lucene");
|
||||
|
Reference in New Issue
Block a user