Replace background mergeability checks with reindexing

Now that mergeability is checked lazily using a cache upon reindexing,
we can ditch the specific mergeability-rechecking codepath and just
depend on the cache to load properly. This reduces complexity of
background merge checking in a number of pleasant ways:

 - We don't have to maintain two sets of background threadpools.
 - Since the cache loader looks at project configuration and runs
   submit rules, we don't need to guess in the ref update listener
   whether submit rules might have changed in a way that affects
   mergeability. If the cache key is the same, it's a hit; if not, it
   isn't.
 - No more --recheck-mergeable flag to reindex; checks only happen for
   those changes that are actually outdated. (We might need to trigger
   a full mergeability recheck at some point in the future, but we can
   do that by simply flushing the persistent cache.)
 - The check interface is pared down; we never actually used arbitrary
   lists of projects/branches/changes.
 - The ref updated listener can always use background priority, while
   callers indexing a single change naturally always use interactive
   priority.
 - No manual locking to avoid duplicate work; LoadingCache takes care
   of that for us.
 - Mergeable is not used by the reindexing path, so we don't need to
   explicitly turn off reindexing in that handler.

The configuration values for mergeability check threadpool sizes are
now used as fallbacks when the index threadpool sizes are not
configured.

Change-Id: I88ae7f4ad417fcba5495b2a5e4b94adf11e2a21b
This commit is contained in:
Dave Borowitz 2014-10-23 10:24:01 -07:00
parent 787af5fbe8
commit 0916cc3605
23 changed files with 223 additions and 634 deletions

View File

@ -820,19 +820,22 @@ Default is 300 seconds (5 minutes).
[[changeMerge.threadPoolSize]]changeMerge.threadPoolSize::
+
Maximum size of the thread pool in which the mergeability flag of open
changes is updated.
_Deprecated:_ Formerly used to control thread pool size for background
mergeability checks. These checks were moved to the indexing threadpool,
so this value is now used for
link:#index.batchThreads[index.batchThreads], only if that value is not
provided.
+
Default is 1.
This option may be removed in a future version.
[[changeMerge.interactiveThreadPoolSize]]changeMerge.interactiveThreadPoolSize::
+
Maximum size of the thread pool in which the mergeability flag of open
changes is updated, when processing interactive user requests (e.g.
pushes to refs/for/*). Set to 0 or negative to share the pool for
background mergeability checks.
_Deprecated:_ Formerly used to control thread pool size for interactive
mergeability checks. These checks were moved to the indexing threadpool,
so this value is now used for link:#index.threads[index.threads], only
if that value is not provided.
+
Default is 1.
This option may be removed in a future version.
[[commentlink]]
=== Section commentlink
@ -2038,7 +2041,9 @@ By default, `LUCENE`.
+
Number of threads to use for indexing in normal interactive operations.
+
Defaults to 1 if not set, or set to a negative value.
Defaults to 1 if not set, or set to a negative value (unless
link:#changeMerge.interactiveThreadPoolSize[changeMerge.interactiveThreadPoolSize]
is iset).
[[index.batchThreads]]index.batchThreads::
+
@ -2046,7 +2051,8 @@ Number of threads to use for indexing in background operations, such as
online schema upgrades.
+
If not set or set to a negative value, defaults to using the same
thread pool as interactive operations.
thread pool as interactive operations (unless
link:#changeMerge.threadPoolSize[changeMerge.threadPoolSize] is set).
==== Lucene configuration

View File

@ -43,18 +43,10 @@ public class ReindexIT {
@Test
public void reindexEmptySite() throws Exception {
initSite();
runGerrit("reindex", "-d", sitePath.getPath(),
runGerrit("reindex", "-d", sitePath.toString(),
"--show-stack-trace");
}
@Test
public void reindexEmptySiteWithRecheckMergeable() throws Exception {
initSite();
runGerrit("reindex", "-d", sitePath.getPath(),
"--show-stack-trace",
"--recheck-mergeable");
}
private void initSite() throws Exception {
runGerrit("init", "-d", sitePath.getPath(),
"--batch", "--no-auto-start", "--skip-plugins", "--show-stack-trace");

View File

@ -36,12 +36,12 @@ import com.google.gerrit.server.IdentifiedUser;
import com.google.gerrit.server.account.GroupBackend;
import com.google.gerrit.server.change.ChangeResource;
import com.google.gerrit.server.change.ChangesCollection;
import com.google.gerrit.server.change.MergeabilityChecker;
import com.google.gerrit.server.change.PostReviewers;
import com.google.gerrit.server.config.AllProjectsNameProvider;
import com.google.gerrit.server.git.MetaDataUpdate;
import com.google.gerrit.server.git.ProjectConfig;
import com.google.gerrit.server.group.SystemGroupBackend;
import com.google.gerrit.server.index.ChangeIndexer;
import com.google.gerrit.server.mail.CreateChangeSender;
import com.google.gerrit.server.patch.PatchSetInfoFactory;
import com.google.gerrit.server.project.ProjectCache;
@ -79,7 +79,7 @@ public class ReviewProjectAccess extends ProjectAccessHandler<Change.Id> {
private final IdentifiedUser user;
private final PatchSetInfoFactory patchSetInfoFactory;
private final Provider<PostReviewers> reviewersProvider;
private final MergeabilityChecker mergeabilityChecker;
private final ChangeIndexer indexer;
private final ChangeHooks hooks;
private final CreateChangeSender.Factory createChangeSenderFactory;
private final ProjectCache projectCache;
@ -91,7 +91,8 @@ public class ReviewProjectAccess extends ProjectAccessHandler<Change.Id> {
MetaDataUpdate.User metaDataUpdateFactory, ReviewDb db,
IdentifiedUser user, PatchSetInfoFactory patchSetInfoFactory,
Provider<PostReviewers> reviewersProvider,
MergeabilityChecker mergeabilityChecker, ChangeHooks hooks,
ChangeIndexer indexer,
ChangeHooks hooks,
CreateChangeSender.Factory createChangeSenderFactory,
ProjectCache projectCache,
AllProjectsNameProvider allProjects,
@ -110,7 +111,7 @@ public class ReviewProjectAccess extends ProjectAccessHandler<Change.Id> {
this.user = user;
this.patchSetInfoFactory = patchSetInfoFactory;
this.reviewersProvider = reviewersProvider;
this.mergeabilityChecker = mergeabilityChecker;
this.indexer = indexer;
this.hooks = hooks;
this.createChangeSenderFactory = createChangeSenderFactory;
this.projectCache = projectCache;
@ -155,7 +156,7 @@ public class ReviewProjectAccess extends ProjectAccessHandler<Change.Id> {
} finally {
db.rollback();
}
mergeabilityChecker.newCheck().addChange(change).reindex().run();
indexer.index(db, change);
hooks.doPatchsetCreatedHook(change, ps, db);
try {
CreateChangeSender cm =

View File

@ -44,7 +44,6 @@ import com.google.gerrit.pgm.util.SiteProgram;
import com.google.gerrit.reviewdb.client.AuthType;
import com.google.gerrit.server.account.InternalAccountDirectory;
import com.google.gerrit.server.cache.h2.DefaultCacheFactory;
import com.google.gerrit.server.change.MergeabilityChecksExecutorModule;
import com.google.gerrit.server.config.AuthConfig;
import com.google.gerrit.server.config.AuthConfigModule;
import com.google.gerrit.server.config.CanonicalWebUrlModule;
@ -317,7 +316,6 @@ public class Daemon extends SiteProgram {
modules.add(new WorkQueue.Module());
modules.add(new ChangeHookRunner.Module());
modules.add(new ReceiveCommitsExecutorModule());
modules.add(new MergeabilityChecksExecutorModule());
modules.add(new IntraLineWorkerPool.Module());
modules.add(cfgInjector.getInstance(GerritGlobalModule.class));
modules.add(new InternalAccountDirectory.Module());

View File

@ -27,7 +27,6 @@ import com.google.gerrit.pgm.util.ThreadLimiter;
import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.change.MergeabilityChecker;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.index.ChangeBatchIndexer;
import com.google.gerrit.server.index.ChangeIndex;
@ -61,9 +60,6 @@ public class Reindex extends SiteProgram {
@Option(name = "--output", usage = "Prefix for output; path for local disk index, or prefix for remote index")
private String outputBase;
@Option(name = "--recheck-mergeable", usage = "Recheck mergeable flag on all changes")
private boolean recheckMergeable;
@Option(name = "--verbose", usage = "Output debug information for each change")
private boolean verbose;
@ -164,10 +160,6 @@ public class Reindex extends SiteProgram {
ChangeBatchIndexer batchIndexer =
sysInjector.getInstance(ChangeBatchIndexer.class);
if (recheckMergeable) {
batchIndexer.setMergeabilityChecker(
sysInjector.getInstance(MergeabilityChecker.class));
}
ChangeBatchIndexer.Result result = batchIndexer.setNumChanges(changeCount)
.setProgressOut(System.err)
.setVerboseOut(verbose ? System.out : NullOutputStream.INSTANCE)

View File

@ -31,8 +31,6 @@ import com.google.gerrit.server.cache.CacheRemovalListener;
import com.google.gerrit.server.cache.h2.DefaultCacheFactory;
import com.google.gerrit.server.change.ChangeKindCacheImpl;
import com.google.gerrit.server.change.MergeabilityCache;
import com.google.gerrit.server.change.MergeabilityChecksExecutor;
import com.google.gerrit.server.change.MergeabilityChecksExecutor.Priority;
import com.google.gerrit.server.change.PatchSetInserter;
import com.google.gerrit.server.config.CanonicalWebUrl;
import com.google.gerrit.server.config.CanonicalWebUrlProvider;
@ -42,7 +40,6 @@ import com.google.gerrit.server.config.FactoryModule;
import com.google.gerrit.server.git.ChangeCache;
import com.google.gerrit.server.git.MergeUtil;
import com.google.gerrit.server.git.TagCache;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.group.GroupModule;
import com.google.gerrit.server.mail.ReplacePatchSetSender;
import com.google.gerrit.server.notedb.NoteDbModule;
@ -56,8 +53,6 @@ import com.google.gerrit.server.project.SectionSortCache;
import com.google.gerrit.server.query.change.ChangeData;
import com.google.inject.Inject;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.TypeLiteral;
import com.google.inject.util.Providers;
@ -122,21 +117,4 @@ public class BatchProgramModule extends FactoryModule {
factory(ChangeData.Factory.class);
factory(ProjectState.Factory.class);
}
@Provides
@Singleton
@MergeabilityChecksExecutor(Priority.BACKGROUND)
public WorkQueue.Executor createMergeabilityChecksExecutor(
WorkQueue queues) {
return queues.createQueue(1, "MergeabilityChecks");
}
@Provides
@Singleton
@MergeabilityChecksExecutor(Priority.INTERACTIVE)
public WorkQueue.Executor createInteractiveMergeabilityChecksExecutor(
@MergeabilityChecksExecutor(Priority.BACKGROUND)
WorkQueue.Executor bg) {
return bg;
}
}

View File

@ -33,6 +33,7 @@ import com.google.gerrit.server.ChangeUtil;
import com.google.gerrit.server.account.AccountCache;
import com.google.gerrit.server.auth.AuthException;
import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
import com.google.gerrit.server.index.ChangeIndexer;
import com.google.gerrit.server.mail.CreateChangeSender;
import com.google.gerrit.server.notedb.ChangeUpdate;
import com.google.gerrit.server.patch.PatchSetInfoFactory;
@ -68,7 +69,7 @@ public class ChangeInserter {
private final ChangeHooks hooks;
private final ApprovalsUtil approvalsUtil;
private final ChangeMessagesUtil cmUtil;
private final MergeabilityChecker mergeabilityChecker;
private final ChangeIndexer indexer;
private final CreateChangeSender.Factory createChangeSenderFactory;
private final HashtagsUtil hashtagsUtil;
private final AccountCache accountCache;
@ -95,7 +96,7 @@ public class ChangeInserter {
ChangeHooks hooks,
ApprovalsUtil approvalsUtil,
ChangeMessagesUtil cmUtil,
MergeabilityChecker mergeabilityChecker,
ChangeIndexer indexer,
CreateChangeSender.Factory createChangeSenderFactory,
HashtagsUtil hashtagsUtil,
AccountCache accountCache,
@ -108,7 +109,7 @@ public class ChangeInserter {
this.hooks = hooks;
this.approvalsUtil = approvalsUtil;
this.cmUtil = cmUtil;
this.mergeabilityChecker = mergeabilityChecker;
this.indexer = indexer;
this.createChangeSenderFactory = createChangeSenderFactory;
this.hashtagsUtil = hashtagsUtil;
this.accountCache = accountCache;
@ -221,10 +222,7 @@ public class ChangeInserter {
}
}
CheckedFuture<?, IOException> f = mergeabilityChecker.newCheck()
.addChange(change)
.reindex()
.runAsync();
CheckedFuture<?, IOException> f = indexer.indexAsync(change.getId());
if(!messageIsForChange()) {
commitMessageNotForChange();

View File

@ -273,7 +273,10 @@ public class ChangeJson {
out.topic = in.getTopic();
out.hashtags = ctl.getNotes().load().getHashtags();
out.changeId = in.getKey().get();
out.mergeable = isMergeable(in);
// TODO(dborowitz): This gets the submit type, so we could include that in
// the response and avoid making a request to /submit_type from the UI.
out.mergeable = in.getStatus() == Change.Status.MERGED
? null : cd.isMergeable();
ChangedLines changedLines = cd.changedLines();
if (changedLines != null) {
out.insertions = changedLines.insertions;
@ -344,14 +347,6 @@ public class ChangeJson {
return out;
}
private Boolean isMergeable(Change c) {
if (c.getStatus() == Change.Status.MERGED
|| c.getLastSha1MergeTested() == null) {
return null;
}
return c.isMergeable();
}
private List<SubmitRecord> submitRecords(ChangeData cd) throws OrmException {
if (cd.getSubmitRecords() != null) {
return cd.getSubmitRecords();

View File

@ -1,45 +0,0 @@
// Copyright (C) 2013 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.change;
import com.google.common.collect.Sets;
import com.google.gerrit.reviewdb.client.Change;
import com.google.inject.Singleton;
import java.util.Collection;
import java.util.Set;
@Singleton
class MergeabilityCheckQueue {
private final Set<Change.Id> pending = Sets.newHashSet();
private final Set<Change.Id> forcePending = Sets.newHashSet();
synchronized Set<Change> addAll(Collection<Change> changes, boolean force) {
Set<Change> r = Sets.newLinkedHashSetWithExpectedSize(changes.size());
for (Change c : changes) {
if (force ? forcePending.add(c.getId()) : pending.add(c.getId())) {
r.add(c);
}
}
return r;
}
synchronized void updatingMergeabilityFlag(Change change, boolean force) {
if (force) {
forcePending.remove(change.getId());
}
pending.remove(change.getId());
}
}

View File

@ -1,366 +0,0 @@
// Copyright (C) 2013 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.change;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
import com.google.gerrit.extensions.restapi.ResourceConflictException;
import com.google.gerrit.reviewdb.client.Branch;
import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.reviewdb.client.PatchSet;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.reviewdb.client.RefNames;
import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.CurrentUser;
import com.google.gerrit.server.IdentifiedUser;
import com.google.gerrit.server.change.MergeabilityChecksExecutor.Priority;
import com.google.gerrit.server.git.MetaDataUpdate;
import com.google.gerrit.server.git.ProjectConfig;
import com.google.gerrit.server.git.WorkQueue.Executor;
import com.google.gerrit.server.index.ChangeIndexer;
import com.google.gerrit.server.project.ChangeControl;
import com.google.gerrit.server.util.RequestContext;
import com.google.gerrit.server.util.ThreadLocalRequestContext;
import com.google.gwtorm.server.OrmException;
import com.google.gwtorm.server.SchemaFactory;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.ProvisionException;
import org.eclipse.jgit.errors.ConfigInvalidException;
import org.eclipse.jgit.errors.RepositoryNotFoundException;
import org.eclipse.jgit.lib.Constants;
import org.eclipse.jgit.lib.ObjectId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
public class MergeabilityChecker implements GitReferenceUpdatedListener {
private static final Logger log = LoggerFactory
.getLogger(MergeabilityChecker.class);
private static final Function<Exception, IOException> MAPPER =
new Function<Exception, IOException>() {
@Override
public IOException apply(Exception in) {
if (in instanceof IOException) {
return (IOException) in;
} else if (in instanceof ExecutionException
&& in.getCause() instanceof IOException) {
return (IOException) in.getCause();
} else {
return new IOException(in);
}
}
};
public class Check {
private List<Change> changes;
private List<Branch.NameKey> branches;
private List<Project.NameKey> projects;
private boolean force;
private boolean reindex;
private boolean interactive;
private Check() {
changes = Lists.newArrayListWithExpectedSize(1);
branches = Lists.newArrayListWithExpectedSize(1);
projects = Lists.newArrayListWithExpectedSize(1);
interactive = true;
}
public Check addChange(Change change) {
changes.add(change);
return this;
}
public Check addBranch(Branch.NameKey branch) {
branches.add(branch);
interactive = false;
return this;
}
public Check addProject(Project.NameKey project) {
projects.add(project);
interactive = false;
return this;
}
/** Force reindexing regardless of whether mergeable flag was modified. */
public Check reindex() {
reindex = true;
return this;
}
/** Force mergeability check even if change is not stale. */
private Check force() {
force = true;
return this;
}
private ListeningExecutorService getExecutor() {
return interactive ? interactiveExecutor : backgroundExecutor;
}
public CheckedFuture<?, IOException> runAsync() {
final ListeningExecutorService executor = getExecutor();
ListenableFuture<List<Change>> getChanges;
if (branches.isEmpty() && projects.isEmpty()) {
getChanges = Futures.immediateFuture(changes);
} else {
getChanges = executor.submit(
new Callable<List<Change>>() {
@Override
public List<Change> call() throws OrmException {
return getChanges();
}
});
}
return Futures.makeChecked(Futures.transform(getChanges,
new AsyncFunction<List<Change>, List<Object>>() {
@Override
public ListenableFuture<List<Object>> apply(List<Change> changes) {
List<ListenableFuture<?>> result =
Lists.newArrayListWithCapacity(changes.size());
for (final Change c : changes) {
// Don't try to guess whether Mergeable will reindex; just turn
// off reindexing in that code path and do it explicitly below.
ListenableFuture<Void> b =
executor.submit(new Task(c, force, !reindex));
if (reindex) {
result.add(Futures.transform(
b, new AsyncFunction<Void, Void>() {
@SuppressWarnings("unchecked")
@Override
public ListenableFuture<Void> apply(Void o) {
return (ListenableFuture<Void>)
indexer.indexAsync(c.getId());
}
}));
} else {
result.add(b);
}
}
return Futures.allAsList(result);
}
}), MAPPER);
}
public void run() throws IOException {
try {
runAsync().checkedGet();
} catch (Exception e) {
Throwables.propagateIfPossible(e, IOException.class);
throw MAPPER.apply(e);
}
}
private List<Change> getChanges() throws OrmException {
ReviewDb db = schemaFactory.open();
try {
List<Change> results = Lists.newArrayList();
results.addAll(changes);
for (Project.NameKey p : projects) {
Iterables.addAll(results, db.changes().byProjectOpenAll(p));
}
for (Branch.NameKey b : branches) {
Iterables.addAll(results, db.changes().byBranchOpenAll(b));
}
return results;
} catch (OrmException e) {
log.error("Failed to fetch changes for mergeability check", e);
throw e;
} finally {
db.close();
}
}
}
private final ThreadLocalRequestContext tl;
private final SchemaFactory<ReviewDb> schemaFactory;
private final IdentifiedUser.GenericFactory identifiedUserFactory;
private final ChangeControl.GenericFactory changeControlFactory;
private final Provider<Mergeable> mergeable;
private final ChangeIndexer indexer;
private final ListeningExecutorService backgroundExecutor;
private final ListeningExecutorService interactiveExecutor;
private final MergeabilityCheckQueue mergeabilityCheckQueue;
private final MetaDataUpdate.Server metaDataUpdateFactory;
@Inject
public MergeabilityChecker(ThreadLocalRequestContext tl,
SchemaFactory<ReviewDb> schemaFactory,
IdentifiedUser.GenericFactory identifiedUserFactory,
ChangeControl.GenericFactory changeControlFactory,
Provider<Mergeable> mergeable, ChangeIndexer indexer,
@MergeabilityChecksExecutor(Priority.BACKGROUND)
Executor backgroundExecutor,
@MergeabilityChecksExecutor(Priority.INTERACTIVE)
Executor interactiveExecutor,
MergeabilityCheckQueue mergeabilityCheckQueue,
MetaDataUpdate.Server metaDataUpdateFactory) {
this.tl = tl;
this.schemaFactory = schemaFactory;
this.identifiedUserFactory = identifiedUserFactory;
this.changeControlFactory = changeControlFactory;
this.mergeable = mergeable;
this.indexer = indexer;
this.backgroundExecutor =
MoreExecutors.listeningDecorator(backgroundExecutor);
this.interactiveExecutor =
MoreExecutors.listeningDecorator(interactiveExecutor);
this.mergeabilityCheckQueue = mergeabilityCheckQueue;
this.metaDataUpdateFactory = metaDataUpdateFactory;
}
public Check newCheck() {
return new Check();
}
@Override
public void onGitReferenceUpdated(GitReferenceUpdatedListener.Event event) {
String ref = event.getRefName();
if (ref.startsWith(Constants.R_HEADS) || ref.equals(RefNames.REFS_CONFIG)) {
Branch.NameKey branch = new Branch.NameKey(
new Project.NameKey(event.getProjectName()), ref);
newCheck().addBranch(branch).runAsync();
}
if (ref.equals(RefNames.REFS_CONFIG)) {
Project.NameKey p = new Project.NameKey(event.getProjectName());
try {
ProjectConfig oldCfg = parseConfig(p, event.getOldObjectId());
ProjectConfig newCfg = parseConfig(p, event.getNewObjectId());
if (recheckMerges(oldCfg, newCfg)) {
newCheck().addProject(p).force().runAsync();
}
} catch (ConfigInvalidException | IOException e) {
String msg = "Failed to update mergeability flags for project " + p.get()
+ " on update of " + RefNames.REFS_CONFIG;
log.error(msg, e);
throw new RuntimeException(msg, e);
}
}
}
private boolean recheckMerges(ProjectConfig oldCfg, ProjectConfig newCfg) {
if (oldCfg == null || newCfg == null) {
return true;
}
return !oldCfg.getProject().getSubmitType().equals(newCfg.getProject().getSubmitType())
|| oldCfg.getProject().getUseContentMerge() != newCfg.getProject().getUseContentMerge()
|| (oldCfg.getRulesId() == null
? newCfg.getRulesId() != null
: !oldCfg.getRulesId().equals(newCfg.getRulesId()));
}
private ProjectConfig parseConfig(Project.NameKey p, String idStr)
throws IOException, ConfigInvalidException, RepositoryNotFoundException {
ObjectId id = ObjectId.fromString(idStr);
if (ObjectId.zeroId().equals(id)) {
return null;
}
return ProjectConfig.read(metaDataUpdateFactory.create(p), id);
}
private class Task implements Callable<Void> {
private final Change change;
private final boolean force;
private final boolean reindex;
private ReviewDb reviewDb;
Task(Change change, boolean force, boolean reindex) {
this.change = change;
this.force = force;
this.reindex = reindex;
}
@Override
public Void call() throws Exception {
mergeabilityCheckQueue.updatingMergeabilityFlag(change, force);
RequestContext context = new RequestContext() {
@Override
public CurrentUser getCurrentUser() {
return identifiedUserFactory.create(change.getOwner());
}
@Override
public Provider<ReviewDb> getReviewDbProvider() {
return new Provider<ReviewDb>() {
@Override
public ReviewDb get() {
if (reviewDb == null) {
try {
reviewDb = schemaFactory.open();
} catch (OrmException e) {
throw new ProvisionException("Cannot open ReviewDb", e);
}
}
return reviewDb;
}
};
}
};
RequestContext old = tl.setContext(context);
ReviewDb db = context.getReviewDbProvider().get();
try {
PatchSet ps = db.patchSets().get(change.currentPatchSetId());
if (ps == null) {
// Cannot compute mergeability if current patch set is missing.
return null;
}
Mergeable m = mergeable.get();
m.setForce(force);
m.setReindex(reindex);
ChangeControl control =
changeControlFactory.controlFor(change, context.getCurrentUser());
m.apply(new RevisionResource(new ChangeResource(control), ps));
return null;
} catch (ResourceConflictException e) {
// change is closed
return null;
} catch (Exception e) {
log.error(String.format(
"cannot update mergeability flag of change %d in project %s after update of %s",
change.getId().get(),
change.getDest().getParentKey(), change.getDest().get()), e);
throw e;
} finally {
tl.setContext(old);
if (reviewDb != null) {
reviewDb.close();
reviewDb = null;
}
}
}
}
}

View File

@ -1,36 +0,0 @@
// Copyright (C) 2013 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.change;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import com.google.gerrit.server.git.WorkQueue.Executor;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.Retention;
/**
* Marker on the global {@link Executor} used by
* {@link MergeabilityChecker}.
*/
@Retention(RUNTIME)
@BindingAnnotation
public @interface MergeabilityChecksExecutor {
public enum Priority {
BACKGROUND, INTERACTIVE;
}
Priority value();
}

View File

@ -1,57 +0,0 @@
// Copyright (C) 2013 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.change;
import com.google.gerrit.server.change.MergeabilityChecksExecutor.Priority;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import org.eclipse.jgit.lib.Config;
/** Module providing the {@link MergeabilityChecksExecutor}. */
public class MergeabilityChecksExecutorModule extends AbstractModule {
@Override
protected void configure() {
}
@Provides
@Singleton
@MergeabilityChecksExecutor(Priority.BACKGROUND)
public WorkQueue.Executor createMergeabilityChecksExecutor(
@GerritServerConfig Config config,
WorkQueue queues) {
int poolSize = config.getInt("changeMerge", null, "threadPoolSize", 1);
return queues.createQueue(poolSize, "MergeabilityChecks-Background");
}
@Provides
@Singleton
@MergeabilityChecksExecutor(Priority.INTERACTIVE)
public WorkQueue.Executor createMergeabilityChecksExecutor(
@GerritServerConfig Config config,
WorkQueue queues,
@MergeabilityChecksExecutor(Priority.BACKGROUND)
WorkQueue.Executor backgroundExecutor) {
int poolSize =
config.getInt("changeMerge", null, "interactiveThreadPoolSize", 1);
if (poolSize <= 0) {
return backgroundExecutor;
}
return queues.createQueue(poolSize, "MergeabilityChecks-Interactive");
}
}

View File

@ -70,10 +70,6 @@ public class Mergeable implements RestReadView<RevisionResource> {
this.force = force;
}
public void setReindex(boolean reindex) {
this.reindex = reindex;
}
private final GitRepositoryManager gitManager;
private final ProjectCache projectCache;
private final MergeUtil.Factory mergeUtilFactory;
@ -83,7 +79,6 @@ public class Mergeable implements RestReadView<RevisionResource> {
private final MergeabilityCache cache;
private boolean force;
private boolean reindex;
@Inject
Mergeable(GitRepositoryManager gitManager,
@ -100,7 +95,6 @@ public class Mergeable implements RestReadView<RevisionResource> {
this.db = db;
this.indexer = indexer;
this.cache = cache;
reindex = true;
}
@Override
@ -196,7 +190,7 @@ public class Mergeable implements RestReadView<RevisionResource> {
}
}
});
if (reindex && c != null) {
if (c != null) {
indexer.index(db.get(), c);
}
return mergeable;

View File

@ -37,6 +37,7 @@ import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
import com.google.gerrit.server.git.BanCommit;
import com.google.gerrit.server.git.validators.CommitValidationException;
import com.google.gerrit.server.git.validators.CommitValidators;
import com.google.gerrit.server.index.ChangeIndexer;
import com.google.gerrit.server.mail.ReplacePatchSetSender;
import com.google.gerrit.server.notedb.ChangeUpdate;
import com.google.gerrit.server.notedb.ReviewerState;
@ -89,7 +90,7 @@ public class PatchSetInserter {
private final ChangeControl.GenericFactory ctlFactory;
private final GitReferenceUpdated gitRefUpdated;
private final CommitValidators.Factory commitValidatorsFactory;
private final MergeabilityChecker mergeabilityChecker;
private final ChangeIndexer indexer;
private final ReplacePatchSetSender.Factory replacePatchSetFactory;
private final ApprovalsUtil approvalsUtil;
private final ApprovalCopier approvalCopier;
@ -122,7 +123,7 @@ public class PatchSetInserter {
PatchSetInfoFactory patchSetInfoFactory,
GitReferenceUpdated gitRefUpdated,
CommitValidators.Factory commitValidatorsFactory,
MergeabilityChecker mergeabilityChecker,
ChangeIndexer indexer,
ReplacePatchSetSender.Factory replacePatchSetFactory,
@Assisted Repository git,
@Assisted RevWalk revWalk,
@ -141,7 +142,7 @@ public class PatchSetInserter {
this.patchSetInfoFactory = patchSetInfoFactory;
this.gitRefUpdated = gitRefUpdated;
this.commitValidatorsFactory = commitValidatorsFactory;
this.mergeabilityChecker = mergeabilityChecker;
this.indexer = indexer;
this.replacePatchSetFactory = replacePatchSetFactory;
this.git = git;
@ -316,10 +317,7 @@ public class PatchSetInserter {
} finally {
db.rollback();
}
mergeabilityChecker.newCheck()
.addChange(updatedChange)
.reindex()
.run();
indexer.index(db, c);
if (runHooks) {
hooks.doPatchsetCreatedHook(updatedChange, patchSet, db);
}

View File

@ -30,6 +30,7 @@ import com.google.gerrit.server.ChangeMessagesUtil;
import com.google.gerrit.server.ChangeUtil;
import com.google.gerrit.server.IdentifiedUser;
import com.google.gerrit.server.change.ChangeJson.ChangeInfo;
import com.google.gerrit.server.index.ChangeIndexer;
import com.google.gerrit.server.mail.ReplyToChangeSender;
import com.google.gerrit.server.mail.RestoredSender;
import com.google.gerrit.server.notedb.ChangeUpdate;
@ -54,7 +55,7 @@ public class Restore implements RestModifyView<ChangeResource, RestoreInput>,
private final RestoredSender.Factory restoredSenderFactory;
private final Provider<ReviewDb> dbProvider;
private final ChangeJson json;
private final MergeabilityChecker mergeabilityChecker;
private final ChangeIndexer indexer;
private final ChangeMessagesUtil cmUtil;
private final ChangeUpdate.Factory updateFactory;
@ -63,14 +64,14 @@ public class Restore implements RestModifyView<ChangeResource, RestoreInput>,
RestoredSender.Factory restoredSenderFactory,
Provider<ReviewDb> dbProvider,
ChangeJson json,
MergeabilityChecker mergeabilityChecker,
ChangeIndexer indexer,
ChangeMessagesUtil cmUtil,
ChangeUpdate.Factory updateFactory) {
this.hooks = hooks;
this.restoredSenderFactory = restoredSenderFactory;
this.dbProvider = dbProvider;
this.json = json;
this.mergeabilityChecker = mergeabilityChecker;
this.indexer = indexer;
this.cmUtil = cmUtil;
this.updateFactory = updateFactory;
}
@ -121,10 +122,7 @@ public class Restore implements RestModifyView<ChangeResource, RestoreInput>,
}
update.commit();
CheckedFuture<?, IOException> f = mergeabilityChecker.newCheck()
.addChange(change)
.reindex()
.runAsync();
CheckedFuture<?, IOException> f = indexer.indexAsync(change.getId());
try {
ReplyToChangeSender cm = restoredSenderFactory.create(change);

View File

@ -72,7 +72,6 @@ import com.google.gerrit.server.avatar.AvatarProvider;
import com.google.gerrit.server.cache.CacheRemovalListener;
import com.google.gerrit.server.change.ChangeKindCacheImpl;
import com.google.gerrit.server.change.MergeabilityCache;
import com.google.gerrit.server.change.MergeabilityChecker;
import com.google.gerrit.server.events.EventFactory;
import com.google.gerrit.server.extensions.events.GitReferenceUpdated;
import com.google.gerrit.server.git.ChangeCache;
@ -95,6 +94,7 @@ import com.google.gerrit.server.git.validators.RefOperationValidators;
import com.google.gerrit.server.git.validators.UploadValidationListener;
import com.google.gerrit.server.git.validators.UploadValidators;
import com.google.gerrit.server.group.GroupModule;
import com.google.gerrit.server.index.ReindexAfterUpdate;
import com.google.gerrit.server.mail.AddReviewerSender;
import com.google.gerrit.server.mail.CreateChangeSender;
import com.google.gerrit.server.mail.EmailModule;
@ -268,7 +268,7 @@ public class GerritGlobalModule extends FactoryModule {
DynamicSet.setOf(binder(), HeadUpdatedListener.class);
DynamicSet.setOf(binder(), UsageDataPublishedListener.class);
DynamicSet.bind(binder(), GitReferenceUpdatedListener.class).to(ChangeCache.class);
DynamicSet.bind(binder(), GitReferenceUpdatedListener.class).to(MergeabilityChecker.class);
DynamicSet.bind(binder(), GitReferenceUpdatedListener.class).to(ReindexAfterUpdate.class);
DynamicSet.bind(binder(), GitReferenceUpdatedListener.class)
.to(ProjectConfigEntry.UpdateChecker.class);
DynamicSet.setOf(binder(), ChangeListener.class);

View File

@ -96,5 +96,11 @@ public class GitReferenceUpdated {
public String getNewObjectId() {
return newObjectId;
}
@Override
public String toString() {
return String.format("%s[%s,%s: %s -> %s]", getClass().getSimpleName(),
projectName, ref, oldObjectId, newObjectId);
}
}
}

View File

@ -82,7 +82,6 @@ import com.google.gerrit.server.change.ChangeInserter;
import com.google.gerrit.server.change.ChangeKind;
import com.google.gerrit.server.change.ChangeKindCache;
import com.google.gerrit.server.change.ChangesCollection;
import com.google.gerrit.server.change.MergeabilityChecker;
import com.google.gerrit.server.change.RevisionResource;
import com.google.gerrit.server.change.Submit;
import com.google.gerrit.server.config.AllProjectsName;
@ -300,7 +299,6 @@ public class ReceiveCommits {
private final ListeningExecutorService changeUpdateExector;
private final RequestScopePropagator requestScopePropagator;
private final ChangeIndexer indexer;
private final MergeabilityChecker mergeabilityChecker;
private final SshInfo sshInfo;
private final AllProjectsName allProjectsName;
private final ReceiveConfig receiveConfig;
@ -370,7 +368,6 @@ public class ReceiveCommits {
@ChangeUpdateExecutor ListeningExecutorService changeUpdateExector,
final RequestScopePropagator requestScopePropagator,
final ChangeIndexer indexer,
final MergeabilityChecker mergeabilityChecker,
final SshInfo sshInfo,
final AllProjectsName allProjectsName,
ReceiveConfig config,
@ -410,7 +407,6 @@ public class ReceiveCommits {
this.changeUpdateExector = changeUpdateExector;
this.requestScopePropagator = requestScopePropagator;
this.indexer = indexer;
this.mergeabilityChecker = mergeabilityChecker;
this.sshInfo = sshInfo;
this.allProjectsName = allProjectsName;
this.receiveConfig = config;
@ -2159,10 +2155,7 @@ public class ReceiveCommits {
if (cmd.getResult() == NOT_ATTEMPTED) {
cmd.execute(rp);
}
CheckedFuture<?, IOException> f = mergeabilityChecker.newCheck()
.addChange(change)
.reindex()
.runAsync();
CheckedFuture<?, IOException> f = indexer.indexAsync(change.getId());
workQueue.getDefaultQueue()
.submit(requestScopePropagator.wrap(new Runnable() {
@Override

View File

@ -32,7 +32,6 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.change.MergeabilityChecker;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.git.GitRepositoryManager;
import com.google.gerrit.server.git.MergeUtil;
@ -118,7 +117,6 @@ public class ChangeBatchIndexer {
private final ChangeIndexer.Factory indexerFactory;
private final ThreeWayMergeStrategy mergeStrategy;
private MergeabilityChecker mergeabilityChecker;
private int numChanges = -1;
private OutputStream progressOut = NullOutputStream.INSTANCE;
private PrintWriter verboseWriter =
@ -139,12 +137,6 @@ public class ChangeBatchIndexer {
this.mergeStrategy = MergeUtil.getMergeStrategy(config);
}
public ChangeBatchIndexer setMergeabilityChecker(
MergeabilityChecker checker) {
mergeabilityChecker = checkNotNull(checker);
return this;
}
public ChangeBatchIndexer setNumChanges(int num) {
numChanges = num;
return this;
@ -177,9 +169,6 @@ public class ChangeBatchIndexer {
final AtomicBoolean ok = new AtomicBoolean(true);
for (final Project.NameKey project : projects) {
if (!updateMergeable(project)) {
ok.set(false);
}
final ListenableFuture<?> future = executor.submit(reindexProject(
indexerFactory.create(executor, index), project, doneTask, failedTask,
verboseWriter));
@ -235,18 +224,6 @@ public class ChangeBatchIndexer {
return new Result(sw, ok.get(), doneTask.getCount(), failedTask.getCount());
}
private boolean updateMergeable(Project.NameKey project) {
if (mergeabilityChecker != null) {
try {
mergeabilityChecker.newCheck().addProject(project).run();
} catch (IOException e) {
log.error("Error in mergeability checker", e);
return false;
}
}
return true;
}
private Callable<Void> reindexProject(final ChangeIndexer indexer,
final Project.NameKey project, final Task done, final Task failed,
final PrintWriter verboseWriter) {

View File

@ -99,6 +99,10 @@ public class IndexModule extends LifecycleModule {
if (threads <= 0) {
threads = config.getInt("index", null, "threads", 0);
}
if (threads <= 0) {
threads =
config.getInt("changeMerge", null, "interactiveThreadPoolSize", 0);
}
if (threads <= 0) {
return MoreExecutors.newDirectExecutorService();
}
@ -117,6 +121,9 @@ public class IndexModule extends LifecycleModule {
return batchExecutor;
}
int threads = config.getInt("index", null, "batchThreads", 0);
if (threads <= 0) {
threads = config.getInt("changeMerge", null, "threadPoolSize", 0);
}
if (threads <= 0) {
return interactive;
}

View File

@ -0,0 +1,164 @@
// Copyright (C) 2014 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server.index;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.gerrit.extensions.events.GitReferenceUpdatedListener;
import com.google.gerrit.reviewdb.client.Branch;
import com.google.gerrit.reviewdb.client.Change;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.reviewdb.client.RefNames;
import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.CurrentUser;
import com.google.gerrit.server.IdentifiedUser;
import com.google.gerrit.server.git.QueueProvider.QueueType;
import com.google.gerrit.server.util.RequestContext;
import com.google.gerrit.server.util.ThreadLocalRequestContext;
import com.google.gwtorm.server.OrmException;
import com.google.gwtorm.server.SchemaFactory;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.util.Providers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
public class ReindexAfterUpdate implements GitReferenceUpdatedListener {
private static final Logger log = LoggerFactory
.getLogger(ReindexAfterUpdate.class);
private final ThreadLocalRequestContext tl;
private final SchemaFactory<ReviewDb> schemaFactory;
private final IdentifiedUser.GenericFactory userFactory;
private final ChangeIndexer.Factory indexerFactory;
private final IndexCollection indexes;
private final ListeningExecutorService executor;
@Inject
ReindexAfterUpdate(
ThreadLocalRequestContext tl,
SchemaFactory<ReviewDb> schemaFactory,
IdentifiedUser.GenericFactory userFactory,
ChangeIndexer.Factory indexerFactory,
IndexCollection indexes,
@IndexExecutor(QueueType.BATCH) ListeningExecutorService executor) {
this.tl = tl;
this.schemaFactory = schemaFactory;
this.userFactory = userFactory;
this.indexerFactory = indexerFactory;
this.indexes = indexes;
this.executor = executor;
}
@Override
public void onGitReferenceUpdated(final Event event) {
Futures.transform(
executor.submit(new GetChanges(event)),
new AsyncFunction<List<Change>, List<Void>>() {
@Override
public ListenableFuture<List<Void>> apply(List<Change> changes) {
List<ListenableFuture<Void>> result =
Lists.newArrayListWithCapacity(changes.size());
for (Change c : changes) {
result.add(executor.submit(new Index(event, c)));
}
return Futures.allAsList(result);
}
});
}
private abstract class Task<V> implements Callable<V> {
protected ReviewDb db;
protected Event event;
protected Task(Event event) {
this.event = event;
}
@Override
public final V call() throws Exception {
try {
db = schemaFactory.open();
return impl();
} catch (Exception e) {
log.error("Failed to reindex changes after " + event, e);
throw e;
} finally {
if (db != null) {
db.close();
}
}
}
protected abstract V impl() throws Exception;
}
private class GetChanges extends Task<List<Change>> {
private GetChanges(Event event) {
super(event);
}
@Override
protected List<Change> impl() throws OrmException {
String ref = event.getRefName();
Project.NameKey project = new Project.NameKey(event.getProjectName());
if (ref.equals(RefNames.REFS_CONFIG)) {
return db.changes().byProjectOpenAll(project).toList();
} else {
return db.changes().byBranchOpenAll(new Branch.NameKey(project, ref))
.toList();
}
}
}
private class Index extends Task<Void> {
private final Change change;
Index(Event event, Change change) {
super(event);
this.change = change;
}
@Override
protected Void impl() throws IOException {
RequestContext context = new RequestContext() {
@Override
public CurrentUser getCurrentUser() {
return userFactory.create(change.getOwner());
}
@Override
public Provider<ReviewDb> getReviewDbProvider() {
return Providers.of(db);
}
};
RequestContext old = tl.setContext(context);
try {
indexerFactory.create(executor, indexes).index(db, change);
return null;
} finally {
tl.setContext(old);
}
}
}
}

View File

@ -26,7 +26,6 @@ import com.google.gerrit.server.GerritPersonIdent;
import com.google.gerrit.server.GerritPersonIdentProvider;
import com.google.gerrit.server.RemotePeer;
import com.google.gerrit.server.cache.h2.DefaultCacheFactory;
import com.google.gerrit.server.change.MergeabilityChecksExecutorModule;
import com.google.gerrit.server.config.AllProjectsName;
import com.google.gerrit.server.config.AllProjectsNameProvider;
import com.google.gerrit.server.config.AllUsersName;
@ -168,7 +167,6 @@ public class InMemoryModule extends FactoryModule {
install(new DefaultCacheFactory.Module());
install(new SmtpEmailSender.Module());
install(new SignedTokenEmailTokenVerifier.Module());
install(new MergeabilityChecksExecutorModule());
IndexType indexType = null;
try {

View File

@ -27,7 +27,6 @@ import com.google.gerrit.lucene.LuceneIndexModule;
import com.google.gerrit.reviewdb.client.AuthType;
import com.google.gerrit.server.account.InternalAccountDirectory;
import com.google.gerrit.server.cache.h2.DefaultCacheFactory;
import com.google.gerrit.server.change.MergeabilityChecksExecutorModule;
import com.google.gerrit.server.config.AuthConfig;
import com.google.gerrit.server.config.AuthConfigModule;
import com.google.gerrit.server.config.CanonicalWebUrlModule;
@ -278,7 +277,6 @@ public class WebAppInitializer extends GuiceServletContextListener
modules.add(new WorkQueue.Module());
modules.add(new ChangeHookRunner.Module());
modules.add(new ReceiveCommitsExecutorModule());
modules.add(new MergeabilityChecksExecutorModule());
modules.add(new IntraLineWorkerPool.Module());
modules.add(cfgInjector.getInstance(GerritGlobalModule.class));
modules.add(new InternalAccountDirectory.Module());