diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py index f1d2d76ea3..d4c1c89c97 100644 --- a/tests/unit/test_zk.py +++ b/tests/unit/test_zk.py @@ -237,6 +237,14 @@ class TestSharding(ZooKeeperBaseTestCase): ) as shard_io: self.assertDictEqual(json.load(shard_io), data) + def test_no_makepath(self): + with testtools.ExpectedException(NoNodeError): + with BufferedShardWriter( + self.zk_client.client, "/test/shards", + makepath=False + ) as shard_writer: + shard_writer.write(b"foobar") + def _test_write_old_read_new(self, shard_count): # Write shards in the old format where each shard is # compressed individually @@ -1859,6 +1867,10 @@ class DummyZKObject(DummyZKObjectMixin, ZKObject): pass +class DummyNoMakepathZKObject(DummyZKObjectMixin, ZKObject): + makepath = False + + class DummyShardedZKObject(DummyZKObjectMixin, ShardedZKObject): pass @@ -1991,6 +2003,9 @@ class TestZKObject(ZooKeeperBaseTestCase): def exists(self, *args, **kw): return self._real_client.exists(*args, **kw) + def ensure_path(self, *args, **kw): + return self._real_client.ensure_path(*args, **kw) + # Fail an update with (tenant_write_lock(self.zk_client, tenant_name) as lock, ZKContext( @@ -2061,6 +2076,11 @@ class TestZKObject(ZooKeeperBaseTestCase): def test_zk_object(self): self._test_zk_object(DummyZKObject) + def test_zk_object_no_makepath(self): + with testtools.ExpectedException(NoNodeError): + with ZKContext(self.zk_client, None, None, self.log) as context: + DummyNoMakepathZKObject.new(context, name="tenant", foo="bar") + def test_sharded_zk_object(self): self._test_zk_object(DummyShardedZKObject) diff --git a/zuul/zk/sharding.py b/zuul/zk/sharding.py index de960ebdfa..6ca31b1d03 100644 --- a/zuul/zk/sharding.py +++ b/zuul/zk/sharding.py @@ -29,7 +29,7 @@ NODE_BYTE_SIZE_LIMIT = 1000000 class RawZKIO(io.RawIOBase): - def __init__(self, client, path, create=False, version=-1): + def __init__(self, client, path, create=False, makepath=True, version=-1): self.client = client self.path = path self.bytes_read = 0 @@ -39,6 +39,7 @@ class RawZKIO(io.RawIOBase): self.znodes_read = 0 self.znodes_written = 0 self.create = create + self.makepath = makepath self.version = version self.zstat = None @@ -72,7 +73,7 @@ class RawZKIO(io.RawIOBase): start = time.perf_counter() if self.create: _, self.zstat = self.client.create( - self.path, data, makepath=True, include_data=True) + self.path, data, makepath=self.makepath, include_data=True) else: self.zstat = self.client.set(self.path, data, version=self.version) @@ -83,10 +84,11 @@ class RawZKIO(io.RawIOBase): class RawShardIO(RawZKIO): - def __init__(self, *args, old_format=False, **kw): + def __init__(self, *args, old_format=False, makepath=True, **kw): # MODEL_API < 31 self.old_format = old_format - super().__init__(*args, **kw) + self.makepath = makepath + super().__init__(*args, makepath=makepath, **kw) def truncate(self, size=None): if size != 0: @@ -161,12 +163,23 @@ class RawShardIO(RawZKIO): if not (len(data_bytes) <= NODE_BYTE_SIZE_LIMIT): raise RuntimeError("Shard too large") start = time.perf_counter() - self.client.create( - "{}/".format(self.path), - data_bytes, - sequence=True, - makepath=True, - ) + # The path we pass to a shard writer is e.g. '/foo/bar'. Now, + # for shards the makepath argument should only apply to '/foo' + # but not the 'bar' subnode as it holds the individual shards + # and will also be deleted recursively on e.g. a truncate. + while True: + try: + self.client.create( + "{}/".format(self.path), + data_bytes, + sequence=True, + ) + break + except NoNodeError: + if not self.makepath: + raise + self.client.ensure_path(self.path) + self.cumulative_write_time += time.perf_counter() - start self.bytes_written += len(data_bytes) self.znodes_written += 1 @@ -220,10 +233,10 @@ class BufferedZKReader(io.BufferedReader): class BufferedShardWriter(io.BufferedWriter): - def __init__(self, client, path, create=False, version=-1): + def __init__(self, client, path, create=False, makepath=True, version=-1): self.__old_format = COMPONENT_REGISTRY.model_api < 31 - self.__raw = RawShardIO(client, path, create=create, version=version, - old_format=self.__old_format) + self.__raw = RawShardIO(client, path, create=create, makepath=makepath, + version=version, old_format=self.__old_format) super().__init__(self.__raw, NODE_BYTE_SIZE_LIMIT) @property diff --git a/zuul/zk/zkobject.py b/zuul/zk/zkobject.py index 9adb39d65b..6fa76ce838 100644 --- a/zuul/zk/zkobject.py +++ b/zuul/zk/zkobject.py @@ -155,6 +155,7 @@ class ZKObject: io_writer_class = sharding.RawZKIO truncate_on_create = False delete_on_error = False + makepath = True # Implementations of these two methods are required def getPath(self): @@ -442,11 +443,11 @@ class ZKObject: return raw_data @staticmethod - def _retryableSave(io_class, context, create, path, data, + def _retryableSave(io_class, context, create, makepath, path, data, version): zstat = None - with io_class(context.client, path, create=create, version=version - ) as stream: + with io_class(context.client, path, create=create, makepath=makepath, + version=version) as stream: stream.truncate(0) stream.write(data) stream.flush() @@ -477,8 +478,8 @@ class ZKObject: else: version = -1 zstat = self._retry(context, self._retryableSave, - self.io_writer_class, context, create, path, - compressed_data, version) + self.io_writer_class, context, create, + self.makepath, path, compressed_data, version) context.profileEvent('set', path) except Exception: context.log.error(