From b9b38b2664c881c10db35b20fc441a5884243073 Mon Sep 17 00:00:00 2001 From: Dave Borowitz Date: Thu, 27 Jun 2013 15:15:00 -0600 Subject: [PATCH] Add online Lucene reindexing support At startup time, if the most recent write version is not ready for searching, kick off a background thread to do a batch reindex. When that finishes, swap the new search version into the IndexCollection. Solr continues to use a single version determined at startup for now. Change-Id: I99c53b7df1c0bec401aac8184071e04e75e8a972 --- .../gerrit/lucene/LuceneIndexModule.java | 15 ++- .../gerrit/lucene/LuceneVersionManager.java | 10 +- .../google/gerrit/lucene/OnlineReindexer.java | 109 ++++++++++++++++++ .../server/index/ChangeBatchIndexer.java | 37 +++--- .../gerrit/server/index/IndexCollection.java | 20 +++- 5 files changed, 169 insertions(+), 22 deletions(-) create mode 100644 gerrit-lucene/src/main/java/com/google/gerrit/lucene/OnlineReindexer.java diff --git a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneIndexModule.java b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneIndexModule.java index 1e1dca75c3..4e8f3f9d8b 100644 --- a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneIndexModule.java +++ b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneIndexModule.java @@ -53,12 +53,25 @@ public class LuceneIndexModule extends LifecycleModule { }); install(new IndexModule(threads)); if (singleVersion == null && base == null) { - listener().to(LuceneVersionManager.class); + install(new MultiVersionModule()); } else { install(new SingleVersionModule()); } } + private class MultiVersionModule extends LifecycleModule { + @Override + public void configure() { + install(new FactoryModule() { + @Override + public void configure() { + factory(OnlineReindexer.Factory.class); + } + }); + listener().to(LuceneVersionManager.class); + } + } + private class SingleVersionModule extends LifecycleModule { @Override public void configure() { diff --git a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneVersionManager.java b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneVersionManager.java index 2f7cfa6e99..91dd01582e 100644 --- a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneVersionManager.java +++ b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneVersionManager.java @@ -89,15 +89,18 @@ class LuceneVersionManager implements LifecycleListener { private final SitePaths sitePaths; private final LuceneChangeIndex.Factory indexFactory; private final IndexCollection indexes; + private final OnlineReindexer.Factory reindexerFactory; @Inject LuceneVersionManager( SitePaths sitePaths, LuceneChangeIndex.Factory indexFactory, - IndexCollection indexes) { + IndexCollection indexes, + OnlineReindexer.Factory reindexerFactory) { this.sitePaths = sitePaths; this.indexFactory = indexFactory; this.indexes = indexes; + this.reindexerFactory = reindexerFactory; } @Override @@ -147,6 +150,11 @@ class LuceneVersionManager implements LifecycleListener { } } } + + int latest = write.get(0).version; + if (latest != search.version) { + reindexerFactory.create(latest).start(); + } } private TreeMap scanVersions(Config cfg) { diff --git a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/OnlineReindexer.java b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/OnlineReindexer.java new file mode 100644 index 0000000000..08338eb7e4 --- /dev/null +++ b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/OnlineReindexer.java @@ -0,0 +1,109 @@ +// Copyright (C) 2013 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License.package com.google.gerrit.server.git; + +package com.google.gerrit.lucene; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.collect.Lists; +import com.google.gerrit.server.index.ChangeBatchIndexer; +import com.google.gerrit.server.index.ChangeIndex; +import com.google.gerrit.server.index.IndexCollection; +import com.google.gerrit.server.project.ProjectCache; +import com.google.inject.Inject; +import com.google.inject.assistedinject.Assisted; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; + +public class OnlineReindexer { + private static final Logger log = LoggerFactory + .getLogger(OnlineReindexer.class); + + public interface Factory { + OnlineReindexer create(int version); + } + + private final IndexCollection indexes; + private final ChangeBatchIndexer batchIndexer; + private final ProjectCache projectCache; + private final int version; + + @Inject + OnlineReindexer( + IndexCollection indexes, + ChangeBatchIndexer batchIndexer, + ProjectCache projectCache, + @Assisted int version) { + this.indexes = indexes; + this.batchIndexer = batchIndexer; + this.projectCache = projectCache; + this.version = version; + } + + public void start() { + Thread t = new Thread() { + @Override + public void run() { + reindex(); + } + }; + t.setName(String.format("Reindex v%d-v%d", + version(indexes.getSearchIndex()), version)); + t.start(); + } + + private static int version(ChangeIndex i) { + return i.getSchema().getVersion(); + } + + private void reindex() { + ChangeIndex index = checkNotNull(indexes.getWriteIndex(version), + "not an active write schema version: %s", version); + log.info("Starting online reindex from schema version {} to {}", + version(indexes.getSearchIndex()), version(index)); + ChangeBatchIndexer.Result result = batchIndexer.indexAll( + index, projectCache.all(), -1, -1, null, null); + if (!result.success()) { + log.error("Online reindex of schema version {} failed", version(index)); + return; + } + + indexes.setSearchIndex(index); + log.info("Reindex complete, using schema version {}", version(index)); + try { + index.markReady(true); + } catch (IOException e) { + log.warn("Error activating new schema version {}", version(index)); + } + + List toRemove = Lists.newArrayListWithExpectedSize(1); + for (ChangeIndex i : indexes.getWriteIndexes()) { + if (version(i) != version(index)) { + toRemove.add(i); + } + } + for (ChangeIndex i : toRemove) { + try { + i.markReady(false); + indexes.removeWriteIndex(version(i)); + } catch (IOException e) { + log.warn("Error deactivating old schema version {}", version(i)); + } + } + } +} diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeBatchIndexer.java b/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeBatchIndexer.java index faecee9478..261a439602 100644 --- a/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeBatchIndexer.java +++ b/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeBatchIndexer.java @@ -33,8 +33,8 @@ import com.google.gerrit.server.git.MultiProgressMonitor; import com.google.gerrit.server.git.MultiProgressMonitor.Task; import com.google.gerrit.server.patch.PatchListLoader; import com.google.gerrit.server.query.change.ChangeData; +import com.google.gwtorm.server.SchemaFactory; import com.google.inject.Inject; -import com.google.inject.Provider; import org.eclipse.jgit.diff.DiffEntry; import org.eclipse.jgit.diff.DiffFormatter; @@ -99,17 +99,17 @@ public class ChangeBatchIndexer { } } - private final Provider dbProvider; + private final SchemaFactory schemaFactory; private final GitRepositoryManager repoManager; private final ListeningScheduledExecutorService executor; private final ChangeIndexer.Factory indexerFactory; @Inject - ChangeBatchIndexer(Provider dbProvider, + ChangeBatchIndexer(SchemaFactory schemaFactory, GitRepositoryManager repoManager, @IndexExecutor ListeningScheduledExecutorService executor, ChangeIndexer.Factory indexerFactory) { - this.dbProvider = dbProvider; + this.schemaFactory = schemaFactory; this.repoManager = repoManager; this.executor = executor; this.indexerFactory = indexerFactory; @@ -149,7 +149,7 @@ public class ChangeBatchIndexer { } catch (InterruptedException e) { fail(project, e); } catch (ExecutionException e) { - ok.set(false); // Logged by indexer. + fail(project, e); } catch (RuntimeException e) { failAndThrow(project, e); } catch (Error e) { @@ -214,23 +214,26 @@ public class ChangeBatchIndexer { @Override public Void call() throws Exception { - ReviewDb db = dbProvider.get(); - repo = repoManager.openRepository(project); - + ReviewDb db = schemaFactory.open(); try { - Map refs = repo.getAllRefs(); - for (Change c : db.changes().byProject(project)) { - Ref r = refs.get(c.currentPatchSetId().toRefName()); - if (r != null) { - byId.put(r.getObjectId(), new ChangeData(c)); + repo = repoManager.openRepository(project); + try { + Map refs = repo.getAllRefs(); + for (Change c : db.changes().byProject(project)) { + Ref r = refs.get(c.currentPatchSetId().toRefName()); + if (r != null) { + byId.put(r.getObjectId(), new ChangeData(c)); + } } - } - walk(); - } finally { - repo.close(); + walk(); + } finally { + repo.close(); // TODO(dborowitz): Opening all repositories in a live server may be // wasteful; see if we can determine which ones it is safe to close with // RepositoryCache.close(repo). + } + } finally { + db.close(); } return null; } diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/index/IndexCollection.java b/gerrit-server/src/main/java/com/google/gerrit/server/index/IndexCollection.java index 6a2c99cb6a..0956120862 100644 --- a/gerrit-server/src/main/java/com/google/gerrit/server/index/IndexCollection.java +++ b/gerrit-server/src/main/java/com/google/gerrit/server/index/IndexCollection.java @@ -68,12 +68,26 @@ public class IndexCollection implements LifecycleListener { writeIndexes.add(index); } - public void removeWriteIndex(int version) { - writeIndexes.remove(version); + public synchronized void removeWriteIndex(int version) { + int removeIndex = -1; + for (int i = 0; i < writeIndexes.size(); i++) { + if (writeIndexes.get(i).getSchema().getVersion() == version) { + removeIndex = i; + break; + } + } + if (removeIndex >= 0) { + writeIndexes.remove(removeIndex); + } } public ChangeIndex getWriteIndex(int version) { - return writeIndexes.get(version); + for (ChangeIndex i : writeIndexes) { + if (i.getSchema().getVersion() == version) { + return i; + } + } + return null; } @Override