Merge changes Ie229ef98,I1b58b2f7,I36c6b92e

* changes:
  Use FanOutExectutor for ReviewerRecommender
  Offer a parallelized way of getting accounts from AccountCache
  Parallelize ChangeJson#toChangeInfos
This commit is contained in:
Patrick Hiesel
2018-04-23 07:12:52 +00:00
committed by Gerrit Code Review
10 changed files with 201 additions and 46 deletions

View File

@@ -3925,6 +3925,15 @@ which miscellaneous tasks are handled.
+
Default is 1.
[[execution.fanOutThreadPoolSize]]execution.fanOutThreadPoolSize::
+
Maximum size of thread pool to on which a serving thread can fan-out
work to parallelize it.
+
When set to 0, a direct executor will be used.
+
By default, 25 which means that formatting happens in the caller thread.
[[receiveemail]]
=== Section receiveemail

View File

@@ -0,0 +1,27 @@
// Copyright (C) 2018 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.gerrit.server;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import com.google.inject.BindingAnnotation;
import java.lang.annotation.Retention;
/**
* Marker on the global {@code ThreadPoolExecutor} used to do parallel work from a serving thread.
*/
@Retention(RUNTIME)
@BindingAnnotation
public @interface FanOutExecutor {}

View File

@@ -16,7 +16,9 @@ package com.google.gerrit.server.account;
import com.google.gerrit.common.Nullable;
import com.google.gerrit.reviewdb.client.Account;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/** Caches important (but small) account state to avoid database hits. */
public interface AccountCache {
@@ -30,6 +32,20 @@ public interface AccountCache {
*/
Optional<AccountState> get(Account.Id accountId);
/**
* Returns a {@code Map} of {@code Account.Id} to {@code AccountState} for the given account IDs.
* If not cached yet the accounts are loaded. If an account can't be loaded (e.g. because it is
* missing), the entry will be missing from the result.
*
* <p>Loads accounts in parallel if applicable.
*
* @param accountIds IDs of the account that should be retrieved
* @return {@code Map} of {@code Account.Id} to {@code AccountState} instances for the given
* account IDs, if an account can't be loaded (e.g. because it is missing), the entry will be
* missing from the result
*/
Map<Account.Id, AccountState> get(Set<Account.Id> accountIds);
/**
* Returns an {@code AccountState} instance for the given account ID. If not cached yet the
* account is loaded. Returns an empty {@code AccountState} instance to represent a missing

View File

@@ -18,9 +18,11 @@ import static com.google.gerrit.server.account.externalids.ExternalId.SCHEME_USE
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableMap;
import com.google.gerrit.common.Nullable;
import com.google.gerrit.common.TimeUtil;
import com.google.gerrit.reviewdb.client.Account;
import com.google.gerrit.server.FanOutExecutor;
import com.google.gerrit.server.account.externalids.ExternalId;
import com.google.gerrit.server.account.externalids.ExternalIds;
import com.google.gerrit.server.cache.CacheModule;
@@ -31,8 +33,16 @@ import com.google.inject.Singleton;
import com.google.inject.TypeLiteral;
import com.google.inject.name.Named;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.eclipse.jgit.errors.ConfigInvalidException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,15 +70,18 @@ public class AccountCacheImpl implements AccountCache {
private final AllUsersName allUsersName;
private final ExternalIds externalIds;
private final LoadingCache<Account.Id, Optional<AccountState>> byId;
private final ExecutorService executor;
@Inject
AccountCacheImpl(
AllUsersName allUsersName,
ExternalIds externalIds,
@Named(BYID_NAME) LoadingCache<Account.Id, Optional<AccountState>> byId) {
@Named(BYID_NAME) LoadingCache<Account.Id, Optional<AccountState>> byId,
@FanOutExecutor ExecutorService executor) {
this.allUsersName = allUsersName;
this.externalIds = externalIds;
this.byId = byId;
this.executor = executor;
}
@Override
@@ -91,6 +104,41 @@ public class AccountCacheImpl implements AccountCache {
}
}
@Override
public Map<Account.Id, AccountState> get(Set<Account.Id> accountIds) {
Map<Account.Id, AccountState> accountStates = new HashMap<>(accountIds.size());
List<Callable<Optional<AccountState>>> callables = new ArrayList<>();
for (Account.Id accountId : accountIds) {
Optional<AccountState> state = byId.asMap().get(accountId);
if (state != null) {
// The value is in-memory, so we just get the state
state.ifPresent(s -> accountStates.put(accountId, s));
} else {
// Queue up a callable so that we can load accounts in parallel
callables.add(() -> get(accountId));
}
}
if (callables.isEmpty()) {
return accountStates;
}
List<Future<Optional<AccountState>>> futures;
try {
futures = executor.invokeAll(callables);
} catch (InterruptedException e) {
log.error("Cannot load AccountStates", e);
return ImmutableMap.of();
}
for (Future<Optional<AccountState>> f : futures) {
try {
f.get().ifPresent(s -> accountStates.put(s.getAccount().getId(), s));
} catch (InterruptedException | ExecutionException e) {
log.error("Cannot load AccountState", e);
}
}
return accountStates;
}
@Override
public Optional<AccountState> getByUsername(String username) {
try {

View File

@@ -15,8 +15,10 @@
package com.google.gerrit.server.account;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import com.google.common.base.Strings;
import com.google.common.collect.Streams;
import com.google.gerrit.extensions.common.AccountInfo;
import com.google.gerrit.extensions.common.AvatarInfo;
import com.google.gerrit.extensions.registration.DynamicItem;
@@ -32,7 +34,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.Map;
import java.util.Set;
@Singleton
@@ -66,11 +68,14 @@ public class InternalAccountDirectory extends AccountDirectory {
if (options.equals(ID_ONLY)) {
return;
}
Set<Account.Id> ids =
Streams.stream(in).map(a -> new Account.Id(a._accountId)).collect(toSet());
Map<Account.Id, AccountState> accountStates = accountCache.get(ids);
for (AccountInfo info : in) {
Account.Id id = new Account.Id(info._accountId);
Optional<AccountState> state = accountCache.get(id);
if (state.isPresent()) {
fill(info, state.get(), options);
AccountState state = accountStates.get(id);
if (state != null) {
fill(info, accountStates.get(id), options);
} else {
info._accountId = options.contains(FillOptions.ID) ? id.get() : null;
}

View File

@@ -44,7 +44,6 @@ import com.google.auto.value.AutoValue;
import com.google.common.base.Joiner;
import com.google.common.base.MoreObjects;
import com.google.common.base.Throwables;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
@@ -106,6 +105,7 @@ import com.google.gerrit.server.AnonymousUser;
import com.google.gerrit.server.ApprovalsUtil;
import com.google.gerrit.server.ChangeMessagesUtil;
import com.google.gerrit.server.CurrentUser;
import com.google.gerrit.server.FanOutExecutor;
import com.google.gerrit.server.GpgException;
import com.google.gerrit.server.IdentifiedUser;
import com.google.gerrit.server.ReviewerByEmailSet;
@@ -129,7 +129,6 @@ import com.google.gerrit.server.permissions.ChangePermission;
import com.google.gerrit.server.permissions.LabelPermission;
import com.google.gerrit.server.permissions.PermissionBackend;
import com.google.gerrit.server.permissions.PermissionBackendException;
import com.google.gerrit.server.project.NoSuchChangeException;
import com.google.gerrit.server.project.ProjectCache;
import com.google.gerrit.server.project.ProjectState;
import com.google.gerrit.server.project.RemoveReviewerControl;
@@ -155,6 +154,10 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.eclipse.jgit.lib.ObjectId;
import org.eclipse.jgit.lib.Ref;
import org.eclipse.jgit.lib.Repository;
@@ -271,6 +274,8 @@ public class ChangeJson {
private final RemoveReviewerControl removeReviewerControl;
private final TrackingFooters trackingFooters;
private final Metrics metrics;
private final ExecutorService fanOutExecutor;
private boolean lazyLoad = true;
private AccountLoader accountLoader;
private FixInput fix;
@@ -304,6 +309,7 @@ public class ChangeJson {
RemoveReviewerControl removeReviewerControl,
TrackingFooters trackingFooters,
Metrics metrics,
@FanOutExecutor ExecutorService fanOutExecutor,
@Assisted Iterable<ListChangesOption> options) {
this.db = db;
this.userProvider = user;
@@ -331,6 +337,7 @@ public class ChangeJson {
this.removeReviewerControl = removeReviewerControl;
this.trackingFooters = trackingFooters;
this.metrics = metrics;
this.fanOutExecutor = fanOutExecutor;
this.options = Sets.immutableEnumSet(options);
}
@@ -411,12 +418,11 @@ public class ChangeJson {
throws OrmException {
try (Timer0.Context ignored = metrics.formatQueryResultsLatency.start()) {
accountLoader = accountLoaderFactory.create(has(DETAILED_ACCOUNTS));
ensureLoaded(FluentIterable.from(in).transformAndConcat(QueryResult::entities));
List<List<ChangeInfo>> res = Lists.newArrayListWithCapacity(in.size());
Map<Change.Id, ChangeInfo> out = new HashMap<>();
List<List<ChangeInfo>> res = new ArrayList<>(in.size());
Map<Change.Id, ChangeInfo> cache = Maps.newHashMapWithExpectedSize(in.size());
for (QueryResult<ChangeData> r : in) {
List<ChangeInfo> infos = toChangeInfos(out, r.entities());
List<ChangeInfo> infos = toChangeInfos(r.entities(), cache);
infos.forEach(c -> cache.put(new Change.Id(c._number), c));
if (!infos.isEmpty() && r.more()) {
infos.get(infos.size() - 1)._moreChanges = true;
}
@@ -478,38 +484,53 @@ public class ChangeJson {
return options.contains(option);
}
private List<ChangeInfo> toChangeInfos(Map<Change.Id, ChangeInfo> out, List<ChangeData> changes) {
private List<ChangeInfo> toChangeInfos(
List<ChangeData> changes, Map<Change.Id, ChangeInfo> cache) {
try (Timer0.Context ignored = metrics.toChangeInfosLatency.start()) {
List<ChangeInfo> info = Lists.newArrayListWithCapacity(changes.size());
// Create a list of formatting calls that can be called sequentially or in parallel
List<Callable<Optional<ChangeInfo>>> formattingCalls = new ArrayList<>(changes.size());
for (ChangeData cd : changes) {
ChangeInfo i = out.get(cd.getId());
if (i == null) {
try {
i = toChangeInfo(cd, Optional.empty());
} catch (PatchListNotAvailableException
| GpgException
| OrmException
| IOException
| PermissionBackendException
| RuntimeException e) {
if (has(CHECK)) {
i = checkOnly(cd);
} else if (e instanceof NoSuchChangeException) {
log.info(
"NoSuchChangeException: Omitting corrupt change "
+ cd.getId()
+ " from results. Seems to be stale in the index.");
continue;
} else {
log.warn("Omitting corrupt change " + cd.getId() + " from results", e);
continue;
}
}
out.put(cd.getId(), i);
}
info.add(i);
formattingCalls.add(
() -> {
ChangeInfo i = cache.get(cd.getId());
if (i != null) {
return Optional.of(i);
}
try {
ensureLoaded(Collections.singleton(cd));
return Optional.of(format(cd, Optional.empty(), false));
} catch (OrmException | RuntimeException e) {
log.warn("Omitting corrupt change " + cd.getId() + " from results", e);
return Optional.empty();
}
});
}
return info;
long numProjects = changes.stream().map(c -> c.project()).distinct().count();
if (!lazyLoad || changes.size() < 3 || numProjects < 2) {
// Format these changes in the request thread as the multithreading overhead would be too
// high.
List<ChangeInfo> result = new ArrayList<>(changes.size());
for (Callable<Optional<ChangeInfo>> c : formattingCalls) {
try {
c.call().ifPresent(result::add);
} catch (Exception e) {
log.warn("Omitting change due to exception", e);
}
}
return result;
}
// Format the changes in parallel on the executor
List<ChangeInfo> result = new ArrayList<>(changes.size());
try {
for (Future<Optional<ChangeInfo>> f : fanOutExecutor.invokeAll(formattingCalls)) {
f.get().ifPresent(result::add);
}
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
return result;
}
}

View File

@@ -17,6 +17,7 @@ package com.google.gerrit.server.config;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gerrit.server.FanOutExecutor;
import com.google.gerrit.server.git.WorkQueue;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
@@ -61,6 +62,17 @@ public class SysExecutorModule extends AbstractModule {
return queues.createQueue(poolSize, "SendEmail");
}
@Provides
@Singleton
@FanOutExecutor
public ExecutorService createFanOutExecutor(@GerritServerConfig Config config, WorkQueue queues) {
int poolSize = config.getInt("execution", null, "fanOutThreadPoolSize", 25);
if (poolSize == 0) {
return MoreExecutors.newDirectExecutorService();
}
return queues.createQueue(poolSize, "FanOut");
}
@Provides
@Singleton
@ChangeUpdateExecutor

View File

@@ -29,10 +29,10 @@ import com.google.gerrit.reviewdb.client.Account;
import com.google.gerrit.reviewdb.client.PatchSetApproval;
import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.ApprovalsUtil;
import com.google.gerrit.server.FanOutExecutor;
import com.google.gerrit.server.change.ReviewerSuggestion;
import com.google.gerrit.server.change.SuggestedReviewer;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.git.WorkQueue;
import com.google.gerrit.server.index.change.ChangeField;
import com.google.gerrit.server.notedb.ChangeNotes;
import com.google.gerrit.server.project.ProjectState;
@@ -54,6 +54,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
@@ -78,7 +79,7 @@ public class ReviewerRecommender {
private final Config config;
private final DynamicMap<ReviewerSuggestion> reviewerSuggestionPluginMap;
private final Provider<InternalChangeQuery> queryProvider;
private final WorkQueue workQueue;
private final ExecutorService executor;
private final Provider<ReviewDb> dbProvider;
private final ApprovalsUtil approvalsUtil;
@@ -87,7 +88,7 @@ public class ReviewerRecommender {
ChangeQueryBuilder changeQueryBuilder,
DynamicMap<ReviewerSuggestion> reviewerSuggestionPluginMap,
Provider<InternalChangeQuery> queryProvider,
WorkQueue workQueue,
@FanOutExecutor ExecutorService executor,
Provider<ReviewDb> dbProvider,
ApprovalsUtil approvalsUtil,
@GerritServerConfig Config config) {
@@ -95,7 +96,7 @@ public class ReviewerRecommender {
this.config = config;
this.queryProvider = queryProvider;
this.reviewerSuggestionPluginMap = reviewerSuggestionPluginMap;
this.workQueue = workQueue;
this.executor = executor;
this.dbProvider = dbProvider;
this.approvalsUtil = approvalsUtil;
}
@@ -150,7 +151,7 @@ public class ReviewerRecommender {
try {
List<Future<Set<SuggestedReviewer>>> futures =
workQueue.getDefaultQueue().invokeAll(tasks, PLUGIN_QUERY_TIMEOUT, TimeUnit.MILLISECONDS);
executor.invokeAll(tasks, PLUGIN_QUERY_TIMEOUT, TimeUnit.MILLISECONDS);
Iterator<Double> weightIterator = weights.iterator();
for (Future<Set<SuggestedReviewer>> f : futures) {
double weight = weightIterator.next();

View File

@@ -14,6 +14,8 @@
package com.google.gerrit.testing;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.gerrit.common.Nullable;
import com.google.gerrit.common.TimeUtil;
import com.google.gerrit.reviewdb.client.Account;
@@ -24,6 +26,7 @@ import com.google.gerrit.server.config.AllUsersNameProvider;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/** Fake implementation of {@link AccountCache} for testing. */
public class FakeAccountCache implements AccountCache {
@@ -47,6 +50,11 @@ public class FakeAccountCache implements AccountCache {
return Optional.ofNullable(byId.get(accountId));
}
@Override
public synchronized Map<Account.Id, AccountState> get(Set<Account.Id> accountIds) {
return ImmutableMap.copyOf(Maps.filterKeys(byId, accountIds::contains));
}
@Override
public synchronized Optional<AccountState> getByUsername(String username) {
throw new UnsupportedOperationException();

View File

@@ -29,6 +29,7 @@ import com.google.gerrit.index.project.ProjectSchemaDefinitions;
import com.google.gerrit.metrics.DisabledMetricMaker;
import com.google.gerrit.metrics.MetricMaker;
import com.google.gerrit.reviewdb.server.ReviewDb;
import com.google.gerrit.server.FanOutExecutor;
import com.google.gerrit.server.GerritPersonIdent;
import com.google.gerrit.server.GerritPersonIdentProvider;
import com.google.gerrit.server.api.GerritApiModule;
@@ -271,6 +272,13 @@ public class InMemoryModule extends FactoryModule {
return MoreExecutors.newDirectExecutorService();
}
@Provides
@Singleton
@FanOutExecutor
public ExecutorService createChangeJsonExecutor() {
return MoreExecutors.newDirectExecutorService();
}
@Provides
@Singleton
@GerritServerId