Add some pipeline processing stats

This will give us more of an idea of the ZK overhead in pipeline
processing.

Change-Id: I47648a08a75351f6af1e14ff04b33de10276f9d9
This commit is contained in:
James E. Blair 2021-11-11 11:02:25 -08:00
parent 3c56f1eafc
commit 38ee8fb429
3 changed files with 48 additions and 3 deletions

View File

@ -79,6 +79,11 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
The number of items currently being processed by this
pipeline.
.. stat:: process
:type: timer
The time taken to process the pipeline.
.. stat:: project
This hierarchy holds more specific metrics for each project
@ -140,6 +145,11 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
The number of changes for this project processed by the
pipeline since Zuul started.
.. stat:: refresh
:type: timer
The time taken to refresh the state from ZooKeeper.
.. stat:: resident_time
:type: timer

View File

@ -23,6 +23,7 @@ import time
import traceback
import uuid
from contextlib import suppress
from zuul.vendor.contextlib import nullcontext
from collections import defaultdict
from apscheduler.schedulers.background import BackgroundScheduler
@ -174,6 +175,10 @@ class Scheduler(threading.Thread):
self.connections = connections
self.sql = self.connections.getSqlReporter(None)
self.statsd = get_statsd(config)
if self.statsd:
self.statsd_timer = self.statsd.timer
else:
self.statsd_timer = nullcontext
self.times = Times(self.sql, self.statsd)
self.rpc = rpclistener.RPCListener(config, self)
self.rpc_slow = rpclistener.RPCListenerSlow(config, self)
@ -1767,6 +1772,7 @@ class Scheduler(threading.Thread):
def process_pipelines(self, tenant):
for pipeline in tenant.layout.pipelines.values():
stats_key = f'zuul.tenant.{tenant.name}.pipeline.{pipeline.name}'
if self._stopped:
return
try:
@ -1775,7 +1781,8 @@ class Scheduler(threading.Thread):
) as lock:
ctx = self.createZKContext(lock, self.log)
with pipeline.manager.currentContext(ctx):
pipeline.state.refresh(ctx)
with self.statsd_timer(f'{stats_key}.refresh'):
pipeline.state.refresh(ctx)
if pipeline.state.old_queues:
self._reenqueuePipeline(tenant, pipeline, ctx)
pipeline.state.cleanup(ctx)
@ -1791,14 +1798,16 @@ class Scheduler(threading.Thread):
pipeline.name, tenant.name)
def _process_pipeline(self, tenant, pipeline):
stats_key = f'zuul.tenant.{tenant.name}.pipeline.{pipeline.name}'
self.process_pipeline_management_queue(tenant, pipeline)
# Give result events priority -- they let us stop builds, whereas
# trigger events cause us to execute builds.
self.process_pipeline_result_queue(tenant, pipeline)
self.process_pipeline_trigger_queue(tenant, pipeline)
try:
while not self._stopped and pipeline.manager.processQueue():
pass
with self.statsd_timer(f'{stats_key}.process'):
while not self._stopped and pipeline.manager.processQueue():
pass
except Exception:
self.log.exception("Exception in pipeline processing:")
pipeline.state.updateAttributes(

26
zuul/vendor/contextlib.py vendored Normal file
View File

@ -0,0 +1,26 @@
# Copied from https://github.com/python/cpython/blob/main/Lib/contextlib.py
# Licensed under the Python Software Foundation License Version 2
#
# This is included in Python versions 3.7+
# TODO: Remove after Zuul drops support for 3.6
from contextlib import AbstractContextManager
class nullcontext(AbstractContextManager):
"""Context manager that does no additional processing.
Used as a stand-in for a normal context manager, when a particular
block of code is only sometimes used with a normal context manager:
cm = optional_cm if condition else nullcontext()
with cm:
# Perform operation, using optional_cm if condition is True
"""
def __init__(self, enter_result=None):
self.enter_result = enter_result
def __enter__(self):
return self.enter_result
def __exit__(self, *excinfo):
pass