diff --git a/gerrit-reviewdb/src/main/java/com/google/gerrit/reviewdb/client/RefNames.java b/gerrit-reviewdb/src/main/java/com/google/gerrit/reviewdb/client/RefNames.java index d6ba008f90..1b00485d5a 100644 --- a/gerrit-reviewdb/src/main/java/com/google/gerrit/reviewdb/client/RefNames.java +++ b/gerrit-reviewdb/src/main/java/com/google/gerrit/reviewdb/client/RefNames.java @@ -46,6 +46,9 @@ public class RefNames { /** A change starred by a user */ public static final String REFS_STARRED_CHANGES = "refs/starred-changes/"; + /** Sequence counters in notedb. */ + public static final String REFS_SEQUENCES = "refs/sequences/"; + /** * Prefix applied to merge commit base nodes. *

diff --git a/gerrit-server/BUCK b/gerrit-server/BUCK index 07847fc6e4..c98663acd4 100644 --- a/gerrit-server/BUCK +++ b/gerrit-server/BUCK @@ -37,6 +37,7 @@ java_library( '//lib:grappa', '//lib:gson', '//lib:guava', + '//lib:guava-retrying', '//lib:gwtjsonrpc', '//lib:gwtorm', '//lib:jsch', @@ -195,6 +196,7 @@ java_test( '//lib:args4j', '//lib:grappa', '//lib:guava', + '//lib:guava-retrying', '//lib/dropwizard:dropwizard-core', '//lib/guice:guice-assistedinject', '//lib/prolog:runtime', diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/notedb/RepoSequence.java b/gerrit-server/src/main/java/com/google/gerrit/server/notedb/RepoSequence.java new file mode 100644 index 0000000000..a7a76ab49f --- /dev/null +++ b/gerrit-server/src/main/java/com/google/gerrit/server/notedb/RepoSequence.java @@ -0,0 +1,206 @@ +// Copyright (C) 2016 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.gerrit.server.notedb; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.eclipse.jgit.lib.Constants.OBJ_BLOB; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.CharMatcher; +import com.google.common.base.Predicates; +import com.google.common.base.Throwables; +import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.Runnables; +import com.google.gerrit.reviewdb.client.Project; +import com.google.gerrit.reviewdb.client.RefNames; +import com.google.gerrit.server.git.GitRepositoryManager; +import com.google.gwtorm.server.OrmException; + +import com.github.rholder.retry.RetryException; +import com.github.rholder.retry.Retryer; +import com.github.rholder.retry.RetryerBuilder; +import com.github.rholder.retry.StopStrategies; +import com.github.rholder.retry.WaitStrategies; + +import org.eclipse.jgit.errors.IncorrectObjectTypeException; +import org.eclipse.jgit.lib.ObjectId; +import org.eclipse.jgit.lib.ObjectInserter; +import org.eclipse.jgit.lib.ObjectLoader; +import org.eclipse.jgit.lib.Ref; +import org.eclipse.jgit.lib.RefUpdate; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.revwalk.RevWalk; + +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Class for managing an incrementing sequence backed by a git repository. + *

+ * The current sequence number is stored as UTF-8 text in a blob pointed to + * by a ref in the {@code refs/sequences/*} namespace. Multiple processes can + * share the same sequence by incrementing the counter using normal git ref + * updates. To amortize the cost of these ref updates, processes can increment + * the counter by a larger number and hand out numbers from that range in memory + * until they run out. This means concurrent processes will hand out somewhat + * non-monotonic numbers. + */ +public class RepoSequence { + @VisibleForTesting + static RetryerBuilder retryerBuilder() { + return RetryerBuilder. newBuilder() + .retryIfResult(Predicates.equalTo(RefUpdate.Result.LOCK_FAILURE)) + .withWaitStrategy( + WaitStrategies.join( + WaitStrategies.exponentialWait(5, TimeUnit.SECONDS), + WaitStrategies.randomWait(50, TimeUnit.MILLISECONDS))) + .withStopStrategy(StopStrategies.stopAfterDelay(30, TimeUnit.SECONDS)); + } + + private static Retryer RETRYER = retryerBuilder().build(); + + private final GitRepositoryManager repoManager; + private final Project.NameKey projectName; + private final String refName; + private final int start; + private final int batchSize; + private final Runnable afterReadRef; + private final Retryer retryer; + + // Protects all non-final fields. + private final Lock counterLock; + + private int limit; + private int counter; + + @VisibleForTesting + int acquireCount; + + public RepoSequence(GitRepositoryManager repoManager, + Project.NameKey projectName, String name, int start, int batchSize) { + this(repoManager, projectName, name, start, batchSize, + Runnables.doNothing(), RETRYER); + } + + @VisibleForTesting + RepoSequence(GitRepositoryManager repoManager, Project.NameKey projectName, + String name, int start, int batchSize, Runnable afterReadRef, + Retryer retryer) { + this.repoManager = checkNotNull(repoManager, "repoManager"); + this.projectName = checkNotNull(projectName, "projectName"); + this.refName = RefNames.REFS_SEQUENCES + checkNotNull(name, "name"); + this.start = start; + checkArgument(batchSize > 0, "expected batchSize > 0, got: %s", batchSize); + this.batchSize = batchSize; + this.afterReadRef = checkNotNull(afterReadRef, "afterReadRef"); + this.retryer = checkNotNull(retryer, "retryer"); + + counterLock = new ReentrantLock(true); + } + + public int next() throws OrmException { + counterLock.lock(); + try { + if (counter >= limit) { + acquire(); + } + return counter++; + } finally { + counterLock.unlock(); + } + } + + private void acquire() throws OrmException { + try (Repository repo = repoManager.openRepository(projectName); + RevWalk rw = new RevWalk(repo)) { + TryAcquire attempt = new TryAcquire(repo, rw); + RefUpdate.Result result = retryer.call(attempt); + if (result != RefUpdate.Result.NEW && result != RefUpdate.Result.FORCED) { + throw new OrmException("failed to update " + refName + ": " + result); + } + counter = attempt.next; + limit = counter + batchSize; + acquireCount++; + } catch (ExecutionException | RetryException e) { + Throwables.propagateIfInstanceOf(e.getCause(), OrmException.class); + throw new OrmException(e); + } catch (IOException e) { + throw new OrmException(e); + } + } + + private class TryAcquire implements Callable { + private final Repository repo; + private final RevWalk rw; + + private int next; + + private TryAcquire(Repository repo, RevWalk rw) { + this.repo = repo; + this.rw = rw; + } + + @Override + public RefUpdate.Result call() throws Exception { + Ref ref = repo.exactRef(refName); + afterReadRef.run(); + ObjectId oldId; + if (ref == null) { + oldId = ObjectId.zeroId(); + next = start; + } else { + oldId = ref.getObjectId(); + next = parse(oldId); + } + return store(oldId, next + batchSize); + } + + private int parse(ObjectId id) throws IOException, OrmException { + ObjectLoader ol = rw.getObjectReader().open(id, OBJ_BLOB); + if (ol.getType() != OBJ_BLOB) { + // In theory this should be thrown by open but not all implementations + // may do it properly (certainly InMemoryRepository doesn't). + throw new IncorrectObjectTypeException(id, OBJ_BLOB); + } + String str = CharMatcher.WHITESPACE.trimFrom( + new String(ol.getCachedBytes(), UTF_8)); + Integer val = Ints.tryParse(str); + if (val == null) { + throw new OrmException( + "invalid value in " + refName + " blob at " + id.name()); + } + return val; + } + + private RefUpdate.Result store(ObjectId oldId, int val) throws IOException { + ObjectId newId; + try (ObjectInserter ins = repo.newObjectInserter()) { + newId = ins.insert(OBJ_BLOB, Integer.toString(val).getBytes(UTF_8)); + ins.flush(); + } + RefUpdate ru = repo.updateRef(refName); + ru.setExpectedOldObjectId(oldId); + ru.setNewObjectId(newId); + ru.setForceUpdate(true); // Required for non-commitish updates. + return ru.update(rw); + } + } +} diff --git a/gerrit-server/src/test/java/com/google/gerrit/server/notedb/RepoSequenceTest.java b/gerrit-server/src/test/java/com/google/gerrit/server/notedb/RepoSequenceTest.java new file mode 100644 index 0000000000..9c265a8277 --- /dev/null +++ b/gerrit-server/src/test/java/com/google/gerrit/server/notedb/RepoSequenceTest.java @@ -0,0 +1,235 @@ +// Copyright (C) 2016 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.gerrit.server.notedb; + +import static com.google.common.truth.Truth.assertThat; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.eclipse.jgit.lib.Constants.OBJ_BLOB; +import static org.junit.Assert.fail; + +import com.google.common.util.concurrent.Runnables; +import com.google.gerrit.reviewdb.client.Project; +import com.google.gerrit.reviewdb.client.RefNames; +import com.google.gerrit.testutil.InMemoryRepositoryManager; +import com.google.gwtorm.server.OrmException; + +import com.github.rholder.retry.BlockStrategy; +import com.github.rholder.retry.Retryer; +import com.github.rholder.retry.RetryerBuilder; +import com.github.rholder.retry.StopStrategies; + +import org.eclipse.jgit.errors.IncorrectObjectTypeException; +import org.eclipse.jgit.junit.TestRepository; +import org.eclipse.jgit.lib.ObjectId; +import org.eclipse.jgit.lib.ObjectInserter; +import org.eclipse.jgit.lib.RefUpdate; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.revwalk.RevWalk; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +public class RepoSequenceTest { + private static final Retryer RETRYER = + RepoSequence.retryerBuilder().withBlockStrategy(new BlockStrategy() { + @Override + public void block(long sleepTime) { + // Don't sleep in tests. + } + }).build(); + + @Rule + public ExpectedException exception = ExpectedException.none(); + + private InMemoryRepositoryManager repoManager; + private Project.NameKey project; + + @Before + public void setUp() throws Exception { + repoManager = new InMemoryRepositoryManager(); + project = new Project.NameKey("project"); + repoManager.createRepository(project); + } + + @Test + public void oneCaller() throws Exception { + int max = 20; + for (int batchSize = 1; batchSize <= 10; batchSize++) { + String name = "batch-size-" + batchSize; + RepoSequence s = newSequence(name, 1, batchSize); + for (int i = 1; i <= max; i++) { + try { + assertThat(s.next()).named("next for " + name).isEqualTo(i); + } catch (OrmException e) { + throw new AssertionError( + "failed batchSize=" + batchSize + ", i=" + i, e); + } + } + assertThat(s.acquireCount) + .named("acquireCount for " + name) + .isEqualTo(divCeil(max, batchSize)); + } + } + + @Test + public void twoCallers() throws Exception { + RepoSequence s1 = newSequence("id", 1, 3); + RepoSequence s2 = newSequence("id", 1, 3); + + // s1 acquires 1-3; s2 acquires 4-6. + assertThat(s1.next()).isEqualTo(1); + assertThat(s2.next()).isEqualTo(4); + assertThat(s1.next()).isEqualTo(2); + assertThat(s2.next()).isEqualTo(5); + assertThat(s1.next()).isEqualTo(3); + assertThat(s2.next()).isEqualTo(6); + + // s2 acquires 7-9; s1 acquires 10-12. + assertThat(s2.next()).isEqualTo(7); + assertThat(s1.next()).isEqualTo(10); + assertThat(s2.next()).isEqualTo(8); + assertThat(s1.next()).isEqualTo(11); + assertThat(s2.next()).isEqualTo(9); + assertThat(s1.next()).isEqualTo(12); + } + + @Test + public void populateEmptyRefWithStartValue() throws Exception { + RepoSequence s = newSequence("id", 1234, 10); + assertThat(s.next()).isEqualTo(1234); + assertThat(readBlob("id")).isEqualTo("1244"); + } + + @Test + public void startIsIgnoredIfRefIsPresent() throws Exception { + writeBlob("id", "1234"); + RepoSequence s = newSequence("id", 3456, 10); + assertThat(s.next()).isEqualTo(1234); + assertThat(readBlob("id")).isEqualTo("1244"); + } + + @Test + public void retryOnLockFailure() throws Exception { + // Seed existing ref value. + writeBlob("id", "1"); + + final AtomicBoolean doneBgUpdate = new AtomicBoolean(false); + Runnable bgUpdate = new Runnable() { + @Override + public void run() { + if (!doneBgUpdate.getAndSet(true)) { + writeBlob("id", "1234"); + } + } + }; + + RepoSequence s = newSequence("id", 1, 10, bgUpdate, RETRYER); + assertThat(doneBgUpdate.get()).isFalse(); + assertThat(s.next()).isEqualTo(1234); + // Single acquire call that results in 2 ref reads. + assertThat(s.acquireCount).isEqualTo(1); + assertThat(doneBgUpdate.get()).isTrue(); + } + + @Test + public void failOnInvalidValue() throws Exception { + ObjectId id = writeBlob("id", "not a number"); + exception.expect(OrmException.class); + exception.expectMessage( + "invalid value in refs/sequences/id blob at " + id.name()); + newSequence("id", 1, 3).next(); + } + + @Test + public void failOnWrongType() throws Exception { + try (Repository repo = repoManager.openRepository(project)) { + TestRepository tr = new TestRepository<>(repo); + tr.branch(RefNames.REFS_SEQUENCES + "id").commit().create(); + try { + newSequence("id", 1, 3).next(); + fail(); + } catch (OrmException e) { + assertThat(e.getCause()).isInstanceOf(ExecutionException.class); + assertThat(e.getCause().getCause()) + .isInstanceOf(IncorrectObjectTypeException.class); + } + } + } + + @Test + public void failAfterRetryerGivesUp() throws Exception { + final AtomicInteger bgCounter = new AtomicInteger(1234); + Runnable bgUpdate = new Runnable() { + @Override + public void run() { + writeBlob("id", Integer.toString(bgCounter.getAndAdd(1000))); + } + }; + RepoSequence s = newSequence( + "id", 1, 10, bgUpdate, + RetryerBuilder. newBuilder() + .withStopStrategy(StopStrategies.stopAfterAttempt(3)) + .build()); + exception.expect(OrmException.class); + exception.expectMessage("failed to update refs/sequences/id: LOCK_FAILURE"); + s.next(); + } + + private RepoSequence newSequence(String name, int start, int batchSize) { + return newSequence( + name, start, batchSize, Runnables.doNothing(), RETRYER); + } + + private RepoSequence newSequence(String name, int start, int batchSize, + Runnable afterReadRef, Retryer retryer) { + return new RepoSequence( + repoManager, project, name, start, batchSize, afterReadRef, retryer); + } + + private ObjectId writeBlob(String sequenceName, String value) { + String refName = RefNames.REFS_SEQUENCES + sequenceName; + try (Repository repo = repoManager.openRepository(project); + ObjectInserter ins = repo.newObjectInserter()) { + ObjectId newId = ins.insert(OBJ_BLOB, value.getBytes(UTF_8)); + ins.flush(); + RefUpdate ru = repo.updateRef(refName); + ru.setNewObjectId(newId); + assertThat(ru.forceUpdate()) + .isAnyOf(RefUpdate.Result.NEW, RefUpdate.Result.FORCED); + return newId; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private String readBlob(String sequenceName) throws Exception { + String refName = RefNames.REFS_SEQUENCES + sequenceName; + try (Repository repo = repoManager.openRepository(project); + RevWalk rw = new RevWalk(repo)) { + ObjectId id = repo.exactRef(refName).getObjectId(); + return new String(rw.getObjectReader().open(id).getCachedBytes(), UTF_8); + } + } + + private static long divCeil(float a, float b) { + return Math.round(Math.ceil(a / b)); + } +} diff --git a/lib/BUCK b/lib/BUCK index 8a2d22b6c6..c37f4bc0f4 100644 --- a/lib/BUCK +++ b/lib/BUCK @@ -68,6 +68,26 @@ maven_jar( license = 'Apache2.0', ) +maven_jar( + name = 'guava-retrying', + id = 'com.github.rholder:guava-retrying:2.0.0', + sha1 = '974bc0a04a11cc4806f7c20a34703bd23c34e7f4', + license = 'Apache2.0', + deps = [':jsr305'], +) + +maven_jar( + name = 'jsr305', + id = 'com.google.code.findbugs:jsr305:2.0.2', + sha1 = '516c03b21d50a644d538de0f0369c620989cd8f0', + license = 'Apache2.0', + attach_source = False, + # Whitelist lib targets that have jsr305 as a dependency. Generally speaking + # Gerrit core should not depend on these annotations, and instead use + # equivalent annotations in com.google.gerrit.common. + visibility = ['//lib:guava-retrying'], +) + maven_jar( name = 'velocity', id = 'org.apache.velocity:velocity:1.7',