Allow monitoring length of connection event queues
In order to expose the length of the connection event queues we need to introduce `getEventQueue()` as a new interface in the connection base class. The queue length will be available in the status JSON output for display in zuul-web as well as a statsd gauge metric: zuul.scheduler.eventqueues.connection.<connection-name> This gives an operator more insight into possible bottlenecks during event pre-processing. Change-Id: Ie2e09e0c7cd166dce5b884c3e3d12ead42b05397
This commit is contained in:
parent
22e7551fb8
commit
5bc219d5cd
|
@ -464,6 +464,11 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
|
|||
|
||||
The size of the current management event queue.
|
||||
|
||||
.. stat:: connection.<connection-name>
|
||||
:type: gauge
|
||||
|
||||
The size of the current connection event queue.
|
||||
|
||||
.. stat:: zuul.geard
|
||||
|
||||
Gearman job distribution statistics. Gearman jobs encompass the
|
||||
|
|
|
@ -438,6 +438,8 @@ class TestScheduler(ZuulTestCase):
|
|||
'zuul.executors.accepting', value='1', kind='g')
|
||||
self.assertReportedStat(
|
||||
'zuul.mergers.online', value='1', kind='g')
|
||||
self.assertReportedStat('zuul.scheduler.eventqueues.connection.gerrit',
|
||||
value='0', kind='g')
|
||||
|
||||
# Catch time / monotonic errors
|
||||
val = self.assertReportedStat(
|
||||
|
|
|
@ -148,6 +148,9 @@ class TestWeb(BaseTestWeb):
|
|||
|
||||
data = resp.json()
|
||||
status_jobs = []
|
||||
self.assertEqual(
|
||||
data["connection_event_queues"]["gerrit"]["length"], 0)
|
||||
|
||||
for p in data['pipelines']:
|
||||
for q in p['change_queues']:
|
||||
if p['name'] in ['gate', 'conflict']:
|
||||
|
|
|
@ -106,6 +106,14 @@ class BaseConnection(object, metaclass=abc.ABCMeta):
|
|||
"""
|
||||
return None
|
||||
|
||||
def getEventQueue(self):
|
||||
"""Return the event queue for this connection.
|
||||
|
||||
:returns: A `zuul.zk.event_queues.ConnectionEventQueue` instance
|
||||
or None.
|
||||
"""
|
||||
return None
|
||||
|
||||
def validateWebConfig(self, config, connections):
|
||||
"""Validate web config.
|
||||
|
||||
|
|
|
@ -1523,6 +1523,9 @@ class GerritConnection(BaseConnection):
|
|||
self._stop_ref_watcher_thread()
|
||||
self._stop_event_connector()
|
||||
|
||||
def getEventQueue(self):
|
||||
return getattr(self, "event_queue", None)
|
||||
|
||||
def _stop_watcher_thread(self):
|
||||
if self.watcher_thread:
|
||||
self.watcher_thread.stop()
|
||||
|
|
|
@ -2209,6 +2209,9 @@ class GithubConnection(CachedBranchConnection):
|
|||
def getWebController(self, zuul_web):
|
||||
return GithubWebController(zuul_web, self)
|
||||
|
||||
def getEventQueue(self):
|
||||
return getattr(self, "event_queue", None)
|
||||
|
||||
def validateWebConfig(self, config, connections):
|
||||
if 'webhook_token' not in self.connection_config:
|
||||
raise Exception(
|
||||
|
|
|
@ -414,6 +414,9 @@ class GitlabConnection(CachedBranchConnection):
|
|||
def getWebController(self, zuul_web):
|
||||
return GitlabWebController(zuul_web, self)
|
||||
|
||||
def getEventQueue(self):
|
||||
return getattr(self, "event_queue", None)
|
||||
|
||||
def getProject(self, name):
|
||||
return self.projects.get(name)
|
||||
|
||||
|
|
|
@ -539,6 +539,9 @@ class PagureConnection(BaseConnection):
|
|||
def getWebController(self, zuul_web):
|
||||
return PagureWebController(zuul_web, self)
|
||||
|
||||
def getEventQueue(self):
|
||||
return getattr(self, "event_queue", None)
|
||||
|
||||
def validateWebConfig(self, config, connections):
|
||||
return True
|
||||
|
||||
|
|
|
@ -348,6 +348,12 @@ class Scheduler(threading.Thread):
|
|||
self.result_event_queue.qsize())
|
||||
self.statsd.gauge('zuul.scheduler.eventqueues.management',
|
||||
len(self.management_events))
|
||||
base = 'zuul.scheduler.eventqueues.connection'
|
||||
for connection in self.connections.connections.values():
|
||||
queue = connection.getEventQueue()
|
||||
if queue is not None:
|
||||
self.statsd.gauge(f'{base}.{connection.connection_name}',
|
||||
len(queue))
|
||||
|
||||
def runCleanup(self):
|
||||
# Run the first cleanup immediately after the first
|
||||
|
@ -1837,6 +1843,13 @@ class Scheduler(threading.Thread):
|
|||
self.result_event_queue.qsize()
|
||||
data['management_event_queue'] = {}
|
||||
data['management_event_queue']['length'] = len(self.management_events)
|
||||
data['connection_event_queues'] = {}
|
||||
for connection in self.connections.connections.values():
|
||||
queue = connection.getEventQueue()
|
||||
if queue is not None:
|
||||
data['connection_event_queues'][connection.connection_name] = {
|
||||
'length': len(queue),
|
||||
}
|
||||
|
||||
if self.last_reconfigured:
|
||||
data['last_reconfigured'] = self.last_reconfigured * 1000
|
||||
|
|
Loading…
Reference in New Issue