Enable RepoSequence with a config option

When noteDb.changes.sequence is true, read changes from the
refs/sequences/changes blob in All-Projects. Use a lazily evaluated
Seed that reads the next change number from ReviewDb and uses it as
the first number in NoteDb.

In a single-server setting this is sufficient, but in a multi-master
environment it may be desirable to leave a gap after the next
ReviewDb counter to account for the fact that not all servers may
receive the configuration update immediately. Control this with an
undocumented config option.

Change-Id: I2de8fbd42b525a75b3cdbcdb4b3f9e33e382b7a7
This commit is contained in:
Dave Borowitz
2016-07-21 15:20:16 -04:00
parent 9448ed4c81
commit 08de3c852b
7 changed files with 167 additions and 35 deletions

View File

@@ -25,6 +25,7 @@ import com.google.common.base.Predicates;
import com.google.common.base.Throwables;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Runnables;
import com.google.gerrit.common.Nullable;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.reviewdb.client.RefNames;
import com.google.gerrit.server.git.GitRepositoryManager;
@@ -64,6 +65,10 @@ import java.util.concurrent.locks.ReentrantLock;
* non-monotonic numbers.
*/
public class RepoSequence {
public interface Seed {
int get() throws OrmException;
}
@VisibleForTesting
static RetryerBuilder<RefUpdate.Result> retryerBuilder() {
return RetryerBuilder.<RefUpdate.Result> newBuilder()
@@ -80,7 +85,7 @@ public class RepoSequence {
private final GitRepositoryManager repoManager;
private final Project.NameKey projectName;
private final String refName;
private final int start;
private final Seed seed;
private final int batchSize;
private final Runnable afterReadRef;
private final Retryer<RefUpdate.Result> retryer;
@@ -94,20 +99,30 @@ public class RepoSequence {
@VisibleForTesting
int acquireCount;
public RepoSequence(GitRepositoryManager repoManager,
Project.NameKey projectName, String name, int start, int batchSize) {
this(repoManager, projectName, name, start, batchSize,
Runnables.doNothing(), RETRYER);
public RepoSequence(
GitRepositoryManager repoManager,
Project.NameKey projectName,
String name,
Seed seed,
int batchSize) {
this(repoManager, projectName, name, seed, batchSize, Runnables.doNothing(),
RETRYER);
}
@VisibleForTesting
RepoSequence(GitRepositoryManager repoManager, Project.NameKey projectName,
String name, int start, int batchSize, Runnable afterReadRef,
RepoSequence(
GitRepositoryManager repoManager,
Project.NameKey projectName,
String name,
Seed seed,
int batchSize,
Runnable afterReadRef,
Retryer<RefUpdate.Result> retryer) {
this.repoManager = checkNotNull(repoManager, "repoManager");
this.projectName = checkNotNull(projectName, "projectName");
this.refName = RefNames.REFS_SEQUENCES + checkNotNull(name, "name");
this.start = start;
this.seed = checkNotNull(seed, "seed");
checkArgument(batchSize > 0, "expected batchSize > 0, got: %s", batchSize);
this.batchSize = batchSize;
this.afterReadRef = checkNotNull(afterReadRef, "afterReadRef");
@@ -128,14 +143,29 @@ public class RepoSequence {
}
}
@VisibleForTesting
public void set(int val) throws OrmException {
// Don't bother spinning. This is only for tests, and a test that calls set
// concurrently with other writes is doing it wrong.
counterLock.lock();
try {
try (Repository repo = repoManager.openRepository(projectName);
RevWalk rw = new RevWalk(repo)) {
checkResult(store(repo, rw, null, val));
counter = limit;
} catch (IOException e) {
throw new OrmException(e);
}
} finally {
counterLock.unlock();
}
}
private void acquire() throws OrmException {
try (Repository repo = repoManager.openRepository(projectName);
RevWalk rw = new RevWalk(repo)) {
TryAcquire attempt = new TryAcquire(repo, rw);
RefUpdate.Result result = retryer.call(attempt);
if (result != RefUpdate.Result.NEW && result != RefUpdate.Result.FORCED) {
throw new OrmException("failed to update " + refName + ": " + result);
}
checkResult(retryer.call(attempt));
counter = attempt.next;
limit = counter + batchSize;
acquireCount++;
@@ -147,6 +177,12 @@ public class RepoSequence {
}
}
private void checkResult(RefUpdate.Result result) throws OrmException {
if (result != RefUpdate.Result.NEW && result != RefUpdate.Result.FORCED) {
throw new OrmException("failed to update " + refName + ": " + result);
}
}
private class TryAcquire implements Callable<RefUpdate.Result> {
private final Repository repo;
private final RevWalk rw;
@@ -165,12 +201,12 @@ public class RepoSequence {
ObjectId oldId;
if (ref == null) {
oldId = ObjectId.zeroId();
next = start;
next = seed.get();
} else {
oldId = ref.getObjectId();
next = parse(oldId);
}
return store(oldId, next + batchSize);
return store(repo, rw, oldId, next + batchSize);
}
private int parse(ObjectId id) throws IOException, OrmException {
@@ -189,18 +225,21 @@ public class RepoSequence {
}
return val;
}
}
private RefUpdate.Result store(ObjectId oldId, int val) throws IOException {
ObjectId newId;
try (ObjectInserter ins = repo.newObjectInserter()) {
newId = ins.insert(OBJ_BLOB, Integer.toString(val).getBytes(UTF_8));
ins.flush();
}
RefUpdate ru = repo.updateRef(refName);
ru.setExpectedOldObjectId(oldId);
ru.setNewObjectId(newId);
ru.setForceUpdate(true); // Required for non-commitish updates.
return ru.update(rw);
private RefUpdate.Result store(Repository repo, RevWalk rw,
@Nullable ObjectId oldId, int val) throws IOException {
ObjectId newId;
try (ObjectInserter ins = repo.newObjectInserter()) {
newId = ins.insert(OBJ_BLOB, Integer.toString(val).getBytes(UTF_8));
ins.flush();
}
RefUpdate ru = repo.updateRef(refName);
if (oldId != null) {
ru.setExpectedOldObjectId(oldId);
}
ru.setNewObjectId(newId);
ru.setForceUpdate(true); // Required for non-commitish updates.
return ru.update(rw);
}
}