Merge changes from topic 'notedb-sequence'

* changes:
  Configure NoteDb change sequence batch size with an option
  ReceiveCommits: Request change IDs from Sequences in bulk
  RepoSequence: Add method to get IDs in bulk
  Enable RepoSequence with a config option
This commit is contained in:
Dave Borowitz
2016-07-26 16:49:27 +00:00
committed by Gerrit Code Review
8 changed files with 320 additions and 55 deletions

View File

@@ -48,6 +48,7 @@ import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.reviewdb.server.ReviewDbUtil;
import com.google.gerrit.server.ChangeUtil;
import com.google.gerrit.server.PatchLineCommentsUtil;
import com.google.gerrit.server.Sequences;
import com.google.gerrit.server.change.PostReview;
import com.google.gerrit.server.change.Rebuild;
import com.google.gerrit.server.change.RevisionResource;
@@ -118,8 +119,11 @@ public class ChangeRebuilderIT extends AbstractDaemonTest {
@Inject
private BatchUpdate.Factory batchUpdateFactory;
@Inject
private Sequences seq;
@Before
public void setUp() {
public void setUp() throws Exception {
assume().that(NoteDbMode.readWrite()).isFalse();
TestTimeUtil.resetWithClockStep(1, TimeUnit.SECONDS);
setNotesMigration(false, false);
@@ -130,10 +134,20 @@ public class ChangeRebuilderIT extends AbstractDaemonTest {
TestTimeUtil.useSystemTime();
}
private void setNotesMigration(boolean writeChanges, boolean readChanges) {
@SuppressWarnings("deprecation")
private void setNotesMigration(boolean writeChanges, boolean readChanges)
throws Exception {
notesMigration.setWriteChanges(writeChanges);
notesMigration.setReadChanges(readChanges);
db = atrScope.reopenDb().getReviewDbProvider().get();
if (notesMigration.readChangeSequence()) {
// Copy next ReviewDb ID to NoteDb.
seq.getChangeIdRepoSequence().set(db.nextChangeId());
} else {
// Copy next NoteDb ID to ReviewDb.
while (db.nextChangeId() < seq.getChangeIdRepoSequence().next()) {}
}
}
@Test
@@ -163,8 +177,7 @@ public class ChangeRebuilderIT extends AbstractDaemonTest {
@Test
public void patchSetWithNullGroups() throws Exception {
Timestamp ts = TimeUtil.nowTs();
@SuppressWarnings("deprecation")
Change c = TestChanges.newChange(project, user.getId(), db.nextChangeId());
Change c = TestChanges.newChange(project, user.getId(), seq.nextChangeId());
c.setCreatedOn(ts);
c.setLastUpdatedOn(ts);
PatchSet ps = TestChanges.newPatchSet(

View File

@@ -14,25 +14,82 @@
package com.google.gerrit.server;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.config.AllProjectsName;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.notedb.NotesMigration;
import com.google.gerrit.server.notedb.RepoSequence;
import com.google.gwtorm.server.OrmException;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
import org.eclipse.jgit.lib.Config;
import java.util.ArrayList;
import java.util.List;
@SuppressWarnings("deprecation")
@Singleton
public class Sequences {
private final Provider<ReviewDb> db;
private final NotesMigration migration;
private final RepoSequence changeSeq;
@Inject
Sequences(Provider<ReviewDb> db) {
Sequences(@GerritServerConfig Config cfg,
final Provider<ReviewDb> db,
NotesMigration migration,
GitRepositoryManager repoManager,
AllProjectsName allProjects) {
this.db = db;
this.migration = migration;
final int gap = cfg.getInt("noteDb", "changes", "initialSequenceGap", 0);
changeSeq = new RepoSequence(
repoManager,
allProjects,
"changes",
new RepoSequence.Seed() {
@Override
public int get() throws OrmException {
return db.get().nextChangeId() + gap;
}
},
cfg.getInt("noteDb", "changes", "sequenceBatchSize", 20));
}
@SuppressWarnings("deprecation")
public int nextChangeId() throws OrmException {
// TODO(dborowitz): Use repo sequence when we have ability to turn off
// ReviewDb entirely. Until then it's simpler to just keep using ReviewDb.
return db.get().nextChangeId();
if (!migration.readChangeSequence()) {
return db.get().nextChangeId();
}
return changeSeq.next();
}
public ImmutableList<Integer> nextChangeIds(int count) throws OrmException {
if (migration.readChangeSequence()) {
return changeSeq.next(count);
}
if (count == 0) {
return ImmutableList.of();
}
checkArgument(count > 0, "count is negative: %s", count);
List<Integer> ids = new ArrayList<>(count);
ReviewDb db = this.db.get();
for (int i = 0; i < count; i++) {
ids.add(db.nextChangeId());
}
return ImmutableList.copyOf(ids);
}
@VisibleForTesting
public RepoSequence getChangeIdRepoSequence() {
return changeSeq;
}
}

View File

@@ -1709,9 +1709,12 @@ public class ReceiveCommits {
try {
SortedSetMultimap<ObjectId, String> groups = groupCollector.getGroups();
for (CreateRequest create : newChanges) {
List<Integer> newIds = seq.nextChangeIds(newChanges.size());
for (int i = 0; i < newChanges.size(); i++) {
CreateRequest create = newChanges.get(i);
create.setChangeId(newIds.get(i));
batch.addCommand(create.cmd);
create.groups = ImmutableList.copyOf(groups.get(create.commitId));
create.groups = ImmutableList.copyOf(groups.get(create.commit));
}
for (ReplaceRequest replace : replaceByChange.values()) {
replace.groups = ImmutableList.copyOf(groups.get(replace.newCommitId));
@@ -1757,32 +1760,37 @@ public class ReceiveCommits {
}
private class CreateRequest {
final ObjectId commitId;
final ReceiveCommand cmd;
final ChangeInserter ins;
final RevCommit commit;
private final String refName;
Change.Id changeId;
ReceiveCommand cmd;
ChangeInserter ins;
List<String> groups = ImmutableList.of();
Change change;
CreateRequest(RevCommit c, String refName)
throws OrmException {
commitId = c.copy();
changeId = new Change.Id(seq.nextChangeId());
ins = changeInserterFactory.create(changeId, c, refName)
CreateRequest(RevCommit commit, String refName) {
this.commit = commit;
this.refName = refName;
}
private void setChangeId(int id) {
changeId = new Change.Id(id);
ins = changeInserterFactory.create(changeId, commit, refName)
.setDraft(magicBranch.draft)
.setTopic(magicBranch.topic)
// Changes already validated in validateNewCommits.
.setValidatePolicy(CommitValidators.Policy.NONE);
cmd = new ReceiveCommand(ObjectId.zeroId(), c,
cmd = new ReceiveCommand(ObjectId.zeroId(), commit,
ins.getPatchSetId().toRefName());
ins.setUpdateRefCommand(cmd);
}
private void addOps(BatchUpdate bu) throws RestApiException {
checkState(changeId != null, "must call setChangeId before addOps");
try {
RevWalk rw = rp.getRevWalk();
RevCommit commit = rw.parseCommit(commitId);
rw.parseBody(commit);
final PatchSet.Id psId = ins.setGroups(groups).getPatchSetId();
Account.Id me = user.getAccountId();
@@ -1852,7 +1860,7 @@ public class ReceiveCommits {
for (CreateRequest r : create) {
checkNotNull(r.change,
"cannot submit new change %s; op may not have run", r.changeId);
bySha.put(r.commitId, r.change);
bySha.put(r.commit, r.change);
}
for (ReplaceRequest r : replace) {
bySha.put(r.newCommitId, r.notes.getChange());

View File

@@ -50,6 +50,7 @@ public class ConfigNotesMigration extends NotesMigration {
private static final String NOTE_DB = "noteDb";
private static final String READ = "read";
private static final String WRITE = "write";
private static final String SEQUENCE = "sequence";
private static void checkConfig(Config cfg) {
Set<String> keys = new HashSet<>();
@@ -78,14 +79,24 @@ public class ConfigNotesMigration extends NotesMigration {
private final boolean writeChanges;
private final boolean readChanges;
private final boolean readChangeSequence;
private final boolean writeAccounts;
private final boolean readAccounts;
@Inject
ConfigNotesMigration(@GerritServerConfig Config cfg) {
checkConfig(cfg);
writeChanges = cfg.getBoolean(NOTE_DB, CHANGES.key(), WRITE, false);
readChanges = cfg.getBoolean(NOTE_DB, CHANGES.key(), READ, false);
// Reading change sequence numbers from NoteDb is not the default even if
// reading changes themselves is. Once this is enabled, it's not easy to
// undo: ReviewDb might hand out numbers that have already been assigned by
// NoteDb. This decision for the default may be reevaluated later.
readChangeSequence = cfg.getBoolean(NOTE_DB, CHANGES.key(), SEQUENCE, false);
writeAccounts = cfg.getBoolean(NOTE_DB, ACCOUNTS.key(), WRITE, false);
readAccounts = cfg.getBoolean(NOTE_DB, ACCOUNTS.key(), READ, false);
}
@@ -100,6 +111,11 @@ public class ConfigNotesMigration extends NotesMigration {
return readChanges;
}
@Override
public boolean readChangeSequence() {
return readChangeSequence;
}
@Override
public boolean writeAccounts() {
return writeAccounts;

View File

@@ -62,6 +62,15 @@ public abstract class NotesMigration {
*/
protected abstract boolean writeChanges();
/**
* Read sequential change ID numbers from NoteDb.
* <p>
* If true, change IDs are read from {@code refs/sequences/changes} in
* All-Projects. If false, change IDs are read from ReviewDb's native
* sequences.
*/
public abstract boolean readChangeSequence();
public abstract boolean readAccounts();
public abstract boolean writeAccounts();

View File

@@ -23,8 +23,10 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.CharMatcher;
import com.google.common.base.Predicates;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Runnables;
import com.google.gerrit.common.Nullable;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.reviewdb.client.RefNames;
import com.google.gerrit.server.git.GitRepositoryManager;
@@ -46,6 +48,8 @@ import org.eclipse.jgit.lib.Repository;
import org.eclipse.jgit.revwalk.RevWalk;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -64,6 +68,10 @@ import java.util.concurrent.locks.ReentrantLock;
* non-monotonic numbers.
*/
public class RepoSequence {
public interface Seed {
int get() throws OrmException;
}
@VisibleForTesting
static RetryerBuilder<RefUpdate.Result> retryerBuilder() {
return RetryerBuilder.<RefUpdate.Result> newBuilder()
@@ -80,7 +88,7 @@ public class RepoSequence {
private final GitRepositoryManager repoManager;
private final Project.NameKey projectName;
private final String refName;
private final int start;
private final Seed seed;
private final int batchSize;
private final Runnable afterReadRef;
private final Retryer<RefUpdate.Result> retryer;
@@ -94,20 +102,30 @@ public class RepoSequence {
@VisibleForTesting
int acquireCount;
public RepoSequence(GitRepositoryManager repoManager,
Project.NameKey projectName, String name, int start, int batchSize) {
this(repoManager, projectName, name, start, batchSize,
Runnables.doNothing(), RETRYER);
public RepoSequence(
GitRepositoryManager repoManager,
Project.NameKey projectName,
String name,
Seed seed,
int batchSize) {
this(repoManager, projectName, name, seed, batchSize, Runnables.doNothing(),
RETRYER);
}
@VisibleForTesting
RepoSequence(GitRepositoryManager repoManager, Project.NameKey projectName,
String name, int start, int batchSize, Runnable afterReadRef,
RepoSequence(
GitRepositoryManager repoManager,
Project.NameKey projectName,
String name,
Seed seed,
int batchSize,
Runnable afterReadRef,
Retryer<RefUpdate.Result> retryer) {
this.repoManager = checkNotNull(repoManager, "repoManager");
this.projectName = checkNotNull(projectName, "projectName");
this.refName = RefNames.REFS_SEQUENCES + checkNotNull(name, "name");
this.start = start;
this.seed = checkNotNull(seed, "seed");
checkArgument(batchSize > 0, "expected batchSize > 0, got: %s", batchSize);
this.batchSize = batchSize;
this.afterReadRef = checkNotNull(afterReadRef, "afterReadRef");
@@ -120,7 +138,7 @@ public class RepoSequence {
counterLock.lock();
try {
if (counter >= limit) {
acquire();
acquire(batchSize);
}
return counter++;
} finally {
@@ -128,16 +146,55 @@ public class RepoSequence {
}
}
private void acquire() throws OrmException {
public ImmutableList<Integer> next(int count) throws OrmException {
if (count == 0) {
return ImmutableList.of();
}
checkArgument(count > 0, "count is negative: %s", count);
counterLock.lock();
try {
List<Integer> ids = new ArrayList<>(count);
while (counter < limit) {
ids.add(counter++);
if (ids.size() == count) {
return ImmutableList.copyOf(ids);
}
}
acquire(Math.max(count - ids.size(), batchSize));
while (ids.size() < count) {
ids.add(counter++);
}
return ImmutableList.copyOf(ids);
} finally {
counterLock.unlock();
}
}
@VisibleForTesting
public void set(int val) throws OrmException {
// Don't bother spinning. This is only for tests, and a test that calls set
// concurrently with other writes is doing it wrong.
counterLock.lock();
try {
try (Repository repo = repoManager.openRepository(projectName);
RevWalk rw = new RevWalk(repo)) {
checkResult(store(repo, rw, null, val));
counter = limit;
} catch (IOException e) {
throw new OrmException(e);
}
} finally {
counterLock.unlock();
}
}
private void acquire(int count) throws OrmException {
try (Repository repo = repoManager.openRepository(projectName);
RevWalk rw = new RevWalk(repo)) {
TryAcquire attempt = new TryAcquire(repo, rw);
RefUpdate.Result result = retryer.call(attempt);
if (result != RefUpdate.Result.NEW && result != RefUpdate.Result.FORCED) {
throw new OrmException("failed to update " + refName + ": " + result);
}
TryAcquire attempt = new TryAcquire(repo, rw, count);
checkResult(retryer.call(attempt));
counter = attempt.next;
limit = counter + batchSize;
limit = counter + count;
acquireCount++;
} catch (ExecutionException | RetryException e) {
Throwables.propagateIfInstanceOf(e.getCause(), OrmException.class);
@@ -147,15 +204,23 @@ public class RepoSequence {
}
}
private void checkResult(RefUpdate.Result result) throws OrmException {
if (result != RefUpdate.Result.NEW && result != RefUpdate.Result.FORCED) {
throw new OrmException("failed to update " + refName + ": " + result);
}
}
private class TryAcquire implements Callable<RefUpdate.Result> {
private final Repository repo;
private final RevWalk rw;
private final int count;
private int next;
private TryAcquire(Repository repo, RevWalk rw) {
private TryAcquire(Repository repo, RevWalk rw, int count) {
this.repo = repo;
this.rw = rw;
this.count = count;
}
@Override
@@ -165,12 +230,12 @@ public class RepoSequence {
ObjectId oldId;
if (ref == null) {
oldId = ObjectId.zeroId();
next = start;
next = seed.get();
} else {
oldId = ref.getObjectId();
next = parse(oldId);
}
return store(oldId, next + batchSize);
return store(repo, rw, oldId, next + count);
}
private int parse(ObjectId id) throws IOException, OrmException {
@@ -189,18 +254,21 @@ public class RepoSequence {
}
return val;
}
}
private RefUpdate.Result store(ObjectId oldId, int val) throws IOException {
ObjectId newId;
try (ObjectInserter ins = repo.newObjectInserter()) {
newId = ins.insert(OBJ_BLOB, Integer.toString(val).getBytes(UTF_8));
ins.flush();
}
RefUpdate ru = repo.updateRef(refName);
ru.setExpectedOldObjectId(oldId);
ru.setNewObjectId(newId);
ru.setForceUpdate(true); // Required for non-commitish updates.
return ru.update(rw);
private RefUpdate.Result store(Repository repo, RevWalk rw,
@Nullable ObjectId oldId, int val) throws IOException {
ObjectId newId;
try (ObjectInserter ins = repo.newObjectInserter()) {
newId = ins.insert(OBJ_BLOB, Integer.toString(val).getBytes(UTF_8));
ins.flush();
}
RefUpdate ru = repo.updateRef(refName);
if (oldId != null) {
ru.setExpectedOldObjectId(oldId);
}
ru.setNewObjectId(newId);
ru.setForceUpdate(true); // Required for non-commitish updates.
return ru.update(rw);
}
}

View File

@@ -77,7 +77,7 @@ public class RepoSequenceTest {
RepoSequence s = newSequence(name, 1, batchSize);
for (int i = 1; i <= max; i++) {
try {
assertThat(s.next()).named("next for " + name).isEqualTo(i);
assertThat(s.next()).named("i=" + i + " for " + name).isEqualTo(i);
} catch (OrmException e) {
throw new AssertionError(
"failed batchSize=" + batchSize + ", i=" + i, e);
@@ -89,6 +89,36 @@ public class RepoSequenceTest {
}
}
@Test
public void oneCallerNoLoop() throws Exception {
RepoSequence s = newSequence("id", 1, 3);
assertThat(s.acquireCount).isEqualTo(0);
assertThat(s.next()).isEqualTo(1);
assertThat(s.acquireCount).isEqualTo(1);
assertThat(s.next()).isEqualTo(2);
assertThat(s.acquireCount).isEqualTo(1);
assertThat(s.next()).isEqualTo(3);
assertThat(s.acquireCount).isEqualTo(1);
assertThat(s.next()).isEqualTo(4);
assertThat(s.acquireCount).isEqualTo(2);
assertThat(s.next()).isEqualTo(5);
assertThat(s.acquireCount).isEqualTo(2);
assertThat(s.next()).isEqualTo(6);
assertThat(s.acquireCount).isEqualTo(2);
assertThat(s.next()).isEqualTo(7);
assertThat(s.acquireCount).isEqualTo(3);
assertThat(s.next()).isEqualTo(8);
assertThat(s.acquireCount).isEqualTo(3);
assertThat(s.next()).isEqualTo(9);
assertThat(s.acquireCount).isEqualTo(3);
assertThat(s.next()).isEqualTo(10);
assertThat(s.acquireCount).isEqualTo(4);
}
@Test
public void twoCallers() throws Exception {
RepoSequence s1 = newSequence("id", 1, 3);
@@ -193,15 +223,72 @@ public class RepoSequenceTest {
s.next();
}
@Test
public void nextWithCountOneCaller() throws Exception {
RepoSequence s = newSequence("id", 1, 3);
assertThat(s.next(2)).containsExactly(1, 2).inOrder();
assertThat(s.acquireCount).isEqualTo(1);
assertThat(s.next(2)).containsExactly(3, 4).inOrder();
assertThat(s.acquireCount).isEqualTo(2);
assertThat(s.next(2)).containsExactly(5, 6).inOrder();
assertThat(s.acquireCount).isEqualTo(2);
assertThat(s.next(3)).containsExactly(7, 8, 9).inOrder();
assertThat(s.acquireCount).isEqualTo(3);
assertThat(s.next(3)).containsExactly(10, 11, 12).inOrder();
assertThat(s.acquireCount).isEqualTo(4);
assertThat(s.next(3)).containsExactly(13, 14, 15).inOrder();
assertThat(s.acquireCount).isEqualTo(5);
assertThat(s.next(7)).containsExactly(16, 17, 18, 19, 20, 21, 22).inOrder();
assertThat(s.acquireCount).isEqualTo(6);
assertThat(s.next(7)).containsExactly(23, 24, 25, 26, 27, 28, 29).inOrder();
assertThat(s.acquireCount).isEqualTo(7);
assertThat(s.next(7)).containsExactly(30, 31, 32, 33, 34, 35, 36).inOrder();
assertThat(s.acquireCount).isEqualTo(8);
}
@Test
public void nextWithCountMultipleCallers() throws Exception {
RepoSequence s1 = newSequence("id", 1, 3);
RepoSequence s2 = newSequence("id", 1, 4);
assertThat(s1.next(2)).containsExactly(1, 2).inOrder();
assertThat(s1.acquireCount).isEqualTo(1);
// s1 hasn't exhausted its last batch.
assertThat(s2.next(2)).containsExactly(4, 5).inOrder();
assertThat(s2.acquireCount).isEqualTo(1);
// s1 acquires again to cover this request, plus a whole new batch.
assertThat(s1.next(3)).containsExactly(3, 8, 9);
assertThat(s1.acquireCount).isEqualTo(2);
// s2 hasn't exhausted its last batch, do so now.
assertThat(s2.next(2)).containsExactly(6, 7);
assertThat(s2.acquireCount).isEqualTo(1);
}
private RepoSequence newSequence(String name, int start, int batchSize) {
return newSequence(
name, start, batchSize, Runnables.doNothing(), RETRYER);
}
private RepoSequence newSequence(String name, int start, int batchSize,
private RepoSequence newSequence(String name, final int start, int batchSize,
Runnable afterReadRef, Retryer<RefUpdate.Result> retryer) {
return new RepoSequence(
repoManager, project, name, start, batchSize, afterReadRef, retryer);
repoManager,
project,
name,
new RepoSequence.Seed() {
@Override
public int get() {
return start;
}
},
batchSize,
afterReadRef,
retryer);
}
private ObjectId writeBlob(String sequenceName, String value) {

View File

@@ -29,6 +29,13 @@ public class TestNotesMigration extends NotesMigration {
return readChanges;
}
@Override
public boolean readChangeSequence() {
// Unlike ConfigNotesMigration, read change numbers from NoteDb by default
// when reads are enabled, to improve test coverage.
return readChanges;
}
// Increase visbility from superclass, as tests may want to check whether
// NoteDb data is written in specific migration scenarios.
@Override