Merge branch 'stable-2.15'
* stable-2.15:
AbstractElasticIndex: fix index name of delete request and add coverage
Bazel: Remove unused commons-lang3
DropWizardMetricMaker: Move sanitizeMetricName to MetricMaker base class
Update git submodules
Update git submodules
Update git submodules
AsciiDoctor: Format with google-java-format version 1.6
Enable metrics for core queues
WorkQueue: Add possibility to enable metrics
Change I7af2f9f6c ("Bazel: Remove unused commons-lang3") is reverted by
this merge since commons-lang3 is actually still used on master.
This reverts commit 589af813cc
.
Change-Id: I5ea8c03d27e7eaefd0067658b25d1c08efdfd0f4
This commit is contained in:
@@ -68,6 +68,29 @@ formatQueryResults invocations in ChangeJson.
|
|||||||
* `query/query_latency`: Successful query latency, accumulated over the life
|
* `query/query_latency`: Successful query latency, accumulated over the life
|
||||||
of the process.
|
of the process.
|
||||||
|
|
||||||
|
=== Core Queues
|
||||||
|
|
||||||
|
The following queues support metrics:
|
||||||
|
|
||||||
|
* default `WorkQueue`
|
||||||
|
* index batch
|
||||||
|
* index interactive
|
||||||
|
* receive commits
|
||||||
|
* send email
|
||||||
|
* ssh batch worker
|
||||||
|
* ssh command start
|
||||||
|
* ssh interactive worker
|
||||||
|
* ssh stream worker
|
||||||
|
|
||||||
|
Each queue provides the following metrics:
|
||||||
|
|
||||||
|
* `queue/<queue_name>/pool_size`: Current number of threads in the pool
|
||||||
|
* `queue/<queue_name>/max_pool_size`: Maximum allowed number of threads in the pool
|
||||||
|
* `queue/<queue_name>/active_threads`: Number of threads that are actively executing tasks
|
||||||
|
* `queue/<queue_name>/scheduled_tasks`: Number of scheduled tasks in the queue
|
||||||
|
* `queue/<queue_name>/total_scheduled_tasks_count`: Total number of tasks that have been scheduled
|
||||||
|
* `queue/<queue_name>/total_completed_tasks_count`: Total number of tasks that have completed execution
|
||||||
|
|
||||||
=== SSH sessions
|
=== SSH sessions
|
||||||
|
|
||||||
* `sshd/sessions/connected`: Number of currently connected SSH sessions.
|
* `sshd/sessions/connected`: Number of currently connected SSH sessions.
|
||||||
|
@@ -182,7 +182,7 @@ abstract class AbstractElasticIndex<K, V> implements Index<K, V> {
|
|||||||
protected abstract String getId(V v);
|
protected abstract String getId(V v);
|
||||||
|
|
||||||
protected String delete(String type, K id) {
|
protected String delete(String type, K id) {
|
||||||
return new DeleteRequest(id.toString(), indexNameRaw, type).toString();
|
return new DeleteRequest(id.toString(), indexName, type).toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract V fromDocument(JsonObject doc, Set<String> fields);
|
protected abstract V fromDocument(JsonObject doc, Set<String> fields);
|
||||||
|
@@ -47,7 +47,7 @@ public class SysExecutorModule extends AbstractModule {
|
|||||||
int poolSize =
|
int poolSize =
|
||||||
config.getInt(
|
config.getInt(
|
||||||
"receive", null, "threadPoolSize", Runtime.getRuntime().availableProcessors());
|
"receive", null, "threadPoolSize", Runtime.getRuntime().availableProcessors());
|
||||||
return queues.createQueue(poolSize, "ReceiveCommits");
|
return queues.createQueue(poolSize, "ReceiveCommits", true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Provides
|
@Provides
|
||||||
@@ -59,7 +59,7 @@ public class SysExecutorModule extends AbstractModule {
|
|||||||
if (poolSize == 0) {
|
if (poolSize == 0) {
|
||||||
return MoreExecutors.newDirectExecutorService();
|
return MoreExecutors.newDirectExecutorService();
|
||||||
}
|
}
|
||||||
return queues.createQueue(poolSize, "SendEmail");
|
return queues.createQueue(poolSize, "SendEmail", true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Provides
|
@Provides
|
||||||
|
@@ -14,9 +14,13 @@
|
|||||||
|
|
||||||
package com.google.gerrit.server.git;
|
package com.google.gerrit.server.git;
|
||||||
|
|
||||||
|
import com.google.common.base.CaseFormat;
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
import com.google.common.flogger.FluentLogger;
|
import com.google.common.flogger.FluentLogger;
|
||||||
import com.google.gerrit.extensions.events.LifecycleListener;
|
import com.google.gerrit.extensions.events.LifecycleListener;
|
||||||
import com.google.gerrit.lifecycle.LifecycleModule;
|
import com.google.gerrit.lifecycle.LifecycleModule;
|
||||||
|
import com.google.gerrit.metrics.Description;
|
||||||
|
import com.google.gerrit.metrics.MetricMaker;
|
||||||
import com.google.gerrit.reviewdb.client.Project;
|
import com.google.gerrit.reviewdb.client.Project;
|
||||||
import com.google.gerrit.server.config.GerritServerConfig;
|
import com.google.gerrit.server.config.GerritServerConfig;
|
||||||
import com.google.gerrit.server.config.ScheduleConfig.Schedule;
|
import com.google.gerrit.server.config.ScheduleConfig.Schedule;
|
||||||
@@ -86,18 +90,20 @@ public class WorkQueue {
|
|||||||
|
|
||||||
private final ScheduledExecutorService defaultQueue;
|
private final ScheduledExecutorService defaultQueue;
|
||||||
private final IdGenerator idGenerator;
|
private final IdGenerator idGenerator;
|
||||||
|
private final MetricMaker metrics;
|
||||||
private final CopyOnWriteArrayList<Executor> queues;
|
private final CopyOnWriteArrayList<Executor> queues;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
WorkQueue(IdGenerator idGenerator, @GerritServerConfig Config cfg) {
|
WorkQueue(IdGenerator idGenerator, @GerritServerConfig Config cfg, MetricMaker metrics) {
|
||||||
this(idGenerator, cfg.getInt("execution", "defaultThreadPoolSize", 1));
|
this(idGenerator, cfg.getInt("execution", "defaultThreadPoolSize", 1), metrics);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Constructor to allow binding the WorkQueue more explicitly in a vhost setup. */
|
/** Constructor to allow binding the WorkQueue more explicitly in a vhost setup. */
|
||||||
public WorkQueue(IdGenerator idGenerator, int defaultThreadPoolSize) {
|
public WorkQueue(IdGenerator idGenerator, int defaultThreadPoolSize, MetricMaker metrics) {
|
||||||
this.idGenerator = idGenerator;
|
this.idGenerator = idGenerator;
|
||||||
|
this.metrics = metrics;
|
||||||
this.queues = new CopyOnWriteArrayList<>();
|
this.queues = new CopyOnWriteArrayList<>();
|
||||||
this.defaultQueue = createQueue(defaultThreadPoolSize, "WorkQueue");
|
this.defaultQueue = createQueue(defaultThreadPoolSize, "WorkQueue", true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Get the default work queue, for miscellaneous tasks. */
|
/** Get the default work queue, for miscellaneous tasks. */
|
||||||
@@ -105,14 +111,54 @@ public class WorkQueue {
|
|||||||
return defaultQueue;
|
return defaultQueue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Create a new executor queue. */
|
/**
|
||||||
public ScheduledExecutorService createQueue(int poolsize, String queueName) {
|
* Create a new executor queue.
|
||||||
return createQueue(poolsize, queueName, Thread.NORM_PRIORITY);
|
*
|
||||||
|
* <p>Creates a new executor queue without associated metrics. This method is suitable for use by
|
||||||
|
* plugins.
|
||||||
|
*
|
||||||
|
* <p>If metrics are needed, use {@link #createQueue(int, String, int, boolean)} instead.
|
||||||
|
*
|
||||||
|
* @param poolsize the size of the pool.
|
||||||
|
* @param queueName the name of the queue.
|
||||||
|
*/
|
||||||
|
public ScheduledThreadPoolExecutor createQueue(int poolsize, String queueName) {
|
||||||
|
return createQueue(poolsize, queueName, Thread.NORM_PRIORITY, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new executor queue, with default priority, optionally with metrics.
|
||||||
|
*
|
||||||
|
* <p>Creates a new executor queue, optionally with associated metrics. Metrics should not be
|
||||||
|
* requested for queues created by plugins.
|
||||||
|
*
|
||||||
|
* @param poolsize the size of the pool.
|
||||||
|
* @param queueName the name of the queue.
|
||||||
|
* @param withMetrics whether to create metrics.
|
||||||
|
*/
|
||||||
public ScheduledThreadPoolExecutor createQueue(
|
public ScheduledThreadPoolExecutor createQueue(
|
||||||
int poolsize, String queueName, int threadPriority) {
|
int poolsize, String queueName, boolean withMetrics) {
|
||||||
|
return createQueue(poolsize, queueName, Thread.NORM_PRIORITY, withMetrics);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new executor queue, optionally with metrics.
|
||||||
|
*
|
||||||
|
* <p>Creates a new executor queue, optionally with associated metrics. Metrics should not be
|
||||||
|
* requested for queues created by plugins.
|
||||||
|
*
|
||||||
|
* @param poolsize the size of the pool.
|
||||||
|
* @param queueName the name of the queue.
|
||||||
|
* @param threadPriority thread priority.
|
||||||
|
* @param withMetrics whether to create metrics.
|
||||||
|
*/
|
||||||
|
public ScheduledThreadPoolExecutor createQueue(
|
||||||
|
int poolsize, String queueName, int threadPriority, boolean withMetrics) {
|
||||||
Executor executor = new Executor(poolsize, queueName);
|
Executor executor = new Executor(poolsize, queueName);
|
||||||
|
if (withMetrics) {
|
||||||
|
logger.atInfo().log("Adding metrics for '%s' queue", queueName);
|
||||||
|
executor.buildMetrics(queueName);
|
||||||
|
}
|
||||||
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
|
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
|
||||||
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(true);
|
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(true);
|
||||||
queues.add(executor);
|
queues.add(executor);
|
||||||
@@ -233,6 +279,85 @@ public class WorkQueue {
|
|||||||
queues.remove(this);
|
queues.remove(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void buildMetrics(String queueName) {
|
||||||
|
metrics.newCallbackMetric(
|
||||||
|
getMetricName(queueName, "max_pool_size"),
|
||||||
|
Long.class,
|
||||||
|
new Description("Maximum allowed number of threads in the pool")
|
||||||
|
.setGauge()
|
||||||
|
.setUnit("threads"),
|
||||||
|
new Supplier<Long>() {
|
||||||
|
@Override
|
||||||
|
public Long get() {
|
||||||
|
return (long) getMaximumPoolSize();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
metrics.newCallbackMetric(
|
||||||
|
getMetricName(queueName, "pool_size"),
|
||||||
|
Long.class,
|
||||||
|
new Description("Current number of threads in the pool").setGauge().setUnit("threads"),
|
||||||
|
new Supplier<Long>() {
|
||||||
|
@Override
|
||||||
|
public Long get() {
|
||||||
|
return (long) getPoolSize();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
metrics.newCallbackMetric(
|
||||||
|
getMetricName(queueName, "active_threads"),
|
||||||
|
Long.class,
|
||||||
|
new Description("Number number of threads that are actively executing tasks")
|
||||||
|
.setGauge()
|
||||||
|
.setUnit("threads"),
|
||||||
|
new Supplier<Long>() {
|
||||||
|
@Override
|
||||||
|
public Long get() {
|
||||||
|
return (long) getActiveCount();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
metrics.newCallbackMetric(
|
||||||
|
getMetricName(queueName, "scheduled_tasks"),
|
||||||
|
Integer.class,
|
||||||
|
new Description("Number of scheduled tasks in the queue").setGauge().setUnit("tasks"),
|
||||||
|
new Supplier<Integer>() {
|
||||||
|
@Override
|
||||||
|
public Integer get() {
|
||||||
|
return getQueue().size();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
metrics.newCallbackMetric(
|
||||||
|
getMetricName(queueName, "total_scheduled_tasks_count"),
|
||||||
|
Long.class,
|
||||||
|
new Description("Total number of tasks that have been scheduled for execution")
|
||||||
|
.setCumulative()
|
||||||
|
.setUnit("tasks"),
|
||||||
|
new Supplier<Long>() {
|
||||||
|
@Override
|
||||||
|
public Long get() {
|
||||||
|
return (long) getTaskCount();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
metrics.newCallbackMetric(
|
||||||
|
getMetricName(queueName, "total_completed_tasks_count"),
|
||||||
|
Long.class,
|
||||||
|
new Description("Total number of tasks that have completed execution")
|
||||||
|
.setCumulative()
|
||||||
|
.setUnit("tasks"),
|
||||||
|
new Supplier<Long>() {
|
||||||
|
@Override
|
||||||
|
public Long get() {
|
||||||
|
return (long) getCompletedTaskCount();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getMetricName(String queueName, String metricName) {
|
||||||
|
String name =
|
||||||
|
CaseFormat.UPPER_CAMEL.to(
|
||||||
|
CaseFormat.LOWER_UNDERSCORE,
|
||||||
|
queueName.replaceFirst("SSH", "Ssh").replaceAll("-", ""));
|
||||||
|
return metrics.sanitizeMetricName(String.format("queue/%s/%s", name, metricName));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected <V> RunnableScheduledFuture<V> decorateTask(
|
protected <V> RunnableScheduledFuture<V> decorateTask(
|
||||||
Runnable runnable, RunnableScheduledFuture<V> r) {
|
Runnable runnable, RunnableScheduledFuture<V> r) {
|
||||||
|
@@ -217,7 +217,8 @@ public class IndexModule extends LifecycleModule {
|
|||||||
if (threads <= 0) {
|
if (threads <= 0) {
|
||||||
threads = Runtime.getRuntime().availableProcessors() / 2 + 1;
|
threads = Runtime.getRuntime().availableProcessors() / 2 + 1;
|
||||||
}
|
}
|
||||||
return MoreExecutors.listeningDecorator(workQueue.createQueue(threads, "Index-Interactive"));
|
return MoreExecutors.listeningDecorator(
|
||||||
|
workQueue.createQueue(threads, "Index-Interactive", true));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Provides
|
@Provides
|
||||||
@@ -232,7 +233,7 @@ public class IndexModule extends LifecycleModule {
|
|||||||
if (threads <= 0) {
|
if (threads <= 0) {
|
||||||
threads = Runtime.getRuntime().availableProcessors();
|
threads = Runtime.getRuntime().availableProcessors();
|
||||||
}
|
}
|
||||||
return MoreExecutors.listeningDecorator(workQueue.createQueue(threads, "Index-Batch"));
|
return MoreExecutors.listeningDecorator(workQueue.createQueue(threads, "Index-Batch", true));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
|
@@ -44,9 +44,10 @@ public class CommandExecutorQueueProvider implements QueueProvider {
|
|||||||
}
|
}
|
||||||
int interactiveThreads = Math.max(1, poolSize - batchThreads);
|
int interactiveThreads = Math.max(1, poolSize - batchThreads);
|
||||||
interactiveExecutor =
|
interactiveExecutor =
|
||||||
queues.createQueue(interactiveThreads, "SSH-Interactive-Worker", Thread.MIN_PRIORITY);
|
queues.createQueue(interactiveThreads, "SSH-Interactive-Worker", Thread.MIN_PRIORITY, true);
|
||||||
if (batchThreads != 0) {
|
if (batchThreads != 0) {
|
||||||
batchExecutor = queues.createQueue(batchThreads, "SSH-Batch-Worker", Thread.MIN_PRIORITY);
|
batchExecutor =
|
||||||
|
queues.createQueue(batchThreads, "SSH-Batch-Worker", Thread.MIN_PRIORITY, true);
|
||||||
} else {
|
} else {
|
||||||
batchExecutor = interactiveExecutor;
|
batchExecutor = interactiveExecutor;
|
||||||
}
|
}
|
||||||
|
@@ -75,7 +75,7 @@ class CommandFactoryProvider implements Provider<CommandFactory>, LifecycleListe
|
|||||||
createCommandInterceptor = i;
|
createCommandInterceptor = i;
|
||||||
|
|
||||||
int threads = cfg.getInt("sshd", "commandStartThreads", 2);
|
int threads = cfg.getInt("sshd", "commandStartThreads", 2);
|
||||||
startExecutor = workQueue.createQueue(threads, "SshCommandStart");
|
startExecutor = workQueue.createQueue(threads, "SshCommandStart", true);
|
||||||
destroyExecutor =
|
destroyExecutor =
|
||||||
Executors.newSingleThreadExecutor(
|
Executors.newSingleThreadExecutor(
|
||||||
new ThreadFactoryBuilder()
|
new ThreadFactoryBuilder()
|
||||||
|
@@ -34,6 +34,6 @@ class StreamCommandExecutorProvider implements Provider<ScheduledThreadPoolExecu
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ScheduledThreadPoolExecutor get() {
|
public ScheduledThreadPoolExecutor get() {
|
||||||
return queues.createQueue(poolSize, "SSH-Stream-Worker", Thread.MIN_PRIORITY);
|
return queues.createQueue(poolSize, "SSH-Stream-Worker", Thread.MIN_PRIORITY, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -2909,6 +2909,18 @@ public abstract class AbstractQueryChangesTest extends GerritServerTests {
|
|||||||
assertQuery("owner: \"" + nameEmail + "\"\\");
|
assertQuery("owner: \"" + nameEmail + "\"\\");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void byDeletedChange() throws Exception {
|
||||||
|
TestRepository<Repo> repo = createProject("repo");
|
||||||
|
Change change = insert(repo, newChange(repo));
|
||||||
|
|
||||||
|
String query = "change:" + change.getId();
|
||||||
|
assertQuery(query, change);
|
||||||
|
|
||||||
|
gApi.changes().id(change.getChangeId()).delete();
|
||||||
|
assertQuery(query);
|
||||||
|
}
|
||||||
|
|
||||||
protected ChangeInserter newChange(TestRepository<Repo> repo) throws Exception {
|
protected ChangeInserter newChange(TestRepository<Repo> repo) throws Exception {
|
||||||
return newChange(repo, null, null, null, null, false);
|
return newChange(repo, null, null, null, null, false);
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user