From 2ed3982ffb18697cf0baf7cb84894ccd1a3e3e32 Mon Sep 17 00:00:00 2001 From: Patrick Hiesel Date: Mon, 16 Apr 2018 12:04:02 +0200 Subject: [PATCH 1/3] Parallelize ChangeJson#toChangeInfos 99%ile latency of QueryChanges is between 20 seconds and 80 seconds on googlesource.com depending on the time of day. There are two main reasons for that: 1) Performing operations that require loading ChangeNotes 2) Performing operations that require opening the repo 3) Filling accounts with a cold AccountCache This commit parallelizes formatting the individual results to mitigate 1+2. 3 will be addressed by a different change that will make the AccountFiller parallelize, too. Parallelization is done on a newly introduced FanOutExectuor that can be used whenever a serving thread wants to parallelize work. Change-Id: I36c6b92e31488ad001f5aea43efc837d31ba3021 --- Documentation/config-gerrit.txt | 9 ++ .../google/gerrit/server/FanOutExecutor.java | 27 ++++++ .../gerrit/server/change/ChangeJson.java | 93 ++++++++++++------- .../server/config/SysExecutorModule.java | 12 +++ .../google/gerrit/testing/InMemoryModule.java | 8 ++ 5 files changed, 113 insertions(+), 36 deletions(-) create mode 100644 java/com/google/gerrit/server/FanOutExecutor.java diff --git a/Documentation/config-gerrit.txt b/Documentation/config-gerrit.txt index accfcd25b7..5356455cdc 100644 --- a/Documentation/config-gerrit.txt +++ b/Documentation/config-gerrit.txt @@ -3906,6 +3906,15 @@ which miscellaneous tasks are handled. + Default is 1. +[[execution.fanOutThreadPoolSize]]execution.fanOutThreadPoolSize:: ++ +Maximum size of thread pool to on which a serving thread can fan-out +work to parallelize it. ++ +When set to 0, a direct executor will be used. ++ +By default, 25 which means that formatting happens in the caller thread. + [[receiveemail]] === Section receiveemail diff --git a/java/com/google/gerrit/server/FanOutExecutor.java b/java/com/google/gerrit/server/FanOutExecutor.java new file mode 100644 index 0000000000..a489890eb6 --- /dev/null +++ b/java/com/google/gerrit/server/FanOutExecutor.java @@ -0,0 +1,27 @@ +// Copyright (C) 2018 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.gerrit.server; + +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +import com.google.inject.BindingAnnotation; +import java.lang.annotation.Retention; + +/** + * Marker on the global {@code ThreadPoolExecutor} used to do parallel work from a serving thread. + */ +@Retention(RUNTIME) +@BindingAnnotation +public @interface FanOutExecutor {} diff --git a/java/com/google/gerrit/server/change/ChangeJson.java b/java/com/google/gerrit/server/change/ChangeJson.java index acf2b1829f..9bf6e5fe20 100644 --- a/java/com/google/gerrit/server/change/ChangeJson.java +++ b/java/com/google/gerrit/server/change/ChangeJson.java @@ -44,7 +44,6 @@ import com.google.auto.value.AutoValue; import com.google.common.base.Joiner; import com.google.common.base.MoreObjects; import com.google.common.base.Throwables; -import com.google.common.collect.FluentIterable; import com.google.common.collect.HashBasedTable; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -106,6 +105,7 @@ import com.google.gerrit.server.AnonymousUser; import com.google.gerrit.server.ApprovalsUtil; import com.google.gerrit.server.ChangeMessagesUtil; import com.google.gerrit.server.CurrentUser; +import com.google.gerrit.server.FanOutExecutor; import com.google.gerrit.server.GpgException; import com.google.gerrit.server.IdentifiedUser; import com.google.gerrit.server.ReviewerByEmailSet; @@ -129,7 +129,6 @@ import com.google.gerrit.server.permissions.ChangePermission; import com.google.gerrit.server.permissions.LabelPermission; import com.google.gerrit.server.permissions.PermissionBackend; import com.google.gerrit.server.permissions.PermissionBackendException; -import com.google.gerrit.server.project.NoSuchChangeException; import com.google.gerrit.server.project.ProjectCache; import com.google.gerrit.server.project.ProjectState; import com.google.gerrit.server.project.RemoveReviewerControl; @@ -155,6 +154,10 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import org.eclipse.jgit.lib.ObjectId; import org.eclipse.jgit.lib.Ref; import org.eclipse.jgit.lib.Repository; @@ -271,6 +274,8 @@ public class ChangeJson { private final RemoveReviewerControl removeReviewerControl; private final TrackingFooters trackingFooters; private final Metrics metrics; + private final ExecutorService fanOutExecutor; + private boolean lazyLoad = true; private AccountLoader accountLoader; private FixInput fix; @@ -304,6 +309,7 @@ public class ChangeJson { RemoveReviewerControl removeReviewerControl, TrackingFooters trackingFooters, Metrics metrics, + @FanOutExecutor ExecutorService fanOutExecutor, @Assisted Iterable options) { this.db = db; this.userProvider = user; @@ -331,6 +337,7 @@ public class ChangeJson { this.removeReviewerControl = removeReviewerControl; this.trackingFooters = trackingFooters; this.metrics = metrics; + this.fanOutExecutor = fanOutExecutor; this.options = Sets.immutableEnumSet(options); } @@ -411,12 +418,11 @@ public class ChangeJson { throws OrmException { try (Timer0.Context ignored = metrics.formatQueryResultsLatency.start()) { accountLoader = accountLoaderFactory.create(has(DETAILED_ACCOUNTS)); - ensureLoaded(FluentIterable.from(in).transformAndConcat(QueryResult::entities)); - - List> res = Lists.newArrayListWithCapacity(in.size()); - Map out = new HashMap<>(); + List> res = new ArrayList<>(in.size()); + Map cache = Maps.newHashMapWithExpectedSize(in.size()); for (QueryResult r : in) { - List infos = toChangeInfos(out, r.entities()); + List infos = toChangeInfos(r.entities(), cache); + infos.forEach(c -> cache.put(new Change.Id(c._number), c)); if (!infos.isEmpty() && r.more()) { infos.get(infos.size() - 1)._moreChanges = true; } @@ -478,38 +484,53 @@ public class ChangeJson { return options.contains(option); } - private List toChangeInfos(Map out, List changes) { + private List toChangeInfos( + List changes, Map cache) { try (Timer0.Context ignored = metrics.toChangeInfosLatency.start()) { - List info = Lists.newArrayListWithCapacity(changes.size()); + // Create a list of formatting calls that can be called sequentially or in parallel + List>> formattingCalls = new ArrayList<>(changes.size()); for (ChangeData cd : changes) { - ChangeInfo i = out.get(cd.getId()); - if (i == null) { - try { - i = toChangeInfo(cd, Optional.empty()); - } catch (PatchListNotAvailableException - | GpgException - | OrmException - | IOException - | PermissionBackendException - | RuntimeException e) { - if (has(CHECK)) { - i = checkOnly(cd); - } else if (e instanceof NoSuchChangeException) { - log.info( - "NoSuchChangeException: Omitting corrupt change " - + cd.getId() - + " from results. Seems to be stale in the index."); - continue; - } else { - log.warn("Omitting corrupt change " + cd.getId() + " from results", e); - continue; - } - } - out.put(cd.getId(), i); - } - info.add(i); + formattingCalls.add( + () -> { + ChangeInfo i = cache.get(cd.getId()); + if (i != null) { + return Optional.of(i); + } + try { + ensureLoaded(Collections.singleton(cd)); + return Optional.of(format(cd, Optional.empty(), false)); + } catch (OrmException | RuntimeException e) { + log.warn("Omitting corrupt change " + cd.getId() + " from results", e); + return Optional.empty(); + } + }); } - return info; + + long numProjects = changes.stream().map(c -> c.project()).distinct().count(); + if (!lazyLoad || changes.size() < 3 || numProjects < 2) { + // Format these changes in the request thread as the multithreading overhead would be too + // high. + List result = new ArrayList<>(changes.size()); + for (Callable> c : formattingCalls) { + try { + c.call().ifPresent(result::add); + } catch (Exception e) { + log.warn("Omitting change due to exception", e); + } + } + return result; + } + + // Format the changes in parallel on the executor + List result = new ArrayList<>(changes.size()); + try { + for (Future> f : fanOutExecutor.invokeAll(formattingCalls)) { + f.get().ifPresent(result::add); + } + } catch (InterruptedException | ExecutionException e) { + throw new IllegalStateException(e); + } + return result; } } diff --git a/java/com/google/gerrit/server/config/SysExecutorModule.java b/java/com/google/gerrit/server/config/SysExecutorModule.java index 99edb6567f..b9fe34c943 100644 --- a/java/com/google/gerrit/server/config/SysExecutorModule.java +++ b/java/com/google/gerrit/server/config/SysExecutorModule.java @@ -17,6 +17,7 @@ package com.google.gerrit.server.config; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.gerrit.server.FanOutExecutor; import com.google.gerrit.server.git.WorkQueue; import com.google.inject.AbstractModule; import com.google.inject.Provides; @@ -61,6 +62,17 @@ public class SysExecutorModule extends AbstractModule { return queues.createQueue(poolSize, "SendEmail"); } + @Provides + @Singleton + @FanOutExecutor + public ExecutorService createFanOutExecutor(@GerritServerConfig Config config, WorkQueue queues) { + int poolSize = config.getInt("execution", null, "fanOutThreadPoolSize", 25); + if (poolSize == 0) { + return MoreExecutors.newDirectExecutorService(); + } + return queues.createQueue(poolSize, "FanOut"); + } + @Provides @Singleton @ChangeUpdateExecutor diff --git a/java/com/google/gerrit/testing/InMemoryModule.java b/java/com/google/gerrit/testing/InMemoryModule.java index 0a9165f1a5..cff1a971d4 100644 --- a/java/com/google/gerrit/testing/InMemoryModule.java +++ b/java/com/google/gerrit/testing/InMemoryModule.java @@ -29,6 +29,7 @@ import com.google.gerrit.index.project.ProjectSchemaDefinitions; import com.google.gerrit.metrics.DisabledMetricMaker; import com.google.gerrit.metrics.MetricMaker; import com.google.gerrit.reviewdb.server.ReviewDb; +import com.google.gerrit.server.FanOutExecutor; import com.google.gerrit.server.GerritPersonIdent; import com.google.gerrit.server.GerritPersonIdentProvider; import com.google.gerrit.server.api.GerritApiModule; @@ -269,6 +270,13 @@ public class InMemoryModule extends FactoryModule { return MoreExecutors.newDirectExecutorService(); } + @Provides + @Singleton + @FanOutExecutor + public ExecutorService createChangeJsonExecutor() { + return MoreExecutors.newDirectExecutorService(); + } + @Provides @Singleton @GerritServerId From b6e9a0661c5d0835971d0780cf071740016a6f00 Mon Sep 17 00:00:00 2001 From: Patrick Hiesel Date: Mon, 16 Apr 2018 16:06:37 +0200 Subject: [PATCH 2/3] Offer a parallelized way of getting accounts from AccountCache Loading accounts from disk if they are not present in the cache is OK for single accounts but creates significant latency burden when multiple accounts need to be loaded. This commit adds a method to AccountCache that allows for parallelization and implements it on a new executor in AccountCacheImpl. Change-Id: I1b58b2f71c8b1c62e809314e6c50ac1b25a3a166 --- .../gerrit/server/account/AccountCache.java | 16 ++++++ .../server/account/AccountCacheImpl.java | 50 ++++++++++++++++++- .../account/InternalAccountDirectory.java | 13 +++-- .../gerrit/testing/FakeAccountCache.java | 8 +++ 4 files changed, 82 insertions(+), 5 deletions(-) diff --git a/java/com/google/gerrit/server/account/AccountCache.java b/java/com/google/gerrit/server/account/AccountCache.java index b6ca1cbe5b..17493bf842 100644 --- a/java/com/google/gerrit/server/account/AccountCache.java +++ b/java/com/google/gerrit/server/account/AccountCache.java @@ -16,7 +16,9 @@ package com.google.gerrit.server.account; import com.google.gerrit.common.Nullable; import com.google.gerrit.reviewdb.client.Account; +import java.util.Map; import java.util.Optional; +import java.util.Set; /** Caches important (but small) account state to avoid database hits. */ public interface AccountCache { @@ -30,6 +32,20 @@ public interface AccountCache { */ Optional get(Account.Id accountId); + /** + * Returns a {@code Map} of {@code Account.Id} to {@code AccountState} for the given account IDs. + * If not cached yet the accounts are loaded. If an account can't be loaded (e.g. because it is + * missing), the entry will be missing from the result. + * + *

Loads accounts in parallel if applicable. + * + * @param accountIds IDs of the account that should be retrieved + * @return {@code Map} of {@code Account.Id} to {@code AccountState} instances for the given + * account IDs, if an account can't be loaded (e.g. because it is missing), the entry will be + * missing from the result + */ + Map get(Set accountIds); + /** * Returns an {@code AccountState} instance for the given account ID. If not cached yet the * account is loaded. Returns an empty {@code AccountState} instance to represent a missing diff --git a/java/com/google/gerrit/server/account/AccountCacheImpl.java b/java/com/google/gerrit/server/account/AccountCacheImpl.java index 895c5b68ba..e72e6ea476 100644 --- a/java/com/google/gerrit/server/account/AccountCacheImpl.java +++ b/java/com/google/gerrit/server/account/AccountCacheImpl.java @@ -18,9 +18,11 @@ import static com.google.gerrit.server.account.externalids.ExternalId.SCHEME_USE import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableMap; import com.google.gerrit.common.Nullable; import com.google.gerrit.common.TimeUtil; import com.google.gerrit.reviewdb.client.Account; +import com.google.gerrit.server.FanOutExecutor; import com.google.gerrit.server.account.externalids.ExternalId; import com.google.gerrit.server.account.externalids.ExternalIds; import com.google.gerrit.server.cache.CacheModule; @@ -31,8 +33,16 @@ import com.google.inject.Singleton; import com.google.inject.TypeLiteral; import com.google.inject.name.Named; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import org.eclipse.jgit.errors.ConfigInvalidException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,15 +70,18 @@ public class AccountCacheImpl implements AccountCache { private final AllUsersName allUsersName; private final ExternalIds externalIds; private final LoadingCache> byId; + private final ExecutorService executor; @Inject AccountCacheImpl( AllUsersName allUsersName, ExternalIds externalIds, - @Named(BYID_NAME) LoadingCache> byId) { + @Named(BYID_NAME) LoadingCache> byId, + @FanOutExecutor ExecutorService executor) { this.allUsersName = allUsersName; this.externalIds = externalIds; this.byId = byId; + this.executor = executor; } @Override @@ -91,6 +104,41 @@ public class AccountCacheImpl implements AccountCache { } } + @Override + public Map get(Set accountIds) { + Map accountStates = new HashMap<>(accountIds.size()); + List>> callables = new ArrayList<>(); + for (Account.Id accountId : accountIds) { + Optional state = byId.asMap().get(accountId); + if (state != null) { + // The value is in-memory, so we just get the state + state.ifPresent(s -> accountStates.put(accountId, s)); + } else { + // Queue up a callable so that we can load accounts in parallel + callables.add(() -> get(accountId)); + } + } + if (callables.isEmpty()) { + return accountStates; + } + + List>> futures; + try { + futures = executor.invokeAll(callables); + } catch (InterruptedException e) { + log.error("Cannot load AccountStates", e); + return ImmutableMap.of(); + } + for (Future> f : futures) { + try { + f.get().ifPresent(s -> accountStates.put(s.getAccount().getId(), s)); + } catch (InterruptedException | ExecutionException e) { + log.error("Cannot load AccountState", e); + } + } + return accountStates; + } + @Override public Optional getByUsername(String username) { try { diff --git a/java/com/google/gerrit/server/account/InternalAccountDirectory.java b/java/com/google/gerrit/server/account/InternalAccountDirectory.java index 77752da3da..8c2bc10f5d 100644 --- a/java/com/google/gerrit/server/account/InternalAccountDirectory.java +++ b/java/com/google/gerrit/server/account/InternalAccountDirectory.java @@ -15,8 +15,10 @@ package com.google.gerrit.server.account; import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; import com.google.common.base.Strings; +import com.google.common.collect.Streams; import com.google.gerrit.extensions.common.AccountInfo; import com.google.gerrit.extensions.common.AvatarInfo; import com.google.gerrit.extensions.registration.DynamicItem; @@ -32,7 +34,7 @@ import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.List; -import java.util.Optional; +import java.util.Map; import java.util.Set; @Singleton @@ -66,11 +68,14 @@ public class InternalAccountDirectory extends AccountDirectory { if (options.equals(ID_ONLY)) { return; } + Set ids = + Streams.stream(in).map(a -> new Account.Id(a._accountId)).collect(toSet()); + Map accountStates = accountCache.get(ids); for (AccountInfo info : in) { Account.Id id = new Account.Id(info._accountId); - Optional state = accountCache.get(id); - if (state.isPresent()) { - fill(info, state.get(), options); + AccountState state = accountStates.get(id); + if (state != null) { + fill(info, accountStates.get(id), options); } else { info._accountId = options.contains(FillOptions.ID) ? id.get() : null; } diff --git a/java/com/google/gerrit/testing/FakeAccountCache.java b/java/com/google/gerrit/testing/FakeAccountCache.java index e549e08b8a..224a5bfb46 100644 --- a/java/com/google/gerrit/testing/FakeAccountCache.java +++ b/java/com/google/gerrit/testing/FakeAccountCache.java @@ -14,6 +14,8 @@ package com.google.gerrit.testing; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import com.google.gerrit.common.Nullable; import com.google.gerrit.common.TimeUtil; import com.google.gerrit.reviewdb.client.Account; @@ -24,6 +26,7 @@ import com.google.gerrit.server.config.AllUsersNameProvider; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.Set; /** Fake implementation of {@link AccountCache} for testing. */ public class FakeAccountCache implements AccountCache { @@ -47,6 +50,11 @@ public class FakeAccountCache implements AccountCache { return Optional.ofNullable(byId.get(accountId)); } + @Override + public synchronized Map get(Set accountIds) { + return ImmutableMap.copyOf(Maps.filterKeys(byId, accountIds::contains)); + } + @Override public synchronized Optional getByUsername(String username) { throw new UnsupportedOperationException(); From 86e01bca9b00a4486960174e36d51c69918a6584 Mon Sep 17 00:00:00 2001 From: Patrick Hiesel Date: Wed, 18 Apr 2018 11:31:56 +0200 Subject: [PATCH 3/3] Use FanOutExectutor for ReviewerRecommender The newly created thread pool should be used for parallelizing work from request serving threads. This is a fairly large pool that makes it less likely that ReviewerRecommender uses a congested queue. Change-Id: Ie229ef98c33ec475b94fc76d8fc2bd0c0872465a --- .../server/restapi/change/ReviewerRecommender.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/java/com/google/gerrit/server/restapi/change/ReviewerRecommender.java b/java/com/google/gerrit/server/restapi/change/ReviewerRecommender.java index 78687cde2a..dae37d64dd 100644 --- a/java/com/google/gerrit/server/restapi/change/ReviewerRecommender.java +++ b/java/com/google/gerrit/server/restapi/change/ReviewerRecommender.java @@ -29,10 +29,10 @@ import com.google.gerrit.reviewdb.client.Account; import com.google.gerrit.reviewdb.client.PatchSetApproval; import com.google.gerrit.reviewdb.server.ReviewDb; import com.google.gerrit.server.ApprovalsUtil; +import com.google.gerrit.server.FanOutExecutor; import com.google.gerrit.server.change.ReviewerSuggestion; import com.google.gerrit.server.change.SuggestedReviewer; import com.google.gerrit.server.config.GerritServerConfig; -import com.google.gerrit.server.git.WorkQueue; import com.google.gerrit.server.index.change.ChangeField; import com.google.gerrit.server.notedb.ChangeNotes; import com.google.gerrit.server.project.ProjectState; @@ -54,6 +54,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; @@ -78,7 +79,7 @@ public class ReviewerRecommender { private final Config config; private final DynamicMap reviewerSuggestionPluginMap; private final Provider queryProvider; - private final WorkQueue workQueue; + private final ExecutorService executor; private final Provider dbProvider; private final ApprovalsUtil approvalsUtil; @@ -87,7 +88,7 @@ public class ReviewerRecommender { ChangeQueryBuilder changeQueryBuilder, DynamicMap reviewerSuggestionPluginMap, Provider queryProvider, - WorkQueue workQueue, + @FanOutExecutor ExecutorService executor, Provider dbProvider, ApprovalsUtil approvalsUtil, @GerritServerConfig Config config) { @@ -95,7 +96,7 @@ public class ReviewerRecommender { this.config = config; this.queryProvider = queryProvider; this.reviewerSuggestionPluginMap = reviewerSuggestionPluginMap; - this.workQueue = workQueue; + this.executor = executor; this.dbProvider = dbProvider; this.approvalsUtil = approvalsUtil; } @@ -150,7 +151,7 @@ public class ReviewerRecommender { try { List>> futures = - workQueue.getDefaultQueue().invokeAll(tasks, PLUGIN_QUERY_TIMEOUT, TimeUnit.MILLISECONDS); + executor.invokeAll(tasks, PLUGIN_QUERY_TIMEOUT, TimeUnit.MILLISECONDS); Iterator weightIterator = weights.iterator(); for (Future> f : futures) { double weight = weightIterator.next();