Merge "Add scheduler run handler metric"
This commit is contained in:
commit
af96e5786f
@ -716,6 +716,12 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
|
||||
|
||||
The size of the current connection event queue.
|
||||
|
||||
.. stat:: run_handler
|
||||
:type: timer
|
||||
|
||||
A timer metric reporting the time taken for one scheduler run
|
||||
handler iteration.
|
||||
|
||||
.. stat:: time_query
|
||||
:type: timer
|
||||
|
||||
|
@ -461,6 +461,7 @@ class TestScheduler(ZuulTestCase):
|
||||
'zuul.mergers.online', value='1', kind='g')
|
||||
self.assertReportedStat('zuul.scheduler.eventqueues.connection.gerrit',
|
||||
value='0', kind='g')
|
||||
self.assertReportedStat('zuul.scheduler.run_handler', kind='ms')
|
||||
|
||||
# Catch time / monotonic errors
|
||||
for key in [
|
||||
|
@ -2126,70 +2126,74 @@ class Scheduler(threading.Thread):
|
||||
return
|
||||
self.log.debug("Run handler awake")
|
||||
self.run_handler_lock.acquire()
|
||||
with self.statsd_timer("zuul.scheduler.run_handler"):
|
||||
try:
|
||||
self._run()
|
||||
except Exception:
|
||||
self.log.exception("Exception in run handler:")
|
||||
# There may still be more events to process
|
||||
self.wake_event.set()
|
||||
finally:
|
||||
self.run_handler_lock.release()
|
||||
|
||||
def _run(self):
|
||||
if not self._stopped:
|
||||
self.process_reconfigure_queue()
|
||||
|
||||
if self.unparsed_abide.ltime < self.system_config_cache.ltime:
|
||||
self.updateSystemConfig()
|
||||
|
||||
for tenant_name in self.unparsed_abide.tenants:
|
||||
if self._stopped:
|
||||
break
|
||||
|
||||
tenant = self.abide.tenants.get(tenant_name)
|
||||
if not tenant:
|
||||
continue
|
||||
|
||||
# This will also forward events for the pipelines
|
||||
# (e.g. enqueue or dequeue events) to the matching
|
||||
# pipeline event queues that are processed afterwards.
|
||||
self.process_tenant_management_queue(tenant)
|
||||
|
||||
if self._stopped:
|
||||
break
|
||||
|
||||
try:
|
||||
if not self._stopped:
|
||||
self.process_reconfigure_queue()
|
||||
|
||||
if self.unparsed_abide.ltime < self.system_config_cache.ltime:
|
||||
self.updateSystemConfig()
|
||||
|
||||
for tenant_name in self.unparsed_abide.tenants:
|
||||
if self._stopped:
|
||||
break
|
||||
|
||||
tenant = self.abide.tenants.get(tenant_name)
|
||||
if not tenant:
|
||||
with tenant_read_lock(
|
||||
self.zk_client, tenant_name, blocking=False
|
||||
) as tlock:
|
||||
if not self.isTenantLayoutUpToDate(tenant_name):
|
||||
continue
|
||||
|
||||
# This will also forward events for the pipelines
|
||||
# (e.g. enqueue or dequeue events) to the matching
|
||||
# pipeline event queues that are processed afterwards.
|
||||
self.process_tenant_management_queue(tenant)
|
||||
# Get tenant again, as it might have been updated
|
||||
# by a tenant reconfig or layout change.
|
||||
tenant = self.abide.tenants[tenant_name]
|
||||
|
||||
if self._stopped:
|
||||
break
|
||||
if not self._stopped:
|
||||
# This will forward trigger events to pipeline
|
||||
# event queues that are processed below.
|
||||
self.process_tenant_trigger_queue(tenant)
|
||||
|
||||
try:
|
||||
with tenant_read_lock(
|
||||
self.zk_client, tenant_name, blocking=False
|
||||
) as tlock:
|
||||
if not self.isTenantLayoutUpToDate(tenant_name):
|
||||
continue
|
||||
|
||||
# Get tenant again, as it might have been updated
|
||||
# by a tenant reconfig or layout change.
|
||||
tenant = self.abide.tenants[tenant_name]
|
||||
|
||||
if not self._stopped:
|
||||
# This will forward trigger events to pipeline
|
||||
# event queues that are processed below.
|
||||
self.process_tenant_trigger_queue(tenant)
|
||||
|
||||
self.process_pipelines(tenant, tlock)
|
||||
except LockException:
|
||||
self.log.debug("Skipping locked tenant %s",
|
||||
tenant.name)
|
||||
remote_state = self.tenant_layout_state.get(
|
||||
tenant_name)
|
||||
local_state = self.local_layout_state.get(
|
||||
tenant_name)
|
||||
if (remote_state is None or
|
||||
local_state is None or
|
||||
remote_state > local_state):
|
||||
# Let's keep looping until we've updated to the
|
||||
# latest tenant layout.
|
||||
self.wake_event.set()
|
||||
except Exception:
|
||||
self.log.exception("Exception processing tenant %s:",
|
||||
tenant_name)
|
||||
# There may still be more events to process
|
||||
self.wake_event.set()
|
||||
self.process_pipelines(tenant, tlock)
|
||||
except LockException:
|
||||
self.log.debug("Skipping locked tenant %s",
|
||||
tenant.name)
|
||||
remote_state = self.tenant_layout_state.get(
|
||||
tenant_name)
|
||||
local_state = self.local_layout_state.get(
|
||||
tenant_name)
|
||||
if (remote_state is None or
|
||||
local_state is None or
|
||||
remote_state > local_state):
|
||||
# Let's keep looping until we've updated to the
|
||||
# latest tenant layout.
|
||||
self.wake_event.set()
|
||||
except Exception:
|
||||
self.log.exception("Exception in run handler:")
|
||||
self.log.exception("Exception processing tenant %s:",
|
||||
tenant_name)
|
||||
# There may still be more events to process
|
||||
self.wake_event.set()
|
||||
finally:
|
||||
self.run_handler_lock.release()
|
||||
|
||||
def primeSystemConfig(self):
|
||||
with self.layout_lock:
|
||||
|
Loading…
Reference in New Issue
Block a user