From b518e69abc4d081c550372f575869c1bc0bf2af0 Mon Sep 17 00:00:00 2001 From: Dave Borowitz Date: Wed, 16 Mar 2016 15:07:58 +0100 Subject: [PATCH] Generify OnlineReindexer and friends Change-Id: I0730547a0ff8e1c001639e06488c8b2f54a106ac --- .../gerrit/lucene/LuceneIndexModule.java | 1 - .../gerrit/lucene/LuceneVersionManager.java | 14 +- .../java/com/google/gerrit/pgm/Reindex.java | 9 +- .../gerrit/server/index}/OnlineReindexer.java | 44 +- .../gerrit/server/index/SiteIndexer.java | 381 +---------------- .../index/change/AllChangesIndexer.java | 404 ++++++++++++++++++ 6 files changed, 437 insertions(+), 416 deletions(-) rename {gerrit-lucene/src/main/java/com/google/gerrit/lucene => gerrit-server/src/main/java/com/google/gerrit/server/index}/OnlineReindexer.java (71%) create mode 100644 gerrit-server/src/main/java/com/google/gerrit/server/index/change/AllChangesIndexer.java diff --git a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneIndexModule.java b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneIndexModule.java index 33eebe4fba..1f9270bfcc 100644 --- a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneIndexModule.java +++ b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneIndexModule.java @@ -50,7 +50,6 @@ public class LuceneIndexModule extends LifecycleModule { @Override protected void configure() { factory(LuceneChangeIndex.Factory.class); - factory(OnlineReindexer.Factory.class); install(new IndexModule(threads)); if (singleVersion == null) { install(new MultiVersionModule()); diff --git a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneVersionManager.java b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneVersionManager.java index 2f53eec965..42959e358f 100644 --- a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneVersionManager.java +++ b/gerrit-lucene/src/main/java/com/google/gerrit/lucene/LuceneVersionManager.java @@ -20,9 +20,13 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.primitives.Ints; import com.google.gerrit.extensions.events.LifecycleListener; +import com.google.gerrit.reviewdb.client.Change; import com.google.gerrit.server.config.GerritServerConfig; import com.google.gerrit.server.config.SitePaths; +import com.google.gerrit.server.index.OnlineReindexer; import com.google.gerrit.server.index.Schema; +import com.google.gerrit.server.index.change.AllChangesIndexer; +import com.google.gerrit.server.index.change.ChangeIndex; import com.google.gerrit.server.index.change.ChangeIndexCollection; import com.google.gerrit.server.index.change.ChangeSchemas; import com.google.gerrit.server.query.change.ChangeData; @@ -93,9 +97,9 @@ public class LuceneVersionManager implements LifecycleListener { private final SitePaths sitePaths; private final LuceneChangeIndex.Factory indexFactory; private final ChangeIndexCollection indexes; - private final OnlineReindexer.Factory reindexerFactory; + private final AllChangesIndexer allChangesIndexer; private final boolean onlineUpgrade; - private OnlineReindexer reindexer; + private OnlineReindexer reindexer; @Inject LuceneVersionManager( @@ -103,11 +107,11 @@ public class LuceneVersionManager implements LifecycleListener { SitePaths sitePaths, LuceneChangeIndex.Factory indexFactory, ChangeIndexCollection indexes, - OnlineReindexer.Factory reindexerFactory) { + AllChangesIndexer allChangesIndexer) { this.sitePaths = sitePaths; this.indexFactory = indexFactory; this.indexes = indexes; - this.reindexerFactory = reindexerFactory; + this.allChangesIndexer = allChangesIndexer; this.onlineUpgrade = cfg.getBoolean("index", null, "onlineUpgrade", true); } @@ -171,7 +175,7 @@ public class LuceneVersionManager implements LifecycleListener { int latest = write.get(0).version; if (onlineUpgrade && latest != search.version) { - reindexer = reindexerFactory.create(latest); + reindexer = new OnlineReindexer<>(indexes, allChangesIndexer, latest); reindexer.start(); } } diff --git a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Reindex.java b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Reindex.java index 06e6003c2f..420318001f 100644 --- a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Reindex.java +++ b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Reindex.java @@ -29,8 +29,9 @@ import com.google.gerrit.server.config.GerritServerConfig; import com.google.gerrit.server.git.GitRepositoryManager; import com.google.gerrit.server.git.ScanningChangeCacheImpl; import com.google.gerrit.server.index.IndexModule; -import com.google.gerrit.server.index.IndexModule.IndexType; import com.google.gerrit.server.index.SiteIndexer; +import com.google.gerrit.server.index.IndexModule.IndexType; +import com.google.gerrit.server.index.change.AllChangesIndexer; import com.google.gerrit.server.index.change.ChangeIndex; import com.google.gerrit.server.index.change.ChangeIndexCollection; import com.google.gerrit.server.index.change.ChangeSchemas; @@ -164,9 +165,9 @@ public class Reindex extends SiteProgram { } pm.endTask(); - SiteIndexer batchIndexer = - sysInjector.getInstance(SiteIndexer.class); - SiteIndexer.Result result = batchIndexer.setNumChanges(changeCount) + AllChangesIndexer batchIndexer = + sysInjector.getInstance(AllChangesIndexer.class); + SiteIndexer.Result result = batchIndexer.setTotalWork(changeCount) .setProgressOut(System.err) .setVerboseOut(verbose ? System.out : NullOutputStream.INSTANCE) .indexAll(index, projects); diff --git a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/OnlineReindexer.java b/gerrit-server/src/main/java/com/google/gerrit/server/index/OnlineReindexer.java similarity index 71% rename from gerrit-lucene/src/main/java/com/google/gerrit/lucene/OnlineReindexer.java rename to gerrit-server/src/main/java/com/google/gerrit/server/index/OnlineReindexer.java index 89aa57cabc..2c75af1364 100644 --- a/gerrit-lucene/src/main/java/com/google/gerrit/lucene/OnlineReindexer.java +++ b/gerrit-server/src/main/java/com/google/gerrit/server/index/OnlineReindexer.java @@ -12,18 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -package com.google.gerrit.lucene; +package com.google.gerrit.server.index; import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.collect.Lists; -import com.google.gerrit.server.index.Index; -import com.google.gerrit.server.index.SiteIndexer; -import com.google.gerrit.server.index.change.ChangeIndex; -import com.google.gerrit.server.index.change.ChangeIndexCollection; -import com.google.gerrit.server.project.ProjectCache; -import com.google.inject.Inject; -import com.google.inject.assistedinject.Assisted; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,30 +25,22 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -public class OnlineReindexer { +public class OnlineReindexer> { private static final Logger log = LoggerFactory .getLogger(OnlineReindexer.class); - public interface Factory { - OnlineReindexer create(int version); - } - - private final ChangeIndexCollection indexes; - private final SiteIndexer batchIndexer; - private final ProjectCache projectCache; + private final IndexCollection indexes; + private final SiteIndexer batchIndexer; private final int version; - private ChangeIndex index; + private I index; private final AtomicBoolean running = new AtomicBoolean(); - @Inject - OnlineReindexer( - ChangeIndexCollection indexes, - SiteIndexer batchIndexer, - ProjectCache projectCache, - @Assisted int version) { + public OnlineReindexer( + IndexCollection indexes, + SiteIndexer batchIndexer, + int version) { this.indexes = indexes; this.batchIndexer = batchIndexer; - this.projectCache = projectCache; this.version = version; } @@ -94,8 +79,7 @@ public class OnlineReindexer { "not an active write schema version: %s", version); log.info("Starting online reindex from schema version {} to {}", version(indexes.getSearchIndex()), version(index)); - SiteIndexer.Result result = - batchIndexer.indexAll(index, projectCache.all()); + SiteIndexer.Result result = batchIndexer.indexAll(index); if (!result.success()) { log.error("Online reindex of schema version {} failed. Successfully" + " indexed {} changes, failed to index {} changes", @@ -106,7 +90,7 @@ public class OnlineReindexer { activateIndex(); } - void activateIndex() { + public void activateIndex() { indexes.setSearchIndex(index); log.info("Using schema version {}", version(index)); try { @@ -115,13 +99,13 @@ public class OnlineReindexer { log.warn("Error activating new schema version {}", version(index)); } - List toRemove = Lists.newArrayListWithExpectedSize(1); - for (ChangeIndex i : indexes.getWriteIndexes()) { + List toRemove = Lists.newArrayListWithExpectedSize(1); + for (I i : indexes.getWriteIndexes()) { if (version(i) != version(index)) { toRemove.add(i); } } - for (ChangeIndex i : toRemove) { + for (I i : toRemove) { try { i.markReady(false); indexes.removeWriteIndex(version(i)); diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/index/SiteIndexer.java b/gerrit-server/src/main/java/com/google/gerrit/server/index/SiteIndexer.java index a60c2a839e..46a2b7dbf8 100644 --- a/gerrit-server/src/main/java/com/google/gerrit/server/index/SiteIndexer.java +++ b/gerrit-server/src/main/java/com/google/gerrit/server/index/SiteIndexer.java @@ -1,4 +1,4 @@ -// Copyright (C) 2013 The Android Open Source Project +// Copyright (C) 2016 The Android Open Source Project // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -14,81 +14,18 @@ package com.google.gerrit.server.index; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.gerrit.server.git.QueueProvider.QueueType.BATCH; -import static org.eclipse.jgit.lib.RefDatabase.ALL; - import com.google.common.base.Stopwatch; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; -import com.google.common.util.concurrent.AsyncFunction; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.gerrit.reviewdb.client.Project; -import com.google.gerrit.reviewdb.server.ReviewDb; -import com.google.gerrit.server.config.GerritServerConfig; -import com.google.gerrit.server.git.GitRepositoryManager; -import com.google.gerrit.server.git.MergeUtil; -import com.google.gerrit.server.git.MultiProgressMonitor; -import com.google.gerrit.server.git.MultiProgressMonitor.Task; -import com.google.gerrit.server.index.change.ChangeIndex; -import com.google.gerrit.server.index.change.ChangeIndexer; -import com.google.gerrit.server.notedb.ChangeNotes; -import com.google.gerrit.server.patch.PatchListLoader; -import com.google.gerrit.server.query.change.ChangeData; -import com.google.gwtorm.server.SchemaFactory; -import com.google.inject.Inject; -import org.eclipse.jgit.diff.DiffEntry; -import org.eclipse.jgit.diff.DiffFormatter; -import org.eclipse.jgit.errors.RepositoryNotFoundException; -import org.eclipse.jgit.lib.Config; -import org.eclipse.jgit.lib.Constants; -import org.eclipse.jgit.lib.ObjectId; -import org.eclipse.jgit.lib.ObjectInserter; -import org.eclipse.jgit.lib.ProgressMonitor; -import org.eclipse.jgit.lib.Ref; -import org.eclipse.jgit.lib.Repository; -import org.eclipse.jgit.merge.ThreeWayMergeStrategy; -import org.eclipse.jgit.revwalk.RevCommit; -import org.eclipse.jgit.revwalk.RevObject; -import org.eclipse.jgit.revwalk.RevTree; -import org.eclipse.jgit.revwalk.RevWalk; -import org.eclipse.jgit.util.io.DisabledOutputStream; -import org.eclipse.jgit.util.io.NullOutputStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.OutputStream; -import java.io.PrintWriter; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -public class SiteIndexer { - private static final Logger log = - LoggerFactory.getLogger(SiteIndexer.class); - - public static class Result { +public interface SiteIndexer> { + public class Result { private final long elapsedNanos; private final boolean success; private final int done; private final int failed; - private Result(Stopwatch sw, boolean success, int done, int failed) { + public Result(Stopwatch sw, boolean success, int done, int failed) { this.elapsedNanos = sw.elapsed(TimeUnit.NANOSECONDS); this.success = success; this.done = done; @@ -112,313 +49,5 @@ public class SiteIndexer { } } - private final SchemaFactory schemaFactory; - private final ChangeData.Factory changeDataFactory; - private final GitRepositoryManager repoManager; - private final ListeningExecutorService executor; - private final ChangeIndexer.Factory indexerFactory; - private final ChangeNotes.Factory notesFactory; - private final ThreeWayMergeStrategy mergeStrategy; - - private int numChanges = -1; - private OutputStream progressOut = NullOutputStream.INSTANCE; - private PrintWriter verboseWriter = - new PrintWriter(NullOutputStream.INSTANCE); - - @Inject - SiteIndexer(SchemaFactory schemaFactory, - ChangeData.Factory changeDataFactory, - GitRepositoryManager repoManager, - @IndexExecutor(BATCH) ListeningExecutorService executor, - ChangeIndexer.Factory indexerFactory, - ChangeNotes.Factory notesFactory, - @GerritServerConfig Config config) { - this.schemaFactory = schemaFactory; - this.changeDataFactory = changeDataFactory; - this.repoManager = repoManager; - this.executor = executor; - this.indexerFactory = indexerFactory; - this.notesFactory = notesFactory; - this.mergeStrategy = MergeUtil.getMergeStrategy(config); - } - - public SiteIndexer setNumChanges(int num) { - numChanges = num; - return this; - } - - public SiteIndexer setProgressOut(OutputStream out) { - progressOut = checkNotNull(out); - return this; - } - - public SiteIndexer setVerboseOut(OutputStream out) { - verboseWriter = new PrintWriter(checkNotNull(out)); - return this; - } - - public Result indexAll(ChangeIndex index, - Iterable projects) { - Stopwatch sw = Stopwatch.createStarted(); - final MultiProgressMonitor mpm = - new MultiProgressMonitor(progressOut, "Reindexing changes"); - final Task projTask = mpm.beginSubTask("projects", - (projects instanceof Collection) - ? ((Collection) projects).size() - : MultiProgressMonitor.UNKNOWN); - final Task doneTask = mpm.beginSubTask(null, - numChanges >= 0 ? numChanges : MultiProgressMonitor.UNKNOWN); - final Task failedTask = mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN); - - final List> futures = Lists.newArrayList(); - final AtomicBoolean ok = new AtomicBoolean(true); - - for (final Project.NameKey project : projects) { - final ListenableFuture future = executor.submit(reindexProject( - indexerFactory.create(executor, index), project, doneTask, failedTask, - verboseWriter)); - futures.add(future); - future.addListener(new Runnable() { - @Override - public void run() { - try { - future.get(); - } catch (ExecutionException | InterruptedException e) { - fail(project, e); - } catch (RuntimeException e) { - failAndThrow(project, e); - } catch (Error e) { - // Can't join with RuntimeException because "RuntimeException | - // Error" becomes Throwable, which messes with signatures. - failAndThrow(project, e); - } finally { - projTask.update(1); - } - } - - private void fail(Project.NameKey project, Throwable t) { - log.error("Failed to index project " + project, t); - ok.set(false); - } - - private void failAndThrow(Project.NameKey project, RuntimeException e) { - fail(project, e); - throw e; - } - - private void failAndThrow(Project.NameKey project, Error e) { - fail(project, e); - throw e; - } - }, MoreExecutors.directExecutor()); - } - - try { - mpm.waitFor(Futures.transformAsync(Futures.successfulAsList(futures), - new AsyncFunction, Void>() { - @Override - public ListenableFuture apply(List input) { - mpm.end(); - return Futures.immediateFuture(null); - } - })); - } catch (ExecutionException e) { - log.error("Error in batch indexer", e); - ok.set(false); - } - // If too many changes failed, maybe there was a bug in the indexer. Don't - // trust the results. This is not an exact percentage since we bump the same - // failure counter if a project can't be read, but close enough. - int nFailed = failedTask.getCount(); - int nTotal = nFailed + doneTask.getCount(); - double pctFailed = ((double) nFailed) / nTotal * 100; - if (pctFailed > 10) { - log.error("Failed {}/{} changes ({}%); not marking new index as ready", - nFailed, nTotal, Math.round(pctFailed)); - ok.set(false); - } - return new Result(sw, ok.get(), doneTask.getCount(), failedTask.getCount()); - } - - private Callable reindexProject(final ChangeIndexer indexer, - final Project.NameKey project, final Task done, final Task failed, - final PrintWriter verboseWriter) { - return new Callable() { - @Override - public Void call() throws Exception { - Multimap byId = ArrayListMultimap.create(); - // TODO(dborowitz): Opening all repositories in a live server may be - // wasteful; see if we can determine which ones it is safe to close - // with RepositoryCache.close(repo). - try (Repository repo = repoManager.openRepository(project); - ReviewDb db = schemaFactory.open()) { - Map refs = repo.getRefDatabase().getRefs(ALL); - for (ChangeNotes cn : notesFactory.scan(repo, db, project)) { - Ref r = refs.get(cn.getChange().currentPatchSetId().toRefName()); - if (r != null) { - byId.put(r.getObjectId(), changeDataFactory.create(db, cn)); - } - } - new ProjectIndexer(indexer, - mergeStrategy, - byId, - repo, - done, - failed, - verboseWriter).call(); - } catch (RepositoryNotFoundException rnfe) { - log.error(rnfe.getMessage()); - } - return null; - } - - @Override - public String toString() { - return "Index all changes of project " + project.get(); - } - }; - } - - private static class ProjectIndexer implements Callable { - private final ChangeIndexer indexer; - private final ThreeWayMergeStrategy mergeStrategy; - private final Multimap byId; - private final ProgressMonitor done; - private final ProgressMonitor failed; - private final PrintWriter verboseWriter; - private final Repository repo; - private RevWalk walk; - - private ProjectIndexer(ChangeIndexer indexer, - ThreeWayMergeStrategy mergeStrategy, - Multimap changesByCommitId, - Repository repo, - ProgressMonitor done, - ProgressMonitor failed, - PrintWriter verboseWriter) { - this.indexer = indexer; - this.mergeStrategy = mergeStrategy; - this.byId = changesByCommitId; - this.repo = repo; - this.done = done; - this.failed = failed; - this.verboseWriter = verboseWriter; - } - - @Override - public Void call() throws Exception { - walk = new RevWalk(repo); - try { - // Walk only refs first to cover as many changes as we can without having - // to mark every single change. - for (Ref ref : repo.getRefDatabase().getRefs(Constants.R_HEADS).values()) { - RevObject o = walk.parseAny(ref.getObjectId()); - if (o instanceof RevCommit) { - walk.markStart((RevCommit) o); - } - } - - RevCommit bCommit; - while ((bCommit = walk.next()) != null && !byId.isEmpty()) { - if (byId.containsKey(bCommit)) { - getPathsAndIndex(bCommit); - byId.removeAll(bCommit); - } - } - - for (ObjectId id : byId.keySet()) { - getPathsAndIndex(id); - } - } finally { - walk.close(); - } - return null; - } - - private void getPathsAndIndex(ObjectId b) throws Exception { - List cds = Lists.newArrayList(byId.get(b)); - try (DiffFormatter df = new DiffFormatter(DisabledOutputStream.INSTANCE)) { - RevCommit bCommit = walk.parseCommit(b); - RevTree bTree = bCommit.getTree(); - RevTree aTree = aFor(bCommit, walk); - df.setRepository(repo); - if (!cds.isEmpty()) { - List paths = (aTree != null) - ? getPaths(df.scan(aTree, bTree)) - : Collections.emptyList(); - Iterator cdit = cds.iterator(); - for (ChangeData cd ; cdit.hasNext(); cdit.remove()) { - cd = cdit.next(); - try { - cd.setCurrentFilePaths(paths); - indexer.index(cd); - done.update(1); - if (verboseWriter != null) { - verboseWriter.println("Reindexed change " + cd.getId()); - } - } catch (Exception e) { - fail("Failed to index change " + cd.getId(), true, e); - } - } - } - } catch (Exception e) { - fail("Failed to index commit " + b.name(), false, e); - for (ChangeData cd : cds) { - fail("Failed to index change " + cd.getId(), true, null); - } - } - } - - private List getPaths(List filenames) { - Set paths = Sets.newTreeSet(); - for (DiffEntry e : filenames) { - if (e.getOldPath() != null) { - paths.add(e.getOldPath()); - } - if (e.getNewPath() != null) { - paths.add(e.getNewPath()); - } - } - return ImmutableList.copyOf(paths); - } - - private RevTree aFor(RevCommit b, RevWalk walk) throws IOException { - switch (b.getParentCount()) { - case 0: - return walk.parseTree(emptyTree()); - case 1: - RevCommit a = b.getParent(0); - walk.parseBody(a); - return walk.parseTree(a.getTree()); - case 2: - return PatchListLoader.automerge(repo, walk, b, mergeStrategy); - default: - return null; - } - } - - private ObjectId emptyTree() throws IOException { - try (ObjectInserter oi = repo.newObjectInserter()) { - ObjectId id = oi.insert(Constants.OBJ_TREE, new byte[] {}); - oi.flush(); - return id; - } - } - - private void fail(String error, boolean failed, Exception e) { - if (failed) { - this.failed.update(1); - } - - if (e != null) { - log.warn(error, e); - } else { - log.warn(error); - } - - if (verboseWriter != null) { - verboseWriter.println(error); - } - } - } + Result indexAll(I index); } diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/index/change/AllChangesIndexer.java b/gerrit-server/src/main/java/com/google/gerrit/server/index/change/AllChangesIndexer.java new file mode 100644 index 0000000000..4fe589c43e --- /dev/null +++ b/gerrit-server/src/main/java/com/google/gerrit/server/index/change/AllChangesIndexer.java @@ -0,0 +1,404 @@ +// Copyright (C) 2013 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.gerrit.server.index.change; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.gerrit.server.git.QueueProvider.QueueType.BATCH; +import static org.eclipse.jgit.lib.RefDatabase.ALL; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.gerrit.reviewdb.client.Change; +import com.google.gerrit.reviewdb.client.Project; +import com.google.gerrit.reviewdb.server.ReviewDb; +import com.google.gerrit.server.config.GerritServerConfig; +import com.google.gerrit.server.git.GitRepositoryManager; +import com.google.gerrit.server.git.MergeUtil; +import com.google.gerrit.server.git.MultiProgressMonitor; +import com.google.gerrit.server.git.MultiProgressMonitor.Task; +import com.google.gerrit.server.index.IndexExecutor; +import com.google.gerrit.server.index.SiteIndexer; +import com.google.gerrit.server.notedb.ChangeNotes; +import com.google.gerrit.server.patch.PatchListLoader; +import com.google.gerrit.server.project.ProjectCache; +import com.google.gerrit.server.query.change.ChangeData; +import com.google.gwtorm.server.SchemaFactory; +import com.google.inject.Inject; + +import org.eclipse.jgit.diff.DiffEntry; +import org.eclipse.jgit.diff.DiffFormatter; +import org.eclipse.jgit.errors.RepositoryNotFoundException; +import org.eclipse.jgit.lib.Config; +import org.eclipse.jgit.lib.Constants; +import org.eclipse.jgit.lib.ObjectId; +import org.eclipse.jgit.lib.ObjectInserter; +import org.eclipse.jgit.lib.ProgressMonitor; +import org.eclipse.jgit.lib.Ref; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.merge.ThreeWayMergeStrategy; +import org.eclipse.jgit.revwalk.RevCommit; +import org.eclipse.jgit.revwalk.RevObject; +import org.eclipse.jgit.revwalk.RevTree; +import org.eclipse.jgit.revwalk.RevWalk; +import org.eclipse.jgit.util.io.DisabledOutputStream; +import org.eclipse.jgit.util.io.NullOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; + +public class AllChangesIndexer + implements SiteIndexer { + private static final Logger log = + LoggerFactory.getLogger(AllChangesIndexer.class); + + private final SchemaFactory schemaFactory; + private final ChangeData.Factory changeDataFactory; + private final GitRepositoryManager repoManager; + private final ListeningExecutorService executor; + private final ChangeIndexer.Factory indexerFactory; + private final ChangeNotes.Factory notesFactory; + private final ProjectCache projectCache; + private final ThreeWayMergeStrategy mergeStrategy; + + private int totalWork = -1; + private OutputStream progressOut = NullOutputStream.INSTANCE; + private PrintWriter verboseWriter = + new PrintWriter(NullOutputStream.INSTANCE); + + @Inject + AllChangesIndexer(SchemaFactory schemaFactory, + ChangeData.Factory changeDataFactory, + GitRepositoryManager repoManager, + @IndexExecutor(BATCH) ListeningExecutorService executor, + ChangeIndexer.Factory indexerFactory, + ChangeNotes.Factory notesFactory, + @GerritServerConfig Config config, + ProjectCache projectCache) { + this.schemaFactory = schemaFactory; + this.changeDataFactory = changeDataFactory; + this.repoManager = repoManager; + this.executor = executor; + this.indexerFactory = indexerFactory; + this.notesFactory = notesFactory; + this.projectCache = projectCache; + this.mergeStrategy = MergeUtil.getMergeStrategy(config); + } + + public AllChangesIndexer setTotalWork(int num) { + totalWork = num; + return this; + } + + public AllChangesIndexer setProgressOut(OutputStream out) { + progressOut = checkNotNull(out); + return this; + } + + public AllChangesIndexer setVerboseOut(OutputStream out) { + verboseWriter = new PrintWriter(checkNotNull(out)); + return this; + } + + @Override + public Result indexAll(ChangeIndex index) { + return indexAll(index, projectCache.all()); + } + + public SiteIndexer.Result indexAll(ChangeIndex index, + Iterable projects) { + Stopwatch sw = Stopwatch.createStarted(); + final MultiProgressMonitor mpm = + new MultiProgressMonitor(progressOut, "Reindexing changes"); + final Task projTask = mpm.beginSubTask("projects", + (projects instanceof Collection) + ? ((Collection) projects).size() + : MultiProgressMonitor.UNKNOWN); + final Task doneTask = mpm.beginSubTask(null, + totalWork >= 0 ? totalWork : MultiProgressMonitor.UNKNOWN); + final Task failedTask = mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN); + + final List> futures = Lists.newArrayList(); + final AtomicBoolean ok = new AtomicBoolean(true); + + for (final Project.NameKey project : projects) { + final ListenableFuture future = executor.submit(reindexProject( + indexerFactory.create(executor, index), project, doneTask, failedTask, + verboseWriter)); + futures.add(future); + future.addListener(new Runnable() { + @Override + public void run() { + try { + future.get(); + } catch (ExecutionException | InterruptedException e) { + fail(project, e); + } catch (RuntimeException e) { + failAndThrow(project, e); + } catch (Error e) { + // Can't join with RuntimeException because "RuntimeException | + // Error" becomes Throwable, which messes with signatures. + failAndThrow(project, e); + } finally { + projTask.update(1); + } + } + + private void fail(Project.NameKey project, Throwable t) { + log.error("Failed to index project " + project, t); + ok.set(false); + } + + private void failAndThrow(Project.NameKey project, RuntimeException e) { + fail(project, e); + throw e; + } + + private void failAndThrow(Project.NameKey project, Error e) { + fail(project, e); + throw e; + } + }, MoreExecutors.directExecutor()); + } + + try { + mpm.waitFor(Futures.transformAsync(Futures.successfulAsList(futures), + new AsyncFunction, Void>() { + @Override + public ListenableFuture apply(List input) { + mpm.end(); + return Futures.immediateFuture(null); + } + })); + } catch (ExecutionException e) { + log.error("Error in batch indexer", e); + ok.set(false); + } + // If too many changes failed, maybe there was a bug in the indexer. Don't + // trust the results. This is not an exact percentage since we bump the same + // failure counter if a project can't be read, but close enough. + int nFailed = failedTask.getCount(); + int nTotal = nFailed + doneTask.getCount(); + double pctFailed = ((double) nFailed) / nTotal * 100; + if (pctFailed > 10) { + log.error("Failed {}/{} changes ({}%); not marking new index as ready", + nFailed, nTotal, Math.round(pctFailed)); + ok.set(false); + } + return new Result(sw, ok.get(), doneTask.getCount(), failedTask.getCount()); + } + + private Callable reindexProject(final ChangeIndexer indexer, + final Project.NameKey project, final Task done, final Task failed, + final PrintWriter verboseWriter) { + return new Callable() { + @Override + public Void call() throws Exception { + Multimap byId = ArrayListMultimap.create(); + // TODO(dborowitz): Opening all repositories in a live server may be + // wasteful; see if we can determine which ones it is safe to close + // with RepositoryCache.close(repo). + try (Repository repo = repoManager.openRepository(project); + ReviewDb db = schemaFactory.open()) { + Map refs = repo.getRefDatabase().getRefs(ALL); + for (ChangeNotes cn : notesFactory.scan(repo, db, project)) { + Ref r = refs.get(cn.getChange().currentPatchSetId().toRefName()); + if (r != null) { + byId.put(r.getObjectId(), changeDataFactory.create(db, cn)); + } + } + new ProjectIndexer(indexer, + mergeStrategy, + byId, + repo, + done, + failed, + verboseWriter).call(); + } catch (RepositoryNotFoundException rnfe) { + log.error(rnfe.getMessage()); + } + return null; + } + + @Override + public String toString() { + return "Index all changes of project " + project.get(); + } + }; + } + + private static class ProjectIndexer implements Callable { + private final ChangeIndexer indexer; + private final ThreeWayMergeStrategy mergeStrategy; + private final Multimap byId; + private final ProgressMonitor done; + private final ProgressMonitor failed; + private final PrintWriter verboseWriter; + private final Repository repo; + private RevWalk walk; + + private ProjectIndexer(ChangeIndexer indexer, + ThreeWayMergeStrategy mergeStrategy, + Multimap changesByCommitId, + Repository repo, + ProgressMonitor done, + ProgressMonitor failed, + PrintWriter verboseWriter) { + this.indexer = indexer; + this.mergeStrategy = mergeStrategy; + this.byId = changesByCommitId; + this.repo = repo; + this.done = done; + this.failed = failed; + this.verboseWriter = verboseWriter; + } + + @Override + public Void call() throws Exception { + walk = new RevWalk(repo); + try { + // Walk only refs first to cover as many changes as we can without having + // to mark every single change. + for (Ref ref : repo.getRefDatabase().getRefs(Constants.R_HEADS).values()) { + RevObject o = walk.parseAny(ref.getObjectId()); + if (o instanceof RevCommit) { + walk.markStart((RevCommit) o); + } + } + + RevCommit bCommit; + while ((bCommit = walk.next()) != null && !byId.isEmpty()) { + if (byId.containsKey(bCommit)) { + getPathsAndIndex(bCommit); + byId.removeAll(bCommit); + } + } + + for (ObjectId id : byId.keySet()) { + getPathsAndIndex(id); + } + } finally { + walk.close(); + } + return null; + } + + private void getPathsAndIndex(ObjectId b) throws Exception { + List cds = Lists.newArrayList(byId.get(b)); + try (DiffFormatter df = new DiffFormatter(DisabledOutputStream.INSTANCE)) { + RevCommit bCommit = walk.parseCommit(b); + RevTree bTree = bCommit.getTree(); + RevTree aTree = aFor(bCommit, walk); + df.setRepository(repo); + if (!cds.isEmpty()) { + List paths = (aTree != null) + ? getPaths(df.scan(aTree, bTree)) + : Collections.emptyList(); + Iterator cdit = cds.iterator(); + for (ChangeData cd ; cdit.hasNext(); cdit.remove()) { + cd = cdit.next(); + try { + cd.setCurrentFilePaths(paths); + indexer.index(cd); + done.update(1); + if (verboseWriter != null) { + verboseWriter.println("Reindexed change " + cd.getId()); + } + } catch (Exception e) { + fail("Failed to index change " + cd.getId(), true, e); + } + } + } + } catch (Exception e) { + fail("Failed to index commit " + b.name(), false, e); + for (ChangeData cd : cds) { + fail("Failed to index change " + cd.getId(), true, null); + } + } + } + + private List getPaths(List filenames) { + Set paths = Sets.newTreeSet(); + for (DiffEntry e : filenames) { + if (e.getOldPath() != null) { + paths.add(e.getOldPath()); + } + if (e.getNewPath() != null) { + paths.add(e.getNewPath()); + } + } + return ImmutableList.copyOf(paths); + } + + private RevTree aFor(RevCommit b, RevWalk walk) throws IOException { + switch (b.getParentCount()) { + case 0: + return walk.parseTree(emptyTree()); + case 1: + RevCommit a = b.getParent(0); + walk.parseBody(a); + return walk.parseTree(a.getTree()); + case 2: + return PatchListLoader.automerge(repo, walk, b, mergeStrategy); + default: + return null; + } + } + + private ObjectId emptyTree() throws IOException { + try (ObjectInserter oi = repo.newObjectInserter()) { + ObjectId id = oi.insert(Constants.OBJ_TREE, new byte[] {}); + oi.flush(); + return id; + } + } + + private void fail(String error, boolean failed, Exception e) { + if (failed) { + this.failed.update(1); + } + + if (e != null) { + log.warn(error, e); + } else { + log.warn(error); + } + + if (verboseWriter != null) { + verboseWriter.println(error); + } + } + } +}