505 lines
18 KiB
Python
505 lines
18 KiB
Python
# Copyright 2021-2022 Acme Gating, LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
import contextlib
|
|
import json
|
|
import logging
|
|
import sys
|
|
import time
|
|
import types
|
|
import zlib
|
|
import collections
|
|
|
|
from kazoo.exceptions import NodeExistsError, NoNodeError
|
|
from kazoo.retry import KazooRetry
|
|
|
|
from zuul.zk import sharding
|
|
from zuul.zk import ZooKeeperClient
|
|
|
|
|
|
class BaseZKContext:
|
|
profile_logger = logging.getLogger('zuul.profile')
|
|
profile_default = False
|
|
# Only changed by unit tests.
|
|
# The default scales with number of procs.
|
|
_max_workers = None
|
|
|
|
def __init__(self):
|
|
# We create the executor dict in enter to make sure that this
|
|
# is used as a context manager and cleaned up properly.
|
|
self.executor = None
|
|
|
|
def __enter__(self):
|
|
if self.executor:
|
|
raise RuntimeError("ZKContext entered multiple times")
|
|
# This is a dictionary keyed by class. ZKObject subclasses
|
|
# can request a dedicated ThreadPoolExecutor for their class
|
|
# so that deserialize methods that use it can avoid deadlocks
|
|
# with child class deserialize methods.
|
|
self.executor = collections.defaultdict(
|
|
lambda: ThreadPoolExecutor(
|
|
max_workers=self._max_workers,
|
|
thread_name_prefix="ZKContext",
|
|
))
|
|
return self
|
|
|
|
def __exit__(self, etype, value, tb):
|
|
if self.executor:
|
|
for executor in self.executor.values():
|
|
if sys.version_info >= (3, 9):
|
|
executor.shutdown(wait=False, cancel_futures=True)
|
|
else:
|
|
executor.shutdown(wait=False)
|
|
self.executor = None
|
|
|
|
|
|
class ZKContext(BaseZKContext):
|
|
def __init__(self, zk_client, lock, stop_event, log):
|
|
super().__init__()
|
|
if isinstance(zk_client, ZooKeeperClient):
|
|
client = zk_client.client
|
|
else:
|
|
client = zk_client
|
|
self.client = client
|
|
self.lock = lock
|
|
self.stop_event = stop_event
|
|
self.log = log
|
|
self.cumulative_read_time = 0.0
|
|
self.cumulative_write_time = 0.0
|
|
self.cumulative_read_objects = 0
|
|
self.cumulative_write_objects = 0
|
|
self.cumulative_read_znodes = 0
|
|
self.cumulative_write_znodes = 0
|
|
self.cumulative_read_bytes = 0
|
|
self.cumulative_write_bytes = 0
|
|
self.build_references = False
|
|
self.profile = self.profile_default
|
|
|
|
def sessionIsValid(self):
|
|
return ((not self.lock or self.lock.is_still_valid()) and
|
|
(not self.stop_event or not self.stop_event.is_set()))
|
|
|
|
def sessionIsInvalid(self):
|
|
return not self.sessionIsValid()
|
|
|
|
def updateStatsFromOtherContext(self, other):
|
|
self.cumulative_read_time += other.cumulative_read_time
|
|
self.cumulative_write_time += other.cumulative_write_time
|
|
self.cumulative_read_objects += other.cumulative_read_objects
|
|
self.cumulative_write_objects += other.cumulative_write_objects
|
|
self.cumulative_read_znodes += other.cumulative_read_znodes
|
|
self.cumulative_write_znodes += other.cumulative_write_znodes
|
|
self.cumulative_read_bytes += other.cumulative_read_bytes
|
|
self.cumulative_write_bytes += other.cumulative_write_bytes
|
|
|
|
def profileEvent(self, etype, path):
|
|
if not self.profile:
|
|
return
|
|
self.profile_logger.debug(
|
|
'ZK 0x%x %s %s '
|
|
'rt=%s wt=%s ro=%s wo=%s rn=%s wn=%s rb=%s wb=%s',
|
|
id(self), etype, path,
|
|
self.cumulative_read_time, self.cumulative_write_time,
|
|
self.cumulative_read_objects, self.cumulative_write_objects,
|
|
self.cumulative_read_znodes, self.cumulative_write_znodes,
|
|
self.cumulative_read_bytes, self.cumulative_write_bytes)
|
|
|
|
|
|
class LocalZKContext(BaseZKContext):
|
|
"""A Local ZKContext that means don't actually write anything to ZK"""
|
|
|
|
def __init__(self, log):
|
|
super().__init__()
|
|
self.client = None
|
|
self.lock = None
|
|
self.stop_event = None
|
|
self.log = log
|
|
|
|
def sessionIsValid(self):
|
|
return True
|
|
|
|
def sessionIsInvalid(self):
|
|
return False
|
|
|
|
|
|
class ZKObject:
|
|
_retry_interval = 5
|
|
_zkobject_compressed_size = 0
|
|
_zkobject_uncompressed_size = 0
|
|
|
|
# Implementations of these two methods are required
|
|
def getPath(self):
|
|
"""Return the path to save this object in ZK
|
|
|
|
:returns: A string representation of the Znode path
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
def serialize(self, context):
|
|
"""Implement this method to return the data to save in ZK.
|
|
|
|
:returns: A byte string
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
# This should work for most classes
|
|
def deserialize(self, data, context):
|
|
"""Implement this method to convert serialized data into object
|
|
attributes.
|
|
|
|
:param bytes data: A byte string to deserialize
|
|
:param ZKContext context: A ZKContext object with the current
|
|
ZK session and lock.
|
|
|
|
:returns: A dictionary of attributes and values to be set on
|
|
the object.
|
|
"""
|
|
return json.loads(data.decode('utf-8'))
|
|
|
|
# These methods are public and shouldn't need to be overridden
|
|
def updateAttributes(self, context, **kw):
|
|
"""Update attributes on this object and save to ZooKeeper
|
|
|
|
Instead of using attribute assignment, call this method to
|
|
update attributes on this object. It will update the local
|
|
values and also write out the updated object to ZooKeeper.
|
|
|
|
:param ZKContext context: A ZKContext object with the current
|
|
ZK session and lock. Be sure to acquire the lock before
|
|
calling methods on this object. This object will validate
|
|
that the lock is still valid before writing to ZooKeeper.
|
|
|
|
All other parameters are keyword arguments which are
|
|
attributes to be set. Set as many attributes in one method
|
|
call as possible for efficient network use.
|
|
"""
|
|
old = self.__dict__.copy()
|
|
self._set(**kw)
|
|
serial = self._trySerialize(context)
|
|
if hash(serial) != getattr(self, '_zkobject_hash', None):
|
|
try:
|
|
self._save(context, serial)
|
|
except Exception:
|
|
# Roll back our old values if we aren't able to update ZK.
|
|
self._set(**old)
|
|
raise
|
|
|
|
@contextlib.contextmanager
|
|
def activeContext(self, context):
|
|
if self._active_context:
|
|
raise RuntimeError(
|
|
f"Another context is already active {self._active_context}")
|
|
try:
|
|
old = self.__dict__.copy()
|
|
self._set(_active_context=context)
|
|
yield
|
|
serial = self._trySerialize(context)
|
|
if hash(serial) != getattr(self, '_zkobject_hash', None):
|
|
try:
|
|
self._save(context, serial)
|
|
except Exception:
|
|
# Roll back our old values if we aren't able to update ZK.
|
|
self._set(**old)
|
|
raise
|
|
finally:
|
|
self._set(_active_context=None)
|
|
|
|
@classmethod
|
|
def new(klass, context, **kw):
|
|
"""Create a new instance and save it in ZooKeeper"""
|
|
obj = klass()
|
|
obj._set(**kw)
|
|
data = obj._trySerialize(context)
|
|
obj._save(context, data, create=True)
|
|
return obj
|
|
|
|
@classmethod
|
|
def fromZK(klass, context, path, **kw):
|
|
"""Instantiate a new object from data in ZK"""
|
|
obj = klass()
|
|
obj._set(**kw)
|
|
obj._load(context, path=path)
|
|
return obj
|
|
|
|
def refresh(self, context):
|
|
"""Update data from ZK"""
|
|
self._load(context)
|
|
|
|
def exists(self, context):
|
|
"""Return whether the object exists in ZK"""
|
|
path = self.getPath()
|
|
return bool(context.client.exists(path))
|
|
|
|
def _trySerialize(self, context):
|
|
if isinstance(context, LocalZKContext):
|
|
return b''
|
|
try:
|
|
return self.serialize(context)
|
|
except Exception:
|
|
# A higher level must handle this exception, but log
|
|
# ourself here so we know what object triggered it.
|
|
context.log.error(
|
|
"Exception serializing ZKObject %s", self)
|
|
raise
|
|
|
|
def delete(self, context):
|
|
path = self.getPath()
|
|
if context.sessionIsInvalid():
|
|
raise Exception("ZooKeeper session or lock not valid")
|
|
try:
|
|
self._retry(context, context.client.delete,
|
|
path, recursive=True)
|
|
context.profileEvent('delete', path)
|
|
return
|
|
except Exception:
|
|
context.log.error(
|
|
"Exception deleting ZKObject %s at %s", self, path)
|
|
raise
|
|
|
|
def estimateDataSize(self, seen=None):
|
|
"""Attempt to find all ZKObjects below this one and sum their
|
|
compressed and uncompressed sizes.
|
|
|
|
:returns: (compressed_size, uncompressed_size)
|
|
"""
|
|
compressed_size = self._zkobject_compressed_size
|
|
uncompressed_size = self._zkobject_uncompressed_size
|
|
|
|
if seen is None:
|
|
seen = {self}
|
|
|
|
def walk(obj):
|
|
compressed = 0
|
|
uncompressed = 0
|
|
if isinstance(obj, ZKObject):
|
|
if obj in seen:
|
|
return 0, 0
|
|
seen.add(obj)
|
|
compressed, uncompressed = obj.estimateDataSize(seen)
|
|
elif (isinstance(obj, dict) or
|
|
isinstance(obj, types.MappingProxyType)):
|
|
for sub in obj.values():
|
|
c, u = walk(sub)
|
|
compressed += c
|
|
uncompressed += u
|
|
elif (isinstance(obj, list) or
|
|
isinstance(obj, tuple)):
|
|
for sub in obj:
|
|
c, u = walk(sub)
|
|
compressed += c
|
|
uncompressed += u
|
|
return compressed, uncompressed
|
|
|
|
c, u = walk(self.__dict__)
|
|
compressed_size += c
|
|
uncompressed_size += u
|
|
|
|
return (compressed_size, uncompressed_size)
|
|
|
|
# Private methods below
|
|
|
|
def _retry(self, context, func, *args, max_tries=-1, **kw):
|
|
kazoo_retry = KazooRetry(max_tries=max_tries,
|
|
interrupt=context.sessionIsInvalid,
|
|
delay=self._retry_interval, backoff=0,
|
|
ignore_expire=False)
|
|
try:
|
|
return kazoo_retry(func, *args, **kw)
|
|
except InterruptedError:
|
|
pass
|
|
|
|
def __init__(self):
|
|
# Don't support any arguments in constructor to force us to go
|
|
# through a save or restore path.
|
|
super().__init__()
|
|
self._set(_active_context=None)
|
|
|
|
@staticmethod
|
|
def _retryableLoad(context, path):
|
|
start = time.perf_counter()
|
|
compressed_data, zstat = context.client.get(path)
|
|
context.cumulative_read_time += time.perf_counter() - start
|
|
context.cumulative_read_objects += 1
|
|
context.cumulative_read_znodes += 1
|
|
context.cumulative_read_bytes += len(compressed_data)
|
|
return compressed_data, zstat
|
|
|
|
def _load(self, context, path=None, deserialize=True):
|
|
if path is None:
|
|
path = self.getPath()
|
|
if context.sessionIsInvalid():
|
|
raise Exception("ZooKeeper session or lock not valid")
|
|
try:
|
|
compressed_data, zstat = self._retry(context, self._retryableLoad,
|
|
context, path)
|
|
context.profileEvent('get', path)
|
|
except Exception:
|
|
context.log.error(
|
|
"Exception loading ZKObject %s at %s", self, path)
|
|
raise
|
|
if deserialize:
|
|
self._set(_zkobject_hash=None)
|
|
try:
|
|
data = zlib.decompress(compressed_data)
|
|
except zlib.error:
|
|
# Fallback for old, uncompressed data
|
|
data = compressed_data
|
|
if not deserialize:
|
|
return data
|
|
self._set(**self.deserialize(data, context))
|
|
self._set(_zstat=zstat,
|
|
_zkobject_hash=hash(data),
|
|
_zkobject_compressed_size=len(compressed_data),
|
|
_zkobject_uncompressed_size=len(data),
|
|
)
|
|
|
|
@staticmethod
|
|
def _retryableSave(context, create, path, compressed_data, version):
|
|
start = time.perf_counter()
|
|
if create:
|
|
real_path, zstat = context.client.create(
|
|
path, compressed_data, makepath=True,
|
|
include_data=True)
|
|
else:
|
|
zstat = context.client.set(path, compressed_data,
|
|
version=version)
|
|
context.cumulative_write_time += time.perf_counter() - start
|
|
context.cumulative_write_objects += 1
|
|
context.cumulative_write_znodes += 1
|
|
context.cumulative_write_bytes += len(compressed_data)
|
|
return zstat
|
|
|
|
def _save(self, context, data, create=False):
|
|
if isinstance(context, LocalZKContext):
|
|
return
|
|
path = self.getPath()
|
|
if context.sessionIsInvalid():
|
|
raise Exception("ZooKeeper session or lock not valid")
|
|
compressed_data = zlib.compress(data)
|
|
|
|
try:
|
|
if hasattr(self, '_zstat'):
|
|
version = self._zstat.version
|
|
else:
|
|
version = -1
|
|
zstat = self._retry(context, self._retryableSave,
|
|
context, create, path, compressed_data,
|
|
version)
|
|
context.profileEvent('set', path)
|
|
except Exception:
|
|
context.log.error(
|
|
"Exception saving ZKObject %s at %s", self, path)
|
|
raise
|
|
self._set(_zstat=zstat,
|
|
_zkobject_hash=hash(data),
|
|
_zkobject_compressed_size=len(compressed_data),
|
|
_zkobject_uncompressed_size=len(data),
|
|
)
|
|
|
|
def __setattr__(self, name, value):
|
|
if self._active_context:
|
|
super().__setattr__(name, value)
|
|
else:
|
|
raise Exception("Unable to modify ZKObject %s" %
|
|
(repr(self),))
|
|
|
|
def _set(self, **kw):
|
|
for name, value in kw.items():
|
|
super().__setattr__(name, value)
|
|
|
|
|
|
class ShardedZKObject(ZKObject):
|
|
# If the node exists when we create we normally error, unless this
|
|
# is set, in which case we proceed and truncate.
|
|
truncate_on_create = False
|
|
# Normally we delete nodes which have syntax errors, but the
|
|
# pipeline summary is read without a write lock, so those are
|
|
# expected. Don't delete them in that case.
|
|
delete_on_error = True
|
|
|
|
@staticmethod
|
|
def _retryableLoad(context, path):
|
|
with sharding.BufferedShardReader(context.client, path) as stream:
|
|
data = stream.read()
|
|
compressed_size = stream.compressed_bytes_read
|
|
context.cumulative_read_time += stream.cumulative_read_time
|
|
context.cumulative_read_objects += 1
|
|
context.cumulative_read_znodes += stream.znodes_read
|
|
context.cumulative_read_bytes += compressed_size
|
|
if not data and context.client.exists(path) is None:
|
|
raise NoNodeError
|
|
return data, compressed_size
|
|
|
|
def _load(self, context, path=None):
|
|
if path is None:
|
|
path = self.getPath()
|
|
if context.sessionIsInvalid():
|
|
raise Exception("ZooKeeper session or lock not valid")
|
|
try:
|
|
self._set(_zkobject_hash=None)
|
|
data, compressed_size = self._retry(context, self._retryableLoad,
|
|
context, path)
|
|
context.profileEvent('get', path)
|
|
self._set(**self.deserialize(data, context))
|
|
self._set(_zkobject_hash=hash(data),
|
|
_zkobject_compressed_size=compressed_size,
|
|
_zkobject_uncompressed_size=len(data),
|
|
)
|
|
except Exception:
|
|
# A higher level must handle this exception, but log
|
|
# ourself here so we know what object triggered it.
|
|
context.log.error(
|
|
"Exception loading ZKObject %s at %s", self, path)
|
|
if self.delete_on_error:
|
|
self.delete(context)
|
|
raise
|
|
|
|
@staticmethod
|
|
def _retryableSave(context, path, data):
|
|
with sharding.BufferedShardWriter(context.client, path) as stream:
|
|
stream.truncate(0)
|
|
stream.write(data)
|
|
stream.flush()
|
|
compressed_size = stream.compressed_bytes_written
|
|
context.cumulative_write_time += stream.cumulative_write_time
|
|
context.cumulative_write_objects += 1
|
|
context.cumulative_write_znodes += stream.znodes_written
|
|
context.cumulative_write_bytes += compressed_size
|
|
return compressed_size
|
|
|
|
def _save(self, context, data, create=False):
|
|
if isinstance(context, LocalZKContext):
|
|
return
|
|
path = self.getPath()
|
|
if context.sessionIsInvalid():
|
|
raise Exception("ZooKeeper session or lock not valid")
|
|
try:
|
|
if create and not self.truncate_on_create:
|
|
exists = self._retry(context, context.client.exists, path)
|
|
context.profileEvent('exists', path)
|
|
if exists is not None:
|
|
raise NodeExistsError
|
|
compressed_size = self._retry(context, self._retryableSave,
|
|
context, path, data)
|
|
context.profileEvent('set', path)
|
|
self._set(_zkobject_hash=hash(data),
|
|
_zkobject_compressed_size=compressed_size,
|
|
_zkobject_uncompressed_size=len(data),
|
|
)
|
|
except Exception:
|
|
context.log.error(
|
|
"Exception saving ZKObject %s at %s", self, path)
|
|
raise
|