Merge "Rewrite Reindex around projects"

This commit is contained in:
Shawn Pearce 2013-06-26 14:31:57 +00:00 committed by Gerrit Code Review
commit 704699a89e
4 changed files with 272 additions and 61 deletions

View File

@ -17,11 +17,18 @@ package com.google.gerrit.pgm;
import static com.google.gerrit.lucene.IndexVersionCheck.SCHEMA_VERSIONS;
import static com.google.gerrit.lucene.IndexVersionCheck.gerritIndexConfig;
import static com.google.gerrit.lucene.LuceneChangeIndex.LUCENE_VERSION;
import static com.google.gerrit.server.schema.DataSourceProvider.Context.SINGLE_USER;
import static com.google.gerrit.server.schema.DataSourceProvider.Context.MULTI_USER;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.extensions.registration.DynamicSet;
@ -30,16 +37,20 @@ import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.gerrit.lucene.LuceneIndexModule;
import com.google.gerrit.pgm.util.SiteProgram;
import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.cache.CacheRemovalListener;
import com.google.gerrit.server.cache.h2.DefaultCacheFactory;
import com.google.gerrit.server.config.SitePaths;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.git.MultiProgressMonitor;
import com.google.gerrit.server.git.MultiProgressMonitor.Task;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.index.ChangeIndexer;
import com.google.gerrit.server.index.IndexExecutor;
import com.google.gerrit.server.index.IndexModule;
import com.google.gerrit.server.patch.PatchListCacheImpl;
import com.google.gerrit.server.patch.PatchListLoader;
import com.google.gerrit.server.query.change.ChangeData;
import com.google.gwtorm.server.OrmException;
import com.google.gwtorm.server.SchemaFactory;
import com.google.inject.AbstractModule;
@ -52,22 +63,43 @@ import com.google.inject.TypeLiteral;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.eclipse.jgit.diff.DiffEntry;
import org.eclipse.jgit.diff.DiffFormatter;
import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.ObjectInserter;
import org.eclipse.jgit.lib.ProgressMonitor;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.lib.RepositoryCache;
import org.eclipse.jgit.lib.TextProgressMonitor;
import org.eclipse.jgit.revwalk.RevCommit;
import org.eclipse.jgit.revwalk.RevObject;
import org.eclipse.jgit.revwalk.RevTree;
import org.eclipse.jgit.revwalk.RevWalk;
import org.eclipse.jgit.storage.file.FileBasedConfig;
import org.eclipse.jgit.util.FS;
import org.eclipse.jgit.util.io.DisabledOutputStream;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class Reindex extends SiteProgram {
private static final Logger log = LoggerFactory.getLogger(Reindex.class);
@Option(name = "--threads", usage = "Number of threads to use for indexing")
private int threads = Runtime.getRuntime().availableProcessors();
@ -81,7 +113,7 @@ public class Reindex extends SiteProgram {
@Override
public int run() throws Exception {
mustHaveValidSite();
dbInjector = createDbInjector(SINGLE_USER);
dbInjector = createDbInjector(MULTI_USER);
if (!IndexModule.isEnabled(dbInjector)) {
throw die("Secondary index not enabled");
}
@ -90,7 +122,7 @@ public class Reindex extends SiteProgram {
dbManager.start();
sitePaths = dbInjector.getInstance(SitePaths.class);
// Delete before any LuceneChangeIndex may be created.
// Delete before any index may be created depending on this data.
deleteAll();
sysInjector = createSysInjector();
@ -187,77 +219,225 @@ public class Reindex extends SiteProgram {
private int indexAll() throws Exception {
ReviewDb db = sysInjector.getInstance(ReviewDb.class);
ChangeIndexer indexer = sysInjector.getInstance(ChangeIndexer.class);
ListeningScheduledExecutorService executor = sysInjector.getInstance(
Key.get(ListeningScheduledExecutorService.class, IndexExecutor.class));
ProgressMonitor pm = new TextProgressMonitor(new PrintWriter(System.out));
pm.start(1);
pm.beginTask("Collecting projects", ProgressMonitor.UNKNOWN);
Set<Project.NameKey> projects = Sets.newTreeSet();
try {
for (Change change : db.changes().all()) {
if (projects.add(change.getProject())) {
pm.update(1);
}
}
} finally {
db.close();
}
pm.endTask();
final MultiProgressMonitor mpm =
new MultiProgressMonitor(System.out, "Reindexing changes");
final Task projTask = mpm.beginSubTask("projects", projects.size());
final Task doneTask = mpm.beginSubTask(null, MultiProgressMonitor.UNKNOWN);
final Task failedTask = mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN);
Stopwatch sw = new Stopwatch().start();
final int queueLen = 2 * threads;
final Semaphore sem = new Semaphore(queueLen);
final List<ListenableFuture<?>> futures =
Lists.newArrayListWithCapacity(projects.size());
final AtomicBoolean ok = new AtomicBoolean(true);
final MultiProgressMonitor pm =
new MultiProgressMonitor(System.out, "Reindexing changes");
final Task done = pm.beginSubTask(null, MultiProgressMonitor.UNKNOWN);
final Task failed = pm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN);
int i = 0;
for (final Change change : db.changes().all()) {
sem.acquire();
final ListenableFuture<?> future = indexer.index(change);
for (final Project.NameKey project : projects) {
final ListenableFuture<?> future = executor.submit(
new ReindexProject(project, doneTask, failedTask));
futures.add(future);
future.addListener(new Runnable() {
@Override
public void run() {
try {
future.get();
done.update(1);
} catch (InterruptedException e) {
fail(change, e);
fail(project, e);
} catch (ExecutionException e) {
fail(change, e);
ok.set(false); // Logged by indexer.
} catch (RuntimeException e) {
failAndThrow(change, e);
failAndThrow(project, e);
} catch (Error e) {
failAndThrow(change, e);
failAndThrow(project, e);
} finally {
sem.release();
projTask.update(1);
}
}
private void fail(Change change, Throwable t) {
private void fail(Project.NameKey project, Throwable t) {
log.error("Failed to index project " + project, t);
ok.set(false);
failed.update(1);
}
private void failAndThrow(Change change, RuntimeException e) {
fail(change, e);
private void failAndThrow(Project.NameKey project, RuntimeException e) {
fail(project, e);
throw e;
}
private void failAndThrow(Change change, Error e) {
fail(change, e);
private void failAndThrow(Project.NameKey project, Error e) {
fail(project, e);
throw e;
}
}, MoreExecutors.sameThreadExecutor());
i++;
}
pm.waitFor(sysInjector.getInstance(WorkQueue.class).getDefaultQueue()
.submit(new Runnable() {
mpm.waitFor(Futures.transform(Futures.successfulAsList(futures),
new AsyncFunction<List<?>, Void>() {
@Override
public void run() {
try {
sem.acquire(queueLen);
} catch (InterruptedException e) {
e.printStackTrace();
}
pm.end();
public ListenableFuture<Void> apply(List<?> input) throws Exception {
mpm.end();
return Futures.immediateFuture(null);
}
}));
}));
double elapsed = sw.elapsed(TimeUnit.MILLISECONDS) / 1000d;
System.out.format("Reindexed %d changes in %.02fs (%.01f/s)\n",
i, elapsed, i/elapsed);
int n = doneTask.getCount() + failedTask.getCount();
System.out.format("Reindexed %d changes in %.01fs (%.01f/s)\n",
n, elapsed, n/elapsed);
return ok.get() ? 0 : 1;
}
private class ReindexProject implements Callable<Void> {
private final ChangeIndexer indexer;
private final Project.NameKey project;
private final ListMultimap<ObjectId, ChangeData> byId;
private final Task done;
private final Task failed;
private Repository repo;
private RevWalk walk;
private ReindexProject(Project.NameKey project, Task done, Task failed) {
this.indexer = sysInjector.getInstance(ChangeIndexer.class);
this.project = project;
this.byId = ArrayListMultimap.create();
this.done = done;
this.failed = failed;
}
@Override
public Void call() throws Exception {
ReviewDb db = sysInjector.getInstance(ReviewDb.class);
GitRepositoryManager mgr = sysInjector.getInstance(GitRepositoryManager.class);
repo = mgr.openRepository(project);
try {
Map<String, Ref> refs = repo.getAllRefs();
for (Change c : db.changes().byProject(project)) {
Ref r = refs.get(c.currentPatchSetId().toRefName());
if (r != null) {
byId.put(r.getObjectId(), new ChangeData(c));
}
}
walk();
} finally {
repo.close();
RepositoryCache.close(repo); // Only used once per Reindex call.
}
return null;
}
private void walk() throws Exception {
walk = new RevWalk(repo);
try {
// Walk only refs first to cover as many changes as we can without having
// to mark every single change.
for (Ref ref : repo.getRefDatabase().getRefs(Constants.R_HEADS).values()) {
RevObject o = walk.parseAny(ref.getObjectId());
if (o instanceof RevCommit) {
walk.markStart((RevCommit) o);
}
}
RevCommit bCommit;
while ((bCommit = walk.next()) != null && !byId.isEmpty()) {
if (byId.containsKey(bCommit)) {
getPathsAndIndex(bCommit);
byId.removeAll(bCommit);
}
}
for (ObjectId id : byId.keySet()) {
getPathsAndIndex(walk.parseCommit(id));
}
} finally {
walk.release();
}
}
private void getPathsAndIndex(RevCommit bCommit) throws Exception {
RevTree bTree = bCommit.getTree();
try {
RevTree aTree = aFor(bCommit, walk);
if (aTree == null) {
return;
}
DiffFormatter df = new DiffFormatter(DisabledOutputStream.INSTANCE);
try {
df.setRepository(repo);
List<ChangeData> cds = byId.get(bCommit);
if (!cds.isEmpty()) {
List<String> paths = getPaths(df.scan(aTree, bTree));
for (ChangeData cd : cds) {
cd.setCurrentFilePaths(paths);
indexer.indexTask(cd).call();
done.update(1);
}
}
} finally {
df.release();
}
} catch (Exception e) {
log.warn("Failed to index changes for commit " + bCommit.name(), e);
failed.update(1);
}
}
private List<String> getPaths(List<DiffEntry> filenames) {
Set<String> paths = Sets.newTreeSet();
for (DiffEntry e : filenames) {
if (e.getOldPath() != null) {
paths.add(e.getOldPath());
}
if (e.getNewPath() != null) {
paths.add(e.getNewPath());
}
}
return ImmutableList.copyOf(paths);
}
private RevTree aFor(RevCommit b, RevWalk walk) throws IOException {
switch (b.getParentCount()) {
case 0:
return walk.parseTree(emptyTree());
case 1:
RevCommit a = b.getParent(0);
walk.parseBody(a);
return walk.parseTree(a.getTree());
case 2:
return PatchListLoader.automerge(repo, walk, b);
default:
return null;
}
}
private ObjectId emptyTree() throws IOException {
ObjectInserter oi = repo.newObjectInserter();
try {
ObjectId id = oi.insert(Constants.OBJ_TREE, new byte[] {});
oi.flush();
return id;
} finally {
oi.release();
}
}
}
private void writeVersion() throws IOException,
ConfigInvalidException {
if (dryRun) {

View File

@ -16,7 +16,11 @@ package com.google.gerrit.server.index;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.server.query.change.ChangeData;
import java.util.concurrent.Callable;
/**
* Helper for (re)indexing a change document.
@ -24,20 +28,50 @@ import com.google.gerrit.reviewdb.client.Change;
* Indexing is run in the background, as it may require substantial work to
* compute some of the fields and/or update the index.
*/
public interface ChangeIndexer {
public abstract class ChangeIndexer {
/** Instance indicating secondary index is disabled. */
public static final ChangeIndexer DISABLED = new ChangeIndexer() {
public static final ChangeIndexer DISABLED = new ChangeIndexer(null) {
@Override
public ListenableFuture<?> index(Change change) {
public ListenableFuture<?> index(ChangeData cd) {
return Futures.immediateFuture(null);
}
public Callable<Void> indexTask(ChangeData cd) {
return new Callable<Void>() {
@Override
public Void call() {
return null;
}
};
}
};
private final ListeningScheduledExecutorService executor;
protected ChangeIndexer(ListeningScheduledExecutorService executor) {
this.executor = executor;
}
/**
* Start indexing a change.
*
* @param change change to index.
* @return future for the indexing task.
*/
public ListenableFuture<?> index(Change change);
public ListenableFuture<?> index(Change change) {
return index(new ChangeData(change));
}
/**
* Start indexing a change.
*
* @param change change to index.
* @param prop propagator to wrap any created runnables in.
* @return future for the indexing task.
*/
public ListenableFuture<?> index(ChangeData cd) {
return executor.submit(indexTask(cd));
}
public abstract Callable<Void> indexTask(ChangeData cd);
}

View File

@ -14,9 +14,7 @@
package com.google.gerrit.server.index;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.CurrentUser;
import com.google.gerrit.server.query.change.ChangeData;
@ -39,11 +37,10 @@ import java.util.concurrent.Callable;
* Indexing is run in the background, as it may require substantial work to
* compute some of the fields and/or update the index.
*/
public class ChangeIndexerImpl implements ChangeIndexer {
public class ChangeIndexerImpl extends ChangeIndexer {
private static final Logger log =
LoggerFactory.getLogger(ChangeIndexerImpl.class);
private final ListeningScheduledExecutorService executor;
private final ChangeIndex index;
private final SchemaFactory<ReviewDb> schemaFactory;
private final ThreadLocalRequestContext context;
@ -53,22 +50,22 @@ public class ChangeIndexerImpl implements ChangeIndexer {
ChangeIndex index,
SchemaFactory<ReviewDb> schemaFactory,
ThreadLocalRequestContext context) {
this.executor = executor;
super(executor);
this.index = index;
this.schemaFactory = schemaFactory;
this.context = context;
}
@Override
public ListenableFuture<?> index(Change change) {
return executor.submit(new Task(change));
public Callable<Void> indexTask(ChangeData cd) {
return new Task(cd);
}
private class Task implements Callable<Void> {
private final Change change;
private final ChangeData cd;
private Task(Change change) {
this.change = change;
private Task(ChangeData cd) {
this.cd = cd;
}
@Override
@ -87,7 +84,7 @@ public class ChangeIndexerImpl implements ChangeIndexer {
throw new OutOfScopeException("No user during ChangeIndexer");
}
});
index.replace(new ChangeData(change));
index.replace(cd);
return null;
} finally {
context.setContext(null);
@ -96,14 +93,14 @@ public class ChangeIndexerImpl implements ChangeIndexer {
} catch (Exception e) {
log.error(String.format(
"Failed to index change %d in %s",
change.getChangeId(), change.getProject().get()), e);
cd.getId().get(), cd.getChange().getProject().get()), e);
throw e;
}
}
@Override
public String toString() {
return "index-change-" + change.getId().get();
return "index-change-" + cd.getId().get();
}
}
}

View File

@ -64,7 +64,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
class PatchListLoader extends CacheLoader<PatchListKey, PatchList> {
public class PatchListLoader extends CacheLoader<PatchListKey, PatchList> {
static final Logger log = LoggerFactory.getLogger(PatchListLoader.class);
private final GitRepositoryManager repoManager;
@ -241,7 +241,7 @@ class PatchListLoader extends CacheLoader<PatchListKey, PatchList> {
}
}
private static RevObject automerge(Repository repo, RevWalk rw, RevCommit b)
public static RevTree automerge(Repository repo, RevWalk rw, RevCommit b)
throws IOException {
String hash = b.name();
String refName = GitRepositoryManager.REFS_CACHE_AUTOMERGE