Always store zstat for sharded zkobjects
Traditionally a sharded zkobject did not store a zstat because it isn't represented by a single object. But some users or subclasses do use a zstat of a sharded zkobject. They rely on the fact that we always truncate the contents of the object before writing, and truncating means removing the base znode. That means that each time we write, we will create a new base znode, and that is sufficient to act as a zstat for the whole object. This change normalizes this and implements it in the ShardedZKObject class, and removes some now duplicated code. Change-Id: I4b973dcfb1175e882f7e3c33d61867424dd3f654
This commit is contained in:
parent
cce82a820c
commit
cb3942da81
@ -1786,6 +1786,9 @@ class TestZKObject(ZooKeeperBaseTestCase):
|
||||
raise OperationTimeoutError()
|
||||
return self._real_client.set(*args, **kw)
|
||||
|
||||
def exists(self, *args, **kw):
|
||||
return self._real_client.exists(*args, **kw)
|
||||
|
||||
# Fail an update
|
||||
with (tenant_write_lock(self.zk_client, tenant_name) as lock,
|
||||
ZKContext(
|
||||
|
@ -289,16 +289,6 @@ class BranchCacheZKObject(ShardedZKObject):
|
||||
data.clear()
|
||||
data['projects'] = projects
|
||||
|
||||
def _save(self, context, data, create=False):
|
||||
super()._save(context, data, create)
|
||||
zstat = context.client.exists(self.getPath())
|
||||
self._set(_zstat=zstat)
|
||||
|
||||
def _load(self, context, path=None):
|
||||
super()._load(context, path)
|
||||
zstat = context.client.exists(self.getPath())
|
||||
self._set(_zstat=zstat)
|
||||
|
||||
|
||||
class BranchCache:
|
||||
def __init__(self, zk_client, connection, component_registry):
|
||||
|
@ -114,6 +114,7 @@ class FilesCache(ZooKeeperSimpleBase, MutableMapping):
|
||||
with sharding.BufferedShardWriter(self.kazoo_client, path) as stream:
|
||||
stream.truncate(0)
|
||||
stream.write(zlib.compress(value.encode("utf8")))
|
||||
stream.flush()
|
||||
|
||||
def __delitem__(self, key):
|
||||
try:
|
||||
@ -219,9 +220,9 @@ class SystemConfigCache(ZooKeeperSimpleBase):
|
||||
self.kazoo_client, self.conf_path
|
||||
) as stream:
|
||||
data = json.loads(zlib.decompress(stream.read()))
|
||||
zstat = stream.zstat
|
||||
except Exception:
|
||||
raise RuntimeError("No valid system config")
|
||||
zstat = self.kazoo_client.exists(self.conf_path)
|
||||
return (model.UnparsedAbideConfig.fromDict(
|
||||
data["unparsed_abide"],
|
||||
ltime=zstat.last_modified_transaction_id
|
||||
@ -239,5 +240,6 @@ class SystemConfigCache(ZooKeeperSimpleBase):
|
||||
stream.truncate(0)
|
||||
stream.write(zlib.compress(
|
||||
json.dumps(data, sort_keys=True).encode("utf8")))
|
||||
zstat = self.kazoo_client.exists(self.conf_path)
|
||||
stream.flush()
|
||||
zstat = stream.zstat
|
||||
unparsed_abide.ltime = zstat.last_modified_transaction_id
|
||||
|
@ -82,6 +82,7 @@ class RawShardIO(RawZKIO):
|
||||
raise ValueError("Can only truncate to 0")
|
||||
with suppress(NoNodeError):
|
||||
self.client.delete(self.path, recursive=True)
|
||||
self.zstat = None
|
||||
|
||||
@property
|
||||
def _shards(self):
|
||||
@ -95,6 +96,7 @@ class RawShardIO(RawZKIO):
|
||||
for shard_name in sorted(self._shards):
|
||||
shard_path = "/".join((self.path, shard_name))
|
||||
read_buffer.write(self._getData(shard_path)[0])
|
||||
self.zstat = self.client.exists(self.path)
|
||||
return read_buffer.getvalue()
|
||||
|
||||
def write(self, data):
|
||||
@ -112,6 +114,8 @@ class RawShardIO(RawZKIO):
|
||||
self.cumulative_write_time += time.perf_counter() - start
|
||||
self.bytes_written += len(data_bytes)
|
||||
self.znodes_written += 1
|
||||
if self.zstat is None:
|
||||
self.zstat = self.client.exists(self.path)
|
||||
return len(data_bytes)
|
||||
|
||||
|
||||
|
@ -405,13 +405,10 @@ class ZKObject:
|
||||
self._set(_zkobject_hash=None)
|
||||
data = self._decompressData(raw_data)
|
||||
self._set(**self.deserialize(data, context))
|
||||
self._set(_zkobject_hash=hash(data),
|
||||
self._set(_zstat=zstat,
|
||||
_zkobject_hash=hash(data),
|
||||
_zkobject_compressed_size=len(raw_data),
|
||||
_zkobject_uncompressed_size=len(data))
|
||||
if zstat is not None:
|
||||
# Traditionally, a sharded zkobject does not have a
|
||||
# _zstat
|
||||
self._set(_zstat=zstat)
|
||||
except Exception:
|
||||
if self.delete_on_error:
|
||||
self.delete(context)
|
||||
@ -472,14 +469,11 @@ class ZKObject:
|
||||
context.log.error(
|
||||
"Exception saving ZKObject %s at %s", self, path)
|
||||
raise
|
||||
self._set(_zkobject_hash=hash(data),
|
||||
self._set(_zstat=zstat,
|
||||
_zkobject_hash=hash(data),
|
||||
_zkobject_compressed_size=len(compressed_data),
|
||||
_zkobject_uncompressed_size=len(data),
|
||||
)
|
||||
if zstat is not None:
|
||||
# Traditionally, a sharded zkobject does not have a
|
||||
# _zstat
|
||||
self._set(_zstat=zstat)
|
||||
|
||||
def __setattr__(self, name, value):
|
||||
if self._active_context:
|
||||
|
Loading…
Reference in New Issue
Block a user