WorkQueue: Add possibility to enable metrics
Because the handling of metrics for queues created from plugins is a bit problematic, a new method createQueue with extra parameter: withMetrics has been introduced. At the moment the only queue that has metric enabled is default WorkQueue. Other queues will be added in a follow up patch. For each queue created using this method the following metrics are available: * active_threads - Number of threads that are actively executing tasks * max_pool_size - Maximum allowed number of threads in the pool * pool_size - Current number of threads in the pool * scheduled_tasks - Number of scheduled tasks in the queue * total_completed_tasks_count - Total number of tasks that have completed execution * total_scheduled_tasks_count - Total number of tasks that have been scheduled Change-Id: Ibebe22765079806c3e6240d5bd31dbf27581e998 Signed-off-by: Eryk Szymanski <eryksz@gmail.com>
This commit is contained in:
committed by
David Pursehouse
parent
0f6c9bd34c
commit
366c3a652e
@@ -53,6 +53,15 @@ objects needing finalization.
|
|||||||
* `query/query_latency`: Successful query latency, accumulated over the life
|
* `query/query_latency`: Successful query latency, accumulated over the life
|
||||||
of the process.
|
of the process.
|
||||||
|
|
||||||
|
=== Work Queue
|
||||||
|
|
||||||
|
* `queue/work_queue/pool_size`: Current number of threads in the pool
|
||||||
|
* `queue/work_queue/max_pool_size`: Maximum allowed number of threads in the pool
|
||||||
|
* `queue/work_queue/active_threads`: Number of threads that are actively executing tasks
|
||||||
|
* `queue/work_queue/scheduled_tasks`: Number of scheduled tasks in the queue
|
||||||
|
* `queue/work_queue/total_scheduled_tasks_count`: Total number of tasks that have been scheduled
|
||||||
|
* `queue/work_queue/total_completed_tasks_count`: Total number of tasks that have completed execution
|
||||||
|
|
||||||
=== SSH sessions
|
=== SSH sessions
|
||||||
|
|
||||||
* `sshd/sessions/connected`: Number of currently connected SSH sessions.
|
* `sshd/sessions/connected`: Number of currently connected SSH sessions.
|
||||||
|
|||||||
@@ -14,8 +14,14 @@
|
|||||||
|
|
||||||
package com.google.gerrit.server.git;
|
package com.google.gerrit.server.git;
|
||||||
|
|
||||||
|
import static com.google.gerrit.metrics.dropwizard.DropWizardMetricMaker.sanitizeMetricName;
|
||||||
|
|
||||||
|
import com.google.common.base.CaseFormat;
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
import com.google.gerrit.extensions.events.LifecycleListener;
|
import com.google.gerrit.extensions.events.LifecycleListener;
|
||||||
import com.google.gerrit.lifecycle.LifecycleModule;
|
import com.google.gerrit.lifecycle.LifecycleModule;
|
||||||
|
import com.google.gerrit.metrics.Description;
|
||||||
|
import com.google.gerrit.metrics.MetricMaker;
|
||||||
import com.google.gerrit.reviewdb.client.Project;
|
import com.google.gerrit.reviewdb.client.Project;
|
||||||
import com.google.gerrit.server.config.GerritServerConfig;
|
import com.google.gerrit.server.config.GerritServerConfig;
|
||||||
import com.google.gerrit.server.util.IdGenerator;
|
import com.google.gerrit.server.util.IdGenerator;
|
||||||
@@ -84,18 +90,20 @@ public class WorkQueue {
|
|||||||
|
|
||||||
private final Executor defaultQueue;
|
private final Executor defaultQueue;
|
||||||
private final IdGenerator idGenerator;
|
private final IdGenerator idGenerator;
|
||||||
|
private final MetricMaker metrics;
|
||||||
private final CopyOnWriteArrayList<Executor> queues;
|
private final CopyOnWriteArrayList<Executor> queues;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
WorkQueue(IdGenerator idGenerator, @GerritServerConfig Config cfg) {
|
WorkQueue(IdGenerator idGenerator, @GerritServerConfig Config cfg, MetricMaker metrics) {
|
||||||
this(idGenerator, cfg.getInt("execution", "defaultThreadPoolSize", 1));
|
this(idGenerator, cfg.getInt("execution", "defaultThreadPoolSize", 1), metrics);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Constructor to allow binding the WorkQueue more explicitly in a vhost setup. */
|
/** Constructor to allow binding the WorkQueue more explicitly in a vhost setup. */
|
||||||
public WorkQueue(IdGenerator idGenerator, int defaultThreadPoolSize) {
|
public WorkQueue(IdGenerator idGenerator, int defaultThreadPoolSize, MetricMaker metrics) {
|
||||||
this.idGenerator = idGenerator;
|
this.idGenerator = idGenerator;
|
||||||
|
this.metrics = metrics;
|
||||||
this.queues = new CopyOnWriteArrayList<>();
|
this.queues = new CopyOnWriteArrayList<>();
|
||||||
this.defaultQueue = createQueue(defaultThreadPoolSize, "WorkQueue");
|
this.defaultQueue = createQueue(defaultThreadPoolSize, "WorkQueue", true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Get the default work queue, for miscellaneous tasks. */
|
/** Get the default work queue, for miscellaneous tasks. */
|
||||||
@@ -103,9 +111,37 @@ public class WorkQueue {
|
|||||||
return defaultQueue;
|
return defaultQueue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Create a new executor queue. */
|
/**
|
||||||
|
* Create a new executor queue.
|
||||||
|
*
|
||||||
|
* <p>Creates a new executor queue without associated metrics. This method is suitable for use by
|
||||||
|
* plugins.
|
||||||
|
*
|
||||||
|
* <p>If metrics are needed, use {@link #createQueue(int, String, boolean)} instead.
|
||||||
|
*
|
||||||
|
* @param poolsize the size of the pool.
|
||||||
|
* @param queueName the name of the queue.
|
||||||
|
*/
|
||||||
public Executor createQueue(int poolsize, String queueName) {
|
public Executor createQueue(int poolsize, String queueName) {
|
||||||
|
return createQueue(poolsize, queueName, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new executor queue, optionally with metrics.
|
||||||
|
*
|
||||||
|
* <p>Creates a new executor queue, optionally with associated metrics. Metrics should not be
|
||||||
|
* requested for queues created by plugins.
|
||||||
|
*
|
||||||
|
* @param poolsize the size of the pool.
|
||||||
|
* @param queueName the name of the queue.
|
||||||
|
* @param withMetrics whether to create metrics.
|
||||||
|
*/
|
||||||
|
public Executor createQueue(int poolsize, String queueName, boolean withMetrics) {
|
||||||
final Executor r = new Executor(poolsize, queueName);
|
final Executor r = new Executor(poolsize, queueName);
|
||||||
|
if (withMetrics) {
|
||||||
|
log.info("Adding metrics for '{}' queue", queueName);
|
||||||
|
r.buildMetrics(queueName);
|
||||||
|
}
|
||||||
r.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
|
r.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
|
||||||
r.setExecuteExistingDelayedTasksAfterShutdownPolicy(true);
|
r.setExecuteExistingDelayedTasksAfterShutdownPolicy(true);
|
||||||
queues.add(r);
|
queues.add(r);
|
||||||
@@ -201,6 +237,85 @@ public class WorkQueue {
|
|||||||
this.queueName = queueName;
|
this.queueName = queueName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void buildMetrics(String queueName) {
|
||||||
|
metrics.newCallbackMetric(
|
||||||
|
getMetricName(queueName, "max_pool_size"),
|
||||||
|
Long.class,
|
||||||
|
new Description("Maximum allowed number of threads in the pool")
|
||||||
|
.setGauge()
|
||||||
|
.setUnit("threads"),
|
||||||
|
new Supplier<Long>() {
|
||||||
|
@Override
|
||||||
|
public Long get() {
|
||||||
|
return (long) getMaximumPoolSize();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
metrics.newCallbackMetric(
|
||||||
|
getMetricName(queueName, "pool_size"),
|
||||||
|
Long.class,
|
||||||
|
new Description("Current number of threads in the pool").setGauge().setUnit("threads"),
|
||||||
|
new Supplier<Long>() {
|
||||||
|
@Override
|
||||||
|
public Long get() {
|
||||||
|
return (long) getPoolSize();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
metrics.newCallbackMetric(
|
||||||
|
getMetricName(queueName, "active_threads"),
|
||||||
|
Long.class,
|
||||||
|
new Description("Number number of threads that are actively executing tasks")
|
||||||
|
.setGauge()
|
||||||
|
.setUnit("threads"),
|
||||||
|
new Supplier<Long>() {
|
||||||
|
@Override
|
||||||
|
public Long get() {
|
||||||
|
return (long) getActiveCount();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
metrics.newCallbackMetric(
|
||||||
|
getMetricName(queueName, "scheduled_tasks"),
|
||||||
|
Integer.class,
|
||||||
|
new Description("Number of scheduled tasks in the queue").setGauge().setUnit("tasks"),
|
||||||
|
new Supplier<Integer>() {
|
||||||
|
@Override
|
||||||
|
public Integer get() {
|
||||||
|
return getQueue().size();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
metrics.newCallbackMetric(
|
||||||
|
getMetricName(queueName, "total_scheduled_tasks_count"),
|
||||||
|
Long.class,
|
||||||
|
new Description("Total number of tasks that have been scheduled for execution")
|
||||||
|
.setCumulative()
|
||||||
|
.setUnit("tasks"),
|
||||||
|
new Supplier<Long>() {
|
||||||
|
@Override
|
||||||
|
public Long get() {
|
||||||
|
return (long) getTaskCount();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
metrics.newCallbackMetric(
|
||||||
|
getMetricName(queueName, "total_completed_tasks_count"),
|
||||||
|
Long.class,
|
||||||
|
new Description("Total number of tasks that have completed execution")
|
||||||
|
.setCumulative()
|
||||||
|
.setUnit("tasks"),
|
||||||
|
new Supplier<Long>() {
|
||||||
|
@Override
|
||||||
|
public Long get() {
|
||||||
|
return (long) getCompletedTaskCount();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getMetricName(String queueName, String metricName) {
|
||||||
|
String name =
|
||||||
|
CaseFormat.UPPER_CAMEL.to(
|
||||||
|
CaseFormat.LOWER_UNDERSCORE,
|
||||||
|
queueName.replaceFirst("SSH", "Ssh").replaceAll("-", ""));
|
||||||
|
return sanitizeMetricName(String.format("queue/%s/%s", name, metricName));
|
||||||
|
}
|
||||||
|
|
||||||
public void unregisterWorkQueue() {
|
public void unregisterWorkQueue() {
|
||||||
queues.remove(this);
|
queues.remove(this);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user