diff --git a/Documentation/metrics.txt b/Documentation/metrics.txt index 901f15a332..523973006d 100644 --- a/Documentation/metrics.txt +++ b/Documentation/metrics.txt @@ -53,6 +53,29 @@ objects needing finalization. * `query/query_latency`: Successful query latency, accumulated over the life of the process. +=== Core Queues + +The following queues support metrics: + +* default `WorkQueue` +* index batch +* index interactive +* receive commits +* send email +* ssh batch worker +* ssh command start +* ssh interactive worker +* ssh stream worker + +Each queue provides the following metrics: + +* `queue//pool_size`: Current number of threads in the pool +* `queue//max_pool_size`: Maximum allowed number of threads in the pool +* `queue//active_threads`: Number of threads that are actively executing tasks +* `queue//scheduled_tasks`: Number of scheduled tasks in the queue +* `queue//total_scheduled_tasks_count`: Total number of tasks that have been scheduled +* `queue//total_completed_tasks_count`: Total number of tasks that have completed execution + === SSH sessions * `sshd/sessions/connected`: Number of currently connected SSH sessions. diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/git/WorkQueue.java b/gerrit-server/src/main/java/com/google/gerrit/server/git/WorkQueue.java index e1f2c5852a..757eb7a282 100644 --- a/gerrit-server/src/main/java/com/google/gerrit/server/git/WorkQueue.java +++ b/gerrit-server/src/main/java/com/google/gerrit/server/git/WorkQueue.java @@ -14,8 +14,14 @@ package com.google.gerrit.server.git; +import static com.google.gerrit.metrics.dropwizard.DropWizardMetricMaker.sanitizeMetricName; + +import com.google.common.base.CaseFormat; +import com.google.common.base.Supplier; import com.google.gerrit.extensions.events.LifecycleListener; import com.google.gerrit.lifecycle.LifecycleModule; +import com.google.gerrit.metrics.Description; +import com.google.gerrit.metrics.MetricMaker; import com.google.gerrit.reviewdb.client.Project; import com.google.gerrit.server.config.GerritServerConfig; import com.google.gerrit.server.util.IdGenerator; @@ -85,18 +91,20 @@ public class WorkQueue { private final ScheduledExecutorService defaultQueue; private final IdGenerator idGenerator; + private final MetricMaker metrics; private final CopyOnWriteArrayList queues; @Inject - WorkQueue(IdGenerator idGenerator, @GerritServerConfig Config cfg) { - this(idGenerator, cfg.getInt("execution", "defaultThreadPoolSize", 1)); + WorkQueue(IdGenerator idGenerator, @GerritServerConfig Config cfg, MetricMaker metrics) { + this(idGenerator, cfg.getInt("execution", "defaultThreadPoolSize", 1), metrics); } /** Constructor to allow binding the WorkQueue more explicitly in a vhost setup. */ - public WorkQueue(IdGenerator idGenerator, int defaultThreadPoolSize) { + public WorkQueue(IdGenerator idGenerator, int defaultThreadPoolSize, MetricMaker metrics) { this.idGenerator = idGenerator; + this.metrics = metrics; this.queues = new CopyOnWriteArrayList<>(); - this.defaultQueue = createQueue(defaultThreadPoolSize, "WorkQueue"); + this.defaultQueue = createQueue(defaultThreadPoolSize, "WorkQueue", true); } /** Get the default work queue, for miscellaneous tasks. */ @@ -104,14 +112,54 @@ public class WorkQueue { return defaultQueue; } - /** Create a new executor queue. */ - public ScheduledExecutorService createQueue(int poolsize, String queueName) { - return createQueue(poolsize, queueName, Thread.NORM_PRIORITY); + /** + * Create a new executor queue. + * + *

Creates a new executor queue without associated metrics. This method is suitable for use by + * plugins. + * + *

If metrics are needed, use {@link #createQueue(int, String, int, boolean)} instead. + * + * @param poolsize the size of the pool. + * @param queueName the name of the queue. + */ + public ScheduledThreadPoolExecutor createQueue(int poolsize, String queueName) { + return createQueue(poolsize, queueName, Thread.NORM_PRIORITY, false); } + /** + * Create a new executor queue, with default priority, optionally with metrics. + * + *

Creates a new executor queue, optionally with associated metrics. Metrics should not be + * requested for queues created by plugins. + * + * @param poolsize the size of the pool. + * @param queueName the name of the queue. + * @param withMetrics whether to create metrics. + */ public ScheduledThreadPoolExecutor createQueue( - int poolsize, String queueName, int threadPriority) { + int poolsize, String queueName, boolean withMetrics) { + return createQueue(poolsize, queueName, Thread.NORM_PRIORITY, withMetrics); + } + + /** + * Create a new executor queue, optionally with metrics. + * + *

Creates a new executor queue, optionally with associated metrics. Metrics should not be + * requested for queues created by plugins. + * + * @param poolsize the size of the pool. + * @param queueName the name of the queue. + * @param threadPriority thread priority. + * @param withMetrics whether to create metrics. + */ + public ScheduledThreadPoolExecutor createQueue( + int poolsize, String queueName, int threadPriority, boolean withMetrics) { Executor executor = new Executor(poolsize, queueName); + if (withMetrics) { + log.info("Adding metrics for '{}' queue", queueName); + executor.buildMetrics(queueName); + } executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false); executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(true); queues.add(executor); @@ -223,6 +271,85 @@ public class WorkQueue { queues.remove(this); } + private void buildMetrics(String queueName) { + metrics.newCallbackMetric( + getMetricName(queueName, "max_pool_size"), + Long.class, + new Description("Maximum allowed number of threads in the pool") + .setGauge() + .setUnit("threads"), + new Supplier() { + @Override + public Long get() { + return (long) getMaximumPoolSize(); + } + }); + metrics.newCallbackMetric( + getMetricName(queueName, "pool_size"), + Long.class, + new Description("Current number of threads in the pool").setGauge().setUnit("threads"), + new Supplier() { + @Override + public Long get() { + return (long) getPoolSize(); + } + }); + metrics.newCallbackMetric( + getMetricName(queueName, "active_threads"), + Long.class, + new Description("Number number of threads that are actively executing tasks") + .setGauge() + .setUnit("threads"), + new Supplier() { + @Override + public Long get() { + return (long) getActiveCount(); + } + }); + metrics.newCallbackMetric( + getMetricName(queueName, "scheduled_tasks"), + Integer.class, + new Description("Number of scheduled tasks in the queue").setGauge().setUnit("tasks"), + new Supplier() { + @Override + public Integer get() { + return getQueue().size(); + } + }); + metrics.newCallbackMetric( + getMetricName(queueName, "total_scheduled_tasks_count"), + Long.class, + new Description("Total number of tasks that have been scheduled for execution") + .setCumulative() + .setUnit("tasks"), + new Supplier() { + @Override + public Long get() { + return (long) getTaskCount(); + } + }); + metrics.newCallbackMetric( + getMetricName(queueName, "total_completed_tasks_count"), + Long.class, + new Description("Total number of tasks that have completed execution") + .setCumulative() + .setUnit("tasks"), + new Supplier() { + @Override + public Long get() { + return (long) getCompletedTaskCount(); + } + }); + } + + private String getMetricName(String queueName, String metricName) { + String name = + CaseFormat.UPPER_CAMEL.to( + CaseFormat.LOWER_UNDERSCORE, + queueName.replaceFirst("SSH", "Ssh").replaceAll("-", "")); + return sanitizeMetricName(String.format("queue/%s/%s", name, metricName)); + } + @Override protected RunnableScheduledFuture decorateTask( Runnable runnable, RunnableScheduledFuture r) { diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/git/receive/ReceiveCommitsExecutorModule.java b/gerrit-server/src/main/java/com/google/gerrit/server/git/receive/ReceiveCommitsExecutorModule.java index 4eb760d9c5..8a66e34ac7 100644 --- a/gerrit-server/src/main/java/com/google/gerrit/server/git/receive/ReceiveCommitsExecutorModule.java +++ b/gerrit-server/src/main/java/com/google/gerrit/server/git/receive/ReceiveCommitsExecutorModule.java @@ -48,7 +48,7 @@ public class ReceiveCommitsExecutorModule extends AbstractModule { int poolSize = config.getInt( "receive", null, "threadPoolSize", Runtime.getRuntime().availableProcessors()); - return queues.createQueue(poolSize, "ReceiveCommits"); + return queues.createQueue(poolSize, "ReceiveCommits", true); } @Provides @@ -60,7 +60,7 @@ public class ReceiveCommitsExecutorModule extends AbstractModule { if (poolSize == 0) { return MoreExecutors.newDirectExecutorService(); } - return queues.createQueue(poolSize, "SendEmail"); + return queues.createQueue(poolSize, "SendEmail", true); } @Provides diff --git a/gerrit-server/src/main/java/com/google/gerrit/server/index/IndexModule.java b/gerrit-server/src/main/java/com/google/gerrit/server/index/IndexModule.java index 45e0376804..89afec4a6c 100644 --- a/gerrit-server/src/main/java/com/google/gerrit/server/index/IndexModule.java +++ b/gerrit-server/src/main/java/com/google/gerrit/server/index/IndexModule.java @@ -186,7 +186,8 @@ public class IndexModule extends LifecycleModule { if (threads <= 0) { threads = Runtime.getRuntime().availableProcessors() / 2 + 1; } - return MoreExecutors.listeningDecorator(workQueue.createQueue(threads, "Index-Interactive")); + return MoreExecutors.listeningDecorator( + workQueue.createQueue(threads, "Index-Interactive", true)); } @Provides @@ -204,7 +205,8 @@ public class IndexModule extends LifecycleModule { if (batchThreads <= 0) { batchThreads = Runtime.getRuntime().availableProcessors(); } - return MoreExecutors.listeningDecorator(workQueue.createQueue(batchThreads, "Index-Batch")); + return MoreExecutors.listeningDecorator( + workQueue.createQueue(batchThreads, "Index-Batch", true)); } @Singleton diff --git a/gerrit-sshd/src/main/java/com/google/gerrit/sshd/CommandExecutorQueueProvider.java b/gerrit-sshd/src/main/java/com/google/gerrit/sshd/CommandExecutorQueueProvider.java index 59185bfd4a..13ca52ec21 100644 --- a/gerrit-sshd/src/main/java/com/google/gerrit/sshd/CommandExecutorQueueProvider.java +++ b/gerrit-sshd/src/main/java/com/google/gerrit/sshd/CommandExecutorQueueProvider.java @@ -44,9 +44,10 @@ public class CommandExecutorQueueProvider implements QueueProvider { } int interactiveThreads = Math.max(1, poolSize - batchThreads); interactiveExecutor = - queues.createQueue(interactiveThreads, "SSH-Interactive-Worker", Thread.MIN_PRIORITY); + queues.createQueue(interactiveThreads, "SSH-Interactive-Worker", Thread.MIN_PRIORITY, true); if (batchThreads != 0) { - batchExecutor = queues.createQueue(batchThreads, "SSH-Batch-Worker", Thread.MIN_PRIORITY); + batchExecutor = + queues.createQueue(batchThreads, "SSH-Batch-Worker", Thread.MIN_PRIORITY, true); } else { batchExecutor = interactiveExecutor; } diff --git a/gerrit-sshd/src/main/java/com/google/gerrit/sshd/CommandFactoryProvider.java b/gerrit-sshd/src/main/java/com/google/gerrit/sshd/CommandFactoryProvider.java index 4cd748733c..e0f458b371 100644 --- a/gerrit-sshd/src/main/java/com/google/gerrit/sshd/CommandFactoryProvider.java +++ b/gerrit-sshd/src/main/java/com/google/gerrit/sshd/CommandFactoryProvider.java @@ -76,7 +76,7 @@ class CommandFactoryProvider implements Provider, LifecycleListe createCommandInterceptor = i; int threads = cfg.getInt("sshd", "commandStartThreads", 2); - startExecutor = workQueue.createQueue(threads, "SshCommandStart"); + startExecutor = workQueue.createQueue(threads, "SshCommandStart", true); destroyExecutor = Executors.newSingleThreadExecutor( new ThreadFactoryBuilder() diff --git a/gerrit-sshd/src/main/java/com/google/gerrit/sshd/StreamCommandExecutorProvider.java b/gerrit-sshd/src/main/java/com/google/gerrit/sshd/StreamCommandExecutorProvider.java index c3c6306c85..235da5d4bf 100644 --- a/gerrit-sshd/src/main/java/com/google/gerrit/sshd/StreamCommandExecutorProvider.java +++ b/gerrit-sshd/src/main/java/com/google/gerrit/sshd/StreamCommandExecutorProvider.java @@ -34,6 +34,6 @@ class StreamCommandExecutorProvider implements Provider