Parallelize ChangeJson#toChangeInfos
99%ile latency of QueryChanges is between 20 seconds and 80 seconds on googlesource.com depending on the time of day. There are two main reasons for that: 1) Performing operations that require loading ChangeNotes 2) Performing operations that require opening the repo 3) Filling accounts with a cold AccountCache This commit parallelizes formatting the individual results to mitigate 1+2. 3 will be addressed by a different change that will make the AccountFiller parallelize, too. Parallelization is done on a newly introduced FanOutExectuor that can be used whenever a serving thread wants to parallelize work. Change-Id: I36c6b92e31488ad001f5aea43efc837d31ba3021
This commit is contained in:
committed by
Patrick Hiesel
parent
a691e63bd6
commit
2ed3982ffb
@@ -3906,6 +3906,15 @@ which miscellaneous tasks are handled.
|
||||
+
|
||||
Default is 1.
|
||||
|
||||
[[execution.fanOutThreadPoolSize]]execution.fanOutThreadPoolSize::
|
||||
+
|
||||
Maximum size of thread pool to on which a serving thread can fan-out
|
||||
work to parallelize it.
|
||||
+
|
||||
When set to 0, a direct executor will be used.
|
||||
+
|
||||
By default, 25 which means that formatting happens in the caller thread.
|
||||
|
||||
[[receiveemail]]
|
||||
=== Section receiveemail
|
||||
|
||||
|
||||
27
java/com/google/gerrit/server/FanOutExecutor.java
Normal file
27
java/com/google/gerrit/server/FanOutExecutor.java
Normal file
@@ -0,0 +1,27 @@
|
||||
// Copyright (C) 2018 The Android Open Source Project
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package com.google.gerrit.server;
|
||||
|
||||
import static java.lang.annotation.RetentionPolicy.RUNTIME;
|
||||
|
||||
import com.google.inject.BindingAnnotation;
|
||||
import java.lang.annotation.Retention;
|
||||
|
||||
/**
|
||||
* Marker on the global {@code ThreadPoolExecutor} used to do parallel work from a serving thread.
|
||||
*/
|
||||
@Retention(RUNTIME)
|
||||
@BindingAnnotation
|
||||
public @interface FanOutExecutor {}
|
||||
@@ -44,7 +44,6 @@ import com.google.auto.value.AutoValue;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.MoreObjects;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.FluentIterable;
|
||||
import com.google.common.collect.HashBasedTable;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
@@ -106,6 +105,7 @@ import com.google.gerrit.server.AnonymousUser;
|
||||
import com.google.gerrit.server.ApprovalsUtil;
|
||||
import com.google.gerrit.server.ChangeMessagesUtil;
|
||||
import com.google.gerrit.server.CurrentUser;
|
||||
import com.google.gerrit.server.FanOutExecutor;
|
||||
import com.google.gerrit.server.GpgException;
|
||||
import com.google.gerrit.server.IdentifiedUser;
|
||||
import com.google.gerrit.server.ReviewerByEmailSet;
|
||||
@@ -129,7 +129,6 @@ import com.google.gerrit.server.permissions.ChangePermission;
|
||||
import com.google.gerrit.server.permissions.LabelPermission;
|
||||
import com.google.gerrit.server.permissions.PermissionBackend;
|
||||
import com.google.gerrit.server.permissions.PermissionBackendException;
|
||||
import com.google.gerrit.server.project.NoSuchChangeException;
|
||||
import com.google.gerrit.server.project.ProjectCache;
|
||||
import com.google.gerrit.server.project.ProjectState;
|
||||
import com.google.gerrit.server.project.RemoveReviewerControl;
|
||||
@@ -155,6 +154,10 @@ import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import org.eclipse.jgit.lib.ObjectId;
|
||||
import org.eclipse.jgit.lib.Ref;
|
||||
import org.eclipse.jgit.lib.Repository;
|
||||
@@ -271,6 +274,8 @@ public class ChangeJson {
|
||||
private final RemoveReviewerControl removeReviewerControl;
|
||||
private final TrackingFooters trackingFooters;
|
||||
private final Metrics metrics;
|
||||
private final ExecutorService fanOutExecutor;
|
||||
|
||||
private boolean lazyLoad = true;
|
||||
private AccountLoader accountLoader;
|
||||
private FixInput fix;
|
||||
@@ -304,6 +309,7 @@ public class ChangeJson {
|
||||
RemoveReviewerControl removeReviewerControl,
|
||||
TrackingFooters trackingFooters,
|
||||
Metrics metrics,
|
||||
@FanOutExecutor ExecutorService fanOutExecutor,
|
||||
@Assisted Iterable<ListChangesOption> options) {
|
||||
this.db = db;
|
||||
this.userProvider = user;
|
||||
@@ -331,6 +337,7 @@ public class ChangeJson {
|
||||
this.removeReviewerControl = removeReviewerControl;
|
||||
this.trackingFooters = trackingFooters;
|
||||
this.metrics = metrics;
|
||||
this.fanOutExecutor = fanOutExecutor;
|
||||
this.options = Sets.immutableEnumSet(options);
|
||||
}
|
||||
|
||||
@@ -411,12 +418,11 @@ public class ChangeJson {
|
||||
throws OrmException {
|
||||
try (Timer0.Context ignored = metrics.formatQueryResultsLatency.start()) {
|
||||
accountLoader = accountLoaderFactory.create(has(DETAILED_ACCOUNTS));
|
||||
ensureLoaded(FluentIterable.from(in).transformAndConcat(QueryResult::entities));
|
||||
|
||||
List<List<ChangeInfo>> res = Lists.newArrayListWithCapacity(in.size());
|
||||
Map<Change.Id, ChangeInfo> out = new HashMap<>();
|
||||
List<List<ChangeInfo>> res = new ArrayList<>(in.size());
|
||||
Map<Change.Id, ChangeInfo> cache = Maps.newHashMapWithExpectedSize(in.size());
|
||||
for (QueryResult<ChangeData> r : in) {
|
||||
List<ChangeInfo> infos = toChangeInfos(out, r.entities());
|
||||
List<ChangeInfo> infos = toChangeInfos(r.entities(), cache);
|
||||
infos.forEach(c -> cache.put(new Change.Id(c._number), c));
|
||||
if (!infos.isEmpty() && r.more()) {
|
||||
infos.get(infos.size() - 1)._moreChanges = true;
|
||||
}
|
||||
@@ -478,38 +484,53 @@ public class ChangeJson {
|
||||
return options.contains(option);
|
||||
}
|
||||
|
||||
private List<ChangeInfo> toChangeInfos(Map<Change.Id, ChangeInfo> out, List<ChangeData> changes) {
|
||||
private List<ChangeInfo> toChangeInfos(
|
||||
List<ChangeData> changes, Map<Change.Id, ChangeInfo> cache) {
|
||||
try (Timer0.Context ignored = metrics.toChangeInfosLatency.start()) {
|
||||
List<ChangeInfo> info = Lists.newArrayListWithCapacity(changes.size());
|
||||
// Create a list of formatting calls that can be called sequentially or in parallel
|
||||
List<Callable<Optional<ChangeInfo>>> formattingCalls = new ArrayList<>(changes.size());
|
||||
for (ChangeData cd : changes) {
|
||||
ChangeInfo i = out.get(cd.getId());
|
||||
if (i == null) {
|
||||
try {
|
||||
i = toChangeInfo(cd, Optional.empty());
|
||||
} catch (PatchListNotAvailableException
|
||||
| GpgException
|
||||
| OrmException
|
||||
| IOException
|
||||
| PermissionBackendException
|
||||
| RuntimeException e) {
|
||||
if (has(CHECK)) {
|
||||
i = checkOnly(cd);
|
||||
} else if (e instanceof NoSuchChangeException) {
|
||||
log.info(
|
||||
"NoSuchChangeException: Omitting corrupt change "
|
||||
+ cd.getId()
|
||||
+ " from results. Seems to be stale in the index.");
|
||||
continue;
|
||||
} else {
|
||||
log.warn("Omitting corrupt change " + cd.getId() + " from results", e);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
out.put(cd.getId(), i);
|
||||
}
|
||||
info.add(i);
|
||||
formattingCalls.add(
|
||||
() -> {
|
||||
ChangeInfo i = cache.get(cd.getId());
|
||||
if (i != null) {
|
||||
return Optional.of(i);
|
||||
}
|
||||
try {
|
||||
ensureLoaded(Collections.singleton(cd));
|
||||
return Optional.of(format(cd, Optional.empty(), false));
|
||||
} catch (OrmException | RuntimeException e) {
|
||||
log.warn("Omitting corrupt change " + cd.getId() + " from results", e);
|
||||
return Optional.empty();
|
||||
}
|
||||
});
|
||||
}
|
||||
return info;
|
||||
|
||||
long numProjects = changes.stream().map(c -> c.project()).distinct().count();
|
||||
if (!lazyLoad || changes.size() < 3 || numProjects < 2) {
|
||||
// Format these changes in the request thread as the multithreading overhead would be too
|
||||
// high.
|
||||
List<ChangeInfo> result = new ArrayList<>(changes.size());
|
||||
for (Callable<Optional<ChangeInfo>> c : formattingCalls) {
|
||||
try {
|
||||
c.call().ifPresent(result::add);
|
||||
} catch (Exception e) {
|
||||
log.warn("Omitting change due to exception", e);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
// Format the changes in parallel on the executor
|
||||
List<ChangeInfo> result = new ArrayList<>(changes.size());
|
||||
try {
|
||||
for (Future<Optional<ChangeInfo>> f : fanOutExecutor.invokeAll(formattingCalls)) {
|
||||
f.get().ifPresent(result::add);
|
||||
}
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new IllegalStateException(e);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ package com.google.gerrit.server.config;
|
||||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import com.google.gerrit.server.FanOutExecutor;
|
||||
import com.google.gerrit.server.git.WorkQueue;
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Provides;
|
||||
@@ -61,6 +62,17 @@ public class SysExecutorModule extends AbstractModule {
|
||||
return queues.createQueue(poolSize, "SendEmail");
|
||||
}
|
||||
|
||||
@Provides
|
||||
@Singleton
|
||||
@FanOutExecutor
|
||||
public ExecutorService createFanOutExecutor(@GerritServerConfig Config config, WorkQueue queues) {
|
||||
int poolSize = config.getInt("execution", null, "fanOutThreadPoolSize", 25);
|
||||
if (poolSize == 0) {
|
||||
return MoreExecutors.newDirectExecutorService();
|
||||
}
|
||||
return queues.createQueue(poolSize, "FanOut");
|
||||
}
|
||||
|
||||
@Provides
|
||||
@Singleton
|
||||
@ChangeUpdateExecutor
|
||||
|
||||
@@ -29,6 +29,7 @@ import com.google.gerrit.index.project.ProjectSchemaDefinitions;
|
||||
import com.google.gerrit.metrics.DisabledMetricMaker;
|
||||
import com.google.gerrit.metrics.MetricMaker;
|
||||
import com.google.gerrit.reviewdb.server.ReviewDb;
|
||||
import com.google.gerrit.server.FanOutExecutor;
|
||||
import com.google.gerrit.server.GerritPersonIdent;
|
||||
import com.google.gerrit.server.GerritPersonIdentProvider;
|
||||
import com.google.gerrit.server.api.GerritApiModule;
|
||||
@@ -269,6 +270,13 @@ public class InMemoryModule extends FactoryModule {
|
||||
return MoreExecutors.newDirectExecutorService();
|
||||
}
|
||||
|
||||
@Provides
|
||||
@Singleton
|
||||
@FanOutExecutor
|
||||
public ExecutorService createChangeJsonExecutor() {
|
||||
return MoreExecutors.newDirectExecutorService();
|
||||
}
|
||||
|
||||
@Provides
|
||||
@Singleton
|
||||
@GerritServerId
|
||||
|
||||
Reference in New Issue
Block a user