Rewrite batching behavior of ChangeRebuilder
Now that we're freed from the constraints of BatchMetaDataUpdate, we don't have to eagerly create ChangeUpdates until we're ready for them, which simplifies the looping logic. Improve batching behavior by subclassing ArrayList with a special implementation that knows when it's safe to add events to the batch. It inspects all elements already in the list when deciding whether to add a new one. This lets us use more flexible time batching behavior than simply rounding, and allows us to set additional constraints not based on time. One constraint that would otherwise cause test failures is that we were accidentally trying to batch multiple ChangeMessages into the same update, making the second one step on the first. Change-Id: Icc57bf55fb1390ea1c0494467159fe830f4ac1f8
This commit is contained in:
@@ -14,6 +14,7 @@
|
||||
|
||||
package com.google.gerrit.server.notedb;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static com.google.common.base.Preconditions.checkState;
|
||||
import static com.google.gerrit.server.PatchLineCommentsUtil.setCommentRevId;
|
||||
import static com.google.gerrit.server.notedb.ChangeNoteUtil.FOOTER_HASHTAGS;
|
||||
@@ -26,6 +27,7 @@ import com.google.common.collect.ComparisonChain;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.google.common.collect.Ordering;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.primitives.Ints;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
@@ -66,7 +68,6 @@ import java.io.IOException;
|
||||
import java.sql.Timestamp;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
@@ -169,40 +170,30 @@ public class ChangeRebuilder {
|
||||
events.add(new ChangeMessageEvent(msg, notedbChange));
|
||||
}
|
||||
|
||||
Collections.sort(events);
|
||||
events.add(new FinalUpdatesEvent(change, notedbChange));
|
||||
ChangeUpdate update = null;
|
||||
for (Event e : events) {
|
||||
if (!sameUpdate(e, update)) {
|
||||
if (update != null) {
|
||||
manager.add(update);
|
||||
}
|
||||
CurrentUser user = e.who != null
|
||||
? userFactory.create(Providers.of(db), e.who)
|
||||
: internalUserFactory.create();
|
||||
update = updateFactory.create(
|
||||
controlFactory.controlFor(db, change, user), e.when);
|
||||
update.setPatchSetId(e.psId);
|
||||
}
|
||||
e.apply(update);
|
||||
}
|
||||
manager.add(update);
|
||||
Collections.sort(events, EVENT_ORDER);
|
||||
|
||||
for (Account.Id author : draftCommentEvents.keys()) {
|
||||
IdentifiedUser user = userFactory.create(Providers.of(db), author);
|
||||
ChangeDraftUpdate draftUpdate = null;
|
||||
for (PatchLineCommentEvent e : draftCommentEvents.get(author)) {
|
||||
if (!sameUpdate(e, draftUpdate)) {
|
||||
if (draftUpdate != null) {
|
||||
manager.add(draftUpdate);
|
||||
}
|
||||
draftUpdate = draftUpdateFactory.create(
|
||||
controlFactory.controlFor(db, change, user), e.when);
|
||||
draftUpdate.setPatchSetId(e.psId);
|
||||
}
|
||||
e.applyDraft(draftUpdate);
|
||||
events.add(new FinalUpdatesEvent(change, notedbChange));
|
||||
|
||||
EventList<Event> el = new EventList<>();
|
||||
for (Event e : events) {
|
||||
if (!el.canAdd(e)) {
|
||||
flushEventsToUpdate(db, manager, el, change);
|
||||
checkState(el.canAdd(e));
|
||||
}
|
||||
manager.add(draftUpdate);
|
||||
el.add(e);
|
||||
}
|
||||
flushEventsToUpdate(db, manager, el, change);
|
||||
|
||||
EventList<PatchLineCommentEvent> plcel = new EventList<>();
|
||||
for (Account.Id author : draftCommentEvents.keys()) {
|
||||
for (PatchLineCommentEvent e : draftCommentEvents.get(author)) {
|
||||
if (!plcel.canAdd(e)) {
|
||||
flushEventsToDraftUpdate(db, manager, plcel, change);
|
||||
checkState(plcel.canAdd(e));
|
||||
}
|
||||
plcel.add(e);
|
||||
}
|
||||
flushEventsToDraftUpdate(db, manager, plcel, change);
|
||||
}
|
||||
|
||||
createStarredChangesRefs(db, changeId, manager.getAllUsersCommands(),
|
||||
@@ -210,6 +201,40 @@ public class ChangeRebuilder {
|
||||
manager.execute();
|
||||
}
|
||||
|
||||
private void flushEventsToUpdate(ReviewDb db, NoteDbUpdateManager manager,
|
||||
EventList<Event> events, Change change)
|
||||
throws NoSuchChangeException, OrmException, IOException {
|
||||
if (events.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
ChangeUpdate update = updateFactory.create(
|
||||
controlFactory.controlFor(db, change, events.getUser(db)),
|
||||
events.getWhen());
|
||||
update.setPatchSetId(events.getPatchSetId());
|
||||
for (Event e : events) {
|
||||
e.apply(update);
|
||||
}
|
||||
manager.add(update);
|
||||
events.clear();
|
||||
}
|
||||
|
||||
private void flushEventsToDraftUpdate(ReviewDb db, NoteDbUpdateManager manager,
|
||||
EventList<PatchLineCommentEvent> events, Change change)
|
||||
throws NoSuchChangeException, OrmException {
|
||||
if (events.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
ChangeDraftUpdate update = draftUpdateFactory.create(
|
||||
controlFactory.controlFor(db, change, events.getUser(db)),
|
||||
events.getWhen());
|
||||
update.setPatchSetId(events.getPatchSetId());
|
||||
for (PatchLineCommentEvent e : events) {
|
||||
e.applyDraft(update);
|
||||
}
|
||||
manager.add(update);
|
||||
events.clear();
|
||||
}
|
||||
|
||||
private void createStarredChangesRefs(ReviewDb db, Change.Id changeId,
|
||||
ChainedReceiveCommands allUsersCmds, Repository allUsersRepo)
|
||||
throws IOException, OrmException {
|
||||
@@ -291,19 +316,21 @@ public class ChangeRebuilder {
|
||||
}
|
||||
}
|
||||
|
||||
private static long round(Date when) {
|
||||
return when.getTime() / TS_WINDOW_MS;
|
||||
}
|
||||
private static final Ordering<Event> EVENT_ORDER = new Ordering<Event>() {
|
||||
@Override
|
||||
public int compare(Event a, Event b) {
|
||||
return ComparisonChain.start()
|
||||
.compare(a.when, b.when)
|
||||
.compare(a.who.get(), b.who.get())
|
||||
.compare(a.psId.get(), b.psId.get())
|
||||
.result();
|
||||
}
|
||||
};
|
||||
|
||||
private static boolean sameUpdate(Event event, AbstractChangeUpdate update) {
|
||||
return update != null
|
||||
&& round(event.when) == round(update.getWhen())
|
||||
&& event.who.equals(update.getUser().getAccountId())
|
||||
&& event.psId.equals(update.getPatchSetId())
|
||||
&& !(event instanceof FinalUpdatesEvent);
|
||||
}
|
||||
private abstract static class Event {
|
||||
// NOTE: EventList only supports direct subclasses, not an arbitrary
|
||||
// hierarchy.
|
||||
|
||||
private abstract static class Event implements Comparable<Event> {
|
||||
final PatchSet.Id psId;
|
||||
final Account.Id who;
|
||||
final Timestamp when;
|
||||
@@ -326,19 +353,13 @@ public class ChangeRebuilder {
|
||||
who, update.getUser().getAccountId());
|
||||
}
|
||||
|
||||
abstract void apply(ChangeUpdate update) throws OrmException, IOException;
|
||||
/**
|
||||
* @return whether this event type must be unique per {@link ChangeUpdate},
|
||||
* i.e. there may be at most one of this type.
|
||||
*/
|
||||
abstract boolean uniquePerUpdate();
|
||||
|
||||
@Override
|
||||
public int compareTo(Event other) {
|
||||
return ComparisonChain.start()
|
||||
// TODO(dborowitz): Smarter bucketing: pick a bucket start time T and
|
||||
// include all events up to T + TS_WINDOW_MS but no further.
|
||||
// Interleaving different authors complicates things.
|
||||
.compare(round(when), round(other.when))
|
||||
.compare(who.get(), other.who.get())
|
||||
.compare(psId.get(), other.psId.get())
|
||||
.result();
|
||||
}
|
||||
abstract void apply(ChangeUpdate update) throws OrmException, IOException;
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
@@ -350,6 +371,88 @@ public class ChangeRebuilder {
|
||||
}
|
||||
}
|
||||
|
||||
private class EventList<E extends Event> extends ArrayList<E> {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private static final long MAX_DELTA_MS = 1000;
|
||||
private static final long MAX_WINDOW_MS = 5000;
|
||||
|
||||
private E getLast() {
|
||||
return get(size() - 1);
|
||||
}
|
||||
|
||||
private long getLastTime() {
|
||||
return getLast().when.getTime();
|
||||
}
|
||||
|
||||
private long getFirstTime() {
|
||||
return get(0).when.getTime();
|
||||
}
|
||||
|
||||
boolean canAdd(E e) {
|
||||
if (isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
if (e instanceof FinalUpdatesEvent) {
|
||||
return false; // FinalUpdatesEvent always gets its own update.
|
||||
}
|
||||
|
||||
Event last = getLast();
|
||||
if (!Objects.equals(e.who, last.who)
|
||||
|| !Objects.equals(e.psId, last.psId)) {
|
||||
return false; // Different patch set or author.
|
||||
}
|
||||
|
||||
long t = e.when.getTime();
|
||||
long tFirst = getFirstTime();
|
||||
long tLast = getLastTime();
|
||||
checkArgument(t >= tLast,
|
||||
"event %s is before previous event in list %s", e, last);
|
||||
if (t - tLast > MAX_DELTA_MS || t - tFirst > MAX_WINDOW_MS) {
|
||||
return false; // Too much time elapsed.
|
||||
}
|
||||
|
||||
if (!e.uniquePerUpdate()) {
|
||||
return true;
|
||||
}
|
||||
for (Event o : this) {
|
||||
if (e.getClass() == o.getClass()) {
|
||||
return false; // Only one event of this type allowed per update.
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(dborowitz): Additional heuristics, like keeping ChangeEvents
|
||||
// separate if they affect overlapping fields.
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
Timestamp getWhen() {
|
||||
return get(0).when;
|
||||
}
|
||||
|
||||
PatchSet.Id getPatchSetId() {
|
||||
PatchSet.Id id = get(0).psId;
|
||||
for (int i = 1; i < size(); i++) {
|
||||
checkState(Objects.equals(id, get(i).psId),
|
||||
"mismatched patch sets in EventList: %s != %s", id, get(i).psId);
|
||||
}
|
||||
return id;
|
||||
}
|
||||
|
||||
CurrentUser getUser(ReviewDb db) {
|
||||
Account.Id id = get(0).who;
|
||||
for (int i = 1; i < size(); i++) {
|
||||
checkState(Objects.equals(id, get(i).who),
|
||||
"mismatched users in EventList: %s != %s", id, get(i).who);
|
||||
}
|
||||
|
||||
return id != null
|
||||
? userFactory.create(Providers.of(db), id)
|
||||
: internalUserFactory.create();
|
||||
}
|
||||
}
|
||||
|
||||
private static class ApprovalEvent extends Event {
|
||||
private PatchSetApproval psa;
|
||||
|
||||
@@ -358,6 +461,11 @@ public class ChangeRebuilder {
|
||||
this.psa = psa;
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean uniquePerUpdate() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
void apply(ChangeUpdate update) {
|
||||
checkUpdate(update);
|
||||
@@ -377,6 +485,11 @@ public class ChangeRebuilder {
|
||||
this.rw = rw;
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean uniquePerUpdate() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
void apply(ChangeUpdate update) throws IOException, OrmException {
|
||||
checkUpdate(update);
|
||||
@@ -412,6 +525,11 @@ public class ChangeRebuilder {
|
||||
this.cache = cache;
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean uniquePerUpdate() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
void apply(ChangeUpdate update) throws OrmException {
|
||||
checkUpdate(update);
|
||||
@@ -438,6 +556,13 @@ public class ChangeRebuilder {
|
||||
this.hashtags = hashtags;
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean uniquePerUpdate() {
|
||||
// Since these are produced from existing commits in the old NoteDb graph,
|
||||
// we know that there must be one per commit in the rebuilt graph.
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
void apply(ChangeUpdate update) throws OrmException {
|
||||
update.setHashtags(hashtags);
|
||||
@@ -467,6 +592,11 @@ public class ChangeRebuilder {
|
||||
this.notedbChange = notedbChange;
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean uniquePerUpdate() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
void apply(ChangeUpdate update) throws OrmException {
|
||||
checkUpdate(update);
|
||||
@@ -520,11 +650,20 @@ public class ChangeRebuilder {
|
||||
|
||||
FinalUpdatesEvent(Change change, Change notedbChange) {
|
||||
super(change.currentPatchSetId(), change.getOwner(),
|
||||
// TODO(dborowitz): This should maybe be a synthetic timestamp just
|
||||
// after the actual last update in the history. On the one hand using
|
||||
// the commit updated time is reasonable, but on the other it might be
|
||||
// non-monotonic, and who knows what would break then.
|
||||
change.getLastUpdatedOn());
|
||||
this.change = change;
|
||||
this.notedbChange = notedbChange;
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean uniquePerUpdate() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
void apply(ChangeUpdate update) throws OrmException {
|
||||
if (!Objects.equals(change.getTopic(), notedbChange.getTopic())) {
|
||||
|
||||
Reference in New Issue
Block a user