Add online Lucene reindexing support

At startup time, if the most recent write version is not ready for
searching, kick off a background thread to do a batch reindex. When
that finishes, swap the new search version into the IndexCollection.

Solr continues to use a single version determined at startup for now.

Change-Id: I99c53b7df1c0bec401aac8184071e04e75e8a972
This commit is contained in:
Dave Borowitz 2013-06-27 15:15:00 -06:00
parent 2361cefe17
commit b9b38b2664
5 changed files with 169 additions and 22 deletions

View File

@ -53,12 +53,25 @@ public class LuceneIndexModule extends LifecycleModule {
});
install(new IndexModule(threads));
if (singleVersion == null && base == null) {
listener().to(LuceneVersionManager.class);
install(new MultiVersionModule());
} else {
install(new SingleVersionModule());
}
}
private class MultiVersionModule extends LifecycleModule {
@Override
public void configure() {
install(new FactoryModule() {
@Override
public void configure() {
factory(OnlineReindexer.Factory.class);
}
});
listener().to(LuceneVersionManager.class);
}
}
private class SingleVersionModule extends LifecycleModule {
@Override
public void configure() {

View File

@ -89,15 +89,18 @@ class LuceneVersionManager implements LifecycleListener {
private final SitePaths sitePaths;
private final LuceneChangeIndex.Factory indexFactory;
private final IndexCollection indexes;
private final OnlineReindexer.Factory reindexerFactory;
@Inject
LuceneVersionManager(
SitePaths sitePaths,
LuceneChangeIndex.Factory indexFactory,
IndexCollection indexes) {
IndexCollection indexes,
OnlineReindexer.Factory reindexerFactory) {
this.sitePaths = sitePaths;
this.indexFactory = indexFactory;
this.indexes = indexes;
this.reindexerFactory = reindexerFactory;
}
@Override
@ -147,6 +150,11 @@ class LuceneVersionManager implements LifecycleListener {
}
}
}
int latest = write.get(0).version;
if (latest != search.version) {
reindexerFactory.create(latest).start();
}
}
private TreeMap<Integer, Version> scanVersions(Config cfg) {

View File

@ -0,0 +1,109 @@
// Copyright (C) 2013 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.package com.google.gerrit.server.git;
package com.google.gerrit.lucene;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.Lists;
import com.google.gerrit.server.index.ChangeBatchIndexer;
import com.google.gerrit.server.index.ChangeIndex;
import com.google.gerrit.server.index.IndexCollection;
import com.google.gerrit.server.project.ProjectCache;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
public class OnlineReindexer {
private static final Logger log = LoggerFactory
.getLogger(OnlineReindexer.class);
public interface Factory {
OnlineReindexer create(int version);
}
private final IndexCollection indexes;
private final ChangeBatchIndexer batchIndexer;
private final ProjectCache projectCache;
private final int version;
@Inject
OnlineReindexer(
IndexCollection indexes,
ChangeBatchIndexer batchIndexer,
ProjectCache projectCache,
@Assisted int version) {
this.indexes = indexes;
this.batchIndexer = batchIndexer;
this.projectCache = projectCache;
this.version = version;
}
public void start() {
Thread t = new Thread() {
@Override
public void run() {
reindex();
}
};
t.setName(String.format("Reindex v%d-v%d",
version(indexes.getSearchIndex()), version));
t.start();
}
private static int version(ChangeIndex i) {
return i.getSchema().getVersion();
}
private void reindex() {
ChangeIndex index = checkNotNull(indexes.getWriteIndex(version),
"not an active write schema version: %s", version);
log.info("Starting online reindex from schema version {} to {}",
version(indexes.getSearchIndex()), version(index));
ChangeBatchIndexer.Result result = batchIndexer.indexAll(
index, projectCache.all(), -1, -1, null, null);
if (!result.success()) {
log.error("Online reindex of schema version {} failed", version(index));
return;
}
indexes.setSearchIndex(index);
log.info("Reindex complete, using schema version {}", version(index));
try {
index.markReady(true);
} catch (IOException e) {
log.warn("Error activating new schema version {}", version(index));
}
List<ChangeIndex> toRemove = Lists.newArrayListWithExpectedSize(1);
for (ChangeIndex i : indexes.getWriteIndexes()) {
if (version(i) != version(index)) {
toRemove.add(i);
}
}
for (ChangeIndex i : toRemove) {
try {
i.markReady(false);
indexes.removeWriteIndex(version(i));
} catch (IOException e) {
log.warn("Error deactivating old schema version {}", version(i));
}
}
}
}

View File

@ -33,8 +33,8 @@ import com.google.gerrit.server.git.MultiProgressMonitor;
import com.google.gerrit.server.git.MultiProgressMonitor.Task;
import com.google.gerrit.server.patch.PatchListLoader;
import com.google.gerrit.server.query.change.ChangeData;
import com.google.gwtorm.server.SchemaFactory;
import com.google.inject.Inject;
import com.google.inject.Provider;
import org.eclipse.jgit.diff.DiffEntry;
import org.eclipse.jgit.diff.DiffFormatter;
@ -99,17 +99,17 @@ public class ChangeBatchIndexer {
}
}
private final Provider<ReviewDb> dbProvider;
private final SchemaFactory<ReviewDb> schemaFactory;
private final GitRepositoryManager repoManager;
private final ListeningScheduledExecutorService executor;
private final ChangeIndexer.Factory indexerFactory;
@Inject
ChangeBatchIndexer(Provider<ReviewDb> dbProvider,
ChangeBatchIndexer(SchemaFactory<ReviewDb> schemaFactory,
GitRepositoryManager repoManager,
@IndexExecutor ListeningScheduledExecutorService executor,
ChangeIndexer.Factory indexerFactory) {
this.dbProvider = dbProvider;
this.schemaFactory = schemaFactory;
this.repoManager = repoManager;
this.executor = executor;
this.indexerFactory = indexerFactory;
@ -149,7 +149,7 @@ public class ChangeBatchIndexer {
} catch (InterruptedException e) {
fail(project, e);
} catch (ExecutionException e) {
ok.set(false); // Logged by indexer.
fail(project, e);
} catch (RuntimeException e) {
failAndThrow(project, e);
} catch (Error e) {
@ -214,23 +214,26 @@ public class ChangeBatchIndexer {
@Override
public Void call() throws Exception {
ReviewDb db = dbProvider.get();
repo = repoManager.openRepository(project);
ReviewDb db = schemaFactory.open();
try {
Map<String, Ref> refs = repo.getAllRefs();
for (Change c : db.changes().byProject(project)) {
Ref r = refs.get(c.currentPatchSetId().toRefName());
if (r != null) {
byId.put(r.getObjectId(), new ChangeData(c));
repo = repoManager.openRepository(project);
try {
Map<String, Ref> refs = repo.getAllRefs();
for (Change c : db.changes().byProject(project)) {
Ref r = refs.get(c.currentPatchSetId().toRefName());
if (r != null) {
byId.put(r.getObjectId(), new ChangeData(c));
}
}
}
walk();
} finally {
repo.close();
walk();
} finally {
repo.close();
// TODO(dborowitz): Opening all repositories in a live server may be
// wasteful; see if we can determine which ones it is safe to close with
// RepositoryCache.close(repo).
}
} finally {
db.close();
}
return null;
}

View File

@ -68,12 +68,26 @@ public class IndexCollection implements LifecycleListener {
writeIndexes.add(index);
}
public void removeWriteIndex(int version) {
writeIndexes.remove(version);
public synchronized void removeWriteIndex(int version) {
int removeIndex = -1;
for (int i = 0; i < writeIndexes.size(); i++) {
if (writeIndexes.get(i).getSchema().getVersion() == version) {
removeIndex = i;
break;
}
}
if (removeIndex >= 0) {
writeIndexes.remove(removeIndex);
}
}
public ChangeIndex getWriteIndex(int version) {
return writeIndexes.get(version);
for (ChangeIndex i : writeIndexes) {
if (i.getSchema().getVersion() == version) {
return i;
}
}
return null;
}
@Override