Merge "Factor future listener out of AllChangesIndexer"
This commit is contained in:
@@ -17,14 +17,23 @@ package com.google.gerrit.server.index;
|
|||||||
import static com.google.common.base.Preconditions.checkNotNull;
|
import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
|
||||||
import com.google.common.base.Stopwatch;
|
import com.google.common.base.Stopwatch;
|
||||||
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
|
import com.google.common.util.concurrent.MoreExecutors;
|
||||||
|
|
||||||
|
import org.eclipse.jgit.lib.ProgressMonitor;
|
||||||
import org.eclipse.jgit.util.io.NullOutputStream;
|
import org.eclipse.jgit.util.io.NullOutputStream;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.io.PrintWriter;
|
import java.io.PrintWriter;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
public abstract class SiteIndexer<K, V, I extends Index<K, V>> {
|
public abstract class SiteIndexer<K, V, I extends Index<K, V>> {
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(SiteIndexer.class);
|
||||||
|
|
||||||
public static class Result {
|
public static class Result {
|
||||||
private final long elapsedNanos;
|
private final long elapsedNanos;
|
||||||
private final boolean success;
|
private final boolean success;
|
||||||
@@ -73,4 +82,60 @@ public abstract class SiteIndexer<K, V, I extends Index<K, V>> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public abstract Result indexAll(I index);
|
public abstract Result indexAll(I index);
|
||||||
|
|
||||||
|
protected final void addErrorListener(ListenableFuture<?> future,
|
||||||
|
String desc, ProgressMonitor progress, AtomicBoolean ok) {
|
||||||
|
future.addListener(
|
||||||
|
new ErrorListener(future, desc, progress, ok),
|
||||||
|
MoreExecutors.directExecutor());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class ErrorListener implements Runnable {
|
||||||
|
private final ListenableFuture<?> future;
|
||||||
|
private final String desc;
|
||||||
|
private final ProgressMonitor progress;
|
||||||
|
private final AtomicBoolean ok;
|
||||||
|
|
||||||
|
private ErrorListener(ListenableFuture<?> future, String desc,
|
||||||
|
ProgressMonitor progress, AtomicBoolean ok) {
|
||||||
|
this.future = future;
|
||||||
|
this.desc = desc;
|
||||||
|
this.progress = progress;
|
||||||
|
this.ok = ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
future.get();
|
||||||
|
} catch (ExecutionException | InterruptedException e) {
|
||||||
|
fail(e);
|
||||||
|
} catch (RuntimeException e) {
|
||||||
|
failAndThrow(e);
|
||||||
|
} catch (Error e) {
|
||||||
|
// Can't join with RuntimeException because "RuntimeException |
|
||||||
|
// Error" becomes Throwable, which messes with signatures.
|
||||||
|
failAndThrow(e);
|
||||||
|
} finally {
|
||||||
|
synchronized (progress) {
|
||||||
|
progress.update(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void fail(Throwable t) {
|
||||||
|
log.error("Failed to index " + desc, t);
|
||||||
|
ok.set(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void failAndThrow(RuntimeException e) {
|
||||||
|
fail(e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void failAndThrow(Error e) {
|
||||||
|
fail(e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -27,7 +27,6 @@ import com.google.common.util.concurrent.AsyncFunction;
|
|||||||
import com.google.common.util.concurrent.Futures;
|
import com.google.common.util.concurrent.Futures;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||||
import com.google.common.util.concurrent.MoreExecutors;
|
|
||||||
import com.google.gerrit.reviewdb.client.Change;
|
import com.google.gerrit.reviewdb.client.Change;
|
||||||
import com.google.gerrit.reviewdb.client.Project;
|
import com.google.gerrit.reviewdb.client.Project;
|
||||||
import com.google.gerrit.reviewdb.server.ReviewDb;
|
import com.google.gerrit.reviewdb.server.ReviewDb;
|
||||||
@@ -152,43 +151,11 @@ public class AllChangesIndexer
|
|||||||
final AtomicBoolean ok = new AtomicBoolean(true);
|
final AtomicBoolean ok = new AtomicBoolean(true);
|
||||||
|
|
||||||
for (final Project.NameKey project : projects) {
|
for (final Project.NameKey project : projects) {
|
||||||
final ListenableFuture<?> future = executor.submit(reindexProject(
|
ListenableFuture<?> future = executor.submit(reindexProject(
|
||||||
indexerFactory.create(executor, index), project, doneTask, failedTask,
|
indexerFactory.create(executor, index), project, doneTask, failedTask,
|
||||||
verboseWriter));
|
verboseWriter));
|
||||||
|
addErrorListener(future, "project " + project, projTask, ok);
|
||||||
futures.add(future);
|
futures.add(future);
|
||||||
future.addListener(new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
future.get();
|
|
||||||
} catch (ExecutionException | InterruptedException e) {
|
|
||||||
fail(project, e);
|
|
||||||
} catch (RuntimeException e) {
|
|
||||||
failAndThrow(project, e);
|
|
||||||
} catch (Error e) {
|
|
||||||
// Can't join with RuntimeException because "RuntimeException |
|
|
||||||
// Error" becomes Throwable, which messes with signatures.
|
|
||||||
failAndThrow(project, e);
|
|
||||||
} finally {
|
|
||||||
projTask.update(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void fail(Project.NameKey project, Throwable t) {
|
|
||||||
log.error("Failed to index project " + project, t);
|
|
||||||
ok.set(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void failAndThrow(Project.NameKey project, RuntimeException e) {
|
|
||||||
fail(project, e);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void failAndThrow(Project.NameKey project, Error e) {
|
|
||||||
fail(project, e);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}, MoreExecutors.directExecutor());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|||||||
Reference in New Issue
Block a user