BatchUpdate: Update/insert/delete change exactly once

The gwtorm backend for gerrit-review has the very unfortunate property
of not supporting multiple updates to the same entity in one
transaction. This has always been the case, but the recent
reorganization of most update code into composable BatchUpdate.Ops has
brought the issue to the fore: different Ops in different locations
may update different parts of an entity, and may all reasonably want
to call changes().update(c) to save their updates.

Prior to the BatchUpdate refactoring, this was not a problem, because
there was less use of transactions. But now that we're doing more
things in transactions (which, remember, is generally a good thing for
performance reasons), this is more likely to crop up.

Wrap the ReviewDb available to ChangeContext with one that does not
support directly modifying the changes table, and replace it with a
handful of *idempotent* methods for indicating that the change should
be updated later. This loses the ability to read back changes that
were previously written in the transaction, but since all Ops should
share a common ChangeContext instance and hence a common mutable
Change instance, they can just read from that.

This change only solves the problem for the Changes table. However,
that should be the most common place where this crops up, since there
are so many fields in Change. Other operations tend to modify
non-overlapping entities (Idd16eaef notwithstanding), for example
one op inserts a new PatchSet and another op adds a ChangeMessage.

Change-Id: Ie7236c2630707df70d452b3f9c014d5591e36aaf
This commit is contained in:
Dave Borowitz
2016-01-15 10:13:53 -05:00
parent feda0ff046
commit fd6ace7595
14 changed files with 402 additions and 25 deletions

View File

@@ -0,0 +1,275 @@
// Copyright (C) 2016 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.reviewdb.server;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.gerrit.reviewdb.client.Change;
import com.google.gwtorm.server.Access;
import com.google.gwtorm.server.AtomicUpdate;
import com.google.gwtorm.server.OrmException;
import com.google.gwtorm.server.ResultSet;
import com.google.gwtorm.server.StatementExecutor;
import java.util.Map;
public class ReviewDbWrapper implements ReviewDb {
protected final ReviewDb delegate;
protected ReviewDbWrapper(ReviewDb delegate) {
this.delegate = checkNotNull(delegate);
}
@Override
public void commit() throws OrmException {
delegate.commit();
}
@Override
public void rollback() throws OrmException {
delegate.rollback();
}
@Override
public void updateSchema(StatementExecutor e) throws OrmException {
delegate.updateSchema(e);
}
@Override
public void pruneSchema(StatementExecutor e) throws OrmException {
delegate.pruneSchema(e);
}
@Override
public Access<?, ?>[] allRelations() {
return delegate.allRelations();
}
@Override
public void close() {
delegate.close();
}
@Override
public SchemaVersionAccess schemaVersion() {
return delegate.schemaVersion();
}
@Override
public SystemConfigAccess systemConfig() {
return delegate.systemConfig();
}
@Override
public AccountAccess accounts() {
return delegate.accounts();
}
@Override
public AccountExternalIdAccess accountExternalIds() {
return delegate.accountExternalIds();
}
@Override
public AccountSshKeyAccess accountSshKeys() {
return delegate.accountSshKeys();
}
@Override
public AccountGroupAccess accountGroups() {
return delegate.accountGroups();
}
@Override
public AccountGroupNameAccess accountGroupNames() {
return delegate.accountGroupNames();
}
@Override
public AccountGroupMemberAccess accountGroupMembers() {
return delegate.accountGroupMembers();
}
@Override
public AccountGroupMemberAuditAccess accountGroupMembersAudit() {
return delegate.accountGroupMembersAudit();
}
@Override
public StarredChangeAccess starredChanges() {
return delegate.starredChanges();
}
@Override
public AccountProjectWatchAccess accountProjectWatches() {
return delegate.accountProjectWatches();
}
@Override
public AccountPatchReviewAccess accountPatchReviews() {
return delegate.accountPatchReviews();
}
@Override
public ChangeAccess changes() {
return delegate.changes();
}
@Override
public PatchSetApprovalAccess patchSetApprovals() {
return delegate.patchSetApprovals();
}
@Override
public ChangeMessageAccess changeMessages() {
return delegate.changeMessages();
}
@Override
public PatchSetAccess patchSets() {
return delegate.patchSets();
}
@Override
public PatchLineCommentAccess patchComments() {
return delegate.patchComments();
}
@Override
public SubmoduleSubscriptionAccess submoduleSubscriptions() {
return delegate.submoduleSubscriptions();
}
@Override
public AccountGroupByIdAccess accountGroupById() {
return delegate.accountGroupById();
}
@Override
public AccountGroupByIdAudAccess accountGroupByIdAud() {
return delegate.accountGroupByIdAud();
}
@Override
public int nextAccountId() throws OrmException {
return delegate.nextAccountId();
}
@Override
public int nextAccountGroupId() throws OrmException {
return delegate.nextAccountGroupId();
}
@Override
@SuppressWarnings("deprecation")
public int nextChangeId() throws OrmException {
return delegate.nextChangeId();
}
@Override
public int nextChangeMessageId() throws OrmException {
return delegate.nextChangeMessageId();
}
public static class ChangeAccessWrapper implements ChangeAccess {
protected final ChangeAccess delegate;
protected ChangeAccessWrapper(ChangeAccess delegate) {
this.delegate = checkNotNull(delegate);
}
@Override
public String getRelationName() {
return delegate.getRelationName();
}
@Override
public int getRelationID() {
return delegate.getRelationID();
}
@Override
public ResultSet<Change> iterateAllEntities() throws OrmException {
return delegate.iterateAllEntities();
}
@Override
public Change.Id primaryKey(Change entity) {
return delegate.primaryKey(entity);
}
@Override
public Map<Change.Id, Change> toMap(Iterable<Change> c) {
return delegate.toMap(c);
}
@Override
public CheckedFuture<Change, OrmException> getAsync(Change.Id key) {
return delegate.getAsync(key);
}
@Override
public ResultSet<Change> get(Iterable<Change.Id> keys) throws OrmException {
return delegate.get(keys);
}
@Override
public void insert(Iterable<Change> instances) throws OrmException {
delegate.insert(instances);
}
@Override
public void update(Iterable<Change> instances) throws OrmException {
delegate.update(instances);
}
@Override
public void upsert(Iterable<Change> instances) throws OrmException {
delegate.upsert(instances);
}
@Override
public void deleteKeys(Iterable<Change.Id> keys) throws OrmException {
delegate.deleteKeys(keys);
}
@Override
public void delete(Iterable<Change> instances) throws OrmException {
delegate.delete(instances);
}
@Override
public void beginTransaction(Change.Id key) throws OrmException {
delegate.beginTransaction(key);
}
@Override
public Change atomicUpdate(Change.Id key, AtomicUpdate<Change> update)
throws OrmException {
return delegate.atomicUpdate(key, update);
}
@Override
public Change get(Change.Id id) throws OrmException {
return delegate.get(id);
}
@Override
public ResultSet<Change> all() throws OrmException {
return delegate.all();
}
}
}

View File

@@ -48,8 +48,6 @@ import com.google.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
@Singleton
public class Abandon implements RestModifyView<ChangeResource, AbandonInput>,
UiAction<ChangeResource> {
@@ -130,7 +128,7 @@ public class Abandon implements RestModifyView<ChangeResource, AbandonInput>,
patchSet = ctx.getDb().patchSets().get(psId);
change.setStatus(Change.Status.ABANDONED);
change.setLastUpdatedOn(ctx.getWhen());
ctx.getDb().changes().update(Collections.singleton(change));
ctx.saveChange();
update.setStatus(change.getStatus());
message = newMessage(ctx.getDb());

View File

@@ -251,7 +251,7 @@ public class ChangeInserter extends BatchUpdate.InsertChangeOp {
patchSet.setGroups(GroupCollector.getDefaultGroups(patchSet));
}
db.patchSets().insert(Collections.singleton(patchSet));
db.changes().insert(Collections.singleton(change));
ctx.saveChange();
update.setTopic(change.getTopic());
/* TODO: fixStatus is used here because the tests

View File

@@ -37,7 +37,6 @@ import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.transport.ReceiveCommand;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
class DeleteDraftChangeOp extends BatchUpdate.Op {
@@ -94,8 +93,7 @@ class DeleteDraftChangeOp extends BatchUpdate.Op {
db.patchSets().delete(patchSets);
db.changeMessages().delete(db.changeMessages().byChange(id));
starredChangesUtil.unstarAll(id);
db.changes().delete(Collections.singleton(change));
ctx.markDeleted();
ctx.deleteChange();
}
@Override

View File

@@ -152,7 +152,7 @@ public class DeleteDraftPatchSet implements RestModifyView<RevisionResource, Inp
c.setCurrentPatchSet(previousPatchSetInfo(ctx));
}
ChangeUtil.updated(c);
ctx.getDb().changes().update(Collections.singleton(c));
ctx.saveChange();
}
private PatchSetInfo previousPatchSetInfo(ChangeContext ctx)

View File

@@ -121,7 +121,7 @@ public class DeleteVote implements RestModifyView<VoteResource, Input> {
if (psa == null) {
throw new ResourceNotFoundException();
}
ChangeUtil.bumpRowVersionNotLastUpdatedOn(change.getId(), ctx.getDb());
ctx.saveChange();
ctx.getDb().patchSetApprovals().update(Collections.singleton(psa));
if (msg.length() > 0) {

View File

@@ -246,7 +246,7 @@ public class PatchSetInserter extends BatchUpdate.Op {
}
change.setCurrentPatchSet(patchSetInfo);
ChangeUtil.updated(change);
db.changes().update(Collections.singleton(change));
ctx.saveChange();
approvalCopier.copy(db, ctl, patchSet);
if (changeMessage != null) {
cmUtil.addChangeMessage(db, update, changeMessage);

View File

@@ -363,7 +363,7 @@ public class PostReview implements RestModifyView<RevisionResource, ReviewInput>
dirty |= updateLabels(ctx);
dirty |= insertMessage(ctx);
if (dirty) {
ctx.getDb().changes().update(Collections.singleton(change));
ctx.saveChange();
}
}

View File

@@ -180,7 +180,7 @@ public class PublishDraftPatchSet implements RestModifyView<RevisionResource, In
addReviewers(ctx);
}
private void saveChange(ChangeContext ctx) throws OrmException {
private void saveChange(ChangeContext ctx) {
change = ctx.getChange();
ChangeUpdate update = ctx.getUpdate(psId);
wasDraftChange = change.getStatus() == Change.Status.DRAFT;
@@ -188,7 +188,7 @@ public class PublishDraftPatchSet implements RestModifyView<RevisionResource, In
change.setStatus(Change.Status.NEW);
update.setStatus(change.getStatus());
ChangeUtil.updated(change);
ctx.getDb().changes().update(Collections.singleton(change));
ctx.saveChange();
}
}
@@ -201,7 +201,7 @@ public class PublishDraftPatchSet implements RestModifyView<RevisionResource, In
// Force ETag invalidation if not done already
if (!wasDraftChange) {
ChangeUtil.updated(change);
ctx.getDb().changes().update(Collections.singleton(change));
ctx.saveChange();
}
ctx.getDb().patchSets().update(Collections.singleton(patchSet));
}

View File

@@ -41,8 +41,6 @@ import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
import java.util.Collections;
@Singleton
public class PutTopic implements RestModifyView<ChangeResource, Input>,
UiAction<ChangeResource> {
@@ -120,7 +118,7 @@ public class PutTopic implements RestModifyView<ChangeResource, Input>,
change.setTopic(Strings.emptyToNull(newTopicName));
update.setTopic(change.getTopic());
ChangeUtil.updated(change);
ctx.getDb().changes().update(Collections.singleton(change));
ctx.saveChange();
ChangeMessage cmsg = new ChangeMessage(
new ChangeMessage.Key(

View File

@@ -48,8 +48,6 @@ import com.google.inject.Singleton;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
@Singleton
public class Restore implements RestModifyView<ChangeResource, RestoreInput>,
UiAction<ChangeResource> {
@@ -118,7 +116,7 @@ public class Restore implements RestModifyView<ChangeResource, RestoreInput>,
patchSet = ctx.getDb().patchSets().get(psId);
change.setStatus(Status.NEW);
change.setLastUpdatedOn(ctx.getWhen());
ctx.getDb().changes().update(Collections.singleton(change));
ctx.saveChange();
update.setStatus(change.getStatus());
message = newMessage(ctx.getDb());

View File

@@ -29,6 +29,7 @@ import com.google.gerrit.reviewdb.client.PatchSet;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.reviewdb.server.ReviewDbUtil;
import com.google.gerrit.reviewdb.server.ReviewDbWrapper;
import com.google.gerrit.server.CurrentUser;
import com.google.gerrit.server.GerritPersonIdent;
import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
@@ -52,6 +53,7 @@ import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
@@ -164,14 +166,22 @@ public class BatchUpdate implements AutoCloseable {
public class ChangeContext extends Context {
private final ChangeControl ctl;
private final Map<PatchSet.Id, ChangeUpdate> updates;
private final ReviewDbWrapper dbWrapper;
private boolean deleted;
private boolean saved;
private ChangeContext(ChangeControl ctl) {
private ChangeContext(ChangeControl ctl, ReviewDbWrapper dbWrapper) {
this.ctl = ctl;
this.dbWrapper = dbWrapper;
updates = new TreeMap<>(ReviewDbUtil.intKeyOrdering());
}
@Override
public ReviewDb getDb() {
return dbWrapper;
}
public ChangeUpdate getUpdate(PatchSet.Id psId) {
ChangeUpdate u = updates.get(psId);
if (u == null) {
@@ -194,8 +204,14 @@ public class BatchUpdate implements AutoCloseable {
return ctl.getChange();
}
public void markDeleted() {
this.deleted = true;
public void saveChange() {
checkState(!deleted, "cannot both save and delete change");
saved = true;
}
public void deleteChange() {
checkState(!saved, "cannot both save and delete change");
deleted = true;
}
}
@@ -530,6 +546,14 @@ public class BatchUpdate implements AutoCloseable {
for (Op op : e.getValue()) {
op.updateChange(ctx);
}
Iterable<Change> changes = Collections.singleton(ctx.getChange());
if (newChanges.containsKey(id)) {
db.changes().insert(changes);
} else if (ctx.saved) {
db.changes().update(changes);
} else if (ctx.deleted) {
db.changes().delete(changes);
}
db.commit();
} finally {
db.rollback();
@@ -567,7 +591,7 @@ public class BatchUpdate implements AutoCloseable {
// - reading from a db that does not belong to this update
// - attempting to read a change that doesn't exist yet
return new ChangeContext(
changeControlFactory.controlFor(c, user));
changeControlFactory.controlFor(c, user), new BatchUpdateReviewDb(db));
}
private void executePostOps() throws Exception {

View File

@@ -0,0 +1,86 @@
// Copyright (C) 2016 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.git;
import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.reviewdb.server.ChangeAccess;
import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.reviewdb.server.ReviewDbWrapper;
import com.google.gwtorm.server.AtomicUpdate;
class BatchUpdateReviewDb extends ReviewDbWrapper {
private final ChangeAccess changesWrapper;
BatchUpdateReviewDb(ReviewDb delegate) {
super(delegate);
changesWrapper = new BatchUpdateChanges(delegate.changes());
}
@Override
public ChangeAccess changes() {
return changesWrapper;
}
private static class BatchUpdateChanges extends ChangeAccessWrapper {
private BatchUpdateChanges(ChangeAccess delegate) {
super(delegate);
}
@Override
public void insert(Iterable<Change> instances) {
throw new UnsupportedOperationException(
"do not call insert; change is automatically inserted");
}
@Override
public void upsert(Iterable<Change> instances) {
throw new UnsupportedOperationException(
"do not call upsert; either use InsertChangeOp for insertion, or"
+ " ChangeContext#saveChange() for update");
}
@Override
public void update(Iterable<Change> instances) {
throw new UnsupportedOperationException(
"do not call update; use ChangeContext#saveChange()");
}
@Override
public void beginTransaction(Change.Id key) {
throw new UnsupportedOperationException(
"updateChange is always called within a transaction");
}
@Override
public void deleteKeys(Iterable<Change.Id> keys) {
throw new UnsupportedOperationException(
"do not call deleteKeys; use ChangeContext#deleteChange()");
}
@Override
public void delete(Iterable<Change> instances) {
throw new UnsupportedOperationException(
"do not call delete; use ChangeContext#deleteChange()");
}
@Override
public Change atomicUpdate(Change.Id key,
AtomicUpdate<Change> update) {
throw new UnsupportedOperationException(
"do not call atomicUpdate; updateChange is always called within a"
+ " transaction");
}
}
}

View File

@@ -176,7 +176,7 @@ public class CherryPick extends SubmitStrategy {
ps.setGroups(GroupCollector.getCurrentGroups(args.db, c));
args.db.patchSets().insert(Collections.singleton(ps));
c.setCurrentPatchSet(patchSetInfo);
args.db.changes().update(Collections.singletonList(c));
ctx.saveChange();
List<PatchSetApproval> approvals = Lists.newArrayList();
for (PatchSetApproval a : args.approvalsUtil.byPatchSet(