Use kazoo.retry in zkobject

It appears that the division of retryable/non-retryable exceptions
is not exactly as asserted in the comments in the retry handlers
in zkobject.  The kazoo.retry module has a class that implements
retry handling with the correct set of retryable exceptions, so
let's use that instead.

The main thing we lose here is the log messages indicating we are
in a retry loop.  There doesn't appear to be a good way to hook
into KazooRetry for that (we could log in the interrupt method,
but that is called every 0.1 seconds).

The InvalidObjectError exception appears to be unused, so it is
removed (since continuing to use it would make the exception handling
more complex).

Change-Id: I1278cd27873374b4efd90504d7166c74c6057b52
This commit is contained in:
James E. Blair
2022-02-24 11:57:25 -08:00
parent 61822ec737
commit 098442b95b
4 changed files with 160 additions and 197 deletions

View File

@ -53,7 +53,7 @@ from tests.base import (
from zuul.zk.zkobject import ShardedZKObject, ZKObject, ZKContext
from zuul.zk.locks import tenant_write_lock
from kazoo.exceptions import KazooException, ZookeeperError
from kazoo.exceptions import ZookeeperError, OperationTimeoutError
class ZooKeeperBaseTestCase(BaseTestCase):
@ -1603,13 +1603,13 @@ class TestZKObject(ZooKeeperBaseTestCase):
def delete(self, *args, **kw):
self.count += 1
if self.count < 2:
raise KazooException()
raise OperationTimeoutError()
return self._real_client.delete(*args, **kw)
def set(self, *args, **kw):
self.count += 1
if self.count < 2:
raise KazooException()
raise OperationTimeoutError()
return self._real_client.set(*args, **kw)
# Fail an update

View File

@ -31,7 +31,7 @@ import textwrap
import types
import itertools
from kazoo.exceptions import NodeExistsError, NoNodeError, ZookeeperError
from kazoo.exceptions import NodeExistsError, NoNodeError
from cachetools.func import lru_cache
from zuul.lib import yamlutil as yaml
@ -801,24 +801,8 @@ class PipelineChangeList(zkobject.ShardedZKObject):
)
def refresh(self, context):
# See comment above about reading without a lock.
retry_count = 0
max_retries = 5
while context.sessionIsValid():
try:
super().refresh(context)
return
except ZookeeperError:
# These errors come from the server and are not
# retryable. Connection errors are KazooExceptions so
# they aren't caught here and we will retry.
raise
except Exception:
context.log.error("Failed to refresh change list")
if retry_count >= max_retries:
raise
retry_count += 1
time.sleep(self._retry_interval)
self._retry(context, super().refresh,
context, max_tries=5)
def getPath(self):
return self.getChangeListPath(self.pipeline)

View File

@ -29,7 +29,3 @@ class NoClientException(ZuulZooKeeperException):
class JobRequestNotFound(ZuulZooKeeperException):
pass
class InvalidObjectError(ZuulZooKeeperException):
pass

View File

@ -18,11 +18,10 @@ import time
import types
import zlib
from kazoo.exceptions import (
KazooException, NodeExistsError, NoNodeError, ZookeeperError)
from kazoo.exceptions import NodeExistsError, NoNodeError
from kazoo.retry import KazooRetry
from zuul.zk import sharding
from zuul.zk.exceptions import InvalidObjectError
class ZKContext:
@ -44,6 +43,9 @@ class ZKContext:
return ((not self.lock or self.lock.is_still_valid()) and
(not self.stop_event or not self.stop_event.is_set()))
def sessionIsInvalid(self):
return not self.sessionIsValid()
class LocalZKContext:
"""A Local ZKContext that means don't actually write anything to ZK"""
@ -57,6 +59,9 @@ class LocalZKContext:
def sessionIsValid(self):
return True
def sessionIsInvalid(self):
return False
class ZKObject:
_retry_interval = 5
@ -175,22 +180,16 @@ class ZKObject:
def delete(self, context):
path = self.getPath()
while context.sessionIsValid():
try:
context.client.delete(path, recursive=True)
return
except ZookeeperError:
# These errors come from the server and are not
# retryable. Connection errors are KazooExceptions so
# they aren't caught here and we will retry.
context.log.error(
"Exception deleting ZKObject %s at %s", self, path)
raise
except KazooException:
context.log.exception(
"Exception deleting ZKObject %s, will retry", self)
time.sleep(self._retry_interval)
raise Exception("ZooKeeper session or lock not valid")
if context.sessionIsInvalid():
raise Exception("ZooKeeper session or lock not valid")
try:
self._retry(context, context.client.delete,
path, recursive=True)
return
except Exception:
context.log.error(
"Exception deleting ZKObject %s at %s", self, path)
raise
def estimateDataSize(self, seen=None):
"""Attempt to find all ZKObjects below this one and sum their
@ -234,96 +233,98 @@ class ZKObject:
# Private methods below
def _retry(self, context, func, *args, max_tries=-1, **kw):
kazoo_retry = KazooRetry(max_tries=max_tries,
interrupt=context.sessionIsInvalid,
delay=self._retry_interval, backoff=0,
ignore_expire=False)
try:
return kazoo_retry(func, *args, **kw)
except InterruptedError:
pass
def __init__(self):
# Don't support any arguments in constructor to force us to go
# through a save or restore path.
super().__init__()
self._set(_active_context=None)
@staticmethod
def _retryableLoad(context, path):
start = time.perf_counter()
compressed_data, zstat = context.client.get(path)
context.cumulative_read_time += time.perf_counter() - start
context.cumulative_read_objects += 1
context.cumulative_read_znodes += 1
context.cumulative_read_bytes += len(compressed_data)
return compressed_data, zstat
def _load(self, context, path=None):
if path is None:
path = self.getPath()
while context.sessionIsValid():
try:
start = time.perf_counter()
compressed_data, zstat = context.client.get(path)
context.cumulative_read_time += time.perf_counter() - start
context.cumulative_read_objects += 1
context.cumulative_read_znodes += 1
context.cumulative_read_bytes += len(compressed_data)
if context.sessionIsInvalid():
raise Exception("ZooKeeper session or lock not valid")
try:
compressed_data, zstat = self._retry(context, self._retryableLoad,
context, path)
except Exception:
context.log.error(
"Exception loading ZKObject %s at %s", self, path)
raise
self._set(_zkobject_hash=None)
try:
data = zlib.decompress(compressed_data)
except zlib.error:
# Fallback for old, uncompressed data
data = compressed_data
self._set(**self.deserialize(data, context))
self._set(_zstat=zstat,
_zkobject_hash=hash(data),
_zkobject_compressed_size=len(compressed_data),
_zkobject_uncompressed_size=len(data),
)
self._set(_zkobject_hash=None)
try:
data = zlib.decompress(compressed_data)
except zlib.error:
# Fallback for old, uncompressed data
data = compressed_data
self._set(**self.deserialize(data, context))
self._set(_zstat=zstat,
_zkobject_hash=hash(data),
_zkobject_compressed_size=len(compressed_data),
_zkobject_uncompressed_size=len(data),
)
return
except ZookeeperError:
# These errors come from the server and are not
# retryable. Connection errors are KazooExceptions so
# they aren't caught here and we will retry.
context.log.error(
"Exception loading ZKObject %s at %s", self, path)
raise
except KazooException:
context.log.exception(
"Exception loading ZKObject %s at %s, will retry",
self, path)
time.sleep(5)
except Exception:
# A higher level must handle this exception, but log
# ourself here so we know what object triggered it.
context.log.error(
"Exception loading ZKObject %s at %s", self, path)
raise
raise Exception("ZooKeeper session or lock not valid")
@staticmethod
def _retryableSave(context, create, path, compressed_data, version):
start = time.perf_counter()
if create:
real_path, zstat = context.client.create(
path, compressed_data, makepath=True,
include_data=True)
else:
zstat = context.client.set(path, compressed_data,
version=version)
context.cumulative_write_time += time.perf_counter() - start
context.cumulative_write_objects += 1
context.cumulative_write_znodes += 1
context.cumulative_write_bytes += len(compressed_data)
return zstat
def _save(self, context, data, create=False):
if isinstance(context, LocalZKContext):
return
path = self.getPath()
while context.sessionIsValid():
try:
compressed_data = zlib.compress(data)
start = time.perf_counter()
if create:
real_path, zstat = context.client.create(
path, compressed_data, makepath=True,
include_data=True)
else:
zstat = context.client.set(path, compressed_data,
version=self._zstat.version)
context.cumulative_write_time += time.perf_counter() - start
context.cumulative_write_objects += 1
context.cumulative_write_znodes += 1
context.cumulative_write_bytes += len(compressed_data)
if context.sessionIsInvalid():
raise Exception("ZooKeeper session or lock not valid")
compressed_data = zlib.compress(data)
self._set(_zstat=zstat,
_zkobject_hash=hash(data),
_zkobject_compressed_size=len(compressed_data),
_zkobject_uncompressed_size=len(data),
)
return
except ZookeeperError:
# These errors come from the server and are not
# retryable. Connection errors are KazooExceptions so
# they aren't caught here and we will retry.
context.log.error(
"Exception saving ZKObject %s at %s", self, path)
raise
except KazooException:
context.log.exception(
"Exception saving ZKObject %s at %s, will retry",
self, path)
time.sleep(self._retry_interval)
raise Exception("ZooKeeper session or lock not valid")
try:
if hasattr(self, '_zstat'):
version = self._zstat.version
else:
version = None
zstat = self._retry(context, self._retryableSave,
context, create, path, compressed_data,
version)
except Exception:
context.log.error(
"Exception saving ZKObject %s at %s", self, path)
raise
self._set(_zstat=zstat,
_zkobject_hash=hash(data),
_zkobject_compressed_size=len(compressed_data),
_zkobject_uncompressed_size=len(data),
)
def __setattr__(self, name, value):
if self._active_context:
@ -346,91 +347,73 @@ class ShardedZKObject(ZKObject):
# expected. Don't delete them in that case.
delete_on_error = True
@staticmethod
def _retryableLoad(context, path):
with sharding.BufferedShardReader(context.client, path) as stream:
data = stream.read()
compressed_size = stream.compressed_bytes_read
context.cumulative_read_time += stream.cumulative_read_time
context.cumulative_read_objects += 1
context.cumulative_read_znodes += stream.znodes_read
context.cumulative_read_bytes += compressed_size
if not data and context.client.exists(path) is None:
raise NoNodeError
return data, compressed_size
def _load(self, context, path=None):
if path is None:
path = self.getPath()
while context.sessionIsValid():
try:
self._set(_zkobject_hash=None)
with sharding.BufferedShardReader(
context.client, path) as stream:
data = stream.read()
compressed_size = stream.compressed_bytes_read
context.cumulative_read_time += \
stream.cumulative_read_time
context.cumulative_read_objects += 1
context.cumulative_read_znodes += \
stream.znodes_read
context.cumulative_read_bytes += compressed_size
if context.sessionIsInvalid():
raise Exception("ZooKeeper session or lock not valid")
try:
self._set(_zkobject_hash=None)
data, compressed_size = self._retry(context, self._retryableLoad,
context, path)
self._set(**self.deserialize(data, context))
self._set(_zkobject_hash=hash(data),
_zkobject_compressed_size=compressed_size,
_zkobject_uncompressed_size=len(data),
)
except Exception:
# A higher level must handle this exception, but log
# ourself here so we know what object triggered it.
context.log.error(
"Exception loading ZKObject %s at %s", self, path)
if self.delete_on_error:
self.delete(context)
raise
if not data and context.client.exists(path) is None:
raise NoNodeError
self._set(**self.deserialize(data, context))
self._set(_zkobject_hash=hash(data),
_zkobject_compressed_size=compressed_size,
_zkobject_uncompressed_size=len(data),
)
return
except ZookeeperError:
# These errors come from the server and are not
# retryable. Connection errors are KazooExceptions so
# they aren't caught here and we will retry.
context.log.error(
"Exception loading ZKObject %s at %s", self, path)
raise
except KazooException:
context.log.exception(
"Exception loading ZKObject %s at %s, will retry",
self, path)
time.sleep(5)
except Exception as exc:
# A higher level must handle this exception, but log
# ourself here so we know what object triggered it.
context.log.error(
"Exception loading ZKObject %s at %s", self, path)
if self.delete_on_error:
self.delete(context)
raise InvalidObjectError from exc
raise Exception("ZooKeeper session or lock not valid")
@staticmethod
def _retryableSave(context, path, data):
with sharding.BufferedShardWriter(context.client, path) as stream:
stream.truncate(0)
stream.write(data)
stream.flush()
compressed_size = stream.compressed_bytes_written
context.cumulative_write_time += stream.cumulative_write_time
context.cumulative_write_objects += 1
context.cumulative_write_znodes += stream.znodes_written
context.cumulative_write_bytes += compressed_size
return compressed_size
def _save(self, context, data, create=False):
if isinstance(context, LocalZKContext):
return
path = self.getPath()
while context.sessionIsValid():
try:
if (create and
not self.truncate_on_create and
context.client.exists(path) is not None):
if context.sessionIsInvalid():
raise Exception("ZooKeeper session or lock not valid")
try:
if create and not self.truncate_on_create:
exists = self._retry(context, context.client.exists, path)
if exists is not None:
raise NodeExistsError
with sharding.BufferedShardWriter(
context.client, path) as stream:
stream.truncate(0)
stream.write(data)
stream.flush()
compressed_size = stream.compressed_bytes_written
context.cumulative_write_time += \
stream.cumulative_write_time
context.cumulative_write_objects += 1
context.cumulative_write_znodes += \
stream.znodes_written
context.cumulative_write_bytes += compressed_size
self._set(_zkobject_hash=hash(data),
_zkobject_compressed_size=compressed_size,
_zkobject_uncompressed_size=len(data),
)
return
except ZookeeperError:
# These errors come from the server and are not
# retryable. Connection errors are KazooExceptions so
# they aren't caught here and we will retry.
context.log.error(
"Exception saving ZKObject %s at %s", self, path)
raise
except KazooException:
context.log.exception(
"Exception saving ZKObject %s at %s, will retry",
self, path)
time.sleep(self._retry_interval)
raise Exception("ZooKeeper session or lock not valid")
compressed_size = self._retry(context, self._retryableSave,
context, path, data)
self._set(_zkobject_hash=hash(data),
_zkobject_compressed_size=compressed_size,
_zkobject_uncompressed_size=len(data),
)
except Exception:
context.log.error(
"Exception saving ZKObject %s at %s", self, path)
raise