Add more pipeline processing stats

This adds the number of zk objects, nodes, and bytes read and written
during each pipeline processing run.  This can help Zuul developers
ascertain where to optimize performance.

Change-Id: Ic2592faeb08d6c2a72b99000864c41ada665cd3b
This commit is contained in:
James E. Blair 2022-02-28 10:51:37 -08:00
parent 72e6234157
commit 88b076e8e3
6 changed files with 106 additions and 5 deletions

View File

@ -235,6 +235,24 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
The time spent reading data from ZooKeeper during a single The time spent reading data from ZooKeeper during a single
pipeline processing run. pipeline processing run.
.. stat:: read_znodes
:type: gauge
The number of ZNodes read from ZooKeeper during a single
pipeline processing run.
.. stat:: read_objects
:type: gauge
The number of Zuul data model objects read from ZooKeeper
during a single pipeline processing run.
.. stat:: read_bytes
:type: gauge
The amount of data read from ZooKeeper during a single
pipeline processing run.
.. stat:: refresh .. stat:: refresh
:type: timer :type: timer
@ -273,6 +291,24 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
The time spent writing data to ZooKeeper during a single The time spent writing data to ZooKeeper during a single
pipeline processing run. pipeline processing run.
.. stat:: write_znodes
:type: gauge
The number of ZNodes written to ZooKeeper during a single
pipeline processing run.
.. stat:: write_objects
:type: gauge
The number of Zuul data model objects written to ZooKeeper
during a single pipeline processing run.
.. stat:: write_bytes
:type: gauge
The amount of data written to ZooKeeper during a single
pipeline processing run.
.. stat:: zuul.executor.<executor> .. stat:: zuul.executor.<executor>
Holds metrics emitted by individual executors. The ``<executor>`` Holds metrics emitted by individual executors. The ``<executor>``

View File

@ -14,3 +14,9 @@ features:
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.event_job_time` * :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.event_job_time`
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.read_time` * :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.read_time`
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.write_time` * :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.write_time`
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.read_objects`
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.write_objects`
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.read_znodes`
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.write_znodes`
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.read_bytes`
* :stat:`zuul.tenant.<tenant>.pipeline.<pipeline>.write_bytes`

View File

@ -456,6 +456,17 @@ class TestScheduler(ZuulTestCase):
val = self.assertReportedStat(key, kind='ms') val = self.assertReportedStat(key, kind='ms')
self.assertTrue(0.0 < float(val) < 60000.0) self.assertTrue(0.0 < float(val) < 60000.0)
for key in [
'zuul.tenant.tenant-one.pipeline.gate.read_objects',
'zuul.tenant.tenant-one.pipeline.gate.write_objects',
'zuul.tenant.tenant-one.pipeline.gate.read_znodes',
'zuul.tenant.tenant-one.pipeline.gate.write_znodes',
'zuul.tenant.tenant-one.pipeline.gate.read_bytes',
'zuul.tenant.tenant-one.pipeline.gate.write_bytes',
]:
val = self.assertReportedStat(key, kind='g')
self.assertTrue(0.0 < float(val) < 60000.0)
self.assertReportedStat('zuul.tenant.tenant-one.pipeline.gate.' self.assertReportedStat('zuul.tenant.tenant-one.pipeline.gate.'
'data_size_compressed', 'data_size_compressed',
kind='g') kind='g')

View File

@ -1927,11 +1927,7 @@ class Scheduler(threading.Thread):
if refreshed: if refreshed:
pipeline.summary.update(ctx, self.globals) pipeline.summary.update(ctx, self.globals)
if self.statsd: if self.statsd:
self.statsd.timing(f'{stats_key}.read_time', self._contextStats(ctx, stats_key)
ctx.cumulative_read_time * 1000)
self.statsd.timing(
f'{stats_key}.write_time',
ctx.cumulative_write_time * 1000)
except LockException: except LockException:
self.log.debug("Skipping locked pipeline %s in tenant %s", self.log.debug("Skipping locked pipeline %s in tenant %s",
pipeline.name, tenant.name) pipeline.name, tenant.name)
@ -1940,6 +1936,24 @@ class Scheduler(threading.Thread):
"Exception processing pipeline %s in tenant %s", "Exception processing pipeline %s in tenant %s",
pipeline.name, tenant.name) pipeline.name, tenant.name)
def _contextStats(self, ctx, stats_key):
self.statsd.timing(f'{stats_key}.read_time',
ctx.cumulative_read_time * 1000)
self.statsd.timing(f'{stats_key}.write_time',
ctx.cumulative_write_time * 1000)
self.statsd.gauge(f'{stats_key}.read_objects',
ctx.cumulative_read_objects)
self.statsd.gauge(f'{stats_key}.write_objects',
ctx.cumulative_write_objects)
self.statsd.gauge(f'{stats_key}.read_znodes',
ctx.cumulative_read_znodes)
self.statsd.gauge(f'{stats_key}.write_znodes',
ctx.cumulative_write_znodes)
self.statsd.gauge(f'{stats_key}.read_bytes',
ctx.cumulative_read_bytes)
self.statsd.gauge(f'{stats_key}.write_bytes',
ctx.cumulative_write_bytes)
def _process_pipeline(self, tenant, pipeline): def _process_pipeline(self, tenant, pipeline):
# Return whether or not we refreshed the pipeline. # Return whether or not we refreshed the pipeline.

View File

@ -33,6 +33,8 @@ class RawShardIO(io.RawIOBase):
self.compressed_bytes_written = 0 self.compressed_bytes_written = 0
self.cumulative_read_time = 0.0 self.cumulative_read_time = 0.0
self.cumulative_write_time = 0.0 self.cumulative_write_time = 0.0
self.znodes_read = 0
self.znodes_written = 0
def readable(self): def readable(self):
return True return True
@ -61,6 +63,7 @@ class RawShardIO(io.RawIOBase):
data, _ = self.client.get(path) data, _ = self.client.get(path)
self.cumulative_read_time += time.perf_counter() - start self.cumulative_read_time += time.perf_counter() - start
self.compressed_bytes_read += len(data) self.compressed_bytes_read += len(data)
self.znodes_read += 1
return zlib.decompress(data) return zlib.decompress(data)
def readall(self): def readall(self):
@ -86,6 +89,7 @@ class RawShardIO(io.RawIOBase):
) )
self.cumulative_write_time += time.perf_counter() - start self.cumulative_write_time += time.perf_counter() - start
self.compressed_bytes_written += len(shard_bytes) self.compressed_bytes_written += len(shard_bytes)
self.znodes_written += 1
return min(byte_count, NODE_BYTE_SIZE_LIMIT) return min(byte_count, NODE_BYTE_SIZE_LIMIT)
@ -102,6 +106,10 @@ class BufferedShardWriter(io.BufferedWriter):
def cumulative_write_time(self): def cumulative_write_time(self):
return self.__raw.cumulative_write_time return self.__raw.cumulative_write_time
@property
def znodes_written(self):
return self.__raw.znodes_written
class BufferedShardReader(io.BufferedReader): class BufferedShardReader(io.BufferedReader):
def __init__(self, client, path): def __init__(self, client, path):
@ -115,3 +123,7 @@ class BufferedShardReader(io.BufferedReader):
@property @property
def cumulative_read_time(self): def cumulative_read_time(self):
return self.__raw.cumulative_read_time return self.__raw.cumulative_read_time
@property
def znodes_read(self):
return self.__raw.znodes_read

View File

@ -33,6 +33,12 @@ class ZKContext:
self.log = log self.log = log
self.cumulative_read_time = 0.0 self.cumulative_read_time = 0.0
self.cumulative_write_time = 0.0 self.cumulative_write_time = 0.0
self.cumulative_read_objects = 0
self.cumulative_write_objects = 0
self.cumulative_read_znodes = 0
self.cumulative_write_znodes = 0
self.cumulative_read_bytes = 0
self.cumulative_write_bytes = 0
def sessionIsValid(self): def sessionIsValid(self):
return ((not self.lock or self.lock.is_still_valid()) and return ((not self.lock or self.lock.is_still_valid()) and
@ -242,6 +248,9 @@ class ZKObject:
start = time.perf_counter() start = time.perf_counter()
compressed_data, zstat = context.client.get(path) compressed_data, zstat = context.client.get(path)
context.cumulative_read_time += time.perf_counter() - start context.cumulative_read_time += time.perf_counter() - start
context.cumulative_read_objects += 1
context.cumulative_read_znodes += 1
context.cumulative_read_bytes += len(compressed_data)
self._set(_zkobject_hash=None) self._set(_zkobject_hash=None)
try: try:
@ -292,6 +301,10 @@ class ZKObject:
zstat = context.client.set(path, compressed_data, zstat = context.client.set(path, compressed_data,
version=self._zstat.version) version=self._zstat.version)
context.cumulative_write_time += time.perf_counter() - start context.cumulative_write_time += time.perf_counter() - start
context.cumulative_write_objects += 1
context.cumulative_write_znodes += 1
context.cumulative_write_bytes += len(compressed_data)
self._set(_zstat=zstat, self._set(_zstat=zstat,
_zkobject_hash=hash(data), _zkobject_hash=hash(data),
_zkobject_compressed_size=len(compressed_data), _zkobject_compressed_size=len(compressed_data),
@ -345,6 +358,11 @@ class ShardedZKObject(ZKObject):
compressed_size = stream.compressed_bytes_read compressed_size = stream.compressed_bytes_read
context.cumulative_read_time += \ context.cumulative_read_time += \
stream.cumulative_read_time stream.cumulative_read_time
context.cumulative_read_objects += 1
context.cumulative_read_znodes += \
stream.znodes_read
context.cumulative_read_bytes += compressed_size
if not data and context.client.exists(path) is None: if not data and context.client.exists(path) is None:
raise NoNodeError raise NoNodeError
self._set(**self.deserialize(data, context)) self._set(**self.deserialize(data, context))
@ -393,6 +411,10 @@ class ShardedZKObject(ZKObject):
compressed_size = stream.compressed_bytes_written compressed_size = stream.compressed_bytes_written
context.cumulative_write_time += \ context.cumulative_write_time += \
stream.cumulative_write_time stream.cumulative_write_time
context.cumulative_write_objects += 1
context.cumulative_write_znodes += \
stream.znodes_written
context.cumulative_write_bytes += compressed_size
self._set(_zkobject_hash=hash(data), self._set(_zkobject_hash=hash(data),
_zkobject_compressed_size=compressed_size, _zkobject_compressed_size=compressed_size,