Use structured change cache keys

This adds a ChangeKey class which is essentially a structured universal
identifier for a change-like object (Ref, Branch, Change, PR, whatever).

We can use this in ZK objects to reference changes, and by doing so, we
can in many cases avoid actually referencing the change objects
themselves.

This also updates the actual keys in ZK to be sha256sums of the structured
key (for brevity and simplicity of encoding).

Change-Id: I6cd62973d48ad3515f6aa8a8172b9e9c19fcda55
This commit is contained in:
James E. Blair
2021-09-23 19:29:05 -07:00
parent 27b677df91
commit c4268b1b46
9 changed files with 225 additions and 113 deletions

View File

@@ -21,7 +21,11 @@ import testtools
from zuul import model
from zuul.model import BuildRequest, HoldRequest, MergeRequest
from zuul.zk import ZooKeeperClient
from zuul.zk.change_cache import AbstractChangeCache, ConcurrentUpdateError
from zuul.zk.change_cache import (
AbstractChangeCache,
ChangeKey,
ConcurrentUpdateError,
)
from zuul.zk.config_cache import SystemConfigCache, UnparsedConfigCache
from zuul.zk.exceptions import LockException
from zuul.zk.executor import ExecutorApi
@@ -1231,7 +1235,9 @@ class DummyChange:
return -1 if self.cache_stat is None else self.cache_stat.version
def serialize(self):
return self.__dict__
d = self.__dict__.copy()
d.pop('cache_stat')
return d
def deserialize(self, data):
self.__dict__.update(data)
@@ -1266,49 +1272,56 @@ class TestChangeCache(ZooKeeperBaseTestCase):
def test_insert(self):
change_foo = DummyChange("project", {"foo": "bar"})
change_bar = DummyChange("project", {"bar": "foo"})
self.cache.set("foo", change_foo)
self.cache.set("bar", change_bar)
key_foo = ChangeKey('conn', 'project', 'change', 'foo', '1')
key_bar = ChangeKey('conn', 'project', 'change', 'bar', '1')
self.cache.set(key_foo, change_foo)
self.cache.set(key_bar, change_bar)
self.assertEqual(self.cache.get("foo"), change_foo)
self.assertEqual(self.cache.get("bar"), change_bar)
self.assertEqual(self.cache.get(key_foo), change_foo)
self.assertEqual(self.cache.get(key_bar), change_bar)
def test_update(self):
change = DummyChange("project", {"foo": "bar"})
self.cache.set("foo", change)
key = ChangeKey('conn', 'project', 'change', 'foo', '1')
self.cache.set(key, change)
change.number = 123
self.cache.set("foo", change, change.cache_version)
self.cache.set(key, change, change.cache_version)
# The change instance must stay the same
updated_change = self.cache.get("foo")
updated_change = self.cache.get(key)
self.assertIs(change, updated_change)
self.assertEqual(change.number, 123)
def test_delete(self):
change = DummyChange("project", {"foo": "bar"})
self.cache.set("foo", change)
self.cache.delete("foo")
self.assertIsNone(self.cache.get("foo"))
key = ChangeKey('conn', 'project', 'change', 'foo', '1')
self.cache.set(key, change)
self.cache.delete(key)
self.assertIsNone(self.cache.get(key))
# Deleting an non-existent key should not raise an exception
self.cache.delete("invalid")
invalid_key = ChangeKey('conn', 'project', 'change', 'invalid', '1')
self.cache.delete(invalid_key)
def test_concurrent_update(self):
change = DummyChange("project", {"foo": "bar"})
self.cache.set("foo", change)
key = ChangeKey('conn', 'project', 'change', 'foo', '1')
self.cache.set(key, change)
# Attempt to update with the old change stat
with testtools.ExpectedException(ConcurrentUpdateError):
self.cache.set("foo", change, change.cache_version - 1)
self.cache.set(key, change, change.cache_version - 1)
def test_change_update_retry(self):
change = DummyChange("project", {"foobar": 0})
self.cache.set("foobar", change)
key = ChangeKey('conn', 'project', 'change', 'foo', '1')
self.cache.set(key, change)
# Update the change so we have a new cache stat.
change.foobar = 1
self.cache.set("foobar", change, change.cache_version)
self.assertEqual(self.cache.get("foobar").foobar, 1)
self.cache.set(key, change, change.cache_version)
self.assertEqual(self.cache.get(key).foobar, 1)
def updater(c):
c.foobar += 1
@@ -1320,13 +1333,12 @@ class TestChangeCache(ZooKeeperBaseTestCase):
change.cache_version - 1,
0)
updated_change = self.cache.updateChangeWithRetry(
"foobar", change, updater)
key, change, updater)
self.assertEqual(updated_change.foobar, 2)
def test_cache_sync(self):
other_cache = DummyChangeCache(self.zk_client, DummyConnection())
key = "foo foo" # Use a key that needs escaping
key = ChangeKey('conn', 'project', 'change', 'foo', '1')
change = DummyChange("project", {"foo": "bar"})
self.cache.set(key, change)
self.assertIsNotNone(other_cache.get(key))
@@ -1344,7 +1356,8 @@ class TestChangeCache(ZooKeeperBaseTestCase):
def test_cleanup(self):
change = DummyChange("project", {"foo": "bar"})
self.cache.set("foo", change)
key = ChangeKey('conn', 'project', 'change', 'foo', '1')
self.cache.set(key, change)
self.cache.cleanup()
self.assertEqual(len(self.cache._data_cleanup_candidates), 0)
@@ -1352,7 +1365,7 @@ class TestChangeCache(ZooKeeperBaseTestCase):
len(self.zk_client.client.get_children(self.cache.data_root)), 1)
change.number = 123
self.cache.set("foo", change, change.cache_version)
self.cache.set(key, change, change.cache_version)
self.cache.cleanup()
self.assertEqual(len(self.cache._data_cleanup_candidates), 1)
@@ -1366,15 +1379,16 @@ class TestChangeCache(ZooKeeperBaseTestCase):
def test_watch_cleanup(self):
change = DummyChange("project", {"foo": "bar"})
self.cache.set("foo", change)
key = ChangeKey('conn', 'project', 'change', 'foo', '1')
self.cache.set(key, change)
for _ in iterate_timeout(10, "watch to be registered"):
if change.cache_stat.key in self.cache._watched_keys:
if change.cache_stat.key._hash in self.cache._watched_keys:
break
self.cache.delete("foo")
self.assertIsNone(self.cache.get("foo"))
self.cache.delete(key)
self.assertIsNone(self.cache.get(key))
for _ in iterate_timeout(10, "watch to be removed"):
if change.cache_stat.key not in self.cache._watched_keys:
if change.cache_stat.key._hash not in self.cache._watched_keys:
break

View File

@@ -42,7 +42,11 @@ from zuul.driver.gerrit.gerritmodel import GerritChange, GerritTriggerEvent
from zuul.driver.git.gitwatcher import GitWatcher
from zuul.lib.logutil import get_annotated_logger
from zuul.model import Ref, Tag, Branch, Project
from zuul.zk.change_cache import AbstractChangeCache, ConcurrentUpdateError
from zuul.zk.change_cache import (
AbstractChangeCache,
ChangeKey,
ConcurrentUpdateError,
)
from zuul.zk.event_queues import ConnectionEventQueue, EventReceiverElection
# HTTP timeout in seconds
@@ -311,7 +315,9 @@ class GerritEventConnector(threading.Thread):
# cache as it may be a dependency
if event.change_number:
refresh = True
key = str((event.change_number, event.patch_number))
key = ChangeKey(self.connection.connection_name, None,
'GerritChange', str(event.change_number),
str(event.patch_number))
if self.connection._change_cache.get(key) is None:
refresh = False
for tenant in self.connection.sched.abide.tenants.values():
@@ -760,7 +766,8 @@ class GerritConnection(ZKChangeCacheMixin, BaseConnection):
# Ensure number and patchset are str
number = str(number)
patchset = str(patchset)
key = str((number, patchset))
key = ChangeKey(self.connection_name, None,
'GerritChange', number, patchset)
change = self._change_cache.get(key)
if change and not refresh:
return change
@@ -776,7 +783,8 @@ class GerritConnection(ZKChangeCacheMixin, BaseConnection):
def _getTag(self, event):
tag = event.ref[len('refs/tags/'):]
key = str((event.project_name, tag, event.newrev))
key = ChangeKey(self.connection_name, None,
'Tag', tag, event.newrev)
change = self._change_cache.get(key)
if change:
return change
@@ -794,7 +802,8 @@ class GerritConnection(ZKChangeCacheMixin, BaseConnection):
return change
def _getBranch(self, event, branch, ref):
key = str((event.project_name, branch, event.newrev))
key = ChangeKey(self.connection_name, None,
'Branch', branch, event.newrev)
change = self._change_cache.get(key)
if change:
return change
@@ -812,7 +821,8 @@ class GerritConnection(ZKChangeCacheMixin, BaseConnection):
return change
def _getRef(self, event):
key = str((event.project_name, event.ref, event.newrev))
key = ChangeKey(self.connection_name, None,
'Ref', event.ref, event.newrev)
change = self._change_cache.get(key)
if change:
return change
@@ -856,6 +866,7 @@ class GerritConnection(ZKChangeCacheMixin, BaseConnection):
result.message):
if match != change_id:
continue
# Note: This is not a ChangeCache ChangeKey
key = (result.number, result.current_patchset)
if key in seen:
continue
@@ -992,7 +1003,9 @@ class GerritConnection(ZKChangeCacheMixin, BaseConnection):
return True
data = self.queryChange(change.number)
key = str((change.number, change.patchset))
key = ChangeKey(self.connection_name, None,
'GerritChange', str(change.number),
str(change.patchset))
def _update_change(c):
c.update(data, self)

View File

@@ -23,7 +23,11 @@ from zuul.connection import BaseConnection, ZKChangeCacheMixin
from zuul.driver.git.gitmodel import GitTriggerEvent
from zuul.driver.git.gitwatcher import GitWatcher
from zuul.model import Ref, Branch
from zuul.zk.change_cache import AbstractChangeCache, ConcurrentUpdateError
from zuul.zk.change_cache import (
AbstractChangeCache,
ChangeKey,
ConcurrentUpdateError,
)
class GitChangeCache(AbstractChangeCache):
@@ -98,7 +102,8 @@ class GitConnection(ZKChangeCacheMixin, BaseConnection):
return refs
def getChange(self, event, refresh=False):
key = str((event.project_name, event.ref, event.newrev))
key = ChangeKey(self.connection_name, event.project_name,
'Ref', event.ref, event.newrev)
change = self._change_cache.get(key)
if change:
return change

View File

@@ -48,7 +48,11 @@ from zuul.model import Ref, Branch, Tag, Project
from zuul.exceptions import MergeFailure
from zuul.driver.github.githubmodel import PullRequest, GithubTriggerEvent
from zuul.model import DequeueEvent
from zuul.zk.change_cache import AbstractChangeCache, ConcurrentUpdateError
from zuul.zk.change_cache import (
AbstractChangeCache,
ChangeKey,
ConcurrentUpdateError,
)
from zuul.zk.event_queues import ConnectionEventQueue
GITHUB_BASE_URL = 'https://api.github.com'
@@ -1281,7 +1285,9 @@ class GithubConnection(ZKChangeCacheMixin, CachedBranchConnection):
# because it can originate from different sources (github event, manual
# enqueue event) where some might just parse the string and forward it.
number = int(number)
key = str((project.name, number, patchset))
key = ChangeKey(self.connection_name, project.name,
'PullRequest', str(number),
str(patchset))
change = self._change_cache.get(key)
if change and not refresh:
return change
@@ -1330,7 +1336,8 @@ class GithubConnection(ZKChangeCacheMixin, CachedBranchConnection):
def _getTag(self, project, event):
tag = event.ref[len('refs/tags/'):]
key = str((event.project_name, tag, event.newrev))
key = ChangeKey(self.connection_name, project.name,
'Tag', tag, event.newrev)
change = self._change_cache.get(key)
if change:
return change
@@ -1351,7 +1358,8 @@ class GithubConnection(ZKChangeCacheMixin, CachedBranchConnection):
def _getBranch(self, project, event):
branch = event.ref[len('refs/heads/'):]
key = str((event.project_name, branch, event.newrev))
key = ChangeKey(self.connection_name, project.name,
'Branch', branch, event.newrev)
change = self._change_cache.get(key)
if change:
return change
@@ -1370,7 +1378,8 @@ class GithubConnection(ZKChangeCacheMixin, CachedBranchConnection):
return change
def _getRef(self, project, event):
key = str((event.project_name, event.ref, event.newrev))
key = ChangeKey(self.connection_name, project.name,
'Ref', event.ref, event.newrev)
change = self._change_cache.get(key)
if change:
return change
@@ -1421,6 +1430,7 @@ class GithubConnection(ZKChangeCacheMixin, CachedBranchConnection):
org, proj, _, num = pr.get('url').split('/')[-4:]
proj = pr.get('base').get('repo').get('full_name')
sha = pr.get('head').get('sha')
# This is not a ChangeKey
key = (proj, num, sha)
# A single tenant could have multiple projects with the same

View File

@@ -34,7 +34,11 @@ from zuul.lib.logutil import get_annotated_logger
from zuul.exceptions import MergeFailure
from zuul.model import Branch, Project, Ref, Tag
from zuul.driver.gitlab.gitlabmodel import GitlabTriggerEvent, MergeRequest
from zuul.zk.change_cache import AbstractChangeCache, ConcurrentUpdateError
from zuul.zk.change_cache import (
AbstractChangeCache,
ChangeKey,
ConcurrentUpdateError,
)
from zuul.zk.event_queues import ConnectionEventQueue
# HTTP timeout in seconds
@@ -520,7 +524,9 @@ class GitlabConnection(ZKChangeCacheMixin, CachedBranchConnection):
def _getChange(self, project, number, patch_number=None,
refresh=False, url=None, event=None):
log = get_annotated_logger(self.log, event)
key = str((project.name, number, patch_number))
key = ChangeKey(self.connection_name, project.name,
'MergeRequest', str(number),
str(patch_number))
change = self._change_cache.get(key)
if change and not refresh:
log.debug("Getting change from cache %s" % str(key))
@@ -576,7 +582,8 @@ class GitlabConnection(ZKChangeCacheMixin, CachedBranchConnection):
return change
def _getNonMRRef(self, project, event):
key = str((project.name, event.ref, event.newrev))
key = ChangeKey(self.connection_name, project.name,
'Ref', event.ref, event.newrev)
change = self._change_cache.get(key)
if change:
return change

View File

@@ -28,7 +28,11 @@ from zuul.lib.logutil import get_annotated_logger
from zuul.web.handler import BaseWebController
from zuul.model import Ref, Branch, Tag
from zuul.lib import dependson
from zuul.zk.change_cache import AbstractChangeCache, ConcurrentUpdateError
from zuul.zk.change_cache import (
AbstractChangeCache,
ChangeKey,
ConcurrentUpdateError,
)
from zuul.zk.event_queues import ConnectionEventQueue
from zuul.driver.pagure.paguremodel import PagureTriggerEvent, PullRequest
@@ -596,7 +600,9 @@ class PagureConnection(ZKChangeCacheMixin, BaseConnection):
def _getChange(self, project, number, patchset=None,
refresh=False, url=None, event=None):
key = str((project.name, number, patchset))
key = ChangeKey(self.connection_name, project.name,
'PullRequest', str(number),
str(patchset))
change = self._change_cache.get(key)
if change and not refresh:
self.log.debug("Getting change from cache %s" % str(key))
@@ -629,7 +635,8 @@ class PagureConnection(ZKChangeCacheMixin, BaseConnection):
return change
def _getNonPRRef(self, project, event):
key = str((project.name, event.ref, event.newrev))
key = ChangeKey(self.connection_name, project.name,
'Ref', event.ref, event.newrev)
change = self._change_cache.get(key)
if change:
return change

View File

@@ -23,6 +23,7 @@ from zuul.lib.dependson import find_dependency_headers
from zuul.lib.logutil import get_annotated_logger
from zuul.lib.tarjan import strongly_connected_components
from zuul.model import QueueItem
from zuul.zk.change_cache import ChangeKey
class DynamicChangeQueueContextManager(object):
@@ -156,12 +157,12 @@ class PipelineManager(metaclass=ABCMeta):
def resolveChangeKeys(self, change_keys):
resolved_changes = []
for key in change_keys:
for reference in change_keys:
key = ChangeKey.fromReference(reference)
change = self._change_cache.get(key)
if change is None:
connection_name, connection_key = key
source = self.sched.connections.getSource(connection_name)
change = source.getChangeByKey(connection_key)
source = self.sched.connections.getSource(key.connection_name)
change = source.getChangeByKey(key)
self._change_cache[change.cache_key] = change
resolved_changes.append(change)
return resolved_changes
@@ -202,7 +203,7 @@ class PipelineManager(metaclass=ABCMeta):
def isAnyVersionOfChangeInPipeline(self, change):
# Checks any items in the pipeline
for item in self.pipeline.getAllItems():
if change.stable_id == item.change.stable_id:
if change.cache_stat.key.isSameChange(item.change.cache_stat.key):
return True
return False
@@ -218,13 +219,9 @@ class PipelineManager(metaclass=ABCMeta):
return
for item in self.pipeline.getAllItems():
# TODO: with structured-data keys (so we can compare the
# stable_id), we might be able to do more of this without
# going to ZK.
for connection_name, key in item.change.commit_needs_changes:
source = self.sched.connections.getSource(connection_name)
dep = source.getChangeByKey(key)
if (dep.stable_id == change.stable_id):
for dep_change_ref in item.change.commit_needs_changes:
dep_change_key = ChangeKey.fromReference(dep_change_ref)
if dep_change_key.isSameChange(change.cache_stat.key):
self.updateCommitDependencies(item.change, None, event)
self.updateCommitDependencies(change, None, event)

View File

@@ -3589,23 +3589,12 @@ class Ref(object):
@property
def cache_key(self):
return (self.project.connection_name, self.cache_stat.key)
return self.cache_stat.key.reference
@property
def cache_version(self):
return -1 if self.cache_stat is None else self.cache_stat.version
@property
def stable_id(self):
# The identifier for this ref/change/thing that doesn't change
# even if it's updated. Something like the ref itself, or the
# gerrit change id/number, or the github pull request number.
# Stable across patchsets and pushes.
return {
"project": self.project.canonical_name,
"ref": self.ref,
}
def serialize(self):
return {
"project": self.project.name,
@@ -3809,17 +3798,6 @@ class Change(Branch):
self.message = data.get("message")
self.commit_id = data.get("commit_id")
@property
def stable_id(self):
# The identifier for this ref/change/thing that doesn't change
# even if it's updated. Something like the ref itself, or the
# gerrit change id/number, or the github pull request number.
# Stable across patchsets and pushes.
return {
"project": self.project.canonical_name,
"number": self.number,
}
def serialize(self):
d = super().serialize()
d.update({

View File

@@ -16,13 +16,12 @@ import abc
import contextlib
import json
import logging
import os
import threading
import time
import uuid
import hashlib
from collections import defaultdict
from collections.abc import Iterable
from urllib.parse import quote_plus, unquote_plus
from kazoo.exceptions import BadVersionError, NodeExistsError, NoNodeError
@@ -34,15 +33,75 @@ from zuul.zk.vendor.watchers import ExistingDataWatch
CHANGE_CACHE_ROOT = "/zuul/cache/connection"
def _keyFromPath(path):
return unquote_plus(os.path.basename(path))
class ConcurrentUpdateError(ZuulZooKeeperException):
pass
def str_or_none(d):
if d is None:
return d
return str(d)
class ChangeKey:
"""Represents a change key
This is used to look up a change in the change cache.
It also contains enough basic information about a change in order
to determine if two entries in the change cache are related or
identical.
There are two ways to refer to a Change in ZK. If one ZK object
refers to a change, it should use ChangeKey.reference. This is a
dictionary with structured information about the change. The
contents can be used to construct a ChangeKey, and that can be
used to pull the Change from the cache.
The cache itself uses a sha256 digest of the reference as the
actual cache key in ZK. This reduces and stabilizes the length of
the cache keys themselves. Methods outside of the change_cache
should not use this directly.
"""
def __init__(self, connection_name, project_name,
change_type, stable_id, revision):
self.connection_name = str_or_none(connection_name)
self.project_name = str_or_none(project_name)
self.change_type = str_or_none(change_type)
self.stable_id = str_or_none(stable_id)
self.revision = str_or_none(revision)
reference = dict(
connection_name=connection_name,
project_name=project_name,
change_type=change_type,
stable_id=stable_id,
revision=revision,
)
self.reference = json.dumps(reference)
msg = self.reference.encode('utf8')
self._hash = hashlib.sha256(msg).hexdigest()
@classmethod
def fromReference(cls, data):
data = json.loads(data)
return cls(data['connection_name'], data['project_name'],
data['change_type'], data['stable_id'], data['revision'])
def isSameChange(self, other):
return all([
self.connection_name == str_or_none(other.connection_name),
self.project_name == str_or_none(other.project_name),
self.change_type == str_or_none(other.change_type),
self.stable_id == str_or_none(other.stable_id),
])
class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
"""Abstract class for caching change items in Zookeeper.
In order to make updates atomic the change data is stored separate
@@ -95,11 +154,12 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
def _dataPath(self, data_uuid):
return f"{self.data_root}/{data_uuid}"
def _cachePath(self, key):
return f"{self.cache_root}/{quote_plus(key)}"
def _cachePath(self, key_hash):
return f"{self.cache_root}/{key_hash}"
def _cacheWatcher(self, cache_keys):
cache_keys = {unquote_plus(k) for k in cache_keys}
# This method deals with key hashes exclusively
cache_keys = set(cache_keys)
existing_keys = set(self._change_cache.keys())
deleted_keys = existing_keys - cache_keys
for key in deleted_keys:
@@ -116,7 +176,7 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
new_keys = cache_keys - self._watched_keys
for key in new_keys:
ExistingDataWatch(self.kazoo_client,
f"{self.cache_root}/{quote_plus(key)}",
f"{self.cache_root}/{key}",
self._cacheItemWatcher)
self._watched_keys.add(key)
@@ -124,10 +184,14 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
if not all((data, zstat, event)):
return
key = _keyFromPath(event.path)
data_uuid = data.decode("utf8")
key, data_uuid = self._loadKey(data)
self._get(key, data_uuid, zstat)
def _loadKey(self, data):
data = json.loads(data.decode("utf8"))
key = ChangeKey.fromReference(data['key_reference'])
return key, data['data_uuid']
def prune(self, relevant, max_age=3600): # 1h
cutoff_time = time.time() - max_age
outdated = {c.cache_stat.key for c in list(self._change_cache.values())
@@ -152,23 +216,33 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
except NoNodeError:
return
for key in sorted(unquote_plus(c) for c in children):
change = self.get(key)
for key_hash in children:
change = self._get_from_key_hash(key_hash)
if change is not None:
yield change
def get(self, key):
cache_path = self._cachePath(key)
cache_path = self._cachePath(key._hash)
try:
value, zstat = self.kazoo_client.get(cache_path)
except NoNodeError:
return None
data_uuid = value.decode("utf8")
_, data_uuid = self._loadKey(value)
return self._get(key, data_uuid, zstat)
def _get_from_key_hash(self, key_hash):
cache_path = self._cachePath(key_hash)
try:
value, zstat = self.kazoo_client.get(cache_path)
except NoNodeError:
return None
key, data_uuid = self._loadKey(value)
return self._get(key, data_uuid, zstat)
def _get(self, key, data_uuid, zstat):
change = self._change_cache.get(key)
change = self._change_cache.get(key._hash)
if change and change.cache_stat.uuid == data_uuid:
# Change in our local cache is up-to-date
return change
@@ -176,14 +250,14 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
try:
data = self._getData(data_uuid)
except NoNodeError:
cache_path = self._cachePath(key)
cache_path = self._cachePath(key._hash)
self.log.error("Removing cache entry %s without any data",
cache_path)
# TODO: handle no node + version mismatch
self.kazoo_client.delete(cache_path, zstat.version)
return None
with self._change_locks[key]:
with self._change_locks[key._hash]:
if change:
# While holding the lock check if we still need to update
# the change and skip the update if we have the latest version.
@@ -198,7 +272,7 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
# Use setdefault here so we only have a single instance of a change
# around. In case of a concurrent get this might return a different
# change instance than the one we just created.
return self._change_cache.setdefault(key, change)
return self._change_cache.setdefault(key._hash, change)
def _getData(self, data_uuid):
with sharding.BufferedShardReader(
@@ -208,30 +282,37 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
def set(self, key, change, version=-1):
data_uuid = self._setData(self._dataFromChange(change))
cache_path = self._cachePath(key)
with self._change_locks[key]:
# Add the change_key info here mostly for debugging since the
# hash is non-reversible.
cache_data = json.dumps(dict(
data_uuid=data_uuid,
key_reference=key.reference,
))
cache_path = self._cachePath(key._hash)
with self._change_locks[key._hash]:
try:
if version == -1:
_, zstat = self.kazoo_client.create(
cache_path,
data_uuid.encode("utf8"),
cache_data.encode("utf8"),
include_data=True)
else:
# Sanity check that we only have a single change instance
# for a key.
if self._change_cache[key] is not change:
if self._change_cache[key._hash] is not change:
raise RuntimeError(
"Conflicting change objects (existing "
f"{self._change_cache[key]} vs. new {change} "
f"for key '{key}'")
f"{self._change_cache[key._hash]} vs. "
f"new {change} "
f"for key '{key.reference}'")
zstat = self.kazoo_client.set(
cache_path, data_uuid.encode("utf8"), version)
cache_path, cache_data.encode("utf8"), version)
except (BadVersionError, NodeExistsError, NoNodeError) as exc:
raise ConcurrentUpdateError from exc
change.cache_stat = model.CacheStat(
key, data_uuid, zstat.version, zstat.last_modified)
self._change_cache[key] = change
self._change_cache[key._hash] = change
def _setData(self, data):
data_uuid = uuid.uuid4().hex
@@ -260,14 +341,14 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
return change
def delete(self, key):
cache_path = self._cachePath(key)
cache_path = self._cachePath(key._hash)
# Only delete the cache entry and NOT the data node in order to
# prevent race conditions with other consumers. The stale data
# nodes will be removed by the periodic cleanup.
self.kazoo_client.delete(cache_path, recursive=True)
with contextlib.suppress(KeyError):
del self._change_cache[key]
del self._change_cache[key._hash]
def _changeFromData(self, data):
change_type, change_data = data["change_type"], data["change_data"]