From 54964b0386e8bea1116521f91c54b9743aba4d83 Mon Sep 17 00:00:00 2001 From: Simon Westphahl Date: Wed, 12 Mar 2025 12:21:52 +0100 Subject: [PATCH] Allow creating (sharded) ZKObjects w/o makepath So far ZKObject and its sharded equivalent also created the parent path in case it did not exist. Since we are sometimes working with cached data, the information we have could be stale. If we now want to create ZKObjects as subnodes of existing nodes we could cause problems when e.g. the parent node was deleted in the meantime and we re-create as part of the sub-ZKObject without any content. To prevent this data race, we make this behavior configurable per ZKObject by introducing a `makepath` attribute which is set to True by default. Change-Id: Ib17e40cd5d664bfc625d83c742d76dabd2dd7a8c --- tests/unit/test_zk.py | 20 ++++++++++++++++++++ zuul/zk/sharding.py | 39 ++++++++++++++++++++++++++------------- zuul/zk/zkobject.py | 11 ++++++----- 3 files changed, 52 insertions(+), 18 deletions(-) diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py index f1d2d76ea3..d4c1c89c97 100644 --- a/tests/unit/test_zk.py +++ b/tests/unit/test_zk.py @@ -237,6 +237,14 @@ class TestSharding(ZooKeeperBaseTestCase): ) as shard_io: self.assertDictEqual(json.load(shard_io), data) + def test_no_makepath(self): + with testtools.ExpectedException(NoNodeError): + with BufferedShardWriter( + self.zk_client.client, "/test/shards", + makepath=False + ) as shard_writer: + shard_writer.write(b"foobar") + def _test_write_old_read_new(self, shard_count): # Write shards in the old format where each shard is # compressed individually @@ -1859,6 +1867,10 @@ class DummyZKObject(DummyZKObjectMixin, ZKObject): pass +class DummyNoMakepathZKObject(DummyZKObjectMixin, ZKObject): + makepath = False + + class DummyShardedZKObject(DummyZKObjectMixin, ShardedZKObject): pass @@ -1991,6 +2003,9 @@ class TestZKObject(ZooKeeperBaseTestCase): def exists(self, *args, **kw): return self._real_client.exists(*args, **kw) + def ensure_path(self, *args, **kw): + return self._real_client.ensure_path(*args, **kw) + # Fail an update with (tenant_write_lock(self.zk_client, tenant_name) as lock, ZKContext( @@ -2061,6 +2076,11 @@ class TestZKObject(ZooKeeperBaseTestCase): def test_zk_object(self): self._test_zk_object(DummyZKObject) + def test_zk_object_no_makepath(self): + with testtools.ExpectedException(NoNodeError): + with ZKContext(self.zk_client, None, None, self.log) as context: + DummyNoMakepathZKObject.new(context, name="tenant", foo="bar") + def test_sharded_zk_object(self): self._test_zk_object(DummyShardedZKObject) diff --git a/zuul/zk/sharding.py b/zuul/zk/sharding.py index de960ebdfa..6ca31b1d03 100644 --- a/zuul/zk/sharding.py +++ b/zuul/zk/sharding.py @@ -29,7 +29,7 @@ NODE_BYTE_SIZE_LIMIT = 1000000 class RawZKIO(io.RawIOBase): - def __init__(self, client, path, create=False, version=-1): + def __init__(self, client, path, create=False, makepath=True, version=-1): self.client = client self.path = path self.bytes_read = 0 @@ -39,6 +39,7 @@ class RawZKIO(io.RawIOBase): self.znodes_read = 0 self.znodes_written = 0 self.create = create + self.makepath = makepath self.version = version self.zstat = None @@ -72,7 +73,7 @@ class RawZKIO(io.RawIOBase): start = time.perf_counter() if self.create: _, self.zstat = self.client.create( - self.path, data, makepath=True, include_data=True) + self.path, data, makepath=self.makepath, include_data=True) else: self.zstat = self.client.set(self.path, data, version=self.version) @@ -83,10 +84,11 @@ class RawZKIO(io.RawIOBase): class RawShardIO(RawZKIO): - def __init__(self, *args, old_format=False, **kw): + def __init__(self, *args, old_format=False, makepath=True, **kw): # MODEL_API < 31 self.old_format = old_format - super().__init__(*args, **kw) + self.makepath = makepath + super().__init__(*args, makepath=makepath, **kw) def truncate(self, size=None): if size != 0: @@ -161,12 +163,23 @@ class RawShardIO(RawZKIO): if not (len(data_bytes) <= NODE_BYTE_SIZE_LIMIT): raise RuntimeError("Shard too large") start = time.perf_counter() - self.client.create( - "{}/".format(self.path), - data_bytes, - sequence=True, - makepath=True, - ) + # The path we pass to a shard writer is e.g. '/foo/bar'. Now, + # for shards the makepath argument should only apply to '/foo' + # but not the 'bar' subnode as it holds the individual shards + # and will also be deleted recursively on e.g. a truncate. + while True: + try: + self.client.create( + "{}/".format(self.path), + data_bytes, + sequence=True, + ) + break + except NoNodeError: + if not self.makepath: + raise + self.client.ensure_path(self.path) + self.cumulative_write_time += time.perf_counter() - start self.bytes_written += len(data_bytes) self.znodes_written += 1 @@ -220,10 +233,10 @@ class BufferedZKReader(io.BufferedReader): class BufferedShardWriter(io.BufferedWriter): - def __init__(self, client, path, create=False, version=-1): + def __init__(self, client, path, create=False, makepath=True, version=-1): self.__old_format = COMPONENT_REGISTRY.model_api < 31 - self.__raw = RawShardIO(client, path, create=create, version=version, - old_format=self.__old_format) + self.__raw = RawShardIO(client, path, create=create, makepath=makepath, + version=version, old_format=self.__old_format) super().__init__(self.__raw, NODE_BYTE_SIZE_LIMIT) @property diff --git a/zuul/zk/zkobject.py b/zuul/zk/zkobject.py index 9adb39d65b..6fa76ce838 100644 --- a/zuul/zk/zkobject.py +++ b/zuul/zk/zkobject.py @@ -155,6 +155,7 @@ class ZKObject: io_writer_class = sharding.RawZKIO truncate_on_create = False delete_on_error = False + makepath = True # Implementations of these two methods are required def getPath(self): @@ -442,11 +443,11 @@ class ZKObject: return raw_data @staticmethod - def _retryableSave(io_class, context, create, path, data, + def _retryableSave(io_class, context, create, makepath, path, data, version): zstat = None - with io_class(context.client, path, create=create, version=version - ) as stream: + with io_class(context.client, path, create=create, makepath=makepath, + version=version) as stream: stream.truncate(0) stream.write(data) stream.flush() @@ -477,8 +478,8 @@ class ZKObject: else: version = -1 zstat = self._retry(context, self._retryableSave, - self.io_writer_class, context, create, path, - compressed_data, version) + self.io_writer_class, context, create, + self.makepath, path, compressed_data, version) context.profileEvent('set', path) except Exception: context.log.error(