Reindex: add a progress monitor
Change ChangeIndexerImpl to use Callable and let Exceptions propagate so we can separate success from failure. Fix the units in the final output message and print a rate as well. Change-Id: I341a14a44e0d5e7233b4220077ee82345b229b20
This commit is contained in:
@@ -34,6 +34,9 @@ import com.google.gerrit.reviewdb.server.ReviewDb;
|
|||||||
import com.google.gerrit.server.cache.CacheRemovalListener;
|
import com.google.gerrit.server.cache.CacheRemovalListener;
|
||||||
import com.google.gerrit.server.cache.h2.DefaultCacheFactory;
|
import com.google.gerrit.server.cache.h2.DefaultCacheFactory;
|
||||||
import com.google.gerrit.server.config.SitePaths;
|
import com.google.gerrit.server.config.SitePaths;
|
||||||
|
import com.google.gerrit.server.git.MultiProgressMonitor;
|
||||||
|
import com.google.gerrit.server.git.MultiProgressMonitor.Task;
|
||||||
|
import com.google.gerrit.server.git.WorkQueue;
|
||||||
import com.google.gerrit.server.index.ChangeIndexer;
|
import com.google.gerrit.server.index.ChangeIndexer;
|
||||||
import com.google.gerrit.server.index.IndexModule;
|
import com.google.gerrit.server.index.IndexModule;
|
||||||
import com.google.gerrit.server.patch.PatchListCacheImpl;
|
import com.google.gerrit.server.patch.PatchListCacheImpl;
|
||||||
@@ -188,9 +191,15 @@ public class Reindex extends SiteProgram {
|
|||||||
ReviewDb db = sysInjector.getInstance(ReviewDb.class);
|
ReviewDb db = sysInjector.getInstance(ReviewDb.class);
|
||||||
ChangeIndexer indexer = sysInjector.getInstance(ChangeIndexer.class);
|
ChangeIndexer indexer = sysInjector.getInstance(ChangeIndexer.class);
|
||||||
Stopwatch sw = new Stopwatch().start();
|
Stopwatch sw = new Stopwatch().start();
|
||||||
int queueLen = 2 * threads;
|
final int queueLen = 2 * threads;
|
||||||
final Semaphore sem = new Semaphore(queueLen);
|
final Semaphore sem = new Semaphore(queueLen);
|
||||||
final AtomicBoolean ok = new AtomicBoolean(true);
|
final AtomicBoolean ok = new AtomicBoolean(true);
|
||||||
|
|
||||||
|
final MultiProgressMonitor pm =
|
||||||
|
new MultiProgressMonitor(System.out, "Reindexing changes");
|
||||||
|
final Task done = pm.beginSubTask(null, MultiProgressMonitor.UNKNOWN);
|
||||||
|
final Task failed = pm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN);
|
||||||
|
|
||||||
int i = 0;
|
int i = 0;
|
||||||
for (final Change change : db.changes().all()) {
|
for (final Change change : db.changes().all()) {
|
||||||
sem.acquire();
|
sem.acquire();
|
||||||
@@ -200,22 +209,54 @@ public class Reindex extends SiteProgram {
|
|||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
future.get();
|
future.get();
|
||||||
|
done.update(1);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
log.error("Failed to index change " + change.getId(), e);
|
fail(change, e);
|
||||||
ok.set(false);
|
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException e) {
|
||||||
log.error("Failed to index change " + change.getId(), e);
|
fail(change, e);
|
||||||
ok.set(false);
|
} catch (RuntimeException e) {
|
||||||
|
failAndThrow(change, e);
|
||||||
|
} catch (Error e) {
|
||||||
|
failAndThrow(change, e);
|
||||||
} finally {
|
} finally {
|
||||||
sem.release();
|
sem.release();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void fail(Change change, Throwable t) {
|
||||||
|
log.error("Failed to index change " + change.getId(), t);
|
||||||
|
ok.set(false);
|
||||||
|
failed.update(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void failAndThrow(Change change, RuntimeException e) {
|
||||||
|
fail(change, e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void failAndThrow(Change change, Error e) {
|
||||||
|
fail(change, e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
}, MoreExecutors.sameThreadExecutor());
|
}, MoreExecutors.sameThreadExecutor());
|
||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
sem.acquire(queueLen);
|
|
||||||
|
pm.waitFor(sysInjector.getInstance(WorkQueue.class).getDefaultQueue()
|
||||||
|
.submit(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
sem.acquire(queueLen);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
pm.end();
|
||||||
|
}
|
||||||
|
}));
|
||||||
double elapsed = sw.elapsed(TimeUnit.MILLISECONDS) / 1000d;
|
double elapsed = sw.elapsed(TimeUnit.MILLISECONDS) / 1000d;
|
||||||
System.out.format("Reindexed %d changes in %.02fms\n", i, elapsed);
|
System.out.format("Reindexed %d changes in %.02fs (%.01f/s)\n",
|
||||||
|
i, elapsed, i/elapsed);
|
||||||
|
|
||||||
return ok.get() ? 0 : 1;
|
return ok.get() ? 0 : 1;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,6 +16,8 @@ package com.google.gerrit.server.git;
|
|||||||
|
|
||||||
import static java.util.concurrent.TimeUnit.NANOSECONDS;
|
import static java.util.concurrent.TimeUnit.NANOSECONDS;
|
||||||
|
|
||||||
|
import com.google.common.base.Strings;
|
||||||
|
|
||||||
import org.eclipse.jgit.lib.Constants;
|
import org.eclipse.jgit.lib.Constants;
|
||||||
import org.eclipse.jgit.lib.ProgressMonitor;
|
import org.eclipse.jgit.lib.ProgressMonitor;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@@ -319,7 +321,10 @@ public class MultiProgressMonitor {
|
|||||||
first = false;
|
first = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
s.append(' ').append(t.name).append(": ");
|
s.append(' ');
|
||||||
|
if (!Strings.isNullOrEmpty(t.name)) {
|
||||||
|
s.append(t.name).append(": ");
|
||||||
|
}
|
||||||
if (t.total == UNKNOWN) {
|
if (t.total == UNKNOWN) {
|
||||||
s.append(count);
|
s.append(count);
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -21,10 +21,8 @@ import com.google.gerrit.server.query.change.ChangeData;
|
|||||||
import com.google.gerrit.server.util.RequestScopePropagator;
|
import com.google.gerrit.server.util.RequestScopePropagator;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Helper for (re)indexing a change document.
|
* Helper for (re)indexing a change document.
|
||||||
@@ -33,9 +31,6 @@ import java.io.IOException;
|
|||||||
* compute some of the fields and/or update the index.
|
* compute some of the fields and/or update the index.
|
||||||
*/
|
*/
|
||||||
public class ChangeIndexerImpl implements ChangeIndexer {
|
public class ChangeIndexerImpl implements ChangeIndexer {
|
||||||
private static final Logger log =
|
|
||||||
LoggerFactory.getLogger(ChangeIndexerImpl.class);
|
|
||||||
|
|
||||||
private final ListeningScheduledExecutorService executor;
|
private final ListeningScheduledExecutorService executor;
|
||||||
private final ChangeIndex index;
|
private final ChangeIndex index;
|
||||||
|
|
||||||
@@ -54,14 +49,14 @@ public class ChangeIndexerImpl implements ChangeIndexer {
|
|||||||
@Override
|
@Override
|
||||||
public ListenableFuture<?> index(Change change,
|
public ListenableFuture<?> index(Change change,
|
||||||
RequestScopePropagator prop) {
|
RequestScopePropagator prop) {
|
||||||
Runnable task = new Task(change);
|
Callable<?> task = new Task(change);
|
||||||
if (prop != null) {
|
if (prop != null) {
|
||||||
task = prop.wrap(task);
|
task = prop.wrap(task);
|
||||||
}
|
}
|
||||||
return executor.submit(task);
|
return executor.submit(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
private class Task implements Runnable {
|
private class Task implements Callable<Void> {
|
||||||
private final Change change;
|
private final Change change;
|
||||||
|
|
||||||
private Task(Change change) {
|
private Task(Change change) {
|
||||||
@@ -69,13 +64,9 @@ public class ChangeIndexerImpl implements ChangeIndexer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public Void call() throws IOException {
|
||||||
ChangeData cd = new ChangeData(change);
|
index.replace(new ChangeData(change));
|
||||||
try {
|
return null;
|
||||||
index.replace(cd);
|
|
||||||
} catch (IOException e) {
|
|
||||||
log.error("Error indexing change", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
Reference in New Issue
Block a user