Enable metrics for core queues

The following queues will provide metrics:

* index_batch
* index_interactive
* receive_commits
* send_email
* ssh_batch_worker
* ssh_command_start
* ssh_interactive_worker
* ssh_stream_worker

For each queue following metrics are available:

* active_threads - Number of threads that are actively executing tasks
* max_pool_size - Maximum allowed number of threads in the pool
* pool_size - Current number of threads in the pool
* scheduled_tasks - Number of scheduled tasks in the queue
* total_completed_tasks_count - Total number of tasks that have completed execution
* total_scheduled_tasks_count - Total number of tasks that have been scheduled

Change-Id: I9430c2a90d1fb07630c1c8b599abf6dd18e01b2d
Signed-off-by: Eryk Szymanski <eryksz@gmail.com>
This commit is contained in:
Eryk Szymanski
2018-05-30 22:42:20 +02:00
committed by David Pursehouse
parent 366c3a652e
commit 04586d81fd
6 changed files with 31 additions and 15 deletions

View File

@@ -53,14 +53,28 @@ objects needing finalization.
* `query/query_latency`: Successful query latency, accumulated over the life * `query/query_latency`: Successful query latency, accumulated over the life
of the process. of the process.
=== Work Queue === Core Queues
* `queue/work_queue/pool_size`: Current number of threads in the pool The following queues support metrics:
* `queue/work_queue/max_pool_size`: Maximum allowed number of threads in the pool
* `queue/work_queue/active_threads`: Number of threads that are actively executing tasks * default `WorkQueue`
* `queue/work_queue/scheduled_tasks`: Number of scheduled tasks in the queue * index batch
* `queue/work_queue/total_scheduled_tasks_count`: Total number of tasks that have been scheduled * index interactive
* `queue/work_queue/total_completed_tasks_count`: Total number of tasks that have completed execution * receive commits
* send email
* ssh batch worker
* ssh command start
* ssh interactive worker
* ssh stream worker
Each queue provides the following metrics:
* `queue/<queue_name>/pool_size`: Current number of threads in the pool
* `queue/<queue_name>/max_pool_size`: Maximum allowed number of threads in the pool
* `queue/<queue_name>/active_threads`: Number of threads that are actively executing tasks
* `queue/<queue_name>/scheduled_tasks`: Number of scheduled tasks in the queue
* `queue/<queue_name>/total_scheduled_tasks_count`: Total number of tasks that have been scheduled
* `queue/<queue_name>/total_completed_tasks_count`: Total number of tasks that have completed execution
=== SSH sessions === SSH sessions

View File

@@ -41,7 +41,7 @@ public class ReceiveCommitsExecutorModule extends AbstractModule {
int poolSize = int poolSize =
config.getInt( config.getInt(
"receive", null, "threadPoolSize", Runtime.getRuntime().availableProcessors()); "receive", null, "threadPoolSize", Runtime.getRuntime().availableProcessors());
return queues.createQueue(poolSize, "ReceiveCommits"); return queues.createQueue(poolSize, "ReceiveCommits", true);
} }
@Provides @Provides
@@ -53,7 +53,7 @@ public class ReceiveCommitsExecutorModule extends AbstractModule {
if (poolSize == 0) { if (poolSize == 0) {
return MoreExecutors.newDirectExecutorService(); return MoreExecutors.newDirectExecutorService();
} }
return queues.createQueue(poolSize, "SendEmail"); return queues.createQueue(poolSize, "SendEmail", true);
} }
@Provides @Provides

View File

@@ -181,7 +181,8 @@ public class IndexModule extends LifecycleModule {
if (threads <= 0) { if (threads <= 0) {
threads = Runtime.getRuntime().availableProcessors() / 2 + 1; threads = Runtime.getRuntime().availableProcessors() / 2 + 1;
} }
return MoreExecutors.listeningDecorator(workQueue.createQueue(threads, "Index-Interactive")); return MoreExecutors.listeningDecorator(
workQueue.createQueue(threads, "Index-Interactive", true));
} }
@Provides @Provides
@@ -199,7 +200,8 @@ public class IndexModule extends LifecycleModule {
if (batchThreads <= 0) { if (batchThreads <= 0) {
batchThreads = Runtime.getRuntime().availableProcessors(); batchThreads = Runtime.getRuntime().availableProcessors();
} }
return MoreExecutors.listeningDecorator(workQueue.createQueue(batchThreads, "Index-Batch")); return MoreExecutors.listeningDecorator(
workQueue.createQueue(batchThreads, "Index-Batch", true));
} }
@Singleton @Singleton

View File

@@ -43,9 +43,9 @@ public class CommandExecutorQueueProvider implements QueueProvider {
poolSize += batchThreads; poolSize += batchThreads;
} }
int interactiveThreads = Math.max(1, poolSize - batchThreads); int interactiveThreads = Math.max(1, poolSize - batchThreads);
interactiveExecutor = queues.createQueue(interactiveThreads, "SSH-Interactive-Worker"); interactiveExecutor = queues.createQueue(interactiveThreads, "SSH-Interactive-Worker", true);
if (batchThreads != 0) { if (batchThreads != 0) {
batchExecutor = queues.createQueue(batchThreads, "SSH-Batch-Worker"); batchExecutor = queues.createQueue(batchThreads, "SSH-Batch-Worker", true);
setThreadFactory(batchExecutor); setThreadFactory(batchExecutor);
} else { } else {
batchExecutor = interactiveExecutor; batchExecutor = interactiveExecutor;

View File

@@ -76,7 +76,7 @@ class CommandFactoryProvider implements Provider<CommandFactory>, LifecycleListe
createCommandInterceptor = i; createCommandInterceptor = i;
int threads = cfg.getInt("sshd", "commandStartThreads", 2); int threads = cfg.getInt("sshd", "commandStartThreads", 2);
startExecutor = workQueue.createQueue(threads, "SshCommandStart"); startExecutor = workQueue.createQueue(threads, "SshCommandStart", true);
destroyExecutor = destroyExecutor =
Executors.newSingleThreadExecutor( Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder() new ThreadFactoryBuilder()

View File

@@ -36,7 +36,7 @@ class StreamCommandExecutorProvider implements Provider<WorkQueue.Executor> {
public WorkQueue.Executor get() { public WorkQueue.Executor get() {
final WorkQueue.Executor executor; final WorkQueue.Executor executor;
executor = queues.createQueue(poolSize, "SSH-Stream-Worker"); executor = queues.createQueue(poolSize, "SSH-Stream-Worker", true);
final ThreadFactory parent = executor.getThreadFactory(); final ThreadFactory parent = executor.getThreadFactory();
executor.setThreadFactory( executor.setThreadFactory(