Fix change cache test races more

Commit 9fd5160558 attempted to fix
races where the change cache watchers might still be running
after test shutdown.  This was specific to some very short tests.
Unfortunately, one of those tests (test_concurrent_delete) exercises
a behavior where if no watch events ever reach the cache during
the test, then the cache key will still remain in the cache.

It is not clear whether this is a behavior that could occur
in reality, or if it is a side-effect of simulating two cache
actors with the same cache object.  If it could happen in reality,
it may indicate a minor bug/leak in the change cache.  The best
way to resolve this might be to switch the change cache to use
a persistent recursive watch (possibly via the ZuulTreeCache)
in a future change.

In the mean time, let's try a second approach to make these tests
robust: give the change cache a stop method that will abort any
calls to ZK after it is called.  This should avoid the underlying
problem of delayed watch callbacks attempting to access zk after
shutdown.

Change-Id: I15133bd5b612e492ddb51e903e9ee7944279cfc3
This commit is contained in:
James E. Blair
2025-04-03 16:50:48 -07:00
parent fac8f8c886
commit 62ac6d4309
7 changed files with 19 additions and 21 deletions

View File

@@ -1701,17 +1701,7 @@ class TestChangeCache(ZooKeeperBaseTestCase):
def setUp(self):
super().setUp()
self.cache = DummyChangeCache(self.zk_client, DummyConnection())
def _deleteAndSync(self, key):
# Run this at the end of the tests to ensure that the cache is
# in sync before shutdown.
try:
self.zk_client.client.delete(self.cache._cachePath(key._hash))
except NoNodeError:
pass
for _ in iterate_timeout(10, "cache to sync"):
if key._hash not in self.cache._change_cache:
break
self.addCleanup(self.cache.stop)
def test_insert(self):
change_foo = DummyChange("project", {"foo": "bar"})
@@ -1726,7 +1716,6 @@ class TestChangeCache(ZooKeeperBaseTestCase):
compressed_size, uncompressed_size = self.cache.estimateDataSize()
self.assertTrue(compressed_size != uncompressed_size != 0)
self._deleteAndSync(key_bar)
def test_update(self):
change = DummyChange("project", {"foo": "bar"})
@@ -1740,7 +1729,6 @@ class TestChangeCache(ZooKeeperBaseTestCase):
updated_change = self.cache.get(key)
self.assertIs(change, updated_change)
self.assertEqual(change.number, 123)
self._deleteAndSync(key)
def test_delete(self):
change = DummyChange("project", {"foo": "bar"})
@@ -1752,7 +1740,6 @@ class TestChangeCache(ZooKeeperBaseTestCase):
# Deleting an non-existent key should not raise an exception
invalid_key = ChangeKey('conn', 'project', 'change', 'invalid', '1')
self.cache.delete(invalid_key)
self._deleteAndSync(key)
def test_concurrent_delete(self):
change = DummyChange("project", {"foo": "bar"})
@@ -1766,7 +1753,6 @@ class TestChangeCache(ZooKeeperBaseTestCase):
self.cache.delete(key, old_version)
# The change should still be in the cache
self.assertIsNotNone(self.cache.get(key))
self._deleteAndSync(key)
def test_prune(self):
change1 = DummyChange("project", {"foo": "bar"})
@@ -1778,7 +1764,6 @@ class TestChangeCache(ZooKeeperBaseTestCase):
self.cache.prune([key1], max_age=0)
self.assertIsNotNone(self.cache.get(key1))
self.assertIsNone(self.cache.get(key2))
self._deleteAndSync(key2)
def test_concurrent_update(self):
change = DummyChange("project", {"foo": "bar"})
@@ -1788,7 +1773,6 @@ class TestChangeCache(ZooKeeperBaseTestCase):
# Attempt to update with the old change stat
with testtools.ExpectedException(ConcurrentUpdateError):
self.cache.set(key, change, change.cache_version - 1)
self._deleteAndSync(key)
def test_change_update_retry(self):
change = DummyChange("project", {"foobar": 0})
@@ -1813,7 +1797,6 @@ class TestChangeCache(ZooKeeperBaseTestCase):
updated_change = self.cache.updateChangeWithRetry(
key, change, updater)
self.assertEqual(updated_change.foobar, 2)
self._deleteAndSync(key)
def test_cache_sync(self):
other_cache = DummyChangeCache(self.zk_client, DummyConnection())
@@ -1832,7 +1815,6 @@ class TestChangeCache(ZooKeeperBaseTestCase):
other_cache.delete(key)
self.assertIsNone(self.cache.get(key))
self._deleteAndSync(key)
def test_cache_sync_on_start(self):
key = ChangeKey('conn', 'project', 'change', 'foo', '1')
@@ -1845,7 +1827,6 @@ class TestChangeCache(ZooKeeperBaseTestCase):
other_cache.cleanup()
other_cache.cleanup()
self.assertIsNotNone(other_cache.get(key))
self._deleteAndSync(key)
def test_cleanup(self):
change = DummyChange("project", {"foo": "bar"})
@@ -1869,7 +1850,6 @@ class TestChangeCache(ZooKeeperBaseTestCase):
self.assertEqual(len(self.cache._data_cleanup_candidates), 0)
self.assertEqual(
len(self.zk_client.client.get_children(self.cache.data_root)), 1)
self._deleteAndSync(key)
def test_watch_cleanup(self):
change = DummyChange("project", {"foo": "bar"})

View File

@@ -1995,6 +1995,8 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
self.stopPollerThread()
self.stopRefWatcherThread()
self.stopEventConnector()
if self._change_cache:
self._change_cache.stop()
def getEventQueue(self):
return getattr(self, "event_queue", None)

View File

@@ -178,6 +178,8 @@ class GitConnection(ZKChangeCacheMixin, BaseConnection):
def onStop(self):
self.log.debug("Stopping Git Watcher")
self._stop_watcher_thread()
if self._change_cache:
self._change_cache.stop()
def _stop_watcher_thread(self):
if self.watcher_thread:

View File

@@ -1325,6 +1325,8 @@ class GithubConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
# connection.
if hasattr(self, 'github_event_connector'):
self._stop_event_connector()
if self._change_cache:
self._change_cache.stop()
def _start_event_connector(self):
self.github_event_connector = self._event_connector_class(self)

View File

@@ -585,6 +585,8 @@ class GitlabConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
def onStop(self):
if hasattr(self, 'gitlab_event_connector'):
self._stop_event_connector()
if self._change_cache:
self._change_cache.stop()
def getWebController(self, zuul_web):
return GitlabWebController(zuul_web, self)

View File

@@ -541,6 +541,8 @@ class PagureConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
def onStop(self):
if hasattr(self, 'pagure_event_connector'):
self._stop_event_connector()
if self._change_cache:
self._change_cache.stop()
def set_my_username(self, client):
self.log.debug("Fetching my username ...")

View File

@@ -172,6 +172,7 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
f"zuul.ChangeCache.{connection.connection_name}")
super().__init__(client)
self._stopped = False
self.connection = connection
self.root_path = f"{CHANGE_CACHE_ROOT}/{connection.connection_name}"
self.cache_root = f"{self.root_path}/cache"
@@ -188,6 +189,9 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
self._data_cleanup_candidates = set()
self.kazoo_client.ChildrenWatch(self.cache_root, self._cacheWatcher)
def stop(self):
self._stopped = True
def _dataPath(self, data_uuid):
return f"{self.data_root}/{data_uuid}"
@@ -209,6 +213,8 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
new_keys = cache_keys - self._watched_keys
for key in new_keys:
if self._stopped:
return
ExistingDataWatch(self.kazoo_client,
f"{self.cache_root}/{key}",
self._cacheItemWatcher)
@@ -221,6 +227,8 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
key, data_uuid = self._loadKey(data)
self.log.debug("Noticed update to key %s data uuid %s",
key, data_uuid)
if self._stopped:
return
self._get(key, data_uuid, zstat)
def _loadKey(self, data):