James E. Blair cce82a820c Refactor ZKObject to harmonize plain and sharded
Now that we have the following axes of zkobjects:
* Sharded/non-sharded
* Polymorphic/non-polymorphic

And both of those are done by sub- or mixin-classes, we are ending
up with a lot of repeated almost-identical code, and it's becoming
difficult to maintain, or add support for the 4th type of object:

To address this, let's lefactor zkobject so that we use the same
load/save methods for sharded/non-sharded, and just replace the
underlying IO class.  We'll use a raw ZK io class for the non-sharded
case, and a sharded io reader/writer for the sharded case.

Part of the complexity of this comes from the fact that currently
the BufferedShard[Reader/Writer] classes also implement compression,
and the polymorphic zkobject needs to read some initial uncompressed

To allow that to happen, we'll separate the concerns and remove
compression from the sharded io class.  That means the ZKObject itself
as a consumer of that class (and also the raw io class) is responsible
for compression.

We have been compressing the data one 1mb znode chunk at a time, and
now we will be compressing the entire data before writing.  That might
briefly use more memory.  However, while we've been doing the same
thing on the reading side, we do ultimately end up with one giant byte
string that we give to json.loads, so we're already incurring that
memory penalty on the read side.  Because of this, we shouldn't see
an overall memory increase.  If we do want to address that, we should
be able to refactor this again to pass in a zlib stream object instead
of the entire compressed data (and likewise the reverse on read).

Note that this may produce fewer ZK operations because by putting the
zlib compression in the writer as we did previously, we ended up
chunking the data based on the *uncompressed* size.  So if we got 4mb
of data, we would chunk it to 1mb, then compress it, which means we
might end up writing only 100k per chunk, using more znodes than
necessary.  Now we will chunk the compressed data and go much closer
to the znode size limit.

Change-Id: I513976a703f035545eaf5bd05a381dbc0c509cda
2024-07-17 11:17:24 -07:00

501 lines
19 KiB

# Copyright 2021 BMW Group
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import contextlib
import json
import logging
import threading
import time
import uuid
import hashlib
import zlib
from collections import defaultdict
from collections.abc import Iterable
from kazoo.exceptions import BadVersionError, NodeExistsError, NoNodeError
from zuul import model
from zuul.zk import sharding, ZooKeeperSimpleBase
from zuul.zk.exceptions import ZuulZooKeeperException
from zuul.zk.vendor.watchers import ExistingDataWatch
CHANGE_CACHE_ROOT = "/zuul/cache/connection"
class ConcurrentUpdateError(ZuulZooKeeperException):
def str_or_none(d):
if d is None:
return d
return str(d)
class ChangeKey:
"""Represents a change key
This is used to look up a change in the change cache.
It also contains enough basic information about a change in order
to determine if two entries in the change cache are related or
There are two ways to refer to a Change in ZK. If one ZK object
refers to a change, it should use ChangeKey.reference. This is a
dictionary with structured information about the change. The
contents can be used to construct a ChangeKey, and that can be
used to pull the Change from the cache. The reference is used by
other objects in ZooKeeper to refer to changes, so the
serialization format must be stable or backwards compatible.
The cache itself uses a sha256 digest of the reference as the
actual cache key in ZK. This reduces and stabilizes the length of
the cache keys themselves. Methods outside of the change_cache
should not use this directly.
def __init__(self, connection_name, project_name,
change_type, stable_id, revision):
self.connection_name = str_or_none(connection_name)
self.project_name = str_or_none(project_name)
self.change_type = str_or_none(change_type)
self.stable_id = str_or_none(stable_id)
self.revision = str_or_none(revision)
reference = dict(
self.reference = json.dumps(reference, sort_keys=True)
msg = self.reference.encode('utf8')
self._hash = hashlib.sha256(msg).hexdigest()
def __hash__(self):
return hash(self.reference)
def __eq__(self, other):
return (isinstance(other, ChangeKey) and
self.reference == other.reference)
def __repr__(self):
return (f'<ChangeKey {self.connection_name} {self.project_name} '
f'{self.change_type} {self.stable_id} {self.revision} '
def fromReference(cls, data):
data = json.loads(data)
return cls(data['connection_name'], data['project_name'],
data['change_type'], data['stable_id'], data['revision'])
def isSameChange(self, other):
return all([
self.connection_name == str_or_none(other.connection_name),
self.project_name == str_or_none(other.project_name),
self.change_type == str_or_none(other.change_type),
self.stable_id == str_or_none(other.stable_id),
# Convenience methods for drivers that encode old/newrev in
# revision. Revision is not guaranteed to use this format.
def oldrev(self):
if '..' in self.revision:
old = self.revision.split('..')[0]
if old == 'None':
return None
return old
return None
def newrev(self):
if '..' in self.revision:
new = self.revision.split('..')[1]
if new == 'None':
return None
return new
return self.revision
class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
"""Abstract class for caching change items in Zookeeper.
In order to make updates atomic the change data is stored separate
from the cache entry. The data uses a random UUID znode that is
then referenced from the actual cache entry.
The change data is immutable, which means that an update of a cached
item will result in a new data node. The cache entry will then be
changed to reference the new data.
This approach also allows us to check if a given change is
up-to-date by comparing the referenced UUID in Zookeeper with the
one in the local cache without loading the whole change data.
The change data is stored in the following Zookeeper path:
The cache entries that reference the change data use the following
Data nodes will not be directly removed when an entry is removed
or updated in order to prevent race conditions with multiple
consumers of the cache. The stale data nodes will be instead
cleaned up in the cache's cleanup() method. This is expected to
happen periodically.
def __init__(self, client, connection):
self.log = logging.getLogger(
self.connection = connection
self.root_path = f"{CHANGE_CACHE_ROOT}/{connection.connection_name}"
self.cache_root = f"{self.root_path}/cache"
self.data_root = f"{self.root_path}/data"
self._change_cache = {}
# Per change locks to serialize concurrent creation and update of
# local objects.
self._change_locks = defaultdict(threading.Lock)
self._watched_keys = set()
# Data UUIDs that are candidates to be removed on the next
# cleanup iteration.
self._data_cleanup_candidates = set()
self.kazoo_client.ChildrenWatch(self.cache_root, self._cacheWatcher)
def _dataPath(self, data_uuid):
return f"{self.data_root}/{data_uuid}"
def _cachePath(self, key_hash):
return f"{self.cache_root}/{key_hash}"
def _cacheWatcher(self, cache_keys):
# This method deals with key hashes exclusively
cache_keys = set(cache_keys)
deleted_watches = self._watched_keys - cache_keys
for key in deleted_watches:
self.log.debug("Watcher removing %s from cache", key)
with contextlib.suppress(KeyError):
del self._change_cache[key]
with contextlib.suppress(KeyError):
del self._change_locks[key]
new_keys = cache_keys - self._watched_keys
for key in new_keys:
def _cacheItemWatcher(self, data, zstat, event=None):
if not all((data, zstat)):
key, data_uuid = self._loadKey(data)
self.log.debug("Noticed update to key %s data uuid %s",
key, data_uuid)
self._get(key, data_uuid, zstat)
def _loadKey(self, data):
data = json.loads(data.decode("utf8"))
key = ChangeKey.fromReference(data['key_reference'])
return key, data['data_uuid']
def estimateDataSize(self):
"""Return the data size of the changes in the cache.
:returns: (compressed_size, uncompressed_size)
compressed_size = 0
uncompressed_size = 0
for c in list(self._change_cache.values()):
compressed_size += c.cache_stat.compressed_size
uncompressed_size += c.cache_stat.uncompressed_size
return (compressed_size, uncompressed_size)
def prune(self, relevant, max_age=3600): # 1h
# Relevant is the list of changes directly in a pipeline.
# This method will take care of expanding that out to each
# change's network of related changes.
self.log.debug("Pruning cache")
cutoff_time = time.time() - max_age
outdated_versions = dict()
to_keep = set(relevant)
sched = self.connection.sched
for c in list(self._change_cache.values()):
# Assign to a local variable so all 3 values we use are
# consistent in case the cache_stat is updated during this
# loop.
cache_stat = c.cache_stat
if cache_stat.last_modified >= cutoff_time:
# This entry isn't old enough to delete yet
# Save the version we examined so we can make sure to only
# delete that version.
outdated_versions[cache_stat.key] = cache_stat.version
# Changes we want to keep may have localized networks; keep
# them together even if one member hasn't been updated in a
# while. Only when the entire network hasn't been modified in
# max_age will any change in it be removed.
for key in to_keep.copy():
source = sched.connections.getSource(key.connection_name)
change = source.getChange(key)
change.getRelatedChanges(sched, to_keep)
to_prune = set(outdated_versions.keys()) - to_keep
for key in to_prune:
self.delete(key, outdated_versions[key])
self.log.debug("Done pruning cache")
def cleanup(self):
self.log.debug("Cleaning cache")
valid_uuids = {c.cache_stat.uuid
for c in list(self._change_cache.values())}
stale_uuids = self._data_cleanup_candidates - valid_uuids
for data_uuid in stale_uuids:
self.log.debug("Deleting stale data uuid %s", data_uuid)
self.kazoo_client.delete(self._dataPath(data_uuid), recursive=True)
data_uuids = set(self.kazoo_client.get_children(self.data_root))
self._data_cleanup_candidates = data_uuids - valid_uuids
self.log.debug("Done cleaning cache")
def __iter__(self):
children = self.kazoo_client.get_children(self.cache_root)
except NoNodeError:
for key_hash in children:
change = self._get_from_key_hash(key_hash)
if change is not None:
yield change
def get(self, key):
cache_path = self._cachePath(key._hash)
value, zstat = self.kazoo_client.get(cache_path)
except NoNodeError:
return None
_, data_uuid = self._loadKey(value)
return self._get(key, data_uuid, zstat)
def _get_from_key_hash(self, key_hash):
cache_path = self._cachePath(key_hash)
value, zstat = self.kazoo_client.get(cache_path)
except NoNodeError:
return None
key, data_uuid = self._loadKey(value)
return self._get(key, data_uuid, zstat)
def _get(self, key, data_uuid, zstat):
change = self._change_cache.get(key._hash)
if change and change.cache_stat.uuid == data_uuid:
# Change in our local cache is up-to-date
return change
compressed_size = 0
uncompressed_size = 0
with sharding.BufferedShardReader(
self.kazoo_client, self._dataPath(data_uuid)
) as stream:
raw_data = zlib.decompress(stream.read())
compressed_size = stream.bytes_read
uncompressed_size = len(raw_data)
except NoNodeError:
cache_path = self._cachePath(key._hash)
self.log.error("Removing cache key %s with no data node uuid %s",
key, data_uuid)
# TODO: handle no node + version mismatch
self.kazoo_client.delete(cache_path, zstat.version)
return None
data = json.loads(raw_data)
except json.JSONDecodeError:
cache_path = self._cachePath(key._hash)
self.log.error("Removing cache key %s with corrupt data node "
"uuid %s data %s len %s",
key, data_uuid, repr(raw_data), len(raw_data))
# TODO: handle no node + version mismatch
self.kazoo_client.delete(cache_path, zstat.version)
return None
with self._change_locks[key._hash]:
if change:
# While holding the lock check if we still need to update
# the change and skip the update if we have the latest version.
if change.cache_stat.mzxid >= zstat.mzxid:
return change
self._updateChange(change, data)
change = self._changeFromData(data)
change.cache_stat = model.CacheStat(
key, data_uuid, zstat.version,
zstat.last_modified_transaction_id, zstat.last_modified,
compressed_size, uncompressed_size)
# Use setdefault here so we only have a single instance of a change
# around. In case of a concurrent get this might return a different
# change instance than the one we just created.
return self._change_cache.setdefault(key._hash, change)
def set(self, key, change, version=-1):
data = self._dataFromChange(change)
raw_data = json.dumps(data, sort_keys=True).encode("utf8")
compressed_size = 0
uncompressed_size = 0
data_uuid = uuid.uuid4().hex
with sharding.BufferedShardWriter(
self.kazoo_client, self._dataPath(data_uuid)) as stream:
compressed_size = stream.bytes_written
uncompressed_size = len(raw_data)
# Add the change_key info here mostly for debugging since the
# hash is non-reversible.
cache_data = json.dumps(dict(
), sort_keys=True)
cache_path = self._cachePath(key._hash)
with self._change_locks[key._hash]:
if version == -1:
"Create cache key %s with data uuid %s len %s",
key, data_uuid, len(raw_data))
_, zstat = self.kazoo_client.create(
# Sanity check that we only have a single change instance
# for a key.
if self._change_cache[key._hash] is not change:
raise RuntimeError(
"Conflicting change objects (existing "
f"{self._change_cache[key._hash]} vs. "
f"new {change} "
f"for key '{key.reference}'")
"Update cache key %s with data uuid %s len %s",
key, data_uuid, len(raw_data))
zstat = self.kazoo_client.set(
cache_path, cache_data.encode("utf8"), version)
except (BadVersionError, NodeExistsError, NoNodeError) as exc:
raise ConcurrentUpdateError from exc
change.cache_stat = model.CacheStat(
key, data_uuid, zstat.version,
zstat.last_modified_transaction_id, zstat.last_modified,
compressed_size, uncompressed_size)
self._change_cache[key._hash] = change
def updateChangeWithRetry(self, key, change, update_func, retry_count=5,
for attempt in range(1, retry_count + 1):
version = change.cache_version
newkey = update_func(change)
if allow_key_update and newkey:
key = newkey
self.set(key, change, version)
except ConcurrentUpdateError:
"Conflicting cache update of %s (attempt: %s/%s)",
change, attempt, retry_count)
if attempt == retry_count:
# Update the cache and get the change as it might have
# changed due to a concurrent create.
change = self.get(key)
return change
def delete(self, key, version=-1):
self.log.debug("Deleting %s from cache", key)
cache_path = self._cachePath(key._hash)
# Only delete the cache entry and NOT the data node in order to
# prevent race conditions with other consumers. The stale data
# nodes will be removed by the periodic cleanup.
self.kazoo_client.delete(cache_path, version)
except BadVersionError:
# Someone else may have written a new entry since we
# decided to delete this, so we should no longer delete
# the entry.
except NoNodeError:
with contextlib.suppress(KeyError):
del self._change_cache[key._hash]
def _changeFromData(self, data):
change_type, change_data = data["change_type"], data["change_data"]
change_class = self._getChangeClass(change_type)
project = self.connection.source.getProject(change_data["project"])
change = change_class(project)
return change
def _dataFromChange(self, change):
return {
"change_type": self._getChangeType(change),
"change_data": change.serialize(),
def _updateChange(self, change, data):
def _getChangeClass(self, change_type):
"""Return the change class for the given type."""
return self.CHANGE_TYPE_MAP[change_type]
def _getChangeType(self, change):
"""Return the change type as a string for the given type."""
return type(change).__name__
def CHANGE_TYPE_MAP(self):
"""Return a mapping of change type as string to change class.
This property cann also be defined as a class attribute.