diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py index 95e7bb0f09..2a3339f0c5 100644 --- a/tests/unit/test_zk.py +++ b/tests/unit/test_zk.py @@ -21,7 +21,11 @@ import testtools from zuul import model from zuul.model import BuildRequest, HoldRequest, MergeRequest from zuul.zk import ZooKeeperClient -from zuul.zk.change_cache import AbstractChangeCache, ConcurrentUpdateError +from zuul.zk.change_cache import ( + AbstractChangeCache, + ChangeKey, + ConcurrentUpdateError, +) from zuul.zk.config_cache import SystemConfigCache, UnparsedConfigCache from zuul.zk.exceptions import LockException from zuul.zk.executor import ExecutorApi @@ -1231,7 +1235,9 @@ class DummyChange: return -1 if self.cache_stat is None else self.cache_stat.version def serialize(self): - return self.__dict__ + d = self.__dict__.copy() + d.pop('cache_stat') + return d def deserialize(self, data): self.__dict__.update(data) @@ -1266,49 +1272,56 @@ class TestChangeCache(ZooKeeperBaseTestCase): def test_insert(self): change_foo = DummyChange("project", {"foo": "bar"}) change_bar = DummyChange("project", {"bar": "foo"}) - self.cache.set("foo", change_foo) - self.cache.set("bar", change_bar) + key_foo = ChangeKey('conn', 'project', 'change', 'foo', '1') + key_bar = ChangeKey('conn', 'project', 'change', 'bar', '1') + self.cache.set(key_foo, change_foo) + self.cache.set(key_bar, change_bar) - self.assertEqual(self.cache.get("foo"), change_foo) - self.assertEqual(self.cache.get("bar"), change_bar) + self.assertEqual(self.cache.get(key_foo), change_foo) + self.assertEqual(self.cache.get(key_bar), change_bar) def test_update(self): change = DummyChange("project", {"foo": "bar"}) - self.cache.set("foo", change) + key = ChangeKey('conn', 'project', 'change', 'foo', '1') + self.cache.set(key, change) change.number = 123 - self.cache.set("foo", change, change.cache_version) + self.cache.set(key, change, change.cache_version) # The change instance must stay the same - updated_change = self.cache.get("foo") + updated_change = self.cache.get(key) self.assertIs(change, updated_change) self.assertEqual(change.number, 123) def test_delete(self): change = DummyChange("project", {"foo": "bar"}) - self.cache.set("foo", change) - self.cache.delete("foo") - self.assertIsNone(self.cache.get("foo")) + key = ChangeKey('conn', 'project', 'change', 'foo', '1') + self.cache.set(key, change) + self.cache.delete(key) + self.assertIsNone(self.cache.get(key)) # Deleting an non-existent key should not raise an exception - self.cache.delete("invalid") + invalid_key = ChangeKey('conn', 'project', 'change', 'invalid', '1') + self.cache.delete(invalid_key) def test_concurrent_update(self): change = DummyChange("project", {"foo": "bar"}) - self.cache.set("foo", change) + key = ChangeKey('conn', 'project', 'change', 'foo', '1') + self.cache.set(key, change) # Attempt to update with the old change stat with testtools.ExpectedException(ConcurrentUpdateError): - self.cache.set("foo", change, change.cache_version - 1) + self.cache.set(key, change, change.cache_version - 1) def test_change_update_retry(self): change = DummyChange("project", {"foobar": 0}) - self.cache.set("foobar", change) + key = ChangeKey('conn', 'project', 'change', 'foo', '1') + self.cache.set(key, change) # Update the change so we have a new cache stat. change.foobar = 1 - self.cache.set("foobar", change, change.cache_version) - self.assertEqual(self.cache.get("foobar").foobar, 1) + self.cache.set(key, change, change.cache_version) + self.assertEqual(self.cache.get(key).foobar, 1) def updater(c): c.foobar += 1 @@ -1320,13 +1333,12 @@ class TestChangeCache(ZooKeeperBaseTestCase): change.cache_version - 1, 0) updated_change = self.cache.updateChangeWithRetry( - "foobar", change, updater) + key, change, updater) self.assertEqual(updated_change.foobar, 2) def test_cache_sync(self): other_cache = DummyChangeCache(self.zk_client, DummyConnection()) - - key = "foo foo" # Use a key that needs escaping + key = ChangeKey('conn', 'project', 'change', 'foo', '1') change = DummyChange("project", {"foo": "bar"}) self.cache.set(key, change) self.assertIsNotNone(other_cache.get(key)) @@ -1344,7 +1356,8 @@ class TestChangeCache(ZooKeeperBaseTestCase): def test_cleanup(self): change = DummyChange("project", {"foo": "bar"}) - self.cache.set("foo", change) + key = ChangeKey('conn', 'project', 'change', 'foo', '1') + self.cache.set(key, change) self.cache.cleanup() self.assertEqual(len(self.cache._data_cleanup_candidates), 0) @@ -1352,7 +1365,7 @@ class TestChangeCache(ZooKeeperBaseTestCase): len(self.zk_client.client.get_children(self.cache.data_root)), 1) change.number = 123 - self.cache.set("foo", change, change.cache_version) + self.cache.set(key, change, change.cache_version) self.cache.cleanup() self.assertEqual(len(self.cache._data_cleanup_candidates), 1) @@ -1366,15 +1379,16 @@ class TestChangeCache(ZooKeeperBaseTestCase): def test_watch_cleanup(self): change = DummyChange("project", {"foo": "bar"}) - self.cache.set("foo", change) + key = ChangeKey('conn', 'project', 'change', 'foo', '1') + self.cache.set(key, change) for _ in iterate_timeout(10, "watch to be registered"): - if change.cache_stat.key in self.cache._watched_keys: + if change.cache_stat.key._hash in self.cache._watched_keys: break - self.cache.delete("foo") - self.assertIsNone(self.cache.get("foo")) + self.cache.delete(key) + self.assertIsNone(self.cache.get(key)) for _ in iterate_timeout(10, "watch to be removed"): - if change.cache_stat.key not in self.cache._watched_keys: + if change.cache_stat.key._hash not in self.cache._watched_keys: break diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py index 7b7107b92c..bf71aba1f4 100644 --- a/zuul/driver/gerrit/gerritconnection.py +++ b/zuul/driver/gerrit/gerritconnection.py @@ -42,7 +42,11 @@ from zuul.driver.gerrit.gerritmodel import GerritChange, GerritTriggerEvent from zuul.driver.git.gitwatcher import GitWatcher from zuul.lib.logutil import get_annotated_logger from zuul.model import Ref, Tag, Branch, Project -from zuul.zk.change_cache import AbstractChangeCache, ConcurrentUpdateError +from zuul.zk.change_cache import ( + AbstractChangeCache, + ChangeKey, + ConcurrentUpdateError, +) from zuul.zk.event_queues import ConnectionEventQueue, EventReceiverElection # HTTP timeout in seconds @@ -311,7 +315,9 @@ class GerritEventConnector(threading.Thread): # cache as it may be a dependency if event.change_number: refresh = True - key = str((event.change_number, event.patch_number)) + key = ChangeKey(self.connection.connection_name, None, + 'GerritChange', str(event.change_number), + str(event.patch_number)) if self.connection._change_cache.get(key) is None: refresh = False for tenant in self.connection.sched.abide.tenants.values(): @@ -760,7 +766,8 @@ class GerritConnection(ZKChangeCacheMixin, BaseConnection): # Ensure number and patchset are str number = str(number) patchset = str(patchset) - key = str((number, patchset)) + key = ChangeKey(self.connection_name, None, + 'GerritChange', number, patchset) change = self._change_cache.get(key) if change and not refresh: return change @@ -776,7 +783,8 @@ class GerritConnection(ZKChangeCacheMixin, BaseConnection): def _getTag(self, event): tag = event.ref[len('refs/tags/'):] - key = str((event.project_name, tag, event.newrev)) + key = ChangeKey(self.connection_name, None, + 'Tag', tag, event.newrev) change = self._change_cache.get(key) if change: return change @@ -794,7 +802,8 @@ class GerritConnection(ZKChangeCacheMixin, BaseConnection): return change def _getBranch(self, event, branch, ref): - key = str((event.project_name, branch, event.newrev)) + key = ChangeKey(self.connection_name, None, + 'Branch', branch, event.newrev) change = self._change_cache.get(key) if change: return change @@ -812,7 +821,8 @@ class GerritConnection(ZKChangeCacheMixin, BaseConnection): return change def _getRef(self, event): - key = str((event.project_name, event.ref, event.newrev)) + key = ChangeKey(self.connection_name, None, + 'Ref', event.ref, event.newrev) change = self._change_cache.get(key) if change: return change @@ -856,6 +866,7 @@ class GerritConnection(ZKChangeCacheMixin, BaseConnection): result.message): if match != change_id: continue + # Note: This is not a ChangeCache ChangeKey key = (result.number, result.current_patchset) if key in seen: continue @@ -992,7 +1003,9 @@ class GerritConnection(ZKChangeCacheMixin, BaseConnection): return True data = self.queryChange(change.number) - key = str((change.number, change.patchset)) + key = ChangeKey(self.connection_name, None, + 'GerritChange', str(change.number), + str(change.patchset)) def _update_change(c): c.update(data, self) diff --git a/zuul/driver/git/gitconnection.py b/zuul/driver/git/gitconnection.py index 83cc14d623..95dc0ecf8f 100644 --- a/zuul/driver/git/gitconnection.py +++ b/zuul/driver/git/gitconnection.py @@ -23,7 +23,11 @@ from zuul.connection import BaseConnection, ZKChangeCacheMixin from zuul.driver.git.gitmodel import GitTriggerEvent from zuul.driver.git.gitwatcher import GitWatcher from zuul.model import Ref, Branch -from zuul.zk.change_cache import AbstractChangeCache, ConcurrentUpdateError +from zuul.zk.change_cache import ( + AbstractChangeCache, + ChangeKey, + ConcurrentUpdateError, +) class GitChangeCache(AbstractChangeCache): @@ -98,7 +102,8 @@ class GitConnection(ZKChangeCacheMixin, BaseConnection): return refs def getChange(self, event, refresh=False): - key = str((event.project_name, event.ref, event.newrev)) + key = ChangeKey(self.connection_name, event.project_name, + 'Ref', event.ref, event.newrev) change = self._change_cache.get(key) if change: return change diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py index 334531cad6..8dac0992bd 100644 --- a/zuul/driver/github/githubconnection.py +++ b/zuul/driver/github/githubconnection.py @@ -48,7 +48,11 @@ from zuul.model import Ref, Branch, Tag, Project from zuul.exceptions import MergeFailure from zuul.driver.github.githubmodel import PullRequest, GithubTriggerEvent from zuul.model import DequeueEvent -from zuul.zk.change_cache import AbstractChangeCache, ConcurrentUpdateError +from zuul.zk.change_cache import ( + AbstractChangeCache, + ChangeKey, + ConcurrentUpdateError, +) from zuul.zk.event_queues import ConnectionEventQueue GITHUB_BASE_URL = 'https://api.github.com' @@ -1281,7 +1285,9 @@ class GithubConnection(ZKChangeCacheMixin, CachedBranchConnection): # because it can originate from different sources (github event, manual # enqueue event) where some might just parse the string and forward it. number = int(number) - key = str((project.name, number, patchset)) + key = ChangeKey(self.connection_name, project.name, + 'PullRequest', str(number), + str(patchset)) change = self._change_cache.get(key) if change and not refresh: return change @@ -1330,7 +1336,8 @@ class GithubConnection(ZKChangeCacheMixin, CachedBranchConnection): def _getTag(self, project, event): tag = event.ref[len('refs/tags/'):] - key = str((event.project_name, tag, event.newrev)) + key = ChangeKey(self.connection_name, project.name, + 'Tag', tag, event.newrev) change = self._change_cache.get(key) if change: return change @@ -1351,7 +1358,8 @@ class GithubConnection(ZKChangeCacheMixin, CachedBranchConnection): def _getBranch(self, project, event): branch = event.ref[len('refs/heads/'):] - key = str((event.project_name, branch, event.newrev)) + key = ChangeKey(self.connection_name, project.name, + 'Branch', branch, event.newrev) change = self._change_cache.get(key) if change: return change @@ -1370,7 +1378,8 @@ class GithubConnection(ZKChangeCacheMixin, CachedBranchConnection): return change def _getRef(self, project, event): - key = str((event.project_name, event.ref, event.newrev)) + key = ChangeKey(self.connection_name, project.name, + 'Ref', event.ref, event.newrev) change = self._change_cache.get(key) if change: return change @@ -1421,6 +1430,7 @@ class GithubConnection(ZKChangeCacheMixin, CachedBranchConnection): org, proj, _, num = pr.get('url').split('/')[-4:] proj = pr.get('base').get('repo').get('full_name') sha = pr.get('head').get('sha') + # This is not a ChangeKey key = (proj, num, sha) # A single tenant could have multiple projects with the same diff --git a/zuul/driver/gitlab/gitlabconnection.py b/zuul/driver/gitlab/gitlabconnection.py index 5c5675ad69..4b2e684fc0 100644 --- a/zuul/driver/gitlab/gitlabconnection.py +++ b/zuul/driver/gitlab/gitlabconnection.py @@ -34,7 +34,11 @@ from zuul.lib.logutil import get_annotated_logger from zuul.exceptions import MergeFailure from zuul.model import Branch, Project, Ref, Tag from zuul.driver.gitlab.gitlabmodel import GitlabTriggerEvent, MergeRequest -from zuul.zk.change_cache import AbstractChangeCache, ConcurrentUpdateError +from zuul.zk.change_cache import ( + AbstractChangeCache, + ChangeKey, + ConcurrentUpdateError, +) from zuul.zk.event_queues import ConnectionEventQueue # HTTP timeout in seconds @@ -520,7 +524,9 @@ class GitlabConnection(ZKChangeCacheMixin, CachedBranchConnection): def _getChange(self, project, number, patch_number=None, refresh=False, url=None, event=None): log = get_annotated_logger(self.log, event) - key = str((project.name, number, patch_number)) + key = ChangeKey(self.connection_name, project.name, + 'MergeRequest', str(number), + str(patch_number)) change = self._change_cache.get(key) if change and not refresh: log.debug("Getting change from cache %s" % str(key)) @@ -576,7 +582,8 @@ class GitlabConnection(ZKChangeCacheMixin, CachedBranchConnection): return change def _getNonMRRef(self, project, event): - key = str((project.name, event.ref, event.newrev)) + key = ChangeKey(self.connection_name, project.name, + 'Ref', event.ref, event.newrev) change = self._change_cache.get(key) if change: return change diff --git a/zuul/driver/pagure/pagureconnection.py b/zuul/driver/pagure/pagureconnection.py index 4de5435ca1..8d6edeeacd 100644 --- a/zuul/driver/pagure/pagureconnection.py +++ b/zuul/driver/pagure/pagureconnection.py @@ -28,7 +28,11 @@ from zuul.lib.logutil import get_annotated_logger from zuul.web.handler import BaseWebController from zuul.model import Ref, Branch, Tag from zuul.lib import dependson -from zuul.zk.change_cache import AbstractChangeCache, ConcurrentUpdateError +from zuul.zk.change_cache import ( + AbstractChangeCache, + ChangeKey, + ConcurrentUpdateError, +) from zuul.zk.event_queues import ConnectionEventQueue from zuul.driver.pagure.paguremodel import PagureTriggerEvent, PullRequest @@ -596,7 +600,9 @@ class PagureConnection(ZKChangeCacheMixin, BaseConnection): def _getChange(self, project, number, patchset=None, refresh=False, url=None, event=None): - key = str((project.name, number, patchset)) + key = ChangeKey(self.connection_name, project.name, + 'PullRequest', str(number), + str(patchset)) change = self._change_cache.get(key) if change and not refresh: self.log.debug("Getting change from cache %s" % str(key)) @@ -629,7 +635,8 @@ class PagureConnection(ZKChangeCacheMixin, BaseConnection): return change def _getNonPRRef(self, project, event): - key = str((project.name, event.ref, event.newrev)) + key = ChangeKey(self.connection_name, project.name, + 'Ref', event.ref, event.newrev) change = self._change_cache.get(key) if change: return change diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 9aeb5d8e65..7287acc5c2 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -23,6 +23,7 @@ from zuul.lib.dependson import find_dependency_headers from zuul.lib.logutil import get_annotated_logger from zuul.lib.tarjan import strongly_connected_components from zuul.model import QueueItem +from zuul.zk.change_cache import ChangeKey class DynamicChangeQueueContextManager(object): @@ -156,12 +157,12 @@ class PipelineManager(metaclass=ABCMeta): def resolveChangeKeys(self, change_keys): resolved_changes = [] - for key in change_keys: + for reference in change_keys: + key = ChangeKey.fromReference(reference) change = self._change_cache.get(key) if change is None: - connection_name, connection_key = key - source = self.sched.connections.getSource(connection_name) - change = source.getChangeByKey(connection_key) + source = self.sched.connections.getSource(key.connection_name) + change = source.getChangeByKey(key) self._change_cache[change.cache_key] = change resolved_changes.append(change) return resolved_changes @@ -202,7 +203,7 @@ class PipelineManager(metaclass=ABCMeta): def isAnyVersionOfChangeInPipeline(self, change): # Checks any items in the pipeline for item in self.pipeline.getAllItems(): - if change.stable_id == item.change.stable_id: + if change.cache_stat.key.isSameChange(item.change.cache_stat.key): return True return False @@ -218,13 +219,9 @@ class PipelineManager(metaclass=ABCMeta): return for item in self.pipeline.getAllItems(): - # TODO: with structured-data keys (so we can compare the - # stable_id), we might be able to do more of this without - # going to ZK. - for connection_name, key in item.change.commit_needs_changes: - source = self.sched.connections.getSource(connection_name) - dep = source.getChangeByKey(key) - if (dep.stable_id == change.stable_id): + for dep_change_ref in item.change.commit_needs_changes: + dep_change_key = ChangeKey.fromReference(dep_change_ref) + if dep_change_key.isSameChange(change.cache_stat.key): self.updateCommitDependencies(item.change, None, event) self.updateCommitDependencies(change, None, event) diff --git a/zuul/model.py b/zuul/model.py index 6b35204d4e..ad4e08b011 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -3589,23 +3589,12 @@ class Ref(object): @property def cache_key(self): - return (self.project.connection_name, self.cache_stat.key) + return self.cache_stat.key.reference @property def cache_version(self): return -1 if self.cache_stat is None else self.cache_stat.version - @property - def stable_id(self): - # The identifier for this ref/change/thing that doesn't change - # even if it's updated. Something like the ref itself, or the - # gerrit change id/number, or the github pull request number. - # Stable across patchsets and pushes. - return { - "project": self.project.canonical_name, - "ref": self.ref, - } - def serialize(self): return { "project": self.project.name, @@ -3809,17 +3798,6 @@ class Change(Branch): self.message = data.get("message") self.commit_id = data.get("commit_id") - @property - def stable_id(self): - # The identifier for this ref/change/thing that doesn't change - # even if it's updated. Something like the ref itself, or the - # gerrit change id/number, or the github pull request number. - # Stable across patchsets and pushes. - return { - "project": self.project.canonical_name, - "number": self.number, - } - def serialize(self): d = super().serialize() d.update({ diff --git a/zuul/zk/change_cache.py b/zuul/zk/change_cache.py index 2f77ef395c..6bb479f546 100644 --- a/zuul/zk/change_cache.py +++ b/zuul/zk/change_cache.py @@ -16,13 +16,12 @@ import abc import contextlib import json import logging -import os import threading import time import uuid +import hashlib from collections import defaultdict from collections.abc import Iterable -from urllib.parse import quote_plus, unquote_plus from kazoo.exceptions import BadVersionError, NodeExistsError, NoNodeError @@ -34,15 +33,75 @@ from zuul.zk.vendor.watchers import ExistingDataWatch CHANGE_CACHE_ROOT = "/zuul/cache/connection" -def _keyFromPath(path): - return unquote_plus(os.path.basename(path)) - - class ConcurrentUpdateError(ZuulZooKeeperException): pass +def str_or_none(d): + if d is None: + return d + return str(d) + + +class ChangeKey: + """Represents a change key + + This is used to look up a change in the change cache. + + It also contains enough basic information about a change in order + to determine if two entries in the change cache are related or + identical. + + There are two ways to refer to a Change in ZK. If one ZK object + refers to a change, it should use ChangeKey.reference. This is a + dictionary with structured information about the change. The + contents can be used to construct a ChangeKey, and that can be + used to pull the Change from the cache. + + The cache itself uses a sha256 digest of the reference as the + actual cache key in ZK. This reduces and stabilizes the length of + the cache keys themselves. Methods outside of the change_cache + should not use this directly. + + """ + + def __init__(self, connection_name, project_name, + change_type, stable_id, revision): + self.connection_name = str_or_none(connection_name) + self.project_name = str_or_none(project_name) + self.change_type = str_or_none(change_type) + self.stable_id = str_or_none(stable_id) + self.revision = str_or_none(revision) + + reference = dict( + connection_name=connection_name, + project_name=project_name, + change_type=change_type, + stable_id=stable_id, + revision=revision, + ) + + self.reference = json.dumps(reference) + msg = self.reference.encode('utf8') + self._hash = hashlib.sha256(msg).hexdigest() + + @classmethod + def fromReference(cls, data): + data = json.loads(data) + return cls(data['connection_name'], data['project_name'], + data['change_type'], data['stable_id'], data['revision']) + + def isSameChange(self, other): + return all([ + self.connection_name == str_or_none(other.connection_name), + self.project_name == str_or_none(other.project_name), + self.change_type == str_or_none(other.change_type), + self.stable_id == str_or_none(other.stable_id), + ]) + + class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC): + """Abstract class for caching change items in Zookeeper. In order to make updates atomic the change data is stored separate @@ -95,11 +154,12 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC): def _dataPath(self, data_uuid): return f"{self.data_root}/{data_uuid}" - def _cachePath(self, key): - return f"{self.cache_root}/{quote_plus(key)}" + def _cachePath(self, key_hash): + return f"{self.cache_root}/{key_hash}" def _cacheWatcher(self, cache_keys): - cache_keys = {unquote_plus(k) for k in cache_keys} + # This method deals with key hashes exclusively + cache_keys = set(cache_keys) existing_keys = set(self._change_cache.keys()) deleted_keys = existing_keys - cache_keys for key in deleted_keys: @@ -116,7 +176,7 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC): new_keys = cache_keys - self._watched_keys for key in new_keys: ExistingDataWatch(self.kazoo_client, - f"{self.cache_root}/{quote_plus(key)}", + f"{self.cache_root}/{key}", self._cacheItemWatcher) self._watched_keys.add(key) @@ -124,10 +184,14 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC): if not all((data, zstat, event)): return - key = _keyFromPath(event.path) - data_uuid = data.decode("utf8") + key, data_uuid = self._loadKey(data) self._get(key, data_uuid, zstat) + def _loadKey(self, data): + data = json.loads(data.decode("utf8")) + key = ChangeKey.fromReference(data['key_reference']) + return key, data['data_uuid'] + def prune(self, relevant, max_age=3600): # 1h cutoff_time = time.time() - max_age outdated = {c.cache_stat.key for c in list(self._change_cache.values()) @@ -152,23 +216,33 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC): except NoNodeError: return - for key in sorted(unquote_plus(c) for c in children): - change = self.get(key) + for key_hash in children: + change = self._get_from_key_hash(key_hash) if change is not None: yield change def get(self, key): - cache_path = self._cachePath(key) + cache_path = self._cachePath(key._hash) try: value, zstat = self.kazoo_client.get(cache_path) except NoNodeError: return None - data_uuid = value.decode("utf8") + _, data_uuid = self._loadKey(value) + return self._get(key, data_uuid, zstat) + + def _get_from_key_hash(self, key_hash): + cache_path = self._cachePath(key_hash) + try: + value, zstat = self.kazoo_client.get(cache_path) + except NoNodeError: + return None + + key, data_uuid = self._loadKey(value) return self._get(key, data_uuid, zstat) def _get(self, key, data_uuid, zstat): - change = self._change_cache.get(key) + change = self._change_cache.get(key._hash) if change and change.cache_stat.uuid == data_uuid: # Change in our local cache is up-to-date return change @@ -176,14 +250,14 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC): try: data = self._getData(data_uuid) except NoNodeError: - cache_path = self._cachePath(key) + cache_path = self._cachePath(key._hash) self.log.error("Removing cache entry %s without any data", cache_path) # TODO: handle no node + version mismatch self.kazoo_client.delete(cache_path, zstat.version) return None - with self._change_locks[key]: + with self._change_locks[key._hash]: if change: # While holding the lock check if we still need to update # the change and skip the update if we have the latest version. @@ -198,7 +272,7 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC): # Use setdefault here so we only have a single instance of a change # around. In case of a concurrent get this might return a different # change instance than the one we just created. - return self._change_cache.setdefault(key, change) + return self._change_cache.setdefault(key._hash, change) def _getData(self, data_uuid): with sharding.BufferedShardReader( @@ -208,30 +282,37 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC): def set(self, key, change, version=-1): data_uuid = self._setData(self._dataFromChange(change)) - cache_path = self._cachePath(key) - with self._change_locks[key]: + # Add the change_key info here mostly for debugging since the + # hash is non-reversible. + cache_data = json.dumps(dict( + data_uuid=data_uuid, + key_reference=key.reference, + )) + cache_path = self._cachePath(key._hash) + with self._change_locks[key._hash]: try: if version == -1: _, zstat = self.kazoo_client.create( cache_path, - data_uuid.encode("utf8"), + cache_data.encode("utf8"), include_data=True) else: # Sanity check that we only have a single change instance # for a key. - if self._change_cache[key] is not change: + if self._change_cache[key._hash] is not change: raise RuntimeError( "Conflicting change objects (existing " - f"{self._change_cache[key]} vs. new {change} " - f"for key '{key}'") + f"{self._change_cache[key._hash]} vs. " + f"new {change} " + f"for key '{key.reference}'") zstat = self.kazoo_client.set( - cache_path, data_uuid.encode("utf8"), version) + cache_path, cache_data.encode("utf8"), version) except (BadVersionError, NodeExistsError, NoNodeError) as exc: raise ConcurrentUpdateError from exc change.cache_stat = model.CacheStat( key, data_uuid, zstat.version, zstat.last_modified) - self._change_cache[key] = change + self._change_cache[key._hash] = change def _setData(self, data): data_uuid = uuid.uuid4().hex @@ -260,14 +341,14 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC): return change def delete(self, key): - cache_path = self._cachePath(key) + cache_path = self._cachePath(key._hash) # Only delete the cache entry and NOT the data node in order to # prevent race conditions with other consumers. The stale data # nodes will be removed by the periodic cleanup. self.kazoo_client.delete(cache_path, recursive=True) with contextlib.suppress(KeyError): - del self._change_cache[key] + del self._change_cache[key._hash] def _changeFromData(self, data): change_type, change_data = data["change_type"], data["change_data"]