Merge changes from topic "queue-metrics" into stable-2.14
* changes: Enable metrics for core queues WorkQueue: Add possibility to enable metrics
This commit is contained in:
commit
7d293e4e5d
@ -53,6 +53,29 @@ objects needing finalization.
|
||||
* `query/query_latency`: Successful query latency, accumulated over the life
|
||||
of the process.
|
||||
|
||||
=== Core Queues
|
||||
|
||||
The following queues support metrics:
|
||||
|
||||
* default `WorkQueue`
|
||||
* index batch
|
||||
* index interactive
|
||||
* receive commits
|
||||
* send email
|
||||
* ssh batch worker
|
||||
* ssh command start
|
||||
* ssh interactive worker
|
||||
* ssh stream worker
|
||||
|
||||
Each queue provides the following metrics:
|
||||
|
||||
* `queue/<queue_name>/pool_size`: Current number of threads in the pool
|
||||
* `queue/<queue_name>/max_pool_size`: Maximum allowed number of threads in the pool
|
||||
* `queue/<queue_name>/active_threads`: Number of threads that are actively executing tasks
|
||||
* `queue/<queue_name>/scheduled_tasks`: Number of scheduled tasks in the queue
|
||||
* `queue/<queue_name>/total_scheduled_tasks_count`: Total number of tasks that have been scheduled
|
||||
* `queue/<queue_name>/total_completed_tasks_count`: Total number of tasks that have completed execution
|
||||
|
||||
=== SSH sessions
|
||||
|
||||
* `sshd/sessions/connected`: Number of currently connected SSH sessions.
|
||||
|
@ -41,7 +41,7 @@ public class ReceiveCommitsExecutorModule extends AbstractModule {
|
||||
int poolSize =
|
||||
config.getInt(
|
||||
"receive", null, "threadPoolSize", Runtime.getRuntime().availableProcessors());
|
||||
return queues.createQueue(poolSize, "ReceiveCommits");
|
||||
return queues.createQueue(poolSize, "ReceiveCommits", true);
|
||||
}
|
||||
|
||||
@Provides
|
||||
@ -53,7 +53,7 @@ public class ReceiveCommitsExecutorModule extends AbstractModule {
|
||||
if (poolSize == 0) {
|
||||
return MoreExecutors.newDirectExecutorService();
|
||||
}
|
||||
return queues.createQueue(poolSize, "SendEmail");
|
||||
return queues.createQueue(poolSize, "SendEmail", true);
|
||||
}
|
||||
|
||||
@Provides
|
||||
|
@ -14,8 +14,14 @@
|
||||
|
||||
package com.google.gerrit.server.git;
|
||||
|
||||
import static com.google.gerrit.metrics.dropwizard.DropWizardMetricMaker.sanitizeMetricName;
|
||||
|
||||
import com.google.common.base.CaseFormat;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.gerrit.extensions.events.LifecycleListener;
|
||||
import com.google.gerrit.lifecycle.LifecycleModule;
|
||||
import com.google.gerrit.metrics.Description;
|
||||
import com.google.gerrit.metrics.MetricMaker;
|
||||
import com.google.gerrit.reviewdb.client.Project;
|
||||
import com.google.gerrit.server.config.GerritServerConfig;
|
||||
import com.google.gerrit.server.util.IdGenerator;
|
||||
@ -84,18 +90,20 @@ public class WorkQueue {
|
||||
|
||||
private final Executor defaultQueue;
|
||||
private final IdGenerator idGenerator;
|
||||
private final MetricMaker metrics;
|
||||
private final CopyOnWriteArrayList<Executor> queues;
|
||||
|
||||
@Inject
|
||||
WorkQueue(IdGenerator idGenerator, @GerritServerConfig Config cfg) {
|
||||
this(idGenerator, cfg.getInt("execution", "defaultThreadPoolSize", 1));
|
||||
WorkQueue(IdGenerator idGenerator, @GerritServerConfig Config cfg, MetricMaker metrics) {
|
||||
this(idGenerator, cfg.getInt("execution", "defaultThreadPoolSize", 1), metrics);
|
||||
}
|
||||
|
||||
/** Constructor to allow binding the WorkQueue more explicitly in a vhost setup. */
|
||||
public WorkQueue(IdGenerator idGenerator, int defaultThreadPoolSize) {
|
||||
public WorkQueue(IdGenerator idGenerator, int defaultThreadPoolSize, MetricMaker metrics) {
|
||||
this.idGenerator = idGenerator;
|
||||
this.metrics = metrics;
|
||||
this.queues = new CopyOnWriteArrayList<>();
|
||||
this.defaultQueue = createQueue(defaultThreadPoolSize, "WorkQueue");
|
||||
this.defaultQueue = createQueue(defaultThreadPoolSize, "WorkQueue", true);
|
||||
}
|
||||
|
||||
/** Get the default work queue, for miscellaneous tasks. */
|
||||
@ -103,9 +111,37 @@ public class WorkQueue {
|
||||
return defaultQueue;
|
||||
}
|
||||
|
||||
/** Create a new executor queue. */
|
||||
/**
|
||||
* Create a new executor queue.
|
||||
*
|
||||
* <p>Creates a new executor queue without associated metrics. This method is suitable for use by
|
||||
* plugins.
|
||||
*
|
||||
* <p>If metrics are needed, use {@link #createQueue(int, String, boolean)} instead.
|
||||
*
|
||||
* @param poolsize the size of the pool.
|
||||
* @param queueName the name of the queue.
|
||||
*/
|
||||
public Executor createQueue(int poolsize, String queueName) {
|
||||
return createQueue(poolsize, queueName, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new executor queue, optionally with metrics.
|
||||
*
|
||||
* <p>Creates a new executor queue, optionally with associated metrics. Metrics should not be
|
||||
* requested for queues created by plugins.
|
||||
*
|
||||
* @param poolsize the size of the pool.
|
||||
* @param queueName the name of the queue.
|
||||
* @param withMetrics whether to create metrics.
|
||||
*/
|
||||
public Executor createQueue(int poolsize, String queueName, boolean withMetrics) {
|
||||
final Executor r = new Executor(poolsize, queueName);
|
||||
if (withMetrics) {
|
||||
log.info("Adding metrics for '{}' queue", queueName);
|
||||
r.buildMetrics(queueName);
|
||||
}
|
||||
r.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
|
||||
r.setExecuteExistingDelayedTasksAfterShutdownPolicy(true);
|
||||
queues.add(r);
|
||||
@ -201,6 +237,85 @@ public class WorkQueue {
|
||||
this.queueName = queueName;
|
||||
}
|
||||
|
||||
private void buildMetrics(String queueName) {
|
||||
metrics.newCallbackMetric(
|
||||
getMetricName(queueName, "max_pool_size"),
|
||||
Long.class,
|
||||
new Description("Maximum allowed number of threads in the pool")
|
||||
.setGauge()
|
||||
.setUnit("threads"),
|
||||
new Supplier<Long>() {
|
||||
@Override
|
||||
public Long get() {
|
||||
return (long) getMaximumPoolSize();
|
||||
}
|
||||
});
|
||||
metrics.newCallbackMetric(
|
||||
getMetricName(queueName, "pool_size"),
|
||||
Long.class,
|
||||
new Description("Current number of threads in the pool").setGauge().setUnit("threads"),
|
||||
new Supplier<Long>() {
|
||||
@Override
|
||||
public Long get() {
|
||||
return (long) getPoolSize();
|
||||
}
|
||||
});
|
||||
metrics.newCallbackMetric(
|
||||
getMetricName(queueName, "active_threads"),
|
||||
Long.class,
|
||||
new Description("Number number of threads that are actively executing tasks")
|
||||
.setGauge()
|
||||
.setUnit("threads"),
|
||||
new Supplier<Long>() {
|
||||
@Override
|
||||
public Long get() {
|
||||
return (long) getActiveCount();
|
||||
}
|
||||
});
|
||||
metrics.newCallbackMetric(
|
||||
getMetricName(queueName, "scheduled_tasks"),
|
||||
Integer.class,
|
||||
new Description("Number of scheduled tasks in the queue").setGauge().setUnit("tasks"),
|
||||
new Supplier<Integer>() {
|
||||
@Override
|
||||
public Integer get() {
|
||||
return getQueue().size();
|
||||
}
|
||||
});
|
||||
metrics.newCallbackMetric(
|
||||
getMetricName(queueName, "total_scheduled_tasks_count"),
|
||||
Long.class,
|
||||
new Description("Total number of tasks that have been scheduled for execution")
|
||||
.setCumulative()
|
||||
.setUnit("tasks"),
|
||||
new Supplier<Long>() {
|
||||
@Override
|
||||
public Long get() {
|
||||
return (long) getTaskCount();
|
||||
}
|
||||
});
|
||||
metrics.newCallbackMetric(
|
||||
getMetricName(queueName, "total_completed_tasks_count"),
|
||||
Long.class,
|
||||
new Description("Total number of tasks that have completed execution")
|
||||
.setCumulative()
|
||||
.setUnit("tasks"),
|
||||
new Supplier<Long>() {
|
||||
@Override
|
||||
public Long get() {
|
||||
return (long) getCompletedTaskCount();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private String getMetricName(String queueName, String metricName) {
|
||||
String name =
|
||||
CaseFormat.UPPER_CAMEL.to(
|
||||
CaseFormat.LOWER_UNDERSCORE,
|
||||
queueName.replaceFirst("SSH", "Ssh").replaceAll("-", ""));
|
||||
return sanitizeMetricName(String.format("queue/%s/%s", name, metricName));
|
||||
}
|
||||
|
||||
public void unregisterWorkQueue() {
|
||||
queues.remove(this);
|
||||
}
|
||||
|
@ -181,7 +181,8 @@ public class IndexModule extends LifecycleModule {
|
||||
if (threads <= 0) {
|
||||
threads = Runtime.getRuntime().availableProcessors() / 2 + 1;
|
||||
}
|
||||
return MoreExecutors.listeningDecorator(workQueue.createQueue(threads, "Index-Interactive"));
|
||||
return MoreExecutors.listeningDecorator(
|
||||
workQueue.createQueue(threads, "Index-Interactive", true));
|
||||
}
|
||||
|
||||
@Provides
|
||||
@ -199,7 +200,8 @@ public class IndexModule extends LifecycleModule {
|
||||
if (batchThreads <= 0) {
|
||||
batchThreads = Runtime.getRuntime().availableProcessors();
|
||||
}
|
||||
return MoreExecutors.listeningDecorator(workQueue.createQueue(batchThreads, "Index-Batch"));
|
||||
return MoreExecutors.listeningDecorator(
|
||||
workQueue.createQueue(batchThreads, "Index-Batch", true));
|
||||
}
|
||||
|
||||
@Singleton
|
||||
|
@ -43,9 +43,9 @@ public class CommandExecutorQueueProvider implements QueueProvider {
|
||||
poolSize += batchThreads;
|
||||
}
|
||||
int interactiveThreads = Math.max(1, poolSize - batchThreads);
|
||||
interactiveExecutor = queues.createQueue(interactiveThreads, "SSH-Interactive-Worker");
|
||||
interactiveExecutor = queues.createQueue(interactiveThreads, "SSH-Interactive-Worker", true);
|
||||
if (batchThreads != 0) {
|
||||
batchExecutor = queues.createQueue(batchThreads, "SSH-Batch-Worker");
|
||||
batchExecutor = queues.createQueue(batchThreads, "SSH-Batch-Worker", true);
|
||||
setThreadFactory(batchExecutor);
|
||||
} else {
|
||||
batchExecutor = interactiveExecutor;
|
||||
|
@ -76,7 +76,7 @@ class CommandFactoryProvider implements Provider<CommandFactory>, LifecycleListe
|
||||
createCommandInterceptor = i;
|
||||
|
||||
int threads = cfg.getInt("sshd", "commandStartThreads", 2);
|
||||
startExecutor = workQueue.createQueue(threads, "SshCommandStart");
|
||||
startExecutor = workQueue.createQueue(threads, "SshCommandStart", true);
|
||||
destroyExecutor =
|
||||
Executors.newSingleThreadExecutor(
|
||||
new ThreadFactoryBuilder()
|
||||
|
@ -36,7 +36,7 @@ class StreamCommandExecutorProvider implements Provider<WorkQueue.Executor> {
|
||||
public WorkQueue.Executor get() {
|
||||
final WorkQueue.Executor executor;
|
||||
|
||||
executor = queues.createQueue(poolSize, "SSH-Stream-Worker");
|
||||
executor = queues.createQueue(poolSize, "SSH-Stream-Worker", true);
|
||||
|
||||
final ThreadFactory parent = executor.getThreadFactory();
|
||||
executor.setThreadFactory(
|
||||
|
Loading…
x
Reference in New Issue
Block a user