Allow creating (sharded) ZKObjects w/o makepath
So far ZKObject and its sharded equivalent also created the parent path in case it did not exist. Since we are sometimes working with cached data, the information we have could be stale. If we now want to create ZKObjects as subnodes of existing nodes we could cause problems when e.g. the parent node was deleted in the meantime and we re-create as part of the sub-ZKObject without any content. To prevent this data race, we make this behavior configurable per ZKObject by introducing a `makepath` attribute which is set to True by default. Change-Id: Ib17e40cd5d664bfc625d83c742d76dabd2dd7a8c
This commit is contained in:
@@ -237,6 +237,14 @@ class TestSharding(ZooKeeperBaseTestCase):
|
||||
) as shard_io:
|
||||
self.assertDictEqual(json.load(shard_io), data)
|
||||
|
||||
def test_no_makepath(self):
|
||||
with testtools.ExpectedException(NoNodeError):
|
||||
with BufferedShardWriter(
|
||||
self.zk_client.client, "/test/shards",
|
||||
makepath=False
|
||||
) as shard_writer:
|
||||
shard_writer.write(b"foobar")
|
||||
|
||||
def _test_write_old_read_new(self, shard_count):
|
||||
# Write shards in the old format where each shard is
|
||||
# compressed individually
|
||||
@@ -1859,6 +1867,10 @@ class DummyZKObject(DummyZKObjectMixin, ZKObject):
|
||||
pass
|
||||
|
||||
|
||||
class DummyNoMakepathZKObject(DummyZKObjectMixin, ZKObject):
|
||||
makepath = False
|
||||
|
||||
|
||||
class DummyShardedZKObject(DummyZKObjectMixin, ShardedZKObject):
|
||||
pass
|
||||
|
||||
@@ -1991,6 +2003,9 @@ class TestZKObject(ZooKeeperBaseTestCase):
|
||||
def exists(self, *args, **kw):
|
||||
return self._real_client.exists(*args, **kw)
|
||||
|
||||
def ensure_path(self, *args, **kw):
|
||||
return self._real_client.ensure_path(*args, **kw)
|
||||
|
||||
# Fail an update
|
||||
with (tenant_write_lock(self.zk_client, tenant_name) as lock,
|
||||
ZKContext(
|
||||
@@ -2061,6 +2076,11 @@ class TestZKObject(ZooKeeperBaseTestCase):
|
||||
def test_zk_object(self):
|
||||
self._test_zk_object(DummyZKObject)
|
||||
|
||||
def test_zk_object_no_makepath(self):
|
||||
with testtools.ExpectedException(NoNodeError):
|
||||
with ZKContext(self.zk_client, None, None, self.log) as context:
|
||||
DummyNoMakepathZKObject.new(context, name="tenant", foo="bar")
|
||||
|
||||
def test_sharded_zk_object(self):
|
||||
self._test_zk_object(DummyShardedZKObject)
|
||||
|
||||
|
||||
@@ -29,7 +29,7 @@ NODE_BYTE_SIZE_LIMIT = 1000000
|
||||
|
||||
|
||||
class RawZKIO(io.RawIOBase):
|
||||
def __init__(self, client, path, create=False, version=-1):
|
||||
def __init__(self, client, path, create=False, makepath=True, version=-1):
|
||||
self.client = client
|
||||
self.path = path
|
||||
self.bytes_read = 0
|
||||
@@ -39,6 +39,7 @@ class RawZKIO(io.RawIOBase):
|
||||
self.znodes_read = 0
|
||||
self.znodes_written = 0
|
||||
self.create = create
|
||||
self.makepath = makepath
|
||||
self.version = version
|
||||
self.zstat = None
|
||||
|
||||
@@ -72,7 +73,7 @@ class RawZKIO(io.RawIOBase):
|
||||
start = time.perf_counter()
|
||||
if self.create:
|
||||
_, self.zstat = self.client.create(
|
||||
self.path, data, makepath=True, include_data=True)
|
||||
self.path, data, makepath=self.makepath, include_data=True)
|
||||
else:
|
||||
self.zstat = self.client.set(self.path, data,
|
||||
version=self.version)
|
||||
@@ -83,10 +84,11 @@ class RawZKIO(io.RawIOBase):
|
||||
|
||||
|
||||
class RawShardIO(RawZKIO):
|
||||
def __init__(self, *args, old_format=False, **kw):
|
||||
def __init__(self, *args, old_format=False, makepath=True, **kw):
|
||||
# MODEL_API < 31
|
||||
self.old_format = old_format
|
||||
super().__init__(*args, **kw)
|
||||
self.makepath = makepath
|
||||
super().__init__(*args, makepath=makepath, **kw)
|
||||
|
||||
def truncate(self, size=None):
|
||||
if size != 0:
|
||||
@@ -161,12 +163,23 @@ class RawShardIO(RawZKIO):
|
||||
if not (len(data_bytes) <= NODE_BYTE_SIZE_LIMIT):
|
||||
raise RuntimeError("Shard too large")
|
||||
start = time.perf_counter()
|
||||
self.client.create(
|
||||
"{}/".format(self.path),
|
||||
data_bytes,
|
||||
sequence=True,
|
||||
makepath=True,
|
||||
)
|
||||
# The path we pass to a shard writer is e.g. '/foo/bar'. Now,
|
||||
# for shards the makepath argument should only apply to '/foo'
|
||||
# but not the 'bar' subnode as it holds the individual shards
|
||||
# and will also be deleted recursively on e.g. a truncate.
|
||||
while True:
|
||||
try:
|
||||
self.client.create(
|
||||
"{}/".format(self.path),
|
||||
data_bytes,
|
||||
sequence=True,
|
||||
)
|
||||
break
|
||||
except NoNodeError:
|
||||
if not self.makepath:
|
||||
raise
|
||||
self.client.ensure_path(self.path)
|
||||
|
||||
self.cumulative_write_time += time.perf_counter() - start
|
||||
self.bytes_written += len(data_bytes)
|
||||
self.znodes_written += 1
|
||||
@@ -220,10 +233,10 @@ class BufferedZKReader(io.BufferedReader):
|
||||
|
||||
|
||||
class BufferedShardWriter(io.BufferedWriter):
|
||||
def __init__(self, client, path, create=False, version=-1):
|
||||
def __init__(self, client, path, create=False, makepath=True, version=-1):
|
||||
self.__old_format = COMPONENT_REGISTRY.model_api < 31
|
||||
self.__raw = RawShardIO(client, path, create=create, version=version,
|
||||
old_format=self.__old_format)
|
||||
self.__raw = RawShardIO(client, path, create=create, makepath=makepath,
|
||||
version=version, old_format=self.__old_format)
|
||||
super().__init__(self.__raw, NODE_BYTE_SIZE_LIMIT)
|
||||
|
||||
@property
|
||||
|
||||
@@ -155,6 +155,7 @@ class ZKObject:
|
||||
io_writer_class = sharding.RawZKIO
|
||||
truncate_on_create = False
|
||||
delete_on_error = False
|
||||
makepath = True
|
||||
|
||||
# Implementations of these two methods are required
|
||||
def getPath(self):
|
||||
@@ -442,11 +443,11 @@ class ZKObject:
|
||||
return raw_data
|
||||
|
||||
@staticmethod
|
||||
def _retryableSave(io_class, context, create, path, data,
|
||||
def _retryableSave(io_class, context, create, makepath, path, data,
|
||||
version):
|
||||
zstat = None
|
||||
with io_class(context.client, path, create=create, version=version
|
||||
) as stream:
|
||||
with io_class(context.client, path, create=create, makepath=makepath,
|
||||
version=version) as stream:
|
||||
stream.truncate(0)
|
||||
stream.write(data)
|
||||
stream.flush()
|
||||
@@ -477,8 +478,8 @@ class ZKObject:
|
||||
else:
|
||||
version = -1
|
||||
zstat = self._retry(context, self._retryableSave,
|
||||
self.io_writer_class, context, create, path,
|
||||
compressed_data, version)
|
||||
self.io_writer_class, context, create,
|
||||
self.makepath, path, compressed_data, version)
|
||||
context.profileEvent('set', path)
|
||||
except Exception:
|
||||
context.log.error(
|
||||
|
||||
Reference in New Issue
Block a user