diff --git a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Reindex.java b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Reindex.java index 29d9eb53c1..bc1e2944a7 100644 --- a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Reindex.java +++ b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/Reindex.java @@ -17,11 +17,18 @@ package com.google.gerrit.pgm; import static com.google.gerrit.lucene.IndexVersionCheck.SCHEMA_VERSIONS; import static com.google.gerrit.lucene.IndexVersionCheck.gerritIndexConfig; import static com.google.gerrit.lucene.LuceneChangeIndex.LUCENE_VERSION; -import static com.google.gerrit.server.schema.DataSourceProvider.Context.SINGLE_USER; +import static com.google.gerrit.server.schema.DataSourceProvider.Context.MULTI_USER; import com.google.common.base.Stopwatch; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.gerrit.extensions.events.LifecycleListener; import com.google.gerrit.extensions.registration.DynamicSet; @@ -30,16 +37,20 @@ import com.google.gerrit.lifecycle.LifecycleModule; import com.google.gerrit.lucene.LuceneIndexModule; import com.google.gerrit.pgm.util.SiteProgram; import com.google.gerrit.reviewdb.client.Change; +import com.google.gerrit.reviewdb.client.Project; import com.google.gerrit.reviewdb.server.ReviewDb; import com.google.gerrit.server.cache.CacheRemovalListener; import com.google.gerrit.server.cache.h2.DefaultCacheFactory; import com.google.gerrit.server.config.SitePaths; +import com.google.gerrit.server.git.GitRepositoryManager; import com.google.gerrit.server.git.MultiProgressMonitor; import com.google.gerrit.server.git.MultiProgressMonitor.Task; -import com.google.gerrit.server.git.WorkQueue; import com.google.gerrit.server.index.ChangeIndexer; +import com.google.gerrit.server.index.IndexExecutor; import com.google.gerrit.server.index.IndexModule; import com.google.gerrit.server.patch.PatchListCacheImpl; +import com.google.gerrit.server.patch.PatchListLoader; +import com.google.gerrit.server.query.change.ChangeData; import com.google.gwtorm.server.OrmException; import com.google.gwtorm.server.SchemaFactory; import com.google.inject.AbstractModule; @@ -52,22 +63,43 @@ import com.google.inject.TypeLiteral; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; +import org.eclipse.jgit.diff.DiffEntry; +import org.eclipse.jgit.diff.DiffFormatter; import org.eclipse.jgit.errors.ConfigInvalidException; +import org.eclipse.jgit.lib.Constants; +import org.eclipse.jgit.lib.ObjectId; +import org.eclipse.jgit.lib.ObjectInserter; +import org.eclipse.jgit.lib.ProgressMonitor; +import org.eclipse.jgit.lib.Ref; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.lib.RepositoryCache; +import org.eclipse.jgit.lib.TextProgressMonitor; +import org.eclipse.jgit.revwalk.RevCommit; +import org.eclipse.jgit.revwalk.RevObject; +import org.eclipse.jgit.revwalk.RevTree; +import org.eclipse.jgit.revwalk.RevWalk; import org.eclipse.jgit.storage.file.FileBasedConfig; import org.eclipse.jgit.util.FS; +import org.eclipse.jgit.util.io.DisabledOutputStream; import org.kohsuke.args4j.Option; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.io.PrintWriter; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; public class Reindex extends SiteProgram { + private static final Logger log = LoggerFactory.getLogger(Reindex.class); + @Option(name = "--threads", usage = "Number of threads to use for indexing") private int threads = Runtime.getRuntime().availableProcessors(); @@ -81,7 +113,7 @@ public class Reindex extends SiteProgram { @Override public int run() throws Exception { mustHaveValidSite(); - dbInjector = createDbInjector(SINGLE_USER); + dbInjector = createDbInjector(MULTI_USER); if (!IndexModule.isEnabled(dbInjector)) { throw die("Secondary index not enabled"); } @@ -90,7 +122,7 @@ public class Reindex extends SiteProgram { dbManager.start(); sitePaths = dbInjector.getInstance(SitePaths.class); - // Delete before any LuceneChangeIndex may be created. + // Delete before any index may be created depending on this data. deleteAll(); sysInjector = createSysInjector(); @@ -187,77 +219,225 @@ public class Reindex extends SiteProgram { private int indexAll() throws Exception { ReviewDb db = sysInjector.getInstance(ReviewDb.class); - ChangeIndexer indexer = sysInjector.getInstance(ChangeIndexer.class); + ListeningScheduledExecutorService executor = sysInjector.getInstance( + Key.get(ListeningScheduledExecutorService.class, IndexExecutor.class)); + + ProgressMonitor pm = new TextProgressMonitor(new PrintWriter(System.out)); + pm.start(1); + pm.beginTask("Collecting projects", ProgressMonitor.UNKNOWN); + Set projects = Sets.newTreeSet(); + try { + for (Change change : db.changes().all()) { + if (projects.add(change.getProject())) { + pm.update(1); + } + } + } finally { + db.close(); + } + pm.endTask(); + + final MultiProgressMonitor mpm = + new MultiProgressMonitor(System.out, "Reindexing changes"); + final Task projTask = mpm.beginSubTask("projects", projects.size()); + final Task doneTask = mpm.beginSubTask(null, MultiProgressMonitor.UNKNOWN); + final Task failedTask = mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN); + Stopwatch sw = new Stopwatch().start(); - final int queueLen = 2 * threads; - final Semaphore sem = new Semaphore(queueLen); + final List> futures = + Lists.newArrayListWithCapacity(projects.size()); final AtomicBoolean ok = new AtomicBoolean(true); - final MultiProgressMonitor pm = - new MultiProgressMonitor(System.out, "Reindexing changes"); - final Task done = pm.beginSubTask(null, MultiProgressMonitor.UNKNOWN); - final Task failed = pm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN); - - int i = 0; - for (final Change change : db.changes().all()) { - sem.acquire(); - final ListenableFuture future = indexer.index(change); + for (final Project.NameKey project : projects) { + final ListenableFuture future = executor.submit( + new ReindexProject(project, doneTask, failedTask)); + futures.add(future); future.addListener(new Runnable() { @Override public void run() { try { future.get(); - done.update(1); } catch (InterruptedException e) { - fail(change, e); + fail(project, e); } catch (ExecutionException e) { - fail(change, e); + ok.set(false); // Logged by indexer. } catch (RuntimeException e) { - failAndThrow(change, e); + failAndThrow(project, e); } catch (Error e) { - failAndThrow(change, e); + failAndThrow(project, e); } finally { - sem.release(); + projTask.update(1); } } - private void fail(Change change, Throwable t) { + private void fail(Project.NameKey project, Throwable t) { + log.error("Failed to index project " + project, t); ok.set(false); - failed.update(1); } - private void failAndThrow(Change change, RuntimeException e) { - fail(change, e); + private void failAndThrow(Project.NameKey project, RuntimeException e) { + fail(project, e); throw e; } - private void failAndThrow(Change change, Error e) { - fail(change, e); + private void failAndThrow(Project.NameKey project, Error e) { + fail(project, e); throw e; } }, MoreExecutors.sameThreadExecutor()); - i++; } - pm.waitFor(sysInjector.getInstance(WorkQueue.class).getDefaultQueue() - .submit(new Runnable() { + mpm.waitFor(Futures.transform(Futures.successfulAsList(futures), + new AsyncFunction, Void>() { @Override - public void run() { - try { - sem.acquire(queueLen); - } catch (InterruptedException e) { - e.printStackTrace(); - } - pm.end(); + public ListenableFuture apply(List input) throws Exception { + mpm.end(); + return Futures.immediateFuture(null); } - })); + })); double elapsed = sw.elapsed(TimeUnit.MILLISECONDS) / 1000d; - System.out.format("Reindexed %d changes in %.02fs (%.01f/s)\n", - i, elapsed, i/elapsed); + int n = doneTask.getCount() + failedTask.getCount(); + System.out.format("Reindexed %d changes in %.01fs (%.01f/s)\n", + n, elapsed, n/elapsed); return ok.get() ? 0 : 1; } + private class ReindexProject implements Callable { + private final ChangeIndexer indexer; + private final Project.NameKey project; + private final ListMultimap byId; + private final Task done; + private final Task failed; + private Repository repo; + private RevWalk walk; + + private ReindexProject(Project.NameKey project, Task done, Task failed) { + this.indexer = sysInjector.getInstance(ChangeIndexer.class); + this.project = project; + this.byId = ArrayListMultimap.create(); + this.done = done; + this.failed = failed; + } + + @Override + public Void call() throws Exception { + ReviewDb db = sysInjector.getInstance(ReviewDb.class); + GitRepositoryManager mgr = sysInjector.getInstance(GitRepositoryManager.class); + repo = mgr.openRepository(project); + + try { + Map refs = repo.getAllRefs(); + for (Change c : db.changes().byProject(project)) { + Ref r = refs.get(c.currentPatchSetId().toRefName()); + if (r != null) { + byId.put(r.getObjectId(), new ChangeData(c)); + } + } + walk(); + } finally { + repo.close(); + RepositoryCache.close(repo); // Only used once per Reindex call. + } + return null; + } + + private void walk() throws Exception { + walk = new RevWalk(repo); + try { + // Walk only refs first to cover as many changes as we can without having + // to mark every single change. + for (Ref ref : repo.getRefDatabase().getRefs(Constants.R_HEADS).values()) { + RevObject o = walk.parseAny(ref.getObjectId()); + if (o instanceof RevCommit) { + walk.markStart((RevCommit) o); + } + } + + RevCommit bCommit; + while ((bCommit = walk.next()) != null && !byId.isEmpty()) { + if (byId.containsKey(bCommit)) { + getPathsAndIndex(bCommit); + byId.removeAll(bCommit); + } + } + + for (ObjectId id : byId.keySet()) { + getPathsAndIndex(walk.parseCommit(id)); + } + } finally { + walk.release(); + } + } + + private void getPathsAndIndex(RevCommit bCommit) throws Exception { + RevTree bTree = bCommit.getTree(); + try { + RevTree aTree = aFor(bCommit, walk); + if (aTree == null) { + return; + } + DiffFormatter df = new DiffFormatter(DisabledOutputStream.INSTANCE); + try { + df.setRepository(repo); + List cds = byId.get(bCommit); + if (!cds.isEmpty()) { + List paths = getPaths(df.scan(aTree, bTree)); + for (ChangeData cd : cds) { + cd.setCurrentFilePaths(paths); + indexer.indexTask(cd).call(); + done.update(1); + } + } + } finally { + df.release(); + } + } catch (Exception e) { + log.warn("Failed to index changes for commit " + bCommit.name(), e); + failed.update(1); + } + } + + private List getPaths(List filenames) { + Set paths = Sets.newTreeSet(); + for (DiffEntry e : filenames) { + if (e.getOldPath() != null) { + paths.add(e.getOldPath()); + } + if (e.getNewPath() != null) { + paths.add(e.getNewPath()); + } + } + return ImmutableList.copyOf(paths); + } + + private RevTree aFor(RevCommit b, RevWalk walk) throws IOException { + switch (b.getParentCount()) { + case 0: + return walk.parseTree(emptyTree()); + case 1: + RevCommit a = b.getParent(0); + walk.parseBody(a); + return walk.parseTree(a.getTree()); + case 2: + return PatchListLoader.automerge(repo, walk, b); + default: + return null; + } + } + + private ObjectId emptyTree() throws IOException { + ObjectInserter oi = repo.newObjectInserter(); + try { + ObjectId id = oi.insert(Constants.OBJ_TREE, new byte[] {}); + oi.flush(); + return id; + } finally { + oi.release(); + } + } + } + private void writeVersion() throws IOException, ConfigInvalidException { if (dryRun) { diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeIndexer.java b/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeIndexer.java index aa4634d6c5..ae71fc3694 100644 --- a/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeIndexer.java +++ b/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeIndexer.java @@ -16,7 +16,11 @@ package com.google.gerrit.server.index; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.gerrit.reviewdb.client.Change; +import com.google.gerrit.server.query.change.ChangeData; + +import java.util.concurrent.Callable; /** * Helper for (re)indexing a change document. @@ -24,20 +28,50 @@ import com.google.gerrit.reviewdb.client.Change; * Indexing is run in the background, as it may require substantial work to * compute some of the fields and/or update the index. */ -public interface ChangeIndexer { +public abstract class ChangeIndexer { /** Instance indicating secondary index is disabled. */ - public static final ChangeIndexer DISABLED = new ChangeIndexer() { + public static final ChangeIndexer DISABLED = new ChangeIndexer(null) { @Override - public ListenableFuture index(Change change) { + public ListenableFuture index(ChangeData cd) { return Futures.immediateFuture(null); } + + public Callable indexTask(ChangeData cd) { + return new Callable() { + @Override + public Void call() { + return null; + } + }; + } }; + private final ListeningScheduledExecutorService executor; + + protected ChangeIndexer(ListeningScheduledExecutorService executor) { + this.executor = executor; + } + /** * Start indexing a change. * * @param change change to index. * @return future for the indexing task. */ - public ListenableFuture index(Change change); + public ListenableFuture index(Change change) { + return index(new ChangeData(change)); + } + + /** + * Start indexing a change. + * + * @param change change to index. + * @param prop propagator to wrap any created runnables in. + * @return future for the indexing task. + */ + public ListenableFuture index(ChangeData cd) { + return executor.submit(indexTask(cd)); + } + + public abstract Callable indexTask(ChangeData cd); } diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeIndexerImpl.java b/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeIndexerImpl.java index 472d82049d..eff803a2fc 100644 --- a/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeIndexerImpl.java +++ b/gerrit-server/src/main/java/com/google/gerrit/server/index/ChangeIndexerImpl.java @@ -14,9 +14,7 @@ package com.google.gerrit.server.index; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; -import com.google.gerrit.reviewdb.client.Change; import com.google.gerrit.reviewdb.server.ReviewDb; import com.google.gerrit.server.CurrentUser; import com.google.gerrit.server.query.change.ChangeData; @@ -39,11 +37,10 @@ import java.util.concurrent.Callable; * Indexing is run in the background, as it may require substantial work to * compute some of the fields and/or update the index. */ -public class ChangeIndexerImpl implements ChangeIndexer { +public class ChangeIndexerImpl extends ChangeIndexer { private static final Logger log = LoggerFactory.getLogger(ChangeIndexerImpl.class); - private final ListeningScheduledExecutorService executor; private final ChangeIndex index; private final SchemaFactory schemaFactory; private final ThreadLocalRequestContext context; @@ -53,22 +50,22 @@ public class ChangeIndexerImpl implements ChangeIndexer { ChangeIndex index, SchemaFactory schemaFactory, ThreadLocalRequestContext context) { - this.executor = executor; + super(executor); this.index = index; this.schemaFactory = schemaFactory; this.context = context; } @Override - public ListenableFuture index(Change change) { - return executor.submit(new Task(change)); + public Callable indexTask(ChangeData cd) { + return new Task(cd); } private class Task implements Callable { - private final Change change; + private final ChangeData cd; - private Task(Change change) { - this.change = change; + private Task(ChangeData cd) { + this.cd = cd; } @Override @@ -87,7 +84,7 @@ public class ChangeIndexerImpl implements ChangeIndexer { throw new OutOfScopeException("No user during ChangeIndexer"); } }); - index.replace(new ChangeData(change)); + index.replace(cd); return null; } finally { context.setContext(null); @@ -96,14 +93,14 @@ public class ChangeIndexerImpl implements ChangeIndexer { } catch (Exception e) { log.error(String.format( "Failed to index change %d in %s", - change.getChangeId(), change.getProject().get()), e); + cd.getId().get(), cd.getChange().getProject().get()), e); throw e; } } @Override public String toString() { - return "index-change-" + change.getId().get(); + return "index-change-" + cd.getId().get(); } } } diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/patch/PatchListLoader.java b/gerrit-server/src/main/java/com/google/gerrit/server/patch/PatchListLoader.java index bb231a505f..267ae49fee 100644 --- a/gerrit-server/src/main/java/com/google/gerrit/server/patch/PatchListLoader.java +++ b/gerrit-server/src/main/java/com/google/gerrit/server/patch/PatchListLoader.java @@ -64,7 +64,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -class PatchListLoader extends CacheLoader { +public class PatchListLoader extends CacheLoader { static final Logger log = LoggerFactory.getLogger(PatchListLoader.class); private final GitRepositoryManager repoManager; @@ -241,7 +241,7 @@ class PatchListLoader extends CacheLoader { } } - private static RevObject automerge(Repository repo, RevWalk rw, RevCommit b) + public static RevTree automerge(Repository repo, RevWalk rw, RevCommit b) throws IOException { String hash = b.name(); String refName = GitRepositoryManager.REFS_CACHE_AUTOMERGE