Merge branch 'stable-2.14' into stable-2.15

* stable-2.14:
  Update git submodules
  Enable metrics for core queues
  WorkQueue: Add possibility to enable metrics

Change-Id: Id15a3d36d0a5e5fe41cf5fbb0746f19ee6df649e
This commit is contained in:
David Pursehouse 2018-06-06 15:51:45 +09:00
commit 300fd71243
7 changed files with 169 additions and 16 deletions

View File

@ -53,6 +53,29 @@ objects needing finalization.
* `query/query_latency`: Successful query latency, accumulated over the life
of the process.
=== Core Queues
The following queues support metrics:
* default `WorkQueue`
* index batch
* index interactive
* receive commits
* send email
* ssh batch worker
* ssh command start
* ssh interactive worker
* ssh stream worker
Each queue provides the following metrics:
* `queue/<queue_name>/pool_size`: Current number of threads in the pool
* `queue/<queue_name>/max_pool_size`: Maximum allowed number of threads in the pool
* `queue/<queue_name>/active_threads`: Number of threads that are actively executing tasks
* `queue/<queue_name>/scheduled_tasks`: Number of scheduled tasks in the queue
* `queue/<queue_name>/total_scheduled_tasks_count`: Total number of tasks that have been scheduled
* `queue/<queue_name>/total_completed_tasks_count`: Total number of tasks that have completed execution
=== SSH sessions
* `sshd/sessions/connected`: Number of currently connected SSH sessions.

View File

@ -14,8 +14,14 @@
package com.google.gerrit.server.git;
import static com.google.gerrit.metrics.dropwizard.DropWizardMetricMaker.sanitizeMetricName;
import com.google.common.base.CaseFormat;
import com.google.common.base.Supplier;
import com.google.gerrit.extensions.events.LifecycleListener;
import com.google.gerrit.lifecycle.LifecycleModule;
import com.google.gerrit.metrics.Description;
import com.google.gerrit.metrics.MetricMaker;
import com.google.gerrit.reviewdb.client.Project;
import com.google.gerrit.server.config.GerritServerConfig;
import com.google.gerrit.server.util.IdGenerator;
@ -85,18 +91,20 @@ public class WorkQueue {
private final ScheduledExecutorService defaultQueue;
private final IdGenerator idGenerator;
private final MetricMaker metrics;
private final CopyOnWriteArrayList<Executor> queues;
@Inject
WorkQueue(IdGenerator idGenerator, @GerritServerConfig Config cfg) {
this(idGenerator, cfg.getInt("execution", "defaultThreadPoolSize", 1));
WorkQueue(IdGenerator idGenerator, @GerritServerConfig Config cfg, MetricMaker metrics) {
this(idGenerator, cfg.getInt("execution", "defaultThreadPoolSize", 1), metrics);
}
/** Constructor to allow binding the WorkQueue more explicitly in a vhost setup. */
public WorkQueue(IdGenerator idGenerator, int defaultThreadPoolSize) {
public WorkQueue(IdGenerator idGenerator, int defaultThreadPoolSize, MetricMaker metrics) {
this.idGenerator = idGenerator;
this.metrics = metrics;
this.queues = new CopyOnWriteArrayList<>();
this.defaultQueue = createQueue(defaultThreadPoolSize, "WorkQueue");
this.defaultQueue = createQueue(defaultThreadPoolSize, "WorkQueue", true);
}
/** Get the default work queue, for miscellaneous tasks. */
@ -104,14 +112,54 @@ public class WorkQueue {
return defaultQueue;
}
/** Create a new executor queue. */
public ScheduledExecutorService createQueue(int poolsize, String queueName) {
return createQueue(poolsize, queueName, Thread.NORM_PRIORITY);
/**
* Create a new executor queue.
*
* <p>Creates a new executor queue without associated metrics. This method is suitable for use by
* plugins.
*
* <p>If metrics are needed, use {@link #createQueue(int, String, int, boolean)} instead.
*
* @param poolsize the size of the pool.
* @param queueName the name of the queue.
*/
public ScheduledThreadPoolExecutor createQueue(int poolsize, String queueName) {
return createQueue(poolsize, queueName, Thread.NORM_PRIORITY, false);
}
/**
* Create a new executor queue, with default priority, optionally with metrics.
*
* <p>Creates a new executor queue, optionally with associated metrics. Metrics should not be
* requested for queues created by plugins.
*
* @param poolsize the size of the pool.
* @param queueName the name of the queue.
* @param withMetrics whether to create metrics.
*/
public ScheduledThreadPoolExecutor createQueue(
int poolsize, String queueName, int threadPriority) {
int poolsize, String queueName, boolean withMetrics) {
return createQueue(poolsize, queueName, Thread.NORM_PRIORITY, withMetrics);
}
/**
* Create a new executor queue, optionally with metrics.
*
* <p>Creates a new executor queue, optionally with associated metrics. Metrics should not be
* requested for queues created by plugins.
*
* @param poolsize the size of the pool.
* @param queueName the name of the queue.
* @param threadPriority thread priority.
* @param withMetrics whether to create metrics.
*/
public ScheduledThreadPoolExecutor createQueue(
int poolsize, String queueName, int threadPriority, boolean withMetrics) {
Executor executor = new Executor(poolsize, queueName);
if (withMetrics) {
log.info("Adding metrics for '{}' queue", queueName);
executor.buildMetrics(queueName);
}
executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(true);
queues.add(executor);
@ -223,6 +271,85 @@ public class WorkQueue {
queues.remove(this);
}
private void buildMetrics(String queueName) {
metrics.newCallbackMetric(
getMetricName(queueName, "max_pool_size"),
Long.class,
new Description("Maximum allowed number of threads in the pool")
.setGauge()
.setUnit("threads"),
new Supplier<Long>() {
@Override
public Long get() {
return (long) getMaximumPoolSize();
}
});
metrics.newCallbackMetric(
getMetricName(queueName, "pool_size"),
Long.class,
new Description("Current number of threads in the pool").setGauge().setUnit("threads"),
new Supplier<Long>() {
@Override
public Long get() {
return (long) getPoolSize();
}
});
metrics.newCallbackMetric(
getMetricName(queueName, "active_threads"),
Long.class,
new Description("Number number of threads that are actively executing tasks")
.setGauge()
.setUnit("threads"),
new Supplier<Long>() {
@Override
public Long get() {
return (long) getActiveCount();
}
});
metrics.newCallbackMetric(
getMetricName(queueName, "scheduled_tasks"),
Integer.class,
new Description("Number of scheduled tasks in the queue").setGauge().setUnit("tasks"),
new Supplier<Integer>() {
@Override
public Integer get() {
return getQueue().size();
}
});
metrics.newCallbackMetric(
getMetricName(queueName, "total_scheduled_tasks_count"),
Long.class,
new Description("Total number of tasks that have been scheduled for execution")
.setCumulative()
.setUnit("tasks"),
new Supplier<Long>() {
@Override
public Long get() {
return (long) getTaskCount();
}
});
metrics.newCallbackMetric(
getMetricName(queueName, "total_completed_tasks_count"),
Long.class,
new Description("Total number of tasks that have completed execution")
.setCumulative()
.setUnit("tasks"),
new Supplier<Long>() {
@Override
public Long get() {
return (long) getCompletedTaskCount();
}
});
}
private String getMetricName(String queueName, String metricName) {
String name =
CaseFormat.UPPER_CAMEL.to(
CaseFormat.LOWER_UNDERSCORE,
queueName.replaceFirst("SSH", "Ssh").replaceAll("-", ""));
return sanitizeMetricName(String.format("queue/%s/%s", name, metricName));
}
@Override
protected <V> RunnableScheduledFuture<V> decorateTask(
Runnable runnable, RunnableScheduledFuture<V> r) {

View File

@ -48,7 +48,7 @@ public class ReceiveCommitsExecutorModule extends AbstractModule {
int poolSize =
config.getInt(
"receive", null, "threadPoolSize", Runtime.getRuntime().availableProcessors());
return queues.createQueue(poolSize, "ReceiveCommits");
return queues.createQueue(poolSize, "ReceiveCommits", true);
}
@Provides
@ -60,7 +60,7 @@ public class ReceiveCommitsExecutorModule extends AbstractModule {
if (poolSize == 0) {
return MoreExecutors.newDirectExecutorService();
}
return queues.createQueue(poolSize, "SendEmail");
return queues.createQueue(poolSize, "SendEmail", true);
}
@Provides

View File

@ -186,7 +186,8 @@ public class IndexModule extends LifecycleModule {
if (threads <= 0) {
threads = Runtime.getRuntime().availableProcessors() / 2 + 1;
}
return MoreExecutors.listeningDecorator(workQueue.createQueue(threads, "Index-Interactive"));
return MoreExecutors.listeningDecorator(
workQueue.createQueue(threads, "Index-Interactive", true));
}
@Provides
@ -204,7 +205,8 @@ public class IndexModule extends LifecycleModule {
if (batchThreads <= 0) {
batchThreads = Runtime.getRuntime().availableProcessors();
}
return MoreExecutors.listeningDecorator(workQueue.createQueue(batchThreads, "Index-Batch"));
return MoreExecutors.listeningDecorator(
workQueue.createQueue(batchThreads, "Index-Batch", true));
}
@Singleton

View File

@ -44,9 +44,10 @@ public class CommandExecutorQueueProvider implements QueueProvider {
}
int interactiveThreads = Math.max(1, poolSize - batchThreads);
interactiveExecutor =
queues.createQueue(interactiveThreads, "SSH-Interactive-Worker", Thread.MIN_PRIORITY);
queues.createQueue(interactiveThreads, "SSH-Interactive-Worker", Thread.MIN_PRIORITY, true);
if (batchThreads != 0) {
batchExecutor = queues.createQueue(batchThreads, "SSH-Batch-Worker", Thread.MIN_PRIORITY);
batchExecutor =
queues.createQueue(batchThreads, "SSH-Batch-Worker", Thread.MIN_PRIORITY, true);
} else {
batchExecutor = interactiveExecutor;
}

View File

@ -76,7 +76,7 @@ class CommandFactoryProvider implements Provider<CommandFactory>, LifecycleListe
createCommandInterceptor = i;
int threads = cfg.getInt("sshd", "commandStartThreads", 2);
startExecutor = workQueue.createQueue(threads, "SshCommandStart");
startExecutor = workQueue.createQueue(threads, "SshCommandStart", true);
destroyExecutor =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder()

View File

@ -34,6 +34,6 @@ class StreamCommandExecutorProvider implements Provider<ScheduledThreadPoolExecu
@Override
public ScheduledThreadPoolExecutor get() {
return queues.createQueue(poolSize, "SSH-Stream-Worker", Thread.MIN_PRIORITY);
return queues.createQueue(poolSize, "SSH-Stream-Worker", Thread.MIN_PRIORITY, true);
}
}