diff --git a/Documentation/config-gerrit.txt b/Documentation/config-gerrit.txt index 444f5b1a8e..cadab83cfc 100644 --- a/Documentation/config-gerrit.txt +++ b/Documentation/config-gerrit.txt @@ -3925,6 +3925,15 @@ which miscellaneous tasks are handled. + Default is 1. +[[execution.fanOutThreadPoolSize]]execution.fanOutThreadPoolSize:: ++ +Maximum size of thread pool to on which a serving thread can fan-out +work to parallelize it. ++ +When set to 0, a direct executor will be used. ++ +By default, 25 which means that formatting happens in the caller thread. + [[receiveemail]] === Section receiveemail diff --git a/java/com/google/gerrit/server/FanOutExecutor.java b/java/com/google/gerrit/server/FanOutExecutor.java new file mode 100644 index 0000000000..a489890eb6 --- /dev/null +++ b/java/com/google/gerrit/server/FanOutExecutor.java @@ -0,0 +1,27 @@ +// Copyright (C) 2018 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.gerrit.server; + +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +import com.google.inject.BindingAnnotation; +import java.lang.annotation.Retention; + +/** + * Marker on the global {@code ThreadPoolExecutor} used to do parallel work from a serving thread. + */ +@Retention(RUNTIME) +@BindingAnnotation +public @interface FanOutExecutor {} diff --git a/java/com/google/gerrit/server/account/AccountCache.java b/java/com/google/gerrit/server/account/AccountCache.java index b6ca1cbe5b..17493bf842 100644 --- a/java/com/google/gerrit/server/account/AccountCache.java +++ b/java/com/google/gerrit/server/account/AccountCache.java @@ -16,7 +16,9 @@ package com.google.gerrit.server.account; import com.google.gerrit.common.Nullable; import com.google.gerrit.reviewdb.client.Account; +import java.util.Map; import java.util.Optional; +import java.util.Set; /** Caches important (but small) account state to avoid database hits. */ public interface AccountCache { @@ -30,6 +32,20 @@ public interface AccountCache { */ Optional get(Account.Id accountId); + /** + * Returns a {@code Map} of {@code Account.Id} to {@code AccountState} for the given account IDs. + * If not cached yet the accounts are loaded. If an account can't be loaded (e.g. because it is + * missing), the entry will be missing from the result. + * + *

Loads accounts in parallel if applicable. + * + * @param accountIds IDs of the account that should be retrieved + * @return {@code Map} of {@code Account.Id} to {@code AccountState} instances for the given + * account IDs, if an account can't be loaded (e.g. because it is missing), the entry will be + * missing from the result + */ + Map get(Set accountIds); + /** * Returns an {@code AccountState} instance for the given account ID. If not cached yet the * account is loaded. Returns an empty {@code AccountState} instance to represent a missing diff --git a/java/com/google/gerrit/server/account/AccountCacheImpl.java b/java/com/google/gerrit/server/account/AccountCacheImpl.java index 895c5b68ba..e72e6ea476 100644 --- a/java/com/google/gerrit/server/account/AccountCacheImpl.java +++ b/java/com/google/gerrit/server/account/AccountCacheImpl.java @@ -18,9 +18,11 @@ import static com.google.gerrit.server.account.externalids.ExternalId.SCHEME_USE import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableMap; import com.google.gerrit.common.Nullable; import com.google.gerrit.common.TimeUtil; import com.google.gerrit.reviewdb.client.Account; +import com.google.gerrit.server.FanOutExecutor; import com.google.gerrit.server.account.externalids.ExternalId; import com.google.gerrit.server.account.externalids.ExternalIds; import com.google.gerrit.server.cache.CacheModule; @@ -31,8 +33,16 @@ import com.google.inject.Singleton; import com.google.inject.TypeLiteral; import com.google.inject.name.Named; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import org.eclipse.jgit.errors.ConfigInvalidException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,15 +70,18 @@ public class AccountCacheImpl implements AccountCache { private final AllUsersName allUsersName; private final ExternalIds externalIds; private final LoadingCache> byId; + private final ExecutorService executor; @Inject AccountCacheImpl( AllUsersName allUsersName, ExternalIds externalIds, - @Named(BYID_NAME) LoadingCache> byId) { + @Named(BYID_NAME) LoadingCache> byId, + @FanOutExecutor ExecutorService executor) { this.allUsersName = allUsersName; this.externalIds = externalIds; this.byId = byId; + this.executor = executor; } @Override @@ -91,6 +104,41 @@ public class AccountCacheImpl implements AccountCache { } } + @Override + public Map get(Set accountIds) { + Map accountStates = new HashMap<>(accountIds.size()); + List>> callables = new ArrayList<>(); + for (Account.Id accountId : accountIds) { + Optional state = byId.asMap().get(accountId); + if (state != null) { + // The value is in-memory, so we just get the state + state.ifPresent(s -> accountStates.put(accountId, s)); + } else { + // Queue up a callable so that we can load accounts in parallel + callables.add(() -> get(accountId)); + } + } + if (callables.isEmpty()) { + return accountStates; + } + + List>> futures; + try { + futures = executor.invokeAll(callables); + } catch (InterruptedException e) { + log.error("Cannot load AccountStates", e); + return ImmutableMap.of(); + } + for (Future> f : futures) { + try { + f.get().ifPresent(s -> accountStates.put(s.getAccount().getId(), s)); + } catch (InterruptedException | ExecutionException e) { + log.error("Cannot load AccountState", e); + } + } + return accountStates; + } + @Override public Optional getByUsername(String username) { try { diff --git a/java/com/google/gerrit/server/account/InternalAccountDirectory.java b/java/com/google/gerrit/server/account/InternalAccountDirectory.java index 77752da3da..8c2bc10f5d 100644 --- a/java/com/google/gerrit/server/account/InternalAccountDirectory.java +++ b/java/com/google/gerrit/server/account/InternalAccountDirectory.java @@ -15,8 +15,10 @@ package com.google.gerrit.server.account; import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; import com.google.common.base.Strings; +import com.google.common.collect.Streams; import com.google.gerrit.extensions.common.AccountInfo; import com.google.gerrit.extensions.common.AvatarInfo; import com.google.gerrit.extensions.registration.DynamicItem; @@ -32,7 +34,7 @@ import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.List; -import java.util.Optional; +import java.util.Map; import java.util.Set; @Singleton @@ -66,11 +68,14 @@ public class InternalAccountDirectory extends AccountDirectory { if (options.equals(ID_ONLY)) { return; } + Set ids = + Streams.stream(in).map(a -> new Account.Id(a._accountId)).collect(toSet()); + Map accountStates = accountCache.get(ids); for (AccountInfo info : in) { Account.Id id = new Account.Id(info._accountId); - Optional state = accountCache.get(id); - if (state.isPresent()) { - fill(info, state.get(), options); + AccountState state = accountStates.get(id); + if (state != null) { + fill(info, accountStates.get(id), options); } else { info._accountId = options.contains(FillOptions.ID) ? id.get() : null; } diff --git a/java/com/google/gerrit/server/change/ChangeJson.java b/java/com/google/gerrit/server/change/ChangeJson.java index c27c09ada8..82affe0e84 100644 --- a/java/com/google/gerrit/server/change/ChangeJson.java +++ b/java/com/google/gerrit/server/change/ChangeJson.java @@ -44,7 +44,6 @@ import com.google.auto.value.AutoValue; import com.google.common.base.Joiner; import com.google.common.base.MoreObjects; import com.google.common.base.Throwables; -import com.google.common.collect.FluentIterable; import com.google.common.collect.HashBasedTable; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -106,6 +105,7 @@ import com.google.gerrit.server.AnonymousUser; import com.google.gerrit.server.ApprovalsUtil; import com.google.gerrit.server.ChangeMessagesUtil; import com.google.gerrit.server.CurrentUser; +import com.google.gerrit.server.FanOutExecutor; import com.google.gerrit.server.GpgException; import com.google.gerrit.server.IdentifiedUser; import com.google.gerrit.server.ReviewerByEmailSet; @@ -129,7 +129,6 @@ import com.google.gerrit.server.permissions.ChangePermission; import com.google.gerrit.server.permissions.LabelPermission; import com.google.gerrit.server.permissions.PermissionBackend; import com.google.gerrit.server.permissions.PermissionBackendException; -import com.google.gerrit.server.project.NoSuchChangeException; import com.google.gerrit.server.project.ProjectCache; import com.google.gerrit.server.project.ProjectState; import com.google.gerrit.server.project.RemoveReviewerControl; @@ -155,6 +154,10 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import org.eclipse.jgit.lib.ObjectId; import org.eclipse.jgit.lib.Ref; import org.eclipse.jgit.lib.Repository; @@ -271,6 +274,8 @@ public class ChangeJson { private final RemoveReviewerControl removeReviewerControl; private final TrackingFooters trackingFooters; private final Metrics metrics; + private final ExecutorService fanOutExecutor; + private boolean lazyLoad = true; private AccountLoader accountLoader; private FixInput fix; @@ -304,6 +309,7 @@ public class ChangeJson { RemoveReviewerControl removeReviewerControl, TrackingFooters trackingFooters, Metrics metrics, + @FanOutExecutor ExecutorService fanOutExecutor, @Assisted Iterable options) { this.db = db; this.userProvider = user; @@ -331,6 +337,7 @@ public class ChangeJson { this.removeReviewerControl = removeReviewerControl; this.trackingFooters = trackingFooters; this.metrics = metrics; + this.fanOutExecutor = fanOutExecutor; this.options = Sets.immutableEnumSet(options); } @@ -411,12 +418,11 @@ public class ChangeJson { throws OrmException { try (Timer0.Context ignored = metrics.formatQueryResultsLatency.start()) { accountLoader = accountLoaderFactory.create(has(DETAILED_ACCOUNTS)); - ensureLoaded(FluentIterable.from(in).transformAndConcat(QueryResult::entities)); - - List> res = Lists.newArrayListWithCapacity(in.size()); - Map out = new HashMap<>(); + List> res = new ArrayList<>(in.size()); + Map cache = Maps.newHashMapWithExpectedSize(in.size()); for (QueryResult r : in) { - List infos = toChangeInfos(out, r.entities()); + List infos = toChangeInfos(r.entities(), cache); + infos.forEach(c -> cache.put(new Change.Id(c._number), c)); if (!infos.isEmpty() && r.more()) { infos.get(infos.size() - 1)._moreChanges = true; } @@ -478,38 +484,53 @@ public class ChangeJson { return options.contains(option); } - private List toChangeInfos(Map out, List changes) { + private List toChangeInfos( + List changes, Map cache) { try (Timer0.Context ignored = metrics.toChangeInfosLatency.start()) { - List info = Lists.newArrayListWithCapacity(changes.size()); + // Create a list of formatting calls that can be called sequentially or in parallel + List>> formattingCalls = new ArrayList<>(changes.size()); for (ChangeData cd : changes) { - ChangeInfo i = out.get(cd.getId()); - if (i == null) { - try { - i = toChangeInfo(cd, Optional.empty()); - } catch (PatchListNotAvailableException - | GpgException - | OrmException - | IOException - | PermissionBackendException - | RuntimeException e) { - if (has(CHECK)) { - i = checkOnly(cd); - } else if (e instanceof NoSuchChangeException) { - log.info( - "NoSuchChangeException: Omitting corrupt change " - + cd.getId() - + " from results. Seems to be stale in the index."); - continue; - } else { - log.warn("Omitting corrupt change " + cd.getId() + " from results", e); - continue; - } - } - out.put(cd.getId(), i); - } - info.add(i); + formattingCalls.add( + () -> { + ChangeInfo i = cache.get(cd.getId()); + if (i != null) { + return Optional.of(i); + } + try { + ensureLoaded(Collections.singleton(cd)); + return Optional.of(format(cd, Optional.empty(), false)); + } catch (OrmException | RuntimeException e) { + log.warn("Omitting corrupt change " + cd.getId() + " from results", e); + return Optional.empty(); + } + }); } - return info; + + long numProjects = changes.stream().map(c -> c.project()).distinct().count(); + if (!lazyLoad || changes.size() < 3 || numProjects < 2) { + // Format these changes in the request thread as the multithreading overhead would be too + // high. + List result = new ArrayList<>(changes.size()); + for (Callable> c : formattingCalls) { + try { + c.call().ifPresent(result::add); + } catch (Exception e) { + log.warn("Omitting change due to exception", e); + } + } + return result; + } + + // Format the changes in parallel on the executor + List result = new ArrayList<>(changes.size()); + try { + for (Future> f : fanOutExecutor.invokeAll(formattingCalls)) { + f.get().ifPresent(result::add); + } + } catch (InterruptedException | ExecutionException e) { + throw new IllegalStateException(e); + } + return result; } } diff --git a/java/com/google/gerrit/server/config/SysExecutorModule.java b/java/com/google/gerrit/server/config/SysExecutorModule.java index 99edb6567f..b9fe34c943 100644 --- a/java/com/google/gerrit/server/config/SysExecutorModule.java +++ b/java/com/google/gerrit/server/config/SysExecutorModule.java @@ -17,6 +17,7 @@ package com.google.gerrit.server.config; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.gerrit.server.FanOutExecutor; import com.google.gerrit.server.git.WorkQueue; import com.google.inject.AbstractModule; import com.google.inject.Provides; @@ -61,6 +62,17 @@ public class SysExecutorModule extends AbstractModule { return queues.createQueue(poolSize, "SendEmail"); } + @Provides + @Singleton + @FanOutExecutor + public ExecutorService createFanOutExecutor(@GerritServerConfig Config config, WorkQueue queues) { + int poolSize = config.getInt("execution", null, "fanOutThreadPoolSize", 25); + if (poolSize == 0) { + return MoreExecutors.newDirectExecutorService(); + } + return queues.createQueue(poolSize, "FanOut"); + } + @Provides @Singleton @ChangeUpdateExecutor diff --git a/java/com/google/gerrit/server/restapi/change/ReviewerRecommender.java b/java/com/google/gerrit/server/restapi/change/ReviewerRecommender.java index 78687cde2a..dae37d64dd 100644 --- a/java/com/google/gerrit/server/restapi/change/ReviewerRecommender.java +++ b/java/com/google/gerrit/server/restapi/change/ReviewerRecommender.java @@ -29,10 +29,10 @@ import com.google.gerrit.reviewdb.client.Account; import com.google.gerrit.reviewdb.client.PatchSetApproval; import com.google.gerrit.reviewdb.server.ReviewDb; import com.google.gerrit.server.ApprovalsUtil; +import com.google.gerrit.server.FanOutExecutor; import com.google.gerrit.server.change.ReviewerSuggestion; import com.google.gerrit.server.change.SuggestedReviewer; import com.google.gerrit.server.config.GerritServerConfig; -import com.google.gerrit.server.git.WorkQueue; import com.google.gerrit.server.index.change.ChangeField; import com.google.gerrit.server.notedb.ChangeNotes; import com.google.gerrit.server.project.ProjectState; @@ -54,6 +54,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; @@ -78,7 +79,7 @@ public class ReviewerRecommender { private final Config config; private final DynamicMap reviewerSuggestionPluginMap; private final Provider queryProvider; - private final WorkQueue workQueue; + private final ExecutorService executor; private final Provider dbProvider; private final ApprovalsUtil approvalsUtil; @@ -87,7 +88,7 @@ public class ReviewerRecommender { ChangeQueryBuilder changeQueryBuilder, DynamicMap reviewerSuggestionPluginMap, Provider queryProvider, - WorkQueue workQueue, + @FanOutExecutor ExecutorService executor, Provider dbProvider, ApprovalsUtil approvalsUtil, @GerritServerConfig Config config) { @@ -95,7 +96,7 @@ public class ReviewerRecommender { this.config = config; this.queryProvider = queryProvider; this.reviewerSuggestionPluginMap = reviewerSuggestionPluginMap; - this.workQueue = workQueue; + this.executor = executor; this.dbProvider = dbProvider; this.approvalsUtil = approvalsUtil; } @@ -150,7 +151,7 @@ public class ReviewerRecommender { try { List>> futures = - workQueue.getDefaultQueue().invokeAll(tasks, PLUGIN_QUERY_TIMEOUT, TimeUnit.MILLISECONDS); + executor.invokeAll(tasks, PLUGIN_QUERY_TIMEOUT, TimeUnit.MILLISECONDS); Iterator weightIterator = weights.iterator(); for (Future> f : futures) { double weight = weightIterator.next(); diff --git a/java/com/google/gerrit/testing/FakeAccountCache.java b/java/com/google/gerrit/testing/FakeAccountCache.java index e549e08b8a..224a5bfb46 100644 --- a/java/com/google/gerrit/testing/FakeAccountCache.java +++ b/java/com/google/gerrit/testing/FakeAccountCache.java @@ -14,6 +14,8 @@ package com.google.gerrit.testing; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import com.google.gerrit.common.Nullable; import com.google.gerrit.common.TimeUtil; import com.google.gerrit.reviewdb.client.Account; @@ -24,6 +26,7 @@ import com.google.gerrit.server.config.AllUsersNameProvider; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.Set; /** Fake implementation of {@link AccountCache} for testing. */ public class FakeAccountCache implements AccountCache { @@ -47,6 +50,11 @@ public class FakeAccountCache implements AccountCache { return Optional.ofNullable(byId.get(accountId)); } + @Override + public synchronized Map get(Set accountIds) { + return ImmutableMap.copyOf(Maps.filterKeys(byId, accountIds::contains)); + } + @Override public synchronized Optional getByUsername(String username) { throw new UnsupportedOperationException(); diff --git a/java/com/google/gerrit/testing/InMemoryModule.java b/java/com/google/gerrit/testing/InMemoryModule.java index 6d90510a2a..b63830ea63 100644 --- a/java/com/google/gerrit/testing/InMemoryModule.java +++ b/java/com/google/gerrit/testing/InMemoryModule.java @@ -29,6 +29,7 @@ import com.google.gerrit.index.project.ProjectSchemaDefinitions; import com.google.gerrit.metrics.DisabledMetricMaker; import com.google.gerrit.metrics.MetricMaker; import com.google.gerrit.reviewdb.server.ReviewDb; +import com.google.gerrit.server.FanOutExecutor; import com.google.gerrit.server.GerritPersonIdent; import com.google.gerrit.server.GerritPersonIdentProvider; import com.google.gerrit.server.api.GerritApiModule; @@ -271,6 +272,13 @@ public class InMemoryModule extends FactoryModule { return MoreExecutors.newDirectExecutorService(); } + @Provides + @Singleton + @FanOutExecutor + public ExecutorService createChangeJsonExecutor() { + return MoreExecutors.newDirectExecutorService(); + } + @Provides @Singleton @GerritServerId