Add some ZK data size stats
This adds reporting of compressed and uncompressed data sizes for the serialized pipeline states and the change caches. These can be used to observe trends over time in data usage and potentially find areas for optimization. Change-Id: I6504e5c2b539295b58644a4ac3d7501da48fe7c1
This commit is contained in:
@@ -32,6 +32,22 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
|
||||
Zuul will report counters for each type of event it receives from
|
||||
each of its configured drivers.
|
||||
|
||||
.. stat:: zuul.connection.<connection>
|
||||
|
||||
Holds metrics specific to connections. This hierarchy includes:
|
||||
|
||||
.. stat:: cache.data_size_compressed
|
||||
:type: gauge
|
||||
|
||||
The number of bytes stored in ZooKeeper for all items in this
|
||||
connection's change cache.
|
||||
|
||||
.. stat:: cache.data_size_uncompressed
|
||||
:type: gauge
|
||||
|
||||
The number of bytes required to for the change cache (the
|
||||
decompressed value of ``data_size_compressed``).
|
||||
|
||||
.. stat:: zuul.tenant.<tenant>.event_enqueue_processing_time
|
||||
:type: timer
|
||||
|
||||
@@ -84,6 +100,19 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
|
||||
|
||||
The time taken to process the pipeline.
|
||||
|
||||
.. stat:: data_size_compressed
|
||||
:type: gauge
|
||||
|
||||
The number of bytes stored in ZooKeeper to represent the
|
||||
serialized state of the pipeline.
|
||||
|
||||
.. stat:: data_size_uncompressed
|
||||
:type: gauge
|
||||
|
||||
The number of bytes required to represent the serialized
|
||||
state of the pipeline (the decompressed value of
|
||||
``data_size_compressed``).
|
||||
|
||||
.. stat:: project
|
||||
|
||||
This hierarchy holds more specific metrics for each project
|
||||
|
||||
@@ -447,6 +447,19 @@ class TestScheduler(ZuulTestCase):
|
||||
'zuul.tenant.tenant-one.event_enqueue_time', kind='ms')
|
||||
self.assertTrue(0.0 < float(val) < 60000.0)
|
||||
|
||||
self.assertReportedStat('zuul.tenant.tenant-one.pipeline.gate.'
|
||||
'data_size_compressed',
|
||||
kind='g')
|
||||
self.assertReportedStat('zuul.tenant.tenant-one.pipeline.gate.'
|
||||
'data_size_uncompressed',
|
||||
kind='g')
|
||||
self.assertReportedStat('zuul.connection.gerrit.cache.'
|
||||
'data_size_compressed',
|
||||
kind='g')
|
||||
self.assertReportedStat('zuul.connection.gerrit.cache.'
|
||||
'data_size_uncompressed',
|
||||
kind='g')
|
||||
|
||||
for build in self.history:
|
||||
self.assertTrue(build.parameters['zuul']['voting'])
|
||||
|
||||
@@ -1390,7 +1403,7 @@ class TestScheduler(ZuulTestCase):
|
||||
change1.cache_stat.uuid,
|
||||
change1.cache_stat.version,
|
||||
change1.cache_stat.mzxid,
|
||||
0.0)
|
||||
0.0, 0, 0)
|
||||
# We should not delete change1 since it's needed by change2
|
||||
# which we want to keep.
|
||||
for connection in sched.connections.connections.values():
|
||||
@@ -1402,7 +1415,7 @@ class TestScheduler(ZuulTestCase):
|
||||
change2.cache_stat.uuid,
|
||||
change2.cache_stat.version,
|
||||
change1.cache_stat.mzxid,
|
||||
0.0)
|
||||
0.0, 0, 0)
|
||||
for connection in sched.connections.connections.values():
|
||||
connection.maintainCache([], max_age=7200)
|
||||
# The master branch change remains
|
||||
|
||||
@@ -1324,6 +1324,9 @@ class TestChangeCache(ZooKeeperBaseTestCase):
|
||||
self.assertEqual(self.cache.get(key_foo), change_foo)
|
||||
self.assertEqual(self.cache.get(key_bar), change_bar)
|
||||
|
||||
compressed_size, uncompressed_size = self.cache.estimateDataSize()
|
||||
self.assertTrue(compressed_size != uncompressed_size != 0)
|
||||
|
||||
def test_update(self):
|
||||
change = DummyChange("project", {"foo": "bar"})
|
||||
key = ChangeKey('conn', 'project', 'change', 'foo', '1')
|
||||
@@ -1400,7 +1403,7 @@ class TestChangeCache(ZooKeeperBaseTestCase):
|
||||
uuid.uuid4().hex,
|
||||
change.cache_version - 1,
|
||||
change.cache_stat.mzxid - 1,
|
||||
0)
|
||||
0, 0, 0)
|
||||
updated_change = self.cache.updateChangeWithRetry(
|
||||
key, change, updater)
|
||||
self.assertEqual(updated_change.foobar, 2)
|
||||
@@ -1509,6 +1512,9 @@ class TestZKObject(ZooKeeperBaseTestCase):
|
||||
foo='bar')
|
||||
self.assertEqual(pipeline1.foo, 'bar')
|
||||
|
||||
compressed_size, uncompressed_size = pipeline1.estimateDataSize()
|
||||
self.assertTrue(compressed_size != uncompressed_size != 0)
|
||||
|
||||
# Load an object from ZK (that we don't already have)
|
||||
with tenant_write_lock(self.zk_client, tenant_name) as lock:
|
||||
context = ZKContext(self.zk_client, lock, stop_event, self.log,
|
||||
@@ -1517,6 +1523,9 @@ class TestZKObject(ZooKeeperBaseTestCase):
|
||||
'/zuul/pipeline/fake_tenant')
|
||||
self.assertEqual(pipeline2.foo, 'bar')
|
||||
|
||||
compressed_size, uncompressed_size = pipeline1.estimateDataSize()
|
||||
self.assertTrue(compressed_size != uncompressed_size != 0)
|
||||
|
||||
def get_ltime(obj):
|
||||
zstat = self.zk_client.client.exists(obj.getPath())
|
||||
return zstat.last_modified_transaction_id
|
||||
|
||||
@@ -336,3 +336,6 @@ class ZKChangeCacheMixin:
|
||||
|
||||
def getChangeByKey(self, key):
|
||||
return self._change_cache.get(key)
|
||||
|
||||
def estimateCacheDataSize(self):
|
||||
return self._change_cache.estimateDataSize()
|
||||
|
||||
@@ -5161,7 +5161,8 @@ class Bundle:
|
||||
|
||||
# Cache info of a ref
|
||||
CacheStat = namedtuple("CacheStat",
|
||||
["key", "uuid", "version", "mzxid", "last_modified"])
|
||||
["key", "uuid", "version", "mzxid", "last_modified",
|
||||
"compressed_size", "uncompressed_size"])
|
||||
|
||||
|
||||
class Ref(object):
|
||||
|
||||
@@ -487,12 +487,22 @@ class Scheduler(threading.Thread):
|
||||
self.statsd.gauge('zuul.mergers.jobs_queued', merge_queue)
|
||||
self.statsd.gauge('zuul.scheduler.eventqueues.management',
|
||||
self.reconfigure_event_queue.qsize())
|
||||
base = 'zuul.scheduler.eventqueues.connection'
|
||||
queue_base = 'zuul.scheduler.eventqueues.connection'
|
||||
conn_base = 'zuul.connection'
|
||||
for connection in self.connections.connections.values():
|
||||
queue = connection.getEventQueue()
|
||||
if queue is not None:
|
||||
self.statsd.gauge(f'{base}.{connection.connection_name}',
|
||||
self.statsd.gauge(f'{queue_base}.{connection.connection_name}',
|
||||
len(queue))
|
||||
if hasattr(connection, 'estimateCacheDataSize'):
|
||||
compressed_size, uncompressed_size =\
|
||||
connection.estimateCacheDataSize()
|
||||
self.statsd.gauge(f'{conn_base}.{connection.connection_name}.'
|
||||
'cache.data_size_compressed',
|
||||
compressed_size)
|
||||
self.statsd.gauge(f'{conn_base}.{connection.connection_name}.'
|
||||
'cache.data_size_uncompressed',
|
||||
uncompressed_size)
|
||||
|
||||
for tenant in self.abide.tenants.values():
|
||||
self.statsd.gauge(f"zuul.tenant.{tenant.name}.management_events",
|
||||
@@ -512,6 +522,12 @@ class Scheduler(threading.Thread):
|
||||
len(result_event_queues[pipeline.name]))
|
||||
self.statsd.gauge(f"{base}.management_events",
|
||||
len(management_event_queues[pipeline.name]))
|
||||
compressed_size, uncompressed_size =\
|
||||
pipeline.state.estimateDataSize()
|
||||
self.statsd.gauge(f'{base}.data_size_compressed',
|
||||
compressed_size)
|
||||
self.statsd.gauge(f'{base}.data_size_uncompressed',
|
||||
uncompressed_size)
|
||||
|
||||
self.nodepool.emitStatsTotals(self.abide)
|
||||
|
||||
|
||||
@@ -205,6 +205,20 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
|
||||
key = ChangeKey.fromReference(data['key_reference'])
|
||||
return key, data['data_uuid']
|
||||
|
||||
def estimateDataSize(self):
|
||||
"""Return the data size of the changes in the cache.
|
||||
|
||||
:returns: (compressed_size, uncompressed_size)
|
||||
"""
|
||||
compressed_size = 0
|
||||
uncompressed_size = 0
|
||||
|
||||
for c in list(self._change_cache.values()):
|
||||
compressed_size += c.cache_stat.compressed_size
|
||||
uncompressed_size += c.cache_stat.uncompressed_size
|
||||
|
||||
return (compressed_size, uncompressed_size)
|
||||
|
||||
def prune(self, relevant, max_age=3600): # 1h
|
||||
# Relevant is the list of changes directly in a pipeline.
|
||||
# This method will take care of expanding that out to each
|
||||
@@ -289,8 +303,15 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
|
||||
# Change in our local cache is up-to-date
|
||||
return change
|
||||
|
||||
compressed_size = 0
|
||||
uncompressed_size = 0
|
||||
try:
|
||||
raw_data = self._getData(data_uuid)
|
||||
with sharding.BufferedShardReader(
|
||||
self.kazoo_client, self._dataPath(data_uuid)
|
||||
) as stream:
|
||||
raw_data = stream.read()
|
||||
compressed_size = stream.compressed_bytes_read
|
||||
uncompressed_size = len(raw_data)
|
||||
except NoNodeError:
|
||||
cache_path = self._cachePath(key._hash)
|
||||
self.log.error("Removing cache key %s with no data node uuid %s",
|
||||
@@ -321,22 +342,27 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
|
||||
|
||||
change.cache_stat = model.CacheStat(
|
||||
key, data_uuid, zstat.version,
|
||||
zstat.last_modified_transaction_id, zstat.last_modified)
|
||||
zstat.last_modified_transaction_id, zstat.last_modified,
|
||||
compressed_size, uncompressed_size)
|
||||
# Use setdefault here so we only have a single instance of a change
|
||||
# around. In case of a concurrent get this might return a different
|
||||
# change instance than the one we just created.
|
||||
return self._change_cache.setdefault(key._hash, change)
|
||||
|
||||
def _getData(self, data_uuid):
|
||||
with sharding.BufferedShardReader(
|
||||
self.kazoo_client, self._dataPath(data_uuid)) as stream:
|
||||
return stream.read()
|
||||
|
||||
def set(self, key, change, version=-1):
|
||||
data = self._dataFromChange(change)
|
||||
raw_data = json.dumps(data, sort_keys=True).encode("utf8")
|
||||
|
||||
data_uuid = self._setData(raw_data)
|
||||
compressed_size = 0
|
||||
uncompressed_size = 0
|
||||
data_uuid = uuid.uuid4().hex
|
||||
with sharding.BufferedShardWriter(
|
||||
self.kazoo_client, self._dataPath(data_uuid)) as stream:
|
||||
stream.write(raw_data)
|
||||
stream.flush()
|
||||
compressed_size = stream.compressed_bytes_written
|
||||
uncompressed_size = len(raw_data)
|
||||
|
||||
# Add the change_key info here mostly for debugging since the
|
||||
# hash is non-reversible.
|
||||
cache_data = json.dumps(dict(
|
||||
@@ -373,16 +399,10 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
|
||||
|
||||
change.cache_stat = model.CacheStat(
|
||||
key, data_uuid, zstat.version,
|
||||
zstat.last_modified_transaction_id, zstat.last_modified)
|
||||
zstat.last_modified_transaction_id, zstat.last_modified,
|
||||
compressed_size, uncompressed_size)
|
||||
self._change_cache[key._hash] = change
|
||||
|
||||
def _setData(self, data):
|
||||
data_uuid = uuid.uuid4().hex
|
||||
with sharding.BufferedShardWriter(
|
||||
self.kazoo_client, self._dataPath(data_uuid)) as stream:
|
||||
stream.write(data)
|
||||
return data_uuid
|
||||
|
||||
def updateChangeWithRetry(self, key, change, update_func, retry_count=5):
|
||||
for attempt in range(1, retry_count + 1):
|
||||
try:
|
||||
|
||||
@@ -28,6 +28,8 @@ class RawShardIO(io.RawIOBase):
|
||||
def __init__(self, client, path):
|
||||
self.client = client
|
||||
self.shard_base = path
|
||||
self.compressed_bytes_read = 0
|
||||
self.compressed_bytes_written = 0
|
||||
|
||||
def readable(self):
|
||||
return True
|
||||
@@ -50,6 +52,7 @@ class RawShardIO(io.RawIOBase):
|
||||
|
||||
def _getData(self, path):
|
||||
data, _ = self.client.get(path)
|
||||
self.compressed_bytes_read += len(data)
|
||||
return zlib.decompress(data)
|
||||
|
||||
def readall(self):
|
||||
@@ -72,14 +75,25 @@ class RawShardIO(io.RawIOBase):
|
||||
sequence=True,
|
||||
makepath=True,
|
||||
)
|
||||
self.compressed_bytes_written += len(shard_bytes)
|
||||
return min(byte_count, NODE_BYTE_SIZE_LIMIT)
|
||||
|
||||
|
||||
class BufferedShardWriter(io.BufferedWriter):
|
||||
def __init__(self, client, path):
|
||||
super().__init__(RawShardIO(client, path), NODE_BYTE_SIZE_LIMIT)
|
||||
self.__raw = RawShardIO(client, path)
|
||||
super().__init__(self.__raw, NODE_BYTE_SIZE_LIMIT)
|
||||
|
||||
@property
|
||||
def compressed_bytes_written(self):
|
||||
return self.__raw.compressed_bytes_written
|
||||
|
||||
|
||||
class BufferedShardReader(io.BufferedReader):
|
||||
def __init__(self, client, path):
|
||||
super().__init__(RawShardIO(client, path), NODE_BYTE_SIZE_LIMIT)
|
||||
self.__raw = RawShardIO(client, path)
|
||||
super().__init__(self.__raw, NODE_BYTE_SIZE_LIMIT)
|
||||
|
||||
@property
|
||||
def compressed_bytes_read(self):
|
||||
return self.__raw.compressed_bytes_read
|
||||
|
||||
@@ -12,9 +12,10 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import json
|
||||
import time
|
||||
import contextlib
|
||||
import types
|
||||
import zlib
|
||||
|
||||
from kazoo.exceptions import (
|
||||
@@ -62,6 +63,8 @@ class LocalZKContext:
|
||||
|
||||
class ZKObject:
|
||||
_retry_interval = 5
|
||||
_zkobject_compressed_size = 0
|
||||
_zkobject_uncompressed_size = 0
|
||||
|
||||
# Implementations of these two methods are required
|
||||
def getPath(self):
|
||||
@@ -192,6 +195,46 @@ class ZKObject:
|
||||
time.sleep(self._retry_interval)
|
||||
raise Exception("ZooKeeper session or lock not valid")
|
||||
|
||||
def estimateDataSize(self, seen=None):
|
||||
"""Attempt to find all ZKObjects below this one and sum their
|
||||
compressed and uncompressed sizes.
|
||||
|
||||
:returns: (compressed_size, uncompressed_size)
|
||||
"""
|
||||
compressed_size = self._zkobject_compressed_size
|
||||
uncompressed_size = self._zkobject_uncompressed_size
|
||||
|
||||
if seen is None:
|
||||
seen = {self}
|
||||
|
||||
def walk(obj):
|
||||
compressed = 0
|
||||
uncompressed = 0
|
||||
if isinstance(obj, ZKObject):
|
||||
if obj in seen:
|
||||
return 0, 0
|
||||
seen.add(obj)
|
||||
compress, uncompressed = obj.estimateDataSize(seen)
|
||||
elif (isinstance(obj, dict) or
|
||||
isinstance(obj, types.MappingProxyType)):
|
||||
for sub in obj.values():
|
||||
c, u = walk(sub)
|
||||
compressed += c
|
||||
uncompressed += u
|
||||
elif (isinstance(obj, list) or
|
||||
isinstance(obj, tuple)):
|
||||
for sub in obj:
|
||||
c, u = walk(sub)
|
||||
compressed += c
|
||||
uncompressed += u
|
||||
return compressed, uncompressed
|
||||
|
||||
c, u = walk(self.__dict__)
|
||||
compressed_size += c
|
||||
uncompressed_size += u
|
||||
|
||||
return (compressed_size, uncompressed_size)
|
||||
|
||||
# Private methods below
|
||||
|
||||
def __init__(self):
|
||||
@@ -214,7 +257,10 @@ class ZKObject:
|
||||
data = compressed_data
|
||||
self._set(**self.deserialize(data, context))
|
||||
self._set(_zstat=zstat,
|
||||
_zkobject_hash=hash(data))
|
||||
_zkobject_hash=hash(data),
|
||||
_zkobject_compressed_size=len(compressed_data),
|
||||
_zkobject_uncompressed_size=len(data),
|
||||
)
|
||||
return
|
||||
except ZookeeperError:
|
||||
# These errors come from the server and are not
|
||||
@@ -251,7 +297,10 @@ class ZKObject:
|
||||
zstat = context.client.set(path, compressed_data,
|
||||
version=self._zstat.version)
|
||||
self._set(_zstat=zstat,
|
||||
_zkobject_hash=hash(data))
|
||||
_zkobject_hash=hash(data),
|
||||
_zkobject_compressed_size=len(compressed_data),
|
||||
_zkobject_uncompressed_size=len(data),
|
||||
)
|
||||
return
|
||||
except ZookeeperError:
|
||||
# These errors come from the server and are not
|
||||
@@ -297,10 +346,14 @@ class ShardedZKObject(ZKObject):
|
||||
with sharding.BufferedShardReader(
|
||||
context.client, path) as stream:
|
||||
data = stream.read()
|
||||
compressed_size = stream.compressed_bytes_read
|
||||
if not data and context.client.exists(path) is None:
|
||||
raise NoNodeError
|
||||
self._set(**self.deserialize(data, context))
|
||||
self._set(_zkobject_hash=hash(data))
|
||||
self._set(_zkobject_hash=hash(data),
|
||||
_zkobject_compressed_size=compressed_size,
|
||||
_zkobject_uncompressed_size=len(data),
|
||||
)
|
||||
return
|
||||
except ZookeeperError:
|
||||
# These errors come from the server and are not
|
||||
@@ -338,7 +391,12 @@ class ShardedZKObject(ZKObject):
|
||||
context.client, path) as stream:
|
||||
stream.truncate(0)
|
||||
stream.write(data)
|
||||
self._set(_zkobject_hash=hash(data))
|
||||
stream.flush()
|
||||
compressed_size = stream.compressed_bytes_written
|
||||
self._set(_zkobject_hash=hash(data),
|
||||
_zkobject_compressed_size=compressed_size,
|
||||
_zkobject_uncompressed_size=len(data),
|
||||
)
|
||||
return
|
||||
except ZookeeperError:
|
||||
# These errors come from the server and are not
|
||||
|
||||
Reference in New Issue
Block a user