From 3f4a1748b22003539e797d7ed36dc41535e1b0b2 Mon Sep 17 00:00:00 2001 From: Dave Borowitz Date: Mon, 12 May 2014 13:52:23 -0700 Subject: [PATCH] Create RebuildNoteDb to migrate data from ReviewDb to notedb RebuildNotedb is responsible for the migration of pre-existing changes in the ReviewDb to the notedb. It asynchronously rebuilds every change in the ReviewDb and all of the related pieces of metadata that can be stored in the notedb (i.e. PatchSets and PatchSetApprovals). As we add the ability to store more data in the notedb, this can be updated to migrate that data as well. Change-Id: If0fb123630a08ef514611944e7bd0ab1d9e48f80 --- .../com/google/gerrit/pgm/RebuildNotedb.java | 186 +++++++++++++++ .../server/notedb/AbstractChangeUpdate.java | 4 + .../gerrit/server/notedb/ChangeRebuilder.java | 225 ++++++++++++++++++ .../gerrit/server/notedb/NotesMigration.java | 4 +- 4 files changed, 416 insertions(+), 3 deletions(-) create mode 100644 gerrit-pgm/src/main/java/com/google/gerrit/pgm/RebuildNotedb.java create mode 100644 gerrit-server/src/main/java/com/google/gerrit/server/notedb/ChangeRebuilder.java diff --git a/gerrit-pgm/src/main/java/com/google/gerrit/pgm/RebuildNotedb.java b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/RebuildNotedb.java new file mode 100644 index 0000000000..db5058e18d --- /dev/null +++ b/gerrit-pgm/src/main/java/com/google/gerrit/pgm/RebuildNotedb.java @@ -0,0 +1,186 @@ +// Copyright (C) 2014 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.gerrit.pgm; + +import static com.google.gerrit.server.schema.DataSourceProvider.Context.MULTI_USER; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.gerrit.lifecycle.LifecycleManager; +import com.google.gerrit.pgm.util.BatchGitModule; +import com.google.gerrit.pgm.util.BatchProgramModule; +import com.google.gerrit.pgm.util.SiteProgram; +import com.google.gerrit.pgm.util.ThreadLimiter; +import com.google.gerrit.reviewdb.client.Change; +import com.google.gerrit.reviewdb.server.ReviewDb; +import com.google.gerrit.server.git.MultiProgressMonitor; +import com.google.gerrit.server.git.MultiProgressMonitor.Task; +import com.google.gerrit.server.git.WorkQueue; +import com.google.gerrit.server.notedb.ChangeRebuilder; +import com.google.gerrit.server.notedb.NoteDbModule; +import com.google.gerrit.server.notedb.NotesMigration; +import com.google.gwtorm.server.OrmException; +import com.google.gwtorm.server.SchemaFactory; +import com.google.inject.AbstractModule; +import com.google.inject.Injector; +import com.google.inject.Key; +import com.google.inject.TypeLiteral; + +import org.kohsuke.args4j.Option; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class RebuildNotedb extends SiteProgram { + private static final Logger log = + LoggerFactory.getLogger(RebuildNotedb.class); + + @Option(name = "--threads", usage = "Number of threads to use for indexing") + private int threads = Runtime.getRuntime().availableProcessors(); + + private Injector dbInjector; + private Injector sysInjector; + + @Override + public int run() throws Exception { + mustHaveValidSite(); + dbInjector = createDbInjector(MULTI_USER); + threads = ThreadLimiter.limitThreads(dbInjector, threads); + + LifecycleManager dbManager = new LifecycleManager(); + dbManager.add(dbInjector); + dbManager.start(); + + sysInjector = createSysInjector(); + LifecycleManager sysManager = new LifecycleManager(); + sysManager.add(sysInjector); + sysManager.start(); + + ListeningExecutorService executor = newExecutor(); + final MultiProgressMonitor mpm = + new MultiProgressMonitor(System.out, "Rebuilding notedb"); + final Task doneTask = + mpm.beginSubTask("changes", MultiProgressMonitor.UNKNOWN); + final Task failedTask = + mpm.beginSubTask("failed", MultiProgressMonitor.UNKNOWN); + ChangeRebuilder rebuilder = sysInjector.getInstance(ChangeRebuilder.class); + + List allChanges = getAllChanges(); + final List> futures = Lists.newArrayList(); + final AtomicBoolean ok = new AtomicBoolean(true); + Stopwatch sw = Stopwatch.createStarted(); + for (final Change c : allChanges) { + final ListenableFuture future = rebuilder.rebuildAsync(c, executor); + futures.add(future); + future.addListener(new Runnable() { + @Override + public void run() { + try { + future.get(); + doneTask.update(1); + } catch (ExecutionException | InterruptedException e) { + fail(e); + } catch (RuntimeException e) { + failAndThrow(e); + } catch (Error e) { + // Can't join with RuntimeException because "RuntimeException | + // Error" becomes Throwable, which messes with signatures. + failAndThrow(e); + } + } + + private void fail(Throwable t) { + log.error("Failed to rebuild change " + c.getId(), t); + ok.set(false); + failedTask.update(1); + } + + private void failAndThrow(RuntimeException e) { + fail(e); + throw e; + } + + private void failAndThrow(Error e) { + fail(e); + throw e; + } + }, MoreExecutors.sameThreadExecutor()); + } + try { + mpm.waitFor(Futures.transform(Futures.successfulAsList(futures), + new AsyncFunction, Void>() { + @Override + public ListenableFuture apply(List input) { + mpm.end(); + return Futures.immediateFuture(null); + } + })); + } catch (ExecutionException e) { + log.error("Error rebuilding notedb", e); + ok.set(false); + } + double t = sw.elapsed(TimeUnit.MILLISECONDS) / 1000d; + System.out.format("Rebuild %d changes in %.01fs (%.01f/s)\n", + allChanges.size(), t, allChanges.size() / t); + return ok.get() ? 0 : 1; + } + + private Injector createSysInjector() { + return dbInjector.createChildInjector(new AbstractModule() { + @Override + public void configure() { + install(dbInjector.getInstance(BatchProgramModule.class)); + install(new BatchGitModule()); + install(new NoteDbModule()); + bind(NotesMigration.class).toInstance(NotesMigration.allEnabled()); + } + }); + } + + private ListeningExecutorService newExecutor() { + if (threads > 0) { + return MoreExecutors.listeningDecorator( + dbInjector.getInstance(WorkQueue.class) + .createQueue(threads, "RebuildChange")); + } else { + return MoreExecutors.sameThreadExecutor(); + } + } + + private List getAllChanges() throws OrmException { + // Memoize all changes to a list so we can close the db connection and allow + // rebuilder threads to use the full connection pool. + // TODO(dborowitz): May need to batch changes, e.g. by project (though note + // that unlike Reindex, we don't think there is an inherent benefit to + // grouping by project), to avoid wasting too much memory here. + SchemaFactory schemaFactory = sysInjector.getInstance(Key.get( + new TypeLiteral>() {})); + ReviewDb db = schemaFactory.open(); + try { + return db.changes().all().toList(); + } finally { + db.close(); + } + } +} diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/notedb/AbstractChangeUpdate.java b/gerrit-server/src/main/java/com/google/gerrit/server/notedb/AbstractChangeUpdate.java index f114af0d77..6d804d3fe1 100644 --- a/gerrit-server/src/main/java/com/google/gerrit/server/notedb/AbstractChangeUpdate.java +++ b/gerrit-server/src/main/java/com/google/gerrit/server/notedb/AbstractChangeUpdate.java @@ -81,6 +81,10 @@ public abstract class AbstractChangeUpdate extends VersionedMetaData { return (IdentifiedUser) ctl.getCurrentUser(); } + public PatchSet.Id getPatchSetId() { + return psId; + } + public void setPatchSetId(PatchSet.Id psId) { checkArgument(psId == null || psId.getParentKey().equals(getChange().getId())); diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/notedb/ChangeRebuilder.java b/gerrit-server/src/main/java/com/google/gerrit/server/notedb/ChangeRebuilder.java new file mode 100644 index 0000000000..861188c01c --- /dev/null +++ b/gerrit-server/src/main/java/com/google/gerrit/server/notedb/ChangeRebuilder.java @@ -0,0 +1,225 @@ +// Copyright (C) 2014 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.gerrit.server.notedb; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.base.Objects; +import com.google.common.collect.ComparisonChain; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.gerrit.reviewdb.client.Account; +import com.google.gerrit.reviewdb.client.Change; +import com.google.gerrit.reviewdb.client.PatchSet; +import com.google.gerrit.reviewdb.client.PatchSetApproval; +import com.google.gerrit.reviewdb.server.ReviewDb; +import com.google.gerrit.server.GerritPersonIdent; +import com.google.gerrit.server.IdentifiedUser; +import com.google.gerrit.server.git.GitRepositoryManager; +import com.google.gerrit.server.git.VersionedMetaData.BatchMetaDataUpdate; +import com.google.gerrit.server.project.ChangeControl; +import com.google.gerrit.server.project.NoSuchChangeException; +import com.google.gwtorm.server.OrmException; +import com.google.inject.Inject; +import com.google.inject.Provider; + +import org.eclipse.jgit.lib.BatchRefUpdate; +import org.eclipse.jgit.lib.CommitBuilder; +import org.eclipse.jgit.lib.PersonIdent; + +import java.io.IOException; +import java.sql.Timestamp; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +public class ChangeRebuilder { + private static final long TS_WINDOW_MS = + TimeUnit.MILLISECONDS.convert(1, TimeUnit.SECONDS); + + private final PersonIdent serverIdent; + private final Provider dbProvider; + private final GitRepositoryManager repoManager; + private final ChangeControl.GenericFactory controlFactory; + private final IdentifiedUser.GenericFactory userFactory; + private final ChangeUpdate.Factory updateFactory; + + @Inject + ChangeRebuilder(@GerritPersonIdent PersonIdent serverIdent, + Provider dbProvider, + GitRepositoryManager repoManager, + ChangeControl.GenericFactory controlFactory, + IdentifiedUser.GenericFactory userFactory, + ChangeUpdate.Factory updateFactory) { + this.serverIdent = serverIdent; + this.dbProvider = dbProvider; + this.repoManager = repoManager; + this.controlFactory = controlFactory; + this.userFactory = userFactory; + this.updateFactory = updateFactory; + } + + public ListenableFuture rebuildAsync( + final Change change, ListeningExecutorService executor) { + return executor.submit(new Callable() { + @Override + public Void call() throws Exception { + rebuild(change, null); + return null; + } + }); + } + + public void rebuild(Change change, BatchRefUpdate bru) + throws NoSuchChangeException, IOException, OrmException { + ReviewDb db = dbProvider.get(); + Change.Id changeId = change.getId(); + + List events = Lists.newArrayList(); + for (PatchSet ps : db.patchSets().byChange(changeId)) { + events.add(new PatchSetEvent(ps)); + } + for (PatchSetApproval psa : db.patchSetApprovals().byChange(changeId)) { + events.add(new ApprovalEvent(psa)); + } + Collections.sort(events); + + BatchMetaDataUpdate batch = null; + ChangeUpdate update = null; + for (Event e : events) { + if (!sameUpdate(e, update)) { + if (update != null) { + writeToBatch(batch, update); + } + IdentifiedUser user = userFactory.create(dbProvider, e.who); + update = updateFactory.create( + controlFactory.controlFor(change, user), e.when); + update.setPatchSetId(e.psId); + if (batch == null) { + batch = update.openUpdate(); + } + } + e.apply(update); + } + if (batch != null) { + if (update != null) { + writeToBatch(batch, update); + } + batch.commit(); + } + } + + private void writeToBatch(BatchMetaDataUpdate batch, ChangeUpdate update) + throws IOException { + CommitBuilder commit = new CommitBuilder(); + commit.setCommitter(new PersonIdent(serverIdent, update.getWhen())); + batch.write(update, commit); + } + + private static long round(Date when) { + return when.getTime() / TS_WINDOW_MS; + } + + private static boolean sameUpdate(Event event, ChangeUpdate update) { + return update != null + && round(event.when) == round(update.getWhen()) + && event.who.equals(update.getUser()) + && event.psId.equals(update.getPatchSetId()); + } + + private static abstract class Event implements Comparable { + final PatchSet.Id psId; + final Account.Id who; + final Timestamp when; + + protected Event(PatchSet.Id psId, Account.Id who, Timestamp when) { + this.psId = psId; + this.who = who; + this.when = when; + } + + protected void checkUpdate(ChangeUpdate update) { + checkState(Objects.equal(update.getPatchSetId(), psId), + "cannot apply event for %s to update for %s", + update.getPatchSetId(), psId); + checkState(when.getTime() - update.getWhen().getTime() <= TS_WINDOW_MS, + "event at %s outside update window starting at %s", + when, update.getWhen()); + checkState(Objects.equal(update.getUser().getAccountId(), who), + "cannot apply event by %s to update by %s", + who, update.getUser().getAccountId()); + } + + abstract void apply(ChangeUpdate update); + + @Override + public int compareTo(Event other) { + return ComparisonChain.start() + // TODO(dborowitz): Smarter bucketing: pick a bucket start time T and + // include all events up to T + TS_WINDOW_MS but no further. + // Interleaving different authors complicates things. + .compare(round(when), round(other.when)) + .compare(who.get(), other.who.get()) + .compare(psId.get(), other.psId.get()) + .result(); + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("psId", psId) + .add("who", who) + .add("when", when) + .toString(); + } + } + + private static class ApprovalEvent extends Event { + private PatchSetApproval psa; + + ApprovalEvent(PatchSetApproval psa) { + super(psa.getPatchSetId(), psa.getAccountId(), psa.getGranted()); + this.psa = psa; + } + + @Override + void apply(ChangeUpdate update) { + checkUpdate(update); + update.putApproval(psa.getLabel(), psa.getValue()); + } + } + + private static class PatchSetEvent extends Event { + private final PatchSet ps; + + PatchSetEvent(PatchSet ps) { + super(ps.getId(), ps.getUploader(), ps.getCreatedOn()); + this.ps = ps; + } + + @Override + void apply(ChangeUpdate update) { + checkUpdate(update); + if (ps.getPatchSetId() == 1) { + update.setSubject("Create change"); + } else { + update.setSubject("Create patch set " + ps.getPatchSetId()); + } + } + } +} diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/notedb/NotesMigration.java b/gerrit-server/src/main/java/com/google/gerrit/server/notedb/NotesMigration.java index 679e27287e..d2747bf8bf 100644 --- a/gerrit-server/src/main/java/com/google/gerrit/server/notedb/NotesMigration.java +++ b/gerrit-server/src/main/java/com/google/gerrit/server/notedb/NotesMigration.java @@ -14,7 +14,6 @@ package com.google.gerrit.server.notedb; -import com.google.common.annotations.VisibleForTesting; import com.google.gerrit.server.config.GerritServerConfig; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -30,8 +29,7 @@ import org.eclipse.jgit.lib.Config; */ @Singleton public class NotesMigration { - @VisibleForTesting - static NotesMigration allEnabled() { + public static NotesMigration allEnabled() { Config cfg = new Config(); cfg.setBoolean("notedb", null, "write", true); cfg.setBoolean("notedb", "patchSetApprovals", "read", true);