Merge branch stable-2.9

* stable-2.9
  Split mergeability checks by priority
  Refactor MergeabilityChecker to use a builder pattern

Change-Id: Ibf2e6d86158e72728a2bebdd5f285dc15fbb5228
This commit is contained in:
Dave Borowitz
2014-03-27 15:31:52 -07:00
10 changed files with 238 additions and 190 deletions

View File

@@ -824,6 +824,15 @@ changes is updated.
+ +
Default is 1. Default is 1.
[[changeMerge.interactiveThreadPoolSize]]changeMerge.interactiveThreadPoolSize::
+
Maximum size of the thread pool in which the mergeability flag of open
changes is updated, when processing interactive user requests (e.g.
pushes to refs/for/*). Set to 0 or negative to share the pool for
background mergeability checks.
+
Default is 1.
[[commentlink]] [[commentlink]]
=== Section commentlink === Section commentlink

View File

@@ -155,7 +155,7 @@ public class ReviewProjectAccess extends ProjectAccessHandler<Change.Id> {
} finally { } finally {
db.rollback(); db.rollback();
} }
mergeabilityChecker.updateAndIndexAsync(change).checkedGet(); mergeabilityChecker.newCheck().addChange(change).reindex().run();
hooks.doPatchsetCreatedHook(change, ps, db); hooks.doPatchsetCreatedHook(change, ps, db);
try { try {
CreateChangeSender cm = CreateChangeSender cm =

View File

@@ -46,6 +46,7 @@ import com.google.gerrit.server.cache.h2.DefaultCacheFactory;
import com.google.gerrit.server.change.ChangeKindCache; import com.google.gerrit.server.change.ChangeKindCache;
import com.google.gerrit.server.change.MergeabilityChecker; import com.google.gerrit.server.change.MergeabilityChecker;
import com.google.gerrit.server.change.MergeabilityChecksExecutor; import com.google.gerrit.server.change.MergeabilityChecksExecutor;
import com.google.gerrit.server.change.MergeabilityChecksExecutor.Priority;
import com.google.gerrit.server.change.PatchSetInserter; import com.google.gerrit.server.change.PatchSetInserter;
import com.google.gerrit.server.config.CanonicalWebUrl; import com.google.gerrit.server.config.CanonicalWebUrl;
import com.google.gerrit.server.config.CanonicalWebUrlProvider; import com.google.gerrit.server.config.CanonicalWebUrlProvider;
@@ -294,11 +295,20 @@ public class Reindex extends SiteProgram {
@Provides @Provides
@Singleton @Singleton
@MergeabilityChecksExecutor @MergeabilityChecksExecutor(Priority.BACKGROUND)
public WorkQueue.Executor createMergeabilityChecksExecutor( public WorkQueue.Executor createMergeabilityChecksExecutor(
WorkQueue queues) { WorkQueue queues) {
return queues.createQueue(1, "MergeabilityChecks"); return queues.createQueue(1, "MergeabilityChecks");
} }
@Provides
@Singleton
@MergeabilityChecksExecutor(Priority.INTERACTIVE)
public WorkQueue.Executor createInteractiveMergeabilityChecksExecutor(
@MergeabilityChecksExecutor(Priority.BACKGROUND)
WorkQueue.Executor bg) {
return bg;
}
} }
private int indexAll() throws Exception { private int indexAll() throws Exception {

View File

@@ -178,8 +178,10 @@ public class ChangeInserter {
} }
update.commit(); update.commit();
CheckedFuture<?, IOException> f = CheckedFuture<?, IOException> f = mergeabilityChecker.newCheck()
mergeabilityChecker.updateAndIndexAsync(change); .addChange(change)
.reindex()
.runAsync();
if (!messageIsForChange()) { if (!messageIsForChange()) {
insertMessage(db); insertMessage(db);
} }

View File

@@ -16,6 +16,8 @@ package com.google.gerrit.server.change;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.CheckedFuture; import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
@@ -32,6 +34,7 @@ import com.google.gerrit.reviewdb.client.RefNames;
import com.google.gerrit.reviewdb.server.ReviewDb; import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.CurrentUser; import com.google.gerrit.server.CurrentUser;
import com.google.gerrit.server.IdentifiedUser; import com.google.gerrit.server.IdentifiedUser;
import com.google.gerrit.server.change.MergeabilityChecksExecutor.Priority;
import com.google.gerrit.server.change.Mergeable.MergeableInfo; import com.google.gerrit.server.change.Mergeable.MergeableInfo;
import com.google.gerrit.server.git.MetaDataUpdate; import com.google.gerrit.server.git.MetaDataUpdate;
import com.google.gerrit.server.git.ProjectConfig; import com.google.gerrit.server.git.ProjectConfig;
@@ -54,7 +57,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@@ -63,36 +65,6 @@ public class MergeabilityChecker implements GitReferenceUpdatedListener {
private static final Logger log = LoggerFactory private static final Logger log = LoggerFactory
.getLogger(MergeabilityChecker.class); .getLogger(MergeabilityChecker.class);
private final ThreadLocalRequestContext tl;
private final SchemaFactory<ReviewDb> schemaFactory;
private final IdentifiedUser.GenericFactory identifiedUserFactory;
private final ChangeControl.GenericFactory changeControlFactory;
private final Provider<Mergeable> mergeable;
private final ChangeIndexer indexer;
private final ListeningExecutorService executor;
private final MergeabilityCheckQueue mergeabilityCheckQueue;
private final MetaDataUpdate.Server metaDataUpdateFactory;
@Inject
public MergeabilityChecker(ThreadLocalRequestContext tl,
SchemaFactory<ReviewDb> schemaFactory,
IdentifiedUser.GenericFactory identifiedUserFactory,
ChangeControl.GenericFactory changeControlFactory,
Provider<Mergeable> mergeable, ChangeIndexer indexer,
@MergeabilityChecksExecutor Executor executor,
MergeabilityCheckQueue mergeabilityCheckQueue,
MetaDataUpdate.Server metaDataUpdateFactory) {
this.tl = tl;
this.schemaFactory = schemaFactory;
this.identifiedUserFactory = identifiedUserFactory;
this.changeControlFactory = changeControlFactory;
this.mergeable = mergeable;
this.indexer = indexer;
this.executor = MoreExecutors.listeningDecorator(executor);
this.mergeabilityCheckQueue = mergeabilityCheckQueue;
this.metaDataUpdateFactory = metaDataUpdateFactory;
}
private static final Function<Exception, IOException> MAPPER = private static final Function<Exception, IOException> MAPPER =
new Function<Exception, IOException>() { new Function<Exception, IOException>() {
@Override @Override
@@ -108,12 +80,179 @@ public class MergeabilityChecker implements GitReferenceUpdatedListener {
} }
}; };
public class Check {
private List<Change> changes;
private List<Branch.NameKey> branches;
private List<Project.NameKey> projects;
private boolean force;
private boolean reindex;
private boolean interactive;
private Check() {
changes = Lists.newArrayListWithExpectedSize(1);
branches = Lists.newArrayListWithExpectedSize(1);
projects = Lists.newArrayListWithExpectedSize(1);
interactive = true;
}
public Check addChange(Change change) {
changes.add(change);
return this;
}
public Check addBranch(Branch.NameKey branch) {
branches.add(branch);
interactive = false;
return this;
}
public Check addProject(Project.NameKey project) {
projects.add(project);
interactive = false;
return this;
}
/** Force reindexing regardless of whether mergeable flag was modified. */
public Check reindex() {
reindex = true;
return this;
}
/** Force mergeability check even if change is not stale. */
private Check force() {
force = true;
return this;
}
private ListeningExecutorService getExecutor() {
return interactive ? interactiveExecutor : backgroundExecutor;
}
public CheckedFuture<?, IOException> runAsync() {
final ListeningExecutorService executor = getExecutor();
ListenableFuture<List<Change>> getChanges;
if (branches.isEmpty() && projects.isEmpty()) {
getChanges = Futures.immediateFuture(changes);
} else {
getChanges = executor.submit(
new Callable<List<Change>>() {
@Override
public List<Change> call() throws OrmException {
return getChanges();
}
});
}
return Futures.makeChecked(Futures.transform(getChanges,
new AsyncFunction<List<Change>, List<Object>>() {
@Override
public ListenableFuture<List<Object>> apply(List<Change> changes) {
List<ListenableFuture<?>> result =
Lists.newArrayListWithCapacity(changes.size());
for (final Change c : changes) {
ListenableFuture<Boolean> b =
executor.submit(new Task(c, force));
if (reindex) {
result.add(Futures.transform(
b, new AsyncFunction<Boolean, Object>() {
@SuppressWarnings("unchecked")
@Override
public ListenableFuture<Object> apply(
Boolean indexUpdated) throws Exception {
if (!indexUpdated) {
return (ListenableFuture<Object>)
indexer.indexAsync(c.getId());
}
return Futures.immediateFuture(null);
}
}));
} else {
result.add(b);
}
}
return Futures.allAsList(result);
}
}), MAPPER);
}
public void run() throws IOException {
try {
runAsync().checkedGet();
} catch (Exception e) {
Throwables.propagateIfPossible(e, IOException.class);
throw MAPPER.apply(e);
}
}
private List<Change> getChanges() throws OrmException {
ReviewDb db = schemaFactory.open();
try {
List<Change> results = Lists.newArrayList();
results.addAll(changes);
for (Project.NameKey p : projects) {
Iterables.addAll(results, db.changes().byProjectOpenAll(p));
}
for (Branch.NameKey b : branches) {
Iterables.addAll(results, db.changes().byBranchOpenAll(b));
}
return results;
} catch (OrmException e) {
log.error("Failed to fetch changes for mergeability check", e);
throw e;
} finally {
db.close();
}
}
}
private final ThreadLocalRequestContext tl;
private final SchemaFactory<ReviewDb> schemaFactory;
private final IdentifiedUser.GenericFactory identifiedUserFactory;
private final ChangeControl.GenericFactory changeControlFactory;
private final Provider<Mergeable> mergeable;
private final ChangeIndexer indexer;
private final ListeningExecutorService backgroundExecutor;
private final ListeningExecutorService interactiveExecutor;
private final MergeabilityCheckQueue mergeabilityCheckQueue;
private final MetaDataUpdate.Server metaDataUpdateFactory;
@Inject
public MergeabilityChecker(ThreadLocalRequestContext tl,
SchemaFactory<ReviewDb> schemaFactory,
IdentifiedUser.GenericFactory identifiedUserFactory,
ChangeControl.GenericFactory changeControlFactory,
Provider<Mergeable> mergeable, ChangeIndexer indexer,
@MergeabilityChecksExecutor(Priority.BACKGROUND)
Executor backgroundExecutor,
@MergeabilityChecksExecutor(Priority.INTERACTIVE)
Executor interactiveExecutor,
MergeabilityCheckQueue mergeabilityCheckQueue,
MetaDataUpdate.Server metaDataUpdateFactory) {
this.tl = tl;
this.schemaFactory = schemaFactory;
this.identifiedUserFactory = identifiedUserFactory;
this.changeControlFactory = changeControlFactory;
this.mergeable = mergeable;
this.indexer = indexer;
this.backgroundExecutor =
MoreExecutors.listeningDecorator(backgroundExecutor);
this.interactiveExecutor =
MoreExecutors.listeningDecorator(interactiveExecutor);
this.mergeabilityCheckQueue = mergeabilityCheckQueue;
this.metaDataUpdateFactory = metaDataUpdateFactory;
}
public Check newCheck() {
return new Check();
}
@Override @Override
public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) { public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
String ref = event.getRefName(); String ref = event.getRefName();
if (ref.startsWith(Constants.R_HEADS) || ref.equals(RefNames.REFS_CONFIG)) { if (ref.startsWith(Constants.R_HEADS) || ref.equals(RefNames.REFS_CONFIG)) {
executor.submit(new BranchUpdateTask(schemaFactory, Branch.NameKey branch = new Branch.NameKey(
new Project.NameKey(event.getProjectName()), ref)); new Project.NameKey(event.getProjectName()), ref);
newCheck().addBranch(branch).runAsync();
} }
if (ref.equals(RefNames.REFS_CONFIG)) { if (ref.equals(RefNames.REFS_CONFIG)) {
Project.NameKey p = new Project.NameKey(event.getProjectName()); Project.NameKey p = new Project.NameKey(event.getProjectName());
@@ -121,15 +260,7 @@ public class MergeabilityChecker implements GitReferenceUpdatedListener {
ProjectConfig oldCfg = parseConfig(p, event.getOldObjectId()); ProjectConfig oldCfg = parseConfig(p, event.getOldObjectId());
ProjectConfig newCfg = parseConfig(p, event.getNewObjectId()); ProjectConfig newCfg = parseConfig(p, event.getNewObjectId());
if (recheckMerges(oldCfg, newCfg)) { if (recheckMerges(oldCfg, newCfg)) {
try { newCheck().addProject(p).force().runAsync();
new ProjectUpdateTask(schemaFactory, p, true).call();
} catch (Exception e) {
String msg = "Failed to update mergeability flags for project " + p.get()
+ " on update of " + RefNames.REFS_CONFIG;
log.error(msg, e);
Throwables.propagateIfPossible(e);
throw new RuntimeException(msg, e);
}
} }
} catch (ConfigInvalidException | IOException e) { } catch (ConfigInvalidException | IOException e) {
String msg = "Failed to update mergeability flags for project " + p.get() String msg = "Failed to update mergeability flags for project " + p.get()
@@ -160,87 +291,13 @@ public class MergeabilityChecker implements GitReferenceUpdatedListener {
return ProjectConfig.read(metaDataUpdateFactory.create(p), id); return ProjectConfig.read(metaDataUpdateFactory.create(p), id);
} }
/** private class Task implements Callable<Boolean> {
* Updates the mergeability flag of the change asynchronously. If the
* mergeability flag is updated the change is reindexed.
*
* @param change the change for which the mergeability flag should be updated
* @return CheckedFuture that updates the mergeability flag of the change and
* returns {@code true} if the mergeability flag was updated and
* the change was reindexed, and {@code false} if the
* mergeability flag was not updated and the change was not reindexed
*/
public CheckedFuture<Boolean, IOException> updateAsync(Change change) {
return updateAsync(change, false);
}
private CheckedFuture<Boolean, IOException> updateAsync(Change change, boolean force) {
return Futures.makeChecked(
executor.submit(new ChangeUpdateTask(schemaFactory, change, force)),
MAPPER);
}
/**
* Updates the mergeability flag of the change asynchronously and reindexes
* the change in any case.
*
* @param change the change for which the mergeability flag should be updated
* @return CheckedFuture that updates the mergeability flag of the change and
* reindexes the change (whether the mergeability flag was updated or
* not)
*/
public CheckedFuture<?, IOException> updateAndIndexAsync(Change change) {
final Change.Id id = change.getId();
return Futures.makeChecked(
Futures.transform(updateAsync(change),
new AsyncFunction<Boolean, Object>() {
@SuppressWarnings("unchecked")
@Override
public ListenableFuture<Object> apply(Boolean indexUpdated)
throws Exception {
if (!indexUpdated) {
return (ListenableFuture<Object>) indexer.indexAsync(id);
}
return Futures.immediateFuture(null);
}
}), MAPPER);
}
public boolean update(Change change) throws IOException {
try {
return new ChangeUpdateTask(schemaFactory, change).call();
} catch (Exception e) {
Throwables.propagateIfPossible(e);
throw MAPPER.apply(e);
}
}
public void update(Project.NameKey project) throws IOException {
try {
for (CheckedFuture<?, IOException> f : new ProjectUpdateTask(
schemaFactory, project, false).call()) {
f.checkedGet();
}
} catch (Exception e) {
Throwables.propagateIfPossible(e);
throw MAPPER.apply(e);
}
}
private class ChangeUpdateTask implements Callable<Boolean> {
private final SchemaFactory<ReviewDb> schemaFactory;
private final Change change; private final Change change;
private final boolean force; private final boolean force;
private ReviewDb reviewDb; private ReviewDb reviewDb;
ChangeUpdateTask(SchemaFactory<ReviewDb> schemaFactory, Change change) { Task(Change change, boolean force) {
this(schemaFactory, change, false);
}
ChangeUpdateTask(SchemaFactory<ReviewDb> schemaFactory, Change change,
boolean force) {
this.schemaFactory = schemaFactory;
this.change = change; this.change = change;
this.force = force; this.force = force;
} }
@@ -287,6 +344,12 @@ public class MergeabilityChecker implements GitReferenceUpdatedListener {
} catch (ResourceConflictException e) { } catch (ResourceConflictException e) {
// change is closed // change is closed
return false; return false;
} catch (Exception e) {
String msg = "Failed to update mergeability flags for project "
+ change.getDest().getParentKey() + " on update of "
+ change.getDest().get();
log.error(msg, e);
throw e;
} finally { } finally {
tl.setContext(old); tl.setContext(old);
if (reviewDb != null) { if (reviewDb != null) {
@@ -296,65 +359,4 @@ public class MergeabilityChecker implements GitReferenceUpdatedListener {
} }
} }
} }
private abstract class UpdateTask implements
Callable<List<CheckedFuture<Boolean, IOException>>> {
private final SchemaFactory<ReviewDb> schemaFactory;
private final boolean force;
UpdateTask(SchemaFactory<ReviewDb> schemaFactory, boolean force) {
this.schemaFactory = schemaFactory;
this.force = force;
}
@Override
public List<CheckedFuture<Boolean, IOException>> call() throws Exception {
List<Change> openChanges;
ReviewDb db = schemaFactory.open();
try {
openChanges = loadChanges(db);
} finally {
db.close();
}
List<CheckedFuture<Boolean, IOException>> futures =
new ArrayList<>(openChanges.size());
for (Change change : mergeabilityCheckQueue.addAll(openChanges, force)) {
futures.add(updateAsync(change, force));
}
return futures;
}
protected abstract List<Change> loadChanges(ReviewDb db) throws OrmException;
}
private class BranchUpdateTask extends UpdateTask {
private final Branch.NameKey branch;
BranchUpdateTask(SchemaFactory<ReviewDb> schemaFactory,
Project.NameKey project, String ref) {
super(schemaFactory, false);
this.branch = new Branch.NameKey(project, ref);
}
@Override
protected List<Change> loadChanges(ReviewDb db) throws OrmException {
return db.changes().byBranchOpenAll(branch).toList();
}
}
private class ProjectUpdateTask extends UpdateTask {
private final Project.NameKey project;
ProjectUpdateTask(SchemaFactory<ReviewDb> schemaFactory,
Project.NameKey project, boolean force) {
super(schemaFactory, force);
this.project = project;
}
@Override
protected List<Change> loadChanges(ReviewDb db) throws OrmException {
return db.changes().byProjectOpenAll(project).toList();
}
}
} }

View File

@@ -28,4 +28,9 @@ import java.lang.annotation.Retention;
@Retention(RUNTIME) @Retention(RUNTIME)
@BindingAnnotation @BindingAnnotation
public @interface MergeabilityChecksExecutor { public @interface MergeabilityChecksExecutor {
public enum Priority {
BACKGROUND, INTERACTIVE;
}
Priority value();
} }

View File

@@ -14,6 +14,7 @@
package com.google.gerrit.server.change; package com.google.gerrit.server.change;
import com.google.gerrit.server.change.MergeabilityChecksExecutor.Priority;
import com.google.gerrit.server.config.GerritServerConfig; import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.git.WorkQueue; import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.AbstractModule; import com.google.inject.AbstractModule;
@@ -22,7 +23,6 @@ import com.google.inject.Singleton;
import org.eclipse.jgit.lib.Config; import org.eclipse.jgit.lib.Config;
/** Module providing the {@link MergeabilityChecksExecutor}. */ /** Module providing the {@link MergeabilityChecksExecutor}. */
public class MergeabilityChecksExecutorModule extends AbstractModule { public class MergeabilityChecksExecutorModule extends AbstractModule {
@Override @Override
@@ -31,11 +31,27 @@ public class MergeabilityChecksExecutorModule extends AbstractModule {
@Provides @Provides
@Singleton @Singleton
@MergeabilityChecksExecutor @MergeabilityChecksExecutor(Priority.BACKGROUND)
public WorkQueue.Executor createMergeabilityChecksExecutor( public WorkQueue.Executor createMergeabilityChecksExecutor(
@GerritServerConfig Config config, @GerritServerConfig Config config,
WorkQueue queues) { WorkQueue queues) {
int poolSize = config.getInt("changeMerge", null, "threadPoolSize", 1); int poolSize = config.getInt("changeMerge", null, "threadPoolSize", 1);
return queues.createQueue(poolSize, "MergeabilityChecks"); return queues.createQueue(poolSize, "MergeabilityChecks");
} }
@Provides
@Singleton
@MergeabilityChecksExecutor(Priority.INTERACTIVE)
public WorkQueue.Executor createMergeabilityChecksExecutor(
@GerritServerConfig Config config,
WorkQueue queues,
@MergeabilityChecksExecutor(Priority.BACKGROUND)
WorkQueue.Executor backgroundExecutor) {
int poolSize =
config.getInt("changeMerge", null, "interactiveThreadPoolSize", 1);
if (poolSize <= 0) {
return backgroundExecutor;
}
return queues.createQueue(poolSize, "InteractiveMergeabilityChecks");
}
} }

View File

@@ -306,8 +306,10 @@ public class PatchSetInserter {
} finally { } finally {
db.rollback(); db.rollback();
} }
CheckedFuture<?, IOException> f = CheckedFuture<?, IOException> f = mergeabilityChecker.newCheck()
mergeabilityChecker.updateAndIndexAsync(updatedChange); .addChange(updatedChange)
.reindex()
.runAsync();
if (runHooks) { if (runHooks) {
hooks.doPatchsetCreatedHook(updatedChange, patchSet, db); hooks.doPatchsetCreatedHook(updatedChange, patchSet, db);
} }

View File

@@ -2004,8 +2004,10 @@ public class ReceiveCommits {
if (cmd.getResult() == NOT_ATTEMPTED) { if (cmd.getResult() == NOT_ATTEMPTED) {
cmd.execute(rp); cmd.execute(rp);
} }
CheckedFuture<?, IOException> f = CheckedFuture<?, IOException> f = mergeabilityChecker.newCheck()
mergeabilityChecker.updateAndIndexAsync(change); .addChange(change)
.reindex()
.runAsync();
gitRefUpdated.fire(project.getNameKey(), newPatchSet.getRefName(), gitRefUpdated.fire(project.getNameKey(), newPatchSet.getRefName(),
ObjectId.zeroId(), newCommit); ObjectId.zeroId(), newCommit);
hooks.doPatchsetCreatedHook(change, newPatchSet, db); hooks.doPatchsetCreatedHook(change, newPatchSet, db);

View File

@@ -211,7 +211,7 @@ public class ChangeBatchIndexer {
private boolean updateMergeable(Project.NameKey project) { private boolean updateMergeable(Project.NameKey project) {
if (mergeabilityChecker != null) { if (mergeabilityChecker != null) {
try { try {
mergeabilityChecker.update(project); mergeabilityChecker.newCheck().addProject(project).run();
} catch (IOException e) { } catch (IOException e) {
log.error("Error in mergeability checker", e); log.error("Error in mergeability checker", e);
return false; return false;