Upon replication fail, retry and block future ones to same URL

Upon getting a replication failure to a URL, Gerrit should block all
future replication to that URL and use a background timer to
continuously retry with a reasonable delay between attempts, until
the remote comes back and starts to answer us again.

It considers a new option in replication config file to determine
the minutes to wait until re-trying to replicate to a previously
offline machine.

Bug: issue 482
Change-Id: If46862787a4f9a60d555e96634e25c96efa3332e
This commit is contained in:
rafael.rabelosilva 2010-09-07 15:33:11 +02:00 committed by Shawn O. Pearce
parent 9d5008a05e
commit 357d861b7d
3 changed files with 190 additions and 33 deletions

View File

@ -142,6 +142,19 @@ This is a Gerrit specific extension to the Git remote block.
+
By default, 15 seconds.
[[remote.name.replicationRetry]]remote.<name>.replicationRetry::
+
Number of minutes to wait before scheduling a remote push operation
previously failed due offline remote server.
+
If a remote push operation fails because a remote server was
offline, all push operations to the same destination URL are
blocked, and the remote push is continuously retried.
+
This is a Gerrit specific extension to the Git remote block.
+
By default, 1 minute.
[[remote.name.threads]]remote.<name>.threads::
+
Number of worker threads to dedicate to pushing to the repositories

View File

@ -78,6 +78,13 @@ class PushOp implements ProjectRunnable {
private Repository db;
/**
* It indicates if the current instance is in fact retrying to push.
*/
private boolean retrying;
private boolean canceled;
@Inject
PushOp(final GitRepositoryManager grm, final SchemaFactory<ReviewDb> s,
final PushReplication.ReplicationConfig p, final RemoteConfig c,
@ -90,6 +97,22 @@ class PushOp implements ProjectRunnable {
uri = u;
}
public boolean isRetrying() {
return retrying;
}
public void setToRetry() {
retrying = true;
}
public void cancel() {
canceled = true;
}
public boolean wasCanceled() {
return canceled;
}
URIish getURI() {
return uri;
}
@ -103,45 +126,73 @@ class PushOp implements ProjectRunnable {
}
}
public void run() {
try {
// Lock the queue, and remove ourselves, so we can't be modified once
// we start replication (instead a new instance, with the same URI, is
// created and scheduled for a future point in time.)
//
pool.notifyStarting(this);
db = repoManager.openRepository(projectName.get());
runImpl();
} catch (RepositoryNotFoundException e) {
log.error("Cannot replicate " + projectName + "; " + e.getMessage());
public Set<String> getRefs() {
final Set<String> refs;
} catch (NoRemoteRepositoryException e) {
log.error("Cannot replicate to " + uri + "; repository not found");
if (mirror) {
refs = new HashSet<String>(1);
refs.add(MIRROR_ALL);
} else {
refs = delta;
}
} catch (NotSupportedException e) {
log.error("Cannot replicate to " + uri, e);
return refs;
}
} catch (TransportException e) {
final Throwable cause = e.getCause();
if (cause instanceof JSchException
&& cause.getMessage().startsWith("UnknownHostKey:")) {
log.error("Cannot replicate to " + uri + ": " + cause.getMessage());
} else {
log.error("Cannot replicate to " + uri, e);
public void addRefs(Set<String> refs) {
if (!mirror) {
for (String ref : refs) {
addRef(ref);
}
}
}
} catch (IOException e) {
log.error("Cannot replicate to " + uri, e);
public void run() {
// Lock the queue, and remove ourselves, so we can't be modified once
// we start replication (instead a new instance, with the same URI, is
// created and scheduled for a future point in time.)
//
pool.notifyStarting(this);
} catch (RuntimeException e) {
log.error("Unexpected error during replication to " + uri, e);
// It should only verify if it was canceled after calling notifyStarting,
// since the canceled flag would be set locking the queue.
if (!canceled) {
try {
db = repoManager.openRepository(projectName.get());
runImpl();
} catch (RepositoryNotFoundException e) {
log.error("Cannot replicate " + projectName + "; " + e.getMessage());
} catch (Error e) {
log.error("Unexpected error during replication to " + uri, e);
} catch (NoRemoteRepositoryException e) {
log.error("Cannot replicate to " + uri + "; repository not found");
} finally {
if (db != null) {
db.close();
} catch (NotSupportedException e) {
log.error("Cannot replicate to " + uri, e);
} catch (TransportException e) {
final Throwable cause = e.getCause();
if (cause instanceof JSchException
&& cause.getMessage().startsWith("UnknownHostKey:")) {
log.error("Cannot replicate to " + uri + ": " + cause.getMessage());
} else {
log.error("Cannot replicate to " + uri, e);
}
// The remote push operation should be retried.
pool.reschedule(this);
} catch (IOException e) {
log.error("Cannot replicate to " + uri, e);
} catch (RuntimeException e) {
log.error("Unexpected error during replication to " + uri, e);
} catch (Error e) {
log.error("Unexpected error during replication to " + uri, e);
} finally {
if (db != null) {
db.close();
}
}
}
}

View File

@ -55,7 +55,6 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -307,6 +306,7 @@ public class PushReplication implements ReplicationQueue {
static class ReplicationConfig {
private final RemoteConfig remote;
private final int delay;
private final int retryDelay;
private final WorkQueue.Executor pool;
private final Map<URIish, PushOp> pending = new HashMap<URIish, PushOp>();
private final PushOp.Factory opFactory;
@ -318,6 +318,7 @@ public class PushReplication implements ReplicationQueue {
remote = rc;
delay = Math.max(0, getInt(rc, cfg, "replicationdelay", 15));
retryDelay = Math.max(0, getInt(rc, cfg, "replicationretry", 1));
final int poolSize = Math.max(0, getInt(rc, cfg, "threads", 1));
final String poolName = "ReplicateTo-" + rc.getName();
@ -383,6 +384,96 @@ public class PushReplication implements ReplicationQueue {
}
}
/**
* It schedules again a PushOp instance.
* <p>
* It is assumed to be previously scheduled and found a
* transport exception. It will schedule it as a push
* operation to be retried after the minutes count
* determined by class attribute retryDelay.
* <p>
* In case the PushOp instance to be scheduled has same
* URI than one also pending for retry, it adds to the one
* pending the refs list of the parameter instance.
* <p>
* In case the PushOp instance to be scheduled has same
* URI than one pending, but not pending for retry, it
* indicates the one pending should be canceled when it
* starts executing, removes it from pending list, and
* adds its refs to the parameter instance. The parameter
* instance is scheduled for retry.
* <p>
* Notice all operations to indicate a PushOp should be
* canceled, or it is retrying, or remove/add it from/to
* pending Map should be protected by the lock on pending
* Map class instance attribute.
*
* @param pushOp The PushOp instance to be scheduled.
*/
void reschedule(final PushOp pushOp) {
try {
if (!controlFor(pushOp.getProjectNameKey()).isVisible()) {
return;
}
} catch (NoSuchProjectException e1) {
log.error("Internal error: project " + pushOp.getProjectNameKey()
+ " not found during replication");
return;
}
// It locks access to pending variable.
synchronized (pending) {
PushOp pendingPushOp = pending.get(pushOp.getURI());
if (pendingPushOp != null) {
// There is one PushOp instance already pending to same URI.
if (pendingPushOp.isRetrying()) {
// The one pending is one already retrying, so it should
// maintain it and add to it the refs of the one passed
// as parameter to the method.
// This scenario would happen if a PushOp has started running
// and then before it failed due transport exception, another
// one to same URI started. The first one would fail and would
// be rescheduled, being present in pending list. When the
// second one fails, it will also be rescheduled and then,
// here, find out replication to its URI is already pending
// for retry (blocking).
pendingPushOp.addRefs(pushOp.getRefs());
} else {
// The one pending is one that is NOT retrying, it was just
// scheduled believing no problem would happen. The one pending
// should be canceled, and this is done by setting its canceled
// flag, removing it from pending list, and adding its refs to
// the pushOp instance that should then, later, in this method,
// be scheduled for retry.
// Notice that the PushOp found pending will start running and,
// when notifying it is starting (with pending lock protection),
// it will see it was canceled and then it will do nothing with
// pending list and it will not execute its run implementation.
pendingPushOp.cancel();
pending.remove(pendingPushOp);
pushOp.addRefs(pendingPushOp.getRefs());
}
}
if (pendingPushOp == null || !pendingPushOp.isRetrying()) {
// The PushOp method param instance should be scheduled for retry.
// Remember when retrying it should be used different delay.
pushOp.setToRetry();
pending.put(pushOp.getURI(), pushOp);
pool.schedule(pushOp, retryDelay, TimeUnit.MINUTES);
}
}
}
ProjectControl controlFor(final Project.NameKey project)
throws NoSuchProjectException {
return projectControlFactory.controlFor(project);
@ -390,7 +481,9 @@ public class PushReplication implements ReplicationQueue {
void notifyStarting(final PushOp op) {
synchronized (pending) {
pending.remove(op.getURI());
if (!op.wasCanceled()) {
pending.remove(op.getURI());
}
}
}