Implement an overall timeout for ReceiveCommits
ReceiveCommits does a lot of work, potentially involving RPCs to remote servers that may have long timeouts. If something really goes off the rails, e.g. a database RPC hanging forever, we want to be able to kill stalled ReceiveCommits threads. Note that the timeout is only for ReceiveCommits, not for the various steps in receive-pack processing that happen before that, like actually reading the data from the wire or indexing the pack. So the amount of time should really be fairly well bounded; even so, the default timeout is a relatively high 2 minutes. Change-Id: Ib9e8045faff9a9acaefd8c55ecc46757b4f54e02
This commit is contained in:
@@ -1714,6 +1714,14 @@ processed.
|
||||
+
|
||||
Defaults to the number of available CPUs according to the Java runtime.
|
||||
|
||||
[[receive.timeout]]receive.timeout::
|
||||
+
|
||||
Overall timeout on the time taken to process the change data in received packs.
|
||||
Only includes the time processing Gerrit changes and updating references, not
|
||||
the time to index the pack.
|
||||
+
|
||||
Default is 120 seconds.
|
||||
|
||||
|
||||
[[repository]]Section repository
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
@@ -15,6 +15,8 @@
|
||||
package com.google.gerrit.server.git;
|
||||
|
||||
import com.google.gerrit.reviewdb.client.Project;
|
||||
import com.google.gerrit.server.config.ConfigUtil;
|
||||
import com.google.gerrit.server.config.GerritServerConfig;
|
||||
import com.google.gerrit.server.git.ReceiveCommits.MessageSender;
|
||||
import com.google.gerrit.server.git.WorkQueue.Executor;
|
||||
import com.google.gerrit.server.project.ProjectControl;
|
||||
@@ -22,8 +24,13 @@ import com.google.gerrit.server.util.RequestScopePropagator;
|
||||
import com.google.inject.assistedinject.Assisted;
|
||||
import com.google.inject.assistedinject.FactoryModuleBuilder;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.PrivateModule;
|
||||
|
||||
import com.google.inject.name.Named;
|
||||
import com.google.inject.PrivateModule;
|
||||
import com.google.inject.Provides;
|
||||
import com.google.inject.Singleton;
|
||||
|
||||
import org.eclipse.jgit.lib.Config;
|
||||
import org.eclipse.jgit.lib.Repository;
|
||||
import org.eclipse.jgit.transport.PreReceiveHook;
|
||||
import org.eclipse.jgit.transport.ReceiveCommand;
|
||||
@@ -34,12 +41,15 @@ import org.slf4j.LoggerFactory;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/** Hook that delegates to {@link ReceiveCommits} in a worker thread. */
|
||||
public class AsyncReceiveCommits implements PreReceiveHook {
|
||||
private static final Logger log =
|
||||
LoggerFactory.getLogger(AsyncReceiveCommits.class);
|
||||
|
||||
private static final String TIMEOUT_NAME = "ReceiveCommitsOverallTimeout";
|
||||
|
||||
public interface Factory {
|
||||
AsyncReceiveCommits create(ProjectControl projectControl,
|
||||
Repository repository);
|
||||
@@ -56,6 +66,16 @@ public class AsyncReceiveCommits implements PreReceiveHook {
|
||||
install(new FactoryModuleBuilder()
|
||||
.build(ReceiveCommits.Factory.class));
|
||||
}
|
||||
|
||||
@Provides
|
||||
@Singleton
|
||||
@Named(TIMEOUT_NAME)
|
||||
long getTimeoutMillis(@GerritServerConfig final Config cfg) {
|
||||
return ConfigUtil.getTimeUnit(
|
||||
cfg, "receive", null, "timeout",
|
||||
TimeUnit.SECONDS.convert(2, TimeUnit.MINUTES),
|
||||
TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
private class Worker implements ProjectRunnable {
|
||||
@@ -119,11 +139,13 @@ public class AsyncReceiveCommits implements PreReceiveHook {
|
||||
private final Executor executor;
|
||||
private final RequestScopePropagator scopePropagator;
|
||||
private final MultiProgressMonitor progress;
|
||||
private final long timeoutMillis;
|
||||
|
||||
@Inject
|
||||
AsyncReceiveCommits(final ReceiveCommits.Factory factory,
|
||||
@ReceiveCommitsExecutor final Executor executor,
|
||||
final RequestScopePropagator scopePropagator,
|
||||
@Named(TIMEOUT_NAME) final long timeoutMillis,
|
||||
@Assisted final ProjectControl projectControl,
|
||||
@Assisted final Repository repo) {
|
||||
this.executor = executor;
|
||||
@@ -133,14 +155,16 @@ public class AsyncReceiveCommits implements PreReceiveHook {
|
||||
|
||||
progress = new MultiProgressMonitor(
|
||||
new MessageSenderOutputStream(), "Updating changes");
|
||||
this.timeoutMillis = timeoutMillis;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPreReceive(final ReceivePack rp,
|
||||
final Collection<ReceiveCommand> commands) {
|
||||
try {
|
||||
progress.waitFor(executor.submit(
|
||||
scopePropagator.wrap(new Worker(commands))));
|
||||
progress.waitFor(
|
||||
executor.submit(scopePropagator.wrap(new Worker(commands))),
|
||||
timeoutMillis, TimeUnit.MILLISECONDS);
|
||||
} catch (ExecutionException e) {
|
||||
log.warn("Error in ReceiveCommits", e);
|
||||
rc.getMessageSender().sendError("internal error while processing changes");
|
||||
|
||||
@@ -134,6 +134,15 @@ public class MultiProgressMonitor {
|
||||
maxIntervalNanos = NANOSECONDS.convert(maxIntervalTime, maxIntervalUnit);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for a task managed by a {@link Future}, with no timeout.
|
||||
*
|
||||
* @see #waitFor(Future, long, TimeUnit)
|
||||
*/
|
||||
public void waitFor(final Future<?> workerFuture) throws ExecutionException {
|
||||
waitFor(workerFuture, 0, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for a task managed by a {@link Future}.
|
||||
* <p>
|
||||
@@ -144,11 +153,23 @@ public class MultiProgressMonitor {
|
||||
*
|
||||
* @param workerFuture a future that returns when the worker thread is
|
||||
* finished.
|
||||
*
|
||||
* @param timeoutTime overall timeout for the task; the future is forcefully
|
||||
* cancelled if the task exceeds the timeout. Non-positive values indicate
|
||||
* no timeout.
|
||||
* @param timeoutUnit unit for overall task timeout.
|
||||
* @throws ExecutionException if this thread or the worker thread was
|
||||
* interrupted, the worker was cancelled, or the worker timed out.
|
||||
*/
|
||||
public void waitFor(Future<?> workerFuture) throws ExecutionException {
|
||||
public void waitFor(final Future<?> workerFuture, final long timeoutTime,
|
||||
final TimeUnit timeoutUnit) throws ExecutionException {
|
||||
long overallStart = System.nanoTime();
|
||||
long deadline;
|
||||
if (timeoutTime > 0) {
|
||||
deadline = overallStart + NANOSECONDS.convert(timeoutTime, timeoutUnit);
|
||||
} else {
|
||||
deadline = 0;
|
||||
}
|
||||
|
||||
synchronized (this) {
|
||||
long left = maxIntervalNanos;
|
||||
while (!done) {
|
||||
@@ -161,7 +182,17 @@ public class MultiProgressMonitor {
|
||||
|
||||
// Send an update on every wakeup (manual or spurious), but only move
|
||||
// the spinner every maxInterval.
|
||||
left -= System.nanoTime() - start;
|
||||
long now = System.nanoTime();
|
||||
|
||||
if (deadline > 0 && now > deadline) {
|
||||
log.warn(String.format(
|
||||
"MultiProgressMonitor worker killed after %sms",
|
||||
TimeUnit.MILLISECONDS.convert(now - overallStart, NANOSECONDS)));
|
||||
workerFuture.cancel(true);
|
||||
break;
|
||||
}
|
||||
|
||||
left -= now - start;
|
||||
if (left <= 0) {
|
||||
moveSpinner();
|
||||
left = maxIntervalNanos;
|
||||
|
||||
Reference in New Issue
Block a user