Merge "Add some ZK data size stats"
This commit is contained in:
@@ -32,6 +32,22 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
|
||||
Zuul will report counters for each type of event it receives from
|
||||
each of its configured drivers.
|
||||
|
||||
.. stat:: zuul.connection.<connection>
|
||||
|
||||
Holds metrics specific to connections. This hierarchy includes:
|
||||
|
||||
.. stat:: cache.data_size_compressed
|
||||
:type: gauge
|
||||
|
||||
The number of bytes stored in ZooKeeper for all items in this
|
||||
connection's change cache.
|
||||
|
||||
.. stat:: cache.data_size_uncompressed
|
||||
:type: gauge
|
||||
|
||||
The number of bytes required to for the change cache (the
|
||||
decompressed value of ``data_size_compressed``).
|
||||
|
||||
.. stat:: zuul.tenant.<tenant>.event_enqueue_processing_time
|
||||
:type: timer
|
||||
|
||||
@@ -84,6 +100,19 @@ These metrics are emitted by the Zuul :ref:`scheduler`:
|
||||
|
||||
The time taken to process the pipeline.
|
||||
|
||||
.. stat:: data_size_compressed
|
||||
:type: gauge
|
||||
|
||||
The number of bytes stored in ZooKeeper to represent the
|
||||
serialized state of the pipeline.
|
||||
|
||||
.. stat:: data_size_uncompressed
|
||||
:type: gauge
|
||||
|
||||
The number of bytes required to represent the serialized
|
||||
state of the pipeline (the decompressed value of
|
||||
``data_size_compressed``).
|
||||
|
||||
.. stat:: project
|
||||
|
||||
This hierarchy holds more specific metrics for each project
|
||||
|
||||
@@ -448,6 +448,19 @@ class TestScheduler(ZuulTestCase):
|
||||
'zuul.tenant.tenant-one.event_enqueue_time', kind='ms')
|
||||
self.assertTrue(0.0 < float(val) < 60000.0)
|
||||
|
||||
self.assertReportedStat('zuul.tenant.tenant-one.pipeline.gate.'
|
||||
'data_size_compressed',
|
||||
kind='g')
|
||||
self.assertReportedStat('zuul.tenant.tenant-one.pipeline.gate.'
|
||||
'data_size_uncompressed',
|
||||
kind='g')
|
||||
self.assertReportedStat('zuul.connection.gerrit.cache.'
|
||||
'data_size_compressed',
|
||||
kind='g')
|
||||
self.assertReportedStat('zuul.connection.gerrit.cache.'
|
||||
'data_size_uncompressed',
|
||||
kind='g')
|
||||
|
||||
for build in self.history:
|
||||
self.assertTrue(build.parameters['zuul']['voting'])
|
||||
|
||||
@@ -1391,7 +1404,7 @@ class TestScheduler(ZuulTestCase):
|
||||
change1.cache_stat.uuid,
|
||||
change1.cache_stat.version,
|
||||
change1.cache_stat.mzxid,
|
||||
0.0)
|
||||
0.0, 0, 0)
|
||||
# We should not delete change1 since it's needed by change2
|
||||
# which we want to keep.
|
||||
for connection in sched.connections.connections.values():
|
||||
@@ -1403,7 +1416,7 @@ class TestScheduler(ZuulTestCase):
|
||||
change2.cache_stat.uuid,
|
||||
change2.cache_stat.version,
|
||||
change1.cache_stat.mzxid,
|
||||
0.0)
|
||||
0.0, 0, 0)
|
||||
for connection in sched.connections.connections.values():
|
||||
connection.maintainCache([], max_age=7200)
|
||||
# The master branch change remains
|
||||
|
||||
@@ -1324,6 +1324,9 @@ class TestChangeCache(ZooKeeperBaseTestCase):
|
||||
self.assertEqual(self.cache.get(key_foo), change_foo)
|
||||
self.assertEqual(self.cache.get(key_bar), change_bar)
|
||||
|
||||
compressed_size, uncompressed_size = self.cache.estimateDataSize()
|
||||
self.assertTrue(compressed_size != uncompressed_size != 0)
|
||||
|
||||
def test_update(self):
|
||||
change = DummyChange("project", {"foo": "bar"})
|
||||
key = ChangeKey('conn', 'project', 'change', 'foo', '1')
|
||||
@@ -1400,7 +1403,7 @@ class TestChangeCache(ZooKeeperBaseTestCase):
|
||||
uuid.uuid4().hex,
|
||||
change.cache_version - 1,
|
||||
change.cache_stat.mzxid - 1,
|
||||
0)
|
||||
0, 0, 0)
|
||||
updated_change = self.cache.updateChangeWithRetry(
|
||||
key, change, updater)
|
||||
self.assertEqual(updated_change.foobar, 2)
|
||||
@@ -1509,6 +1512,9 @@ class TestZKObject(ZooKeeperBaseTestCase):
|
||||
foo='bar')
|
||||
self.assertEqual(pipeline1.foo, 'bar')
|
||||
|
||||
compressed_size, uncompressed_size = pipeline1.estimateDataSize()
|
||||
self.assertTrue(compressed_size != uncompressed_size != 0)
|
||||
|
||||
# Load an object from ZK (that we don't already have)
|
||||
with tenant_write_lock(self.zk_client, tenant_name) as lock:
|
||||
context = ZKContext(self.zk_client, lock, stop_event, self.log,
|
||||
@@ -1517,6 +1523,9 @@ class TestZKObject(ZooKeeperBaseTestCase):
|
||||
'/zuul/pipeline/fake_tenant')
|
||||
self.assertEqual(pipeline2.foo, 'bar')
|
||||
|
||||
compressed_size, uncompressed_size = pipeline1.estimateDataSize()
|
||||
self.assertTrue(compressed_size != uncompressed_size != 0)
|
||||
|
||||
def get_ltime(obj):
|
||||
zstat = self.zk_client.client.exists(obj.getPath())
|
||||
return zstat.last_modified_transaction_id
|
||||
|
||||
@@ -336,3 +336,6 @@ class ZKChangeCacheMixin:
|
||||
|
||||
def getChangeByKey(self, key):
|
||||
return self._change_cache.get(key)
|
||||
|
||||
def estimateCacheDataSize(self):
|
||||
return self._change_cache.estimateDataSize()
|
||||
|
||||
@@ -5174,7 +5174,8 @@ class Bundle:
|
||||
|
||||
# Cache info of a ref
|
||||
CacheStat = namedtuple("CacheStat",
|
||||
["key", "uuid", "version", "mzxid", "last_modified"])
|
||||
["key", "uuid", "version", "mzxid", "last_modified",
|
||||
"compressed_size", "uncompressed_size"])
|
||||
|
||||
|
||||
class Ref(object):
|
||||
|
||||
@@ -520,12 +520,22 @@ class Scheduler(threading.Thread):
|
||||
self.statsd.gauge('zuul.mergers.jobs_queued', merge_queue)
|
||||
self.statsd.gauge('zuul.scheduler.eventqueues.management',
|
||||
self.reconfigure_event_queue.qsize())
|
||||
base = 'zuul.scheduler.eventqueues.connection'
|
||||
queue_base = 'zuul.scheduler.eventqueues.connection'
|
||||
conn_base = 'zuul.connection'
|
||||
for connection in self.connections.connections.values():
|
||||
queue = connection.getEventQueue()
|
||||
if queue is not None:
|
||||
self.statsd.gauge(f'{base}.{connection.connection_name}',
|
||||
self.statsd.gauge(f'{queue_base}.{connection.connection_name}',
|
||||
len(queue))
|
||||
if hasattr(connection, 'estimateCacheDataSize'):
|
||||
compressed_size, uncompressed_size =\
|
||||
connection.estimateCacheDataSize()
|
||||
self.statsd.gauge(f'{conn_base}.{connection.connection_name}.'
|
||||
'cache.data_size_compressed',
|
||||
compressed_size)
|
||||
self.statsd.gauge(f'{conn_base}.{connection.connection_name}.'
|
||||
'cache.data_size_uncompressed',
|
||||
uncompressed_size)
|
||||
|
||||
for tenant in self.abide.tenants.values():
|
||||
self.statsd.gauge(f"zuul.tenant.{tenant.name}.management_events",
|
||||
@@ -545,6 +555,12 @@ class Scheduler(threading.Thread):
|
||||
len(result_event_queues[pipeline.name]))
|
||||
self.statsd.gauge(f"{base}.management_events",
|
||||
len(management_event_queues[pipeline.name]))
|
||||
compressed_size, uncompressed_size =\
|
||||
pipeline.state.estimateDataSize()
|
||||
self.statsd.gauge(f'{base}.data_size_compressed',
|
||||
compressed_size)
|
||||
self.statsd.gauge(f'{base}.data_size_uncompressed',
|
||||
uncompressed_size)
|
||||
|
||||
self.nodepool.emitStatsTotals(self.abide)
|
||||
|
||||
|
||||
@@ -205,6 +205,20 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
|
||||
key = ChangeKey.fromReference(data['key_reference'])
|
||||
return key, data['data_uuid']
|
||||
|
||||
def estimateDataSize(self):
|
||||
"""Return the data size of the changes in the cache.
|
||||
|
||||
:returns: (compressed_size, uncompressed_size)
|
||||
"""
|
||||
compressed_size = 0
|
||||
uncompressed_size = 0
|
||||
|
||||
for c in list(self._change_cache.values()):
|
||||
compressed_size += c.cache_stat.compressed_size
|
||||
uncompressed_size += c.cache_stat.uncompressed_size
|
||||
|
||||
return (compressed_size, uncompressed_size)
|
||||
|
||||
def prune(self, relevant, max_age=3600): # 1h
|
||||
# Relevant is the list of changes directly in a pipeline.
|
||||
# This method will take care of expanding that out to each
|
||||
@@ -289,8 +303,15 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
|
||||
# Change in our local cache is up-to-date
|
||||
return change
|
||||
|
||||
compressed_size = 0
|
||||
uncompressed_size = 0
|
||||
try:
|
||||
raw_data = self._getData(data_uuid)
|
||||
with sharding.BufferedShardReader(
|
||||
self.kazoo_client, self._dataPath(data_uuid)
|
||||
) as stream:
|
||||
raw_data = stream.read()
|
||||
compressed_size = stream.compressed_bytes_read
|
||||
uncompressed_size = len(raw_data)
|
||||
except NoNodeError:
|
||||
cache_path = self._cachePath(key._hash)
|
||||
self.log.error("Removing cache key %s with no data node uuid %s",
|
||||
@@ -321,22 +342,27 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
|
||||
|
||||
change.cache_stat = model.CacheStat(
|
||||
key, data_uuid, zstat.version,
|
||||
zstat.last_modified_transaction_id, zstat.last_modified)
|
||||
zstat.last_modified_transaction_id, zstat.last_modified,
|
||||
compressed_size, uncompressed_size)
|
||||
# Use setdefault here so we only have a single instance of a change
|
||||
# around. In case of a concurrent get this might return a different
|
||||
# change instance than the one we just created.
|
||||
return self._change_cache.setdefault(key._hash, change)
|
||||
|
||||
def _getData(self, data_uuid):
|
||||
with sharding.BufferedShardReader(
|
||||
self.kazoo_client, self._dataPath(data_uuid)) as stream:
|
||||
return stream.read()
|
||||
|
||||
def set(self, key, change, version=-1):
|
||||
data = self._dataFromChange(change)
|
||||
raw_data = json.dumps(data, sort_keys=True).encode("utf8")
|
||||
|
||||
data_uuid = self._setData(raw_data)
|
||||
compressed_size = 0
|
||||
uncompressed_size = 0
|
||||
data_uuid = uuid.uuid4().hex
|
||||
with sharding.BufferedShardWriter(
|
||||
self.kazoo_client, self._dataPath(data_uuid)) as stream:
|
||||
stream.write(raw_data)
|
||||
stream.flush()
|
||||
compressed_size = stream.compressed_bytes_written
|
||||
uncompressed_size = len(raw_data)
|
||||
|
||||
# Add the change_key info here mostly for debugging since the
|
||||
# hash is non-reversible.
|
||||
cache_data = json.dumps(dict(
|
||||
@@ -373,16 +399,10 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
|
||||
|
||||
change.cache_stat = model.CacheStat(
|
||||
key, data_uuid, zstat.version,
|
||||
zstat.last_modified_transaction_id, zstat.last_modified)
|
||||
zstat.last_modified_transaction_id, zstat.last_modified,
|
||||
compressed_size, uncompressed_size)
|
||||
self._change_cache[key._hash] = change
|
||||
|
||||
def _setData(self, data):
|
||||
data_uuid = uuid.uuid4().hex
|
||||
with sharding.BufferedShardWriter(
|
||||
self.kazoo_client, self._dataPath(data_uuid)) as stream:
|
||||
stream.write(data)
|
||||
return data_uuid
|
||||
|
||||
def updateChangeWithRetry(self, key, change, update_func, retry_count=5):
|
||||
for attempt in range(1, retry_count + 1):
|
||||
try:
|
||||
|
||||
@@ -28,6 +28,8 @@ class RawShardIO(io.RawIOBase):
|
||||
def __init__(self, client, path):
|
||||
self.client = client
|
||||
self.shard_base = path
|
||||
self.compressed_bytes_read = 0
|
||||
self.compressed_bytes_written = 0
|
||||
|
||||
def readable(self):
|
||||
return True
|
||||
@@ -50,6 +52,7 @@ class RawShardIO(io.RawIOBase):
|
||||
|
||||
def _getData(self, path):
|
||||
data, _ = self.client.get(path)
|
||||
self.compressed_bytes_read += len(data)
|
||||
return zlib.decompress(data)
|
||||
|
||||
def readall(self):
|
||||
@@ -72,14 +75,25 @@ class RawShardIO(io.RawIOBase):
|
||||
sequence=True,
|
||||
makepath=True,
|
||||
)
|
||||
self.compressed_bytes_written += len(shard_bytes)
|
||||
return min(byte_count, NODE_BYTE_SIZE_LIMIT)
|
||||
|
||||
|
||||
class BufferedShardWriter(io.BufferedWriter):
|
||||
def __init__(self, client, path):
|
||||
super().__init__(RawShardIO(client, path), NODE_BYTE_SIZE_LIMIT)
|
||||
self.__raw = RawShardIO(client, path)
|
||||
super().__init__(self.__raw, NODE_BYTE_SIZE_LIMIT)
|
||||
|
||||
@property
|
||||
def compressed_bytes_written(self):
|
||||
return self.__raw.compressed_bytes_written
|
||||
|
||||
|
||||
class BufferedShardReader(io.BufferedReader):
|
||||
def __init__(self, client, path):
|
||||
super().__init__(RawShardIO(client, path), NODE_BYTE_SIZE_LIMIT)
|
||||
self.__raw = RawShardIO(client, path)
|
||||
super().__init__(self.__raw, NODE_BYTE_SIZE_LIMIT)
|
||||
|
||||
@property
|
||||
def compressed_bytes_read(self):
|
||||
return self.__raw.compressed_bytes_read
|
||||
|
||||
@@ -12,9 +12,10 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import json
|
||||
import time
|
||||
import contextlib
|
||||
import types
|
||||
import zlib
|
||||
|
||||
from kazoo.exceptions import (
|
||||
@@ -62,6 +63,8 @@ class LocalZKContext:
|
||||
|
||||
class ZKObject:
|
||||
_retry_interval = 5
|
||||
_zkobject_compressed_size = 0
|
||||
_zkobject_uncompressed_size = 0
|
||||
|
||||
# Implementations of these two methods are required
|
||||
def getPath(self):
|
||||
@@ -192,6 +195,46 @@ class ZKObject:
|
||||
time.sleep(self._retry_interval)
|
||||
raise Exception("ZooKeeper session or lock not valid")
|
||||
|
||||
def estimateDataSize(self, seen=None):
|
||||
"""Attempt to find all ZKObjects below this one and sum their
|
||||
compressed and uncompressed sizes.
|
||||
|
||||
:returns: (compressed_size, uncompressed_size)
|
||||
"""
|
||||
compressed_size = self._zkobject_compressed_size
|
||||
uncompressed_size = self._zkobject_uncompressed_size
|
||||
|
||||
if seen is None:
|
||||
seen = {self}
|
||||
|
||||
def walk(obj):
|
||||
compressed = 0
|
||||
uncompressed = 0
|
||||
if isinstance(obj, ZKObject):
|
||||
if obj in seen:
|
||||
return 0, 0
|
||||
seen.add(obj)
|
||||
compress, uncompressed = obj.estimateDataSize(seen)
|
||||
elif (isinstance(obj, dict) or
|
||||
isinstance(obj, types.MappingProxyType)):
|
||||
for sub in obj.values():
|
||||
c, u = walk(sub)
|
||||
compressed += c
|
||||
uncompressed += u
|
||||
elif (isinstance(obj, list) or
|
||||
isinstance(obj, tuple)):
|
||||
for sub in obj:
|
||||
c, u = walk(sub)
|
||||
compressed += c
|
||||
uncompressed += u
|
||||
return compressed, uncompressed
|
||||
|
||||
c, u = walk(self.__dict__)
|
||||
compressed_size += c
|
||||
uncompressed_size += u
|
||||
|
||||
return (compressed_size, uncompressed_size)
|
||||
|
||||
# Private methods below
|
||||
|
||||
def __init__(self):
|
||||
@@ -214,7 +257,10 @@ class ZKObject:
|
||||
data = compressed_data
|
||||
self._set(**self.deserialize(data, context))
|
||||
self._set(_zstat=zstat,
|
||||
_zkobject_hash=hash(data))
|
||||
_zkobject_hash=hash(data),
|
||||
_zkobject_compressed_size=len(compressed_data),
|
||||
_zkobject_uncompressed_size=len(data),
|
||||
)
|
||||
return
|
||||
except ZookeeperError:
|
||||
# These errors come from the server and are not
|
||||
@@ -251,7 +297,10 @@ class ZKObject:
|
||||
zstat = context.client.set(path, compressed_data,
|
||||
version=self._zstat.version)
|
||||
self._set(_zstat=zstat,
|
||||
_zkobject_hash=hash(data))
|
||||
_zkobject_hash=hash(data),
|
||||
_zkobject_compressed_size=len(compressed_data),
|
||||
_zkobject_uncompressed_size=len(data),
|
||||
)
|
||||
return
|
||||
except ZookeeperError:
|
||||
# These errors come from the server and are not
|
||||
@@ -297,10 +346,14 @@ class ShardedZKObject(ZKObject):
|
||||
with sharding.BufferedShardReader(
|
||||
context.client, path) as stream:
|
||||
data = stream.read()
|
||||
compressed_size = stream.compressed_bytes_read
|
||||
if not data and context.client.exists(path) is None:
|
||||
raise NoNodeError
|
||||
self._set(**self.deserialize(data, context))
|
||||
self._set(_zkobject_hash=hash(data))
|
||||
self._set(_zkobject_hash=hash(data),
|
||||
_zkobject_compressed_size=compressed_size,
|
||||
_zkobject_uncompressed_size=len(data),
|
||||
)
|
||||
return
|
||||
except ZookeeperError:
|
||||
# These errors come from the server and are not
|
||||
@@ -338,7 +391,12 @@ class ShardedZKObject(ZKObject):
|
||||
context.client, path) as stream:
|
||||
stream.truncate(0)
|
||||
stream.write(data)
|
||||
self._set(_zkobject_hash=hash(data))
|
||||
stream.flush()
|
||||
compressed_size = stream.compressed_bytes_written
|
||||
self._set(_zkobject_hash=hash(data),
|
||||
_zkobject_compressed_size=compressed_size,
|
||||
_zkobject_uncompressed_size=len(data),
|
||||
)
|
||||
return
|
||||
except ZookeeperError:
|
||||
# These errors come from the server and are not
|
||||
|
||||
Reference in New Issue
Block a user