Finish NoteDbMigrator
The final phase is converting all changes to NoteDb primary. This is very straightforward, as PrimaryStorageMigrator was designed to be run in arbitrary order with arbitrary parallelism. The only tricky thing is that this phase changes to _NOTE_DB_PRIMARY before doing any work, then NOTE_DB(_UNFUSED) after. The only way it's possible to iterate through the main state loop in _PRIMARY is if there was a failure, in which case we run the failed step again on the next execution. This exposed one now-faulty assumption in the NoteDbBatchUpdate implementations that the specific ReviewDb instance handed to the constructor has its changes tables disabled. It's possible for the config read from the NotesMigration to determine which BatchUpdate implementation to use to differ from the bits used by NotesMigrationSchemaFactory at context creation time to open a ReviewDb. There's no harm in removing these overly-conservative checks. Change-Id: I0a7def4dc7e1da18eb928920b2538225fc6fea57
This commit is contained in:
@@ -17,6 +17,7 @@ package com.google.gerrit.acceptance.server.notedb;
|
||||
import static com.google.common.truth.Truth.assertThat;
|
||||
import static com.google.common.truth.Truth8.assertThat;
|
||||
import static com.google.common.truth.TruthJUnit.assume;
|
||||
import static com.google.gerrit.server.notedb.NotesMigrationState.NOTE_DB_UNFUSED;
|
||||
import static com.google.gerrit.server.notedb.NotesMigrationState.READ_WRITE_NO_SEQUENCE;
|
||||
import static com.google.gerrit.server.notedb.NotesMigrationState.READ_WRITE_WITH_SEQUENCE_NOTE_DB_PRIMARY;
|
||||
import static com.google.gerrit.server.notedb.NotesMigrationState.READ_WRITE_WITH_SEQUENCE_REVIEW_DB_PRIMARY;
|
||||
@@ -292,6 +293,50 @@ public class OnlineNoteDbMigrationIT extends AbstractDaemonTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void fullMigration() throws Exception {
|
||||
PushOneCommit.Result r = createChange();
|
||||
Change.Id id = r.getChange().getId();
|
||||
|
||||
migrate(b -> b);
|
||||
assertNotesMigrationState(NOTE_DB_UNFUSED);
|
||||
|
||||
assertThat(sequences.nextChangeId()).isEqualTo(502);
|
||||
|
||||
ObjectId oldMetaId;
|
||||
int rowVersion;
|
||||
try (ReviewDb db = schemaFactory.open();
|
||||
Repository repo = repoManager.openRepository(project)) {
|
||||
Ref ref = repo.exactRef(RefNames.changeMetaRef(id));
|
||||
assertThat(ref).isNotNull();
|
||||
oldMetaId = ref.getObjectId();
|
||||
|
||||
Change c = db.changes().get(id);
|
||||
assertThat(c.getTopic()).isNull();
|
||||
rowVersion = c.getRowVersion();
|
||||
NoteDbChangeState s = NoteDbChangeState.parse(c);
|
||||
assertThat(s.getPrimaryStorage()).isEqualTo(PrimaryStorage.NOTE_DB);
|
||||
assertThat(s.getRefState()).isEmpty();
|
||||
}
|
||||
|
||||
// Do not open a new context, to simulate races with other threads that opened a context earlier
|
||||
// in the migration process; this needs to work.
|
||||
gApi.changes().id(id.get()).topic(name("a-topic"));
|
||||
|
||||
// Of course, it should also work with a new context.
|
||||
resetCurrentApiUser();
|
||||
gApi.changes().id(id.get()).topic(name("another-topic"));
|
||||
|
||||
try (ReviewDb db = schemaFactory.open();
|
||||
Repository repo = repoManager.openRepository(project)) {
|
||||
assertThat(repo.exactRef(RefNames.changeMetaRef(id)).getObjectId()).isNotEqualTo(oldMetaId);
|
||||
|
||||
Change c = db.changes().get(id);
|
||||
assertThat(c.getTopic()).isNull();
|
||||
assertThat(c.getRowVersion()).isEqualTo(rowVersion);
|
||||
}
|
||||
}
|
||||
|
||||
private void assertNotesMigrationState(NotesMigrationState expected) throws Exception {
|
||||
assertThat(NotesMigrationState.forNotesMigration(notesMigration)).hasValue(expected);
|
||||
gerritConfig.load();
|
||||
|
||||
@@ -16,20 +16,21 @@ package com.google.gerrit.server.notedb.rebuild;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static com.google.common.base.Preconditions.checkState;
|
||||
import static com.google.gerrit.reviewdb.server.ReviewDbUtil.unwrapDb;
|
||||
import static com.google.gerrit.server.notedb.NotesMigrationState.NOTE_DB_UNFUSED;
|
||||
import static com.google.gerrit.server.notedb.NotesMigrationState.READ_WRITE_NO_SEQUENCE;
|
||||
import static com.google.gerrit.server.notedb.NotesMigrationState.READ_WRITE_WITH_SEQUENCE_NOTE_DB_PRIMARY;
|
||||
import static com.google.gerrit.server.notedb.NotesMigrationState.READ_WRITE_WITH_SEQUENCE_REVIEW_DB_PRIMARY;
|
||||
import static com.google.gerrit.server.notedb.NotesMigrationState.WRITE;
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static java.util.Comparator.comparing;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Predicates;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableListMultimap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.MultimapBuilder;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.collect.SetMultimap;
|
||||
@@ -53,6 +54,7 @@ import com.google.gerrit.server.git.WorkQueue;
|
||||
import com.google.gerrit.server.notedb.ConfigNotesMigration;
|
||||
import com.google.gerrit.server.notedb.NotesMigration;
|
||||
import com.google.gerrit.server.notedb.NotesMigrationState;
|
||||
import com.google.gerrit.server.notedb.PrimaryStorageMigrator;
|
||||
import com.google.gerrit.server.notedb.RepoSequence;
|
||||
import com.google.gerrit.server.notedb.rebuild.ChangeRebuilder.NoPatchSetsException;
|
||||
import com.google.gwtorm.server.OrmException;
|
||||
@@ -93,6 +95,7 @@ public class NoteDbMigrator implements AutoCloseable {
|
||||
private final ChangeRebuilder rebuilder;
|
||||
private final WorkQueue workQueue;
|
||||
private final NotesMigration globalNotesMigration;
|
||||
private final PrimaryStorageMigrator primaryStorageMigrator;
|
||||
|
||||
private int threads;
|
||||
private ImmutableList<Project.NameKey> projects = ImmutableList.of();
|
||||
@@ -112,7 +115,8 @@ public class NoteDbMigrator implements AutoCloseable {
|
||||
AllProjectsName allProjects,
|
||||
ChangeRebuilder rebuilder,
|
||||
WorkQueue workQueue,
|
||||
NotesMigration globalNotesMigration) {
|
||||
NotesMigration globalNotesMigration,
|
||||
PrimaryStorageMigrator primaryStorageMigrator) {
|
||||
this.cfg = cfg;
|
||||
this.sitePaths = sitePaths;
|
||||
this.schemaFactory = schemaFactory;
|
||||
@@ -121,6 +125,7 @@ public class NoteDbMigrator implements AutoCloseable {
|
||||
this.rebuilder = rebuilder;
|
||||
this.workQueue = workQueue;
|
||||
this.globalNotesMigration = globalNotesMigration;
|
||||
this.primaryStorageMigrator = primaryStorageMigrator;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -258,6 +263,7 @@ public class NoteDbMigrator implements AutoCloseable {
|
||||
allProjects,
|
||||
rebuilder,
|
||||
globalNotesMigration,
|
||||
primaryStorageMigrator,
|
||||
threads > 1
|
||||
? MoreExecutors.listeningDecorator(workQueue.createQueue(threads, "RebuildChange"))
|
||||
: MoreExecutors.newDirectExecutorService(),
|
||||
@@ -277,6 +283,7 @@ public class NoteDbMigrator implements AutoCloseable {
|
||||
private final AllProjectsName allProjects;
|
||||
private final ChangeRebuilder rebuilder;
|
||||
private final NotesMigration globalNotesMigration;
|
||||
private final PrimaryStorageMigrator primaryStorageMigrator;
|
||||
|
||||
private final ListeningExecutorService executor;
|
||||
private final ImmutableList<Project.NameKey> projects;
|
||||
@@ -294,6 +301,7 @@ public class NoteDbMigrator implements AutoCloseable {
|
||||
AllProjectsName allProjects,
|
||||
ChangeRebuilder rebuilder,
|
||||
NotesMigration globalNotesMigration,
|
||||
PrimaryStorageMigrator primaryStorageMigrator,
|
||||
ListeningExecutorService executor,
|
||||
ImmutableList<Project.NameKey> projects,
|
||||
ImmutableList<Change.Id> changes,
|
||||
@@ -315,6 +323,7 @@ public class NoteDbMigrator implements AutoCloseable {
|
||||
this.repoManager = repoManager;
|
||||
this.allProjects = allProjects;
|
||||
this.globalNotesMigration = globalNotesMigration;
|
||||
this.primaryStorageMigrator = primaryStorageMigrator;
|
||||
this.gerritConfig = new FileBasedConfig(sitePaths.gerrit_config.toFile(), FS.detect());
|
||||
this.executor = executor;
|
||||
this.projects = projects;
|
||||
@@ -387,11 +396,16 @@ public class NoteDbMigrator implements AutoCloseable {
|
||||
state = rebuildAndEnableReads(state);
|
||||
rebuilt = true;
|
||||
} else {
|
||||
state = setNoteDbPrimary();
|
||||
state = setNoteDbPrimary(state);
|
||||
}
|
||||
break;
|
||||
case READ_WRITE_WITH_SEQUENCE_NOTE_DB_PRIMARY:
|
||||
state = disableReviewDb();
|
||||
// The only way we can get here is if there was a failure on a previous run of
|
||||
// setNoteDbPrimary, since that method moves to NOTE_DB_UNFUSED if it completes
|
||||
// successfully. Assume that not all changes were converted and re-run the step.
|
||||
// migrateToNoteDbPrimary is a relatively fast no-op for already-migrated changes, so this
|
||||
// isn't actually repeating work.
|
||||
state = setNoteDbPrimary(state);
|
||||
break;
|
||||
case NOTE_DB_UNFUSED:
|
||||
// Done!
|
||||
@@ -435,12 +449,61 @@ public class NoteDbMigrator implements AutoCloseable {
|
||||
return saveState(prev, READ_WRITE_WITH_SEQUENCE_REVIEW_DB_PRIMARY);
|
||||
}
|
||||
|
||||
private NotesMigrationState setNoteDbPrimary() {
|
||||
throw new UnsupportedOperationException("not yet implemented");
|
||||
private NotesMigrationState setNoteDbPrimary(NotesMigrationState prev)
|
||||
throws MigrationException, OrmException, IOException {
|
||||
checkState(
|
||||
projects.isEmpty() && changes.isEmpty(),
|
||||
"Should not have attempted setNoteDbPrimary with a subset of changes");
|
||||
checkState(
|
||||
prev == READ_WRITE_WITH_SEQUENCE_REVIEW_DB_PRIMARY
|
||||
|| prev == READ_WRITE_WITH_SEQUENCE_NOTE_DB_PRIMARY,
|
||||
"Unexpected start state for setNoteDbPrimary: %s",
|
||||
prev);
|
||||
|
||||
// Before changing the primary storage of old changes, ensure new changes are created with
|
||||
// NoteDb primary.
|
||||
prev = saveState(prev, READ_WRITE_WITH_SEQUENCE_NOTE_DB_PRIMARY);
|
||||
|
||||
Stopwatch sw = Stopwatch.createStarted();
|
||||
log.info("Setting primary storage to NoteDb");
|
||||
List<Change.Id> allChanges;
|
||||
try (ReviewDb db = unwrapDb(schemaFactory.open())) {
|
||||
allChanges = Streams.stream(db.changes().all()).map(Change::getId).collect(toList());
|
||||
}
|
||||
|
||||
private NotesMigrationState disableReviewDb() {
|
||||
throw new UnsupportedOperationException("not yet implemented");
|
||||
List<ListenableFuture<Boolean>> futures =
|
||||
allChanges
|
||||
.stream()
|
||||
.map(
|
||||
id ->
|
||||
executor.submit(
|
||||
() -> {
|
||||
// TODO(dborowitz): Avoid reopening db if using a single thread.
|
||||
try (ReviewDb db = unwrapDb(schemaFactory.open())) {
|
||||
primaryStorageMigrator.migrateToNoteDbPrimary(id);
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
log.error("Error migrating primary storage for " + id, e);
|
||||
return false;
|
||||
}
|
||||
}))
|
||||
.collect(toList());
|
||||
|
||||
boolean ok = futuresToBoolean(futures, "Error migrating primary storage");
|
||||
double t = sw.elapsed(TimeUnit.MILLISECONDS) / 1000d;
|
||||
log.info(
|
||||
String.format(
|
||||
"Migrated primary storage of %d changes in %.01fs (%.01f/s)\n",
|
||||
allChanges.size(), t, allChanges.size() / t));
|
||||
if (!ok) {
|
||||
throw new MigrationException("Migrating primary storage for some changes failed, see log");
|
||||
}
|
||||
|
||||
return disableReviewDb(prev);
|
||||
}
|
||||
|
||||
private NotesMigrationState disableReviewDb(NotesMigrationState prev) throws IOException {
|
||||
return saveState(prev, NOTE_DB_UNFUSED);
|
||||
}
|
||||
|
||||
private Optional<NotesMigrationState> loadState() throws IOException {
|
||||
@@ -485,7 +548,6 @@ public class NoteDbMigrator implements AutoCloseable {
|
||||
if (!globalNotesMigration.commitChangeWrites()) {
|
||||
throw new MigrationException("Cannot rebuild without noteDb.changes.write=true");
|
||||
}
|
||||
boolean ok;
|
||||
Stopwatch sw = Stopwatch.createStarted();
|
||||
log.info("Rebuilding changes in NoteDb");
|
||||
|
||||
@@ -507,13 +569,7 @@ public class NoteDbMigrator implements AutoCloseable {
|
||||
futures.add(future);
|
||||
}
|
||||
|
||||
try {
|
||||
ok = Iterables.all(Futures.allAsList(futures).get(), Predicates.equalTo(true));
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
log.error("Error rebuilding projects", e);
|
||||
ok = false;
|
||||
}
|
||||
|
||||
boolean ok = futuresToBoolean(futures, "Error rebuilding projects");
|
||||
double t = sw.elapsed(TimeUnit.MILLISECONDS) / 1000d;
|
||||
log.info(
|
||||
String.format(
|
||||
@@ -593,4 +649,13 @@ public class NoteDbMigrator implements AutoCloseable {
|
||||
}
|
||||
return ok;
|
||||
}
|
||||
|
||||
private static boolean futuresToBoolean(List<ListenableFuture<Boolean>> futures, String errMsg) {
|
||||
try {
|
||||
return Futures.allAsList(futures).get().stream().allMatch(b -> b);
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
log.error(errMsg, e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -286,7 +286,6 @@ class FusedNoteDbBatchUpdate extends BatchUpdate {
|
||||
@Assisted CurrentUser user,
|
||||
@Assisted Timestamp when) {
|
||||
super(repoManager, serverIdent, project, user, when);
|
||||
checkArgument(!db.changesTablesEnabled(), "expected Change tables to be disabled on %s", db);
|
||||
this.changeNotesFactory = changeNotesFactory;
|
||||
this.changeControlFactory = changeControlFactory;
|
||||
this.changeUpdateFactory = changeUpdateFactory;
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
|
||||
package com.google.gerrit.server.update;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.base.Preconditions.checkNotNull;
|
||||
import static java.util.Comparator.comparing;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
@@ -267,7 +266,6 @@ class UnfusedNoteDbBatchUpdate extends BatchUpdate {
|
||||
@Assisted CurrentUser user,
|
||||
@Assisted Timestamp when) {
|
||||
super(repoManager, serverIdent, project, user, when);
|
||||
checkArgument(!db.changesTablesEnabled(), "expected Change tables to be disabled on %s", db);
|
||||
this.changeNotesFactory = changeNotesFactory;
|
||||
this.changeControlFactory = changeControlFactory;
|
||||
this.changeUpdateFactory = changeUpdateFactory;
|
||||
|
||||
Reference in New Issue
Block a user