diff --git a/doc/source/monitoring.rst b/doc/source/monitoring.rst index 3c0b2a1ff0..10e358e49e 100644 --- a/doc/source/monitoring.rst +++ b/doc/source/monitoring.rst @@ -235,6 +235,24 @@ These metrics are emitted by the Zuul :ref:`scheduler`: The time spent reading data from ZooKeeper during a single pipeline processing run. + .. stat:: read_znodes + :type: gauge + + The number of ZNodes read from ZooKeeper during a single + pipeline processing run. + + .. stat:: read_objects + :type: gauge + + The number of Zuul data model objects read from ZooKeeper + during a single pipeline processing run. + + .. stat:: read_bytes + :type: gauge + + The amount of data read from ZooKeeper during a single + pipeline processing run. + .. stat:: refresh :type: timer @@ -273,6 +291,24 @@ These metrics are emitted by the Zuul :ref:`scheduler`: The time spent writing data to ZooKeeper during a single pipeline processing run. + .. stat:: write_znodes + :type: gauge + + The number of ZNodes written to ZooKeeper during a single + pipeline processing run. + + .. stat:: write_objects + :type: gauge + + The number of Zuul data model objects written to ZooKeeper + during a single pipeline processing run. + + .. stat:: write_bytes + :type: gauge + + The amount of data written to ZooKeeper during a single + pipeline processing run. + .. stat:: zuul.executor. Holds metrics emitted by individual executors. The ```` diff --git a/releasenotes/notes/pipeline-timing-ea263e6e5939b1aa.yaml b/releasenotes/notes/pipeline-timing-ea263e6e5939b1aa.yaml index cab32e1469..b6e7f96b0b 100644 --- a/releasenotes/notes/pipeline-timing-ea263e6e5939b1aa.yaml +++ b/releasenotes/notes/pipeline-timing-ea263e6e5939b1aa.yaml @@ -14,3 +14,9 @@ features: * :stat:`zuul.tenant..pipeline..event_job_time` * :stat:`zuul.tenant..pipeline..read_time` * :stat:`zuul.tenant..pipeline..write_time` + * :stat:`zuul.tenant..pipeline..read_objects` + * :stat:`zuul.tenant..pipeline..write_objects` + * :stat:`zuul.tenant..pipeline..read_znodes` + * :stat:`zuul.tenant..pipeline..write_znodes` + * :stat:`zuul.tenant..pipeline..read_bytes` + * :stat:`zuul.tenant..pipeline..write_bytes` diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 69597401b8..88916171dd 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -456,6 +456,17 @@ class TestScheduler(ZuulTestCase): val = self.assertReportedStat(key, kind='ms') self.assertTrue(0.0 < float(val) < 60000.0) + for key in [ + 'zuul.tenant.tenant-one.pipeline.gate.read_objects', + 'zuul.tenant.tenant-one.pipeline.gate.write_objects', + 'zuul.tenant.tenant-one.pipeline.gate.read_znodes', + 'zuul.tenant.tenant-one.pipeline.gate.write_znodes', + 'zuul.tenant.tenant-one.pipeline.gate.read_bytes', + 'zuul.tenant.tenant-one.pipeline.gate.write_bytes', + ]: + val = self.assertReportedStat(key, kind='g') + self.assertTrue(0.0 < float(val) < 60000.0) + self.assertReportedStat('zuul.tenant.tenant-one.pipeline.gate.' 'data_size_compressed', kind='g') diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 26fa6ac514..ebcfbd71d1 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -1927,11 +1927,7 @@ class Scheduler(threading.Thread): if refreshed: pipeline.summary.update(ctx, self.globals) if self.statsd: - self.statsd.timing(f'{stats_key}.read_time', - ctx.cumulative_read_time * 1000) - self.statsd.timing( - f'{stats_key}.write_time', - ctx.cumulative_write_time * 1000) + self._contextStats(ctx, stats_key) except LockException: self.log.debug("Skipping locked pipeline %s in tenant %s", pipeline.name, tenant.name) @@ -1940,6 +1936,24 @@ class Scheduler(threading.Thread): "Exception processing pipeline %s in tenant %s", pipeline.name, tenant.name) + def _contextStats(self, ctx, stats_key): + self.statsd.timing(f'{stats_key}.read_time', + ctx.cumulative_read_time * 1000) + self.statsd.timing(f'{stats_key}.write_time', + ctx.cumulative_write_time * 1000) + self.statsd.gauge(f'{stats_key}.read_objects', + ctx.cumulative_read_objects) + self.statsd.gauge(f'{stats_key}.write_objects', + ctx.cumulative_write_objects) + self.statsd.gauge(f'{stats_key}.read_znodes', + ctx.cumulative_read_znodes) + self.statsd.gauge(f'{stats_key}.write_znodes', + ctx.cumulative_write_znodes) + self.statsd.gauge(f'{stats_key}.read_bytes', + ctx.cumulative_read_bytes) + self.statsd.gauge(f'{stats_key}.write_bytes', + ctx.cumulative_write_bytes) + def _process_pipeline(self, tenant, pipeline): # Return whether or not we refreshed the pipeline. diff --git a/zuul/zk/sharding.py b/zuul/zk/sharding.py index 5ca1158d5f..e8a3638e72 100644 --- a/zuul/zk/sharding.py +++ b/zuul/zk/sharding.py @@ -33,6 +33,8 @@ class RawShardIO(io.RawIOBase): self.compressed_bytes_written = 0 self.cumulative_read_time = 0.0 self.cumulative_write_time = 0.0 + self.znodes_read = 0 + self.znodes_written = 0 def readable(self): return True @@ -61,6 +63,7 @@ class RawShardIO(io.RawIOBase): data, _ = self.client.get(path) self.cumulative_read_time += time.perf_counter() - start self.compressed_bytes_read += len(data) + self.znodes_read += 1 return zlib.decompress(data) def readall(self): @@ -86,6 +89,7 @@ class RawShardIO(io.RawIOBase): ) self.cumulative_write_time += time.perf_counter() - start self.compressed_bytes_written += len(shard_bytes) + self.znodes_written += 1 return min(byte_count, NODE_BYTE_SIZE_LIMIT) @@ -102,6 +106,10 @@ class BufferedShardWriter(io.BufferedWriter): def cumulative_write_time(self): return self.__raw.cumulative_write_time + @property + def znodes_written(self): + return self.__raw.znodes_written + class BufferedShardReader(io.BufferedReader): def __init__(self, client, path): @@ -115,3 +123,7 @@ class BufferedShardReader(io.BufferedReader): @property def cumulative_read_time(self): return self.__raw.cumulative_read_time + + @property + def znodes_read(self): + return self.__raw.znodes_read diff --git a/zuul/zk/zkobject.py b/zuul/zk/zkobject.py index e837bf3ea6..215f442919 100644 --- a/zuul/zk/zkobject.py +++ b/zuul/zk/zkobject.py @@ -33,6 +33,12 @@ class ZKContext: self.log = log self.cumulative_read_time = 0.0 self.cumulative_write_time = 0.0 + self.cumulative_read_objects = 0 + self.cumulative_write_objects = 0 + self.cumulative_read_znodes = 0 + self.cumulative_write_znodes = 0 + self.cumulative_read_bytes = 0 + self.cumulative_write_bytes = 0 def sessionIsValid(self): return ((not self.lock or self.lock.is_still_valid()) and @@ -242,6 +248,9 @@ class ZKObject: start = time.perf_counter() compressed_data, zstat = context.client.get(path) context.cumulative_read_time += time.perf_counter() - start + context.cumulative_read_objects += 1 + context.cumulative_read_znodes += 1 + context.cumulative_read_bytes += len(compressed_data) self._set(_zkobject_hash=None) try: @@ -292,6 +301,10 @@ class ZKObject: zstat = context.client.set(path, compressed_data, version=self._zstat.version) context.cumulative_write_time += time.perf_counter() - start + context.cumulative_write_objects += 1 + context.cumulative_write_znodes += 1 + context.cumulative_write_bytes += len(compressed_data) + self._set(_zstat=zstat, _zkobject_hash=hash(data), _zkobject_compressed_size=len(compressed_data), @@ -345,6 +358,11 @@ class ShardedZKObject(ZKObject): compressed_size = stream.compressed_bytes_read context.cumulative_read_time += \ stream.cumulative_read_time + context.cumulative_read_objects += 1 + context.cumulative_read_znodes += \ + stream.znodes_read + context.cumulative_read_bytes += compressed_size + if not data and context.client.exists(path) is None: raise NoNodeError self._set(**self.deserialize(data, context)) @@ -393,6 +411,10 @@ class ShardedZKObject(ZKObject): compressed_size = stream.compressed_bytes_written context.cumulative_write_time += \ stream.cumulative_write_time + context.cumulative_write_objects += 1 + context.cumulative_write_znodes += \ + stream.znodes_written + context.cumulative_write_bytes += compressed_size self._set(_zkobject_hash=hash(data), _zkobject_compressed_size=compressed_size,