From 29d0534696b3b541701b863bef626f7c804b90f2 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Tue, 28 Sep 2021 10:09:29 -0700 Subject: [PATCH] Never externally delete change cache entries The change cache depends on having singleton objects for entries. If a scheduler ever ends up with 2 objects for the same change, the cache will refuse to update the cache with new data for the object which is not in the cache. However, there is a simple series of events which could lead to this: 1) Event from source populates cache with a change. 2) Change is enqueued into pipeline. 3) Event from source triggers a data refresh of same change. 4) Data refresh fails. 5) Exception handler for data refresh deletes change from cache. Imagine that the pipeline processor is now attempting to refresh the change to determine whether it has merged. At this point, updates to the cache will fail with this error: 2021-09-28 14:25:23,057 ERROR zuul.Scheduler: Exception in pipeline processing: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/zuul/scheduler.py", line 1615, in _process_pipeline while not self._stopped and pipeline.manager.processQueue(): File "/usr/local/lib/python3.8/site-packages/zuul/manager/__init__.py", line 1418, in processQueue item_changed, nnfi = self._processOneItem( File "/usr/local/lib/python3.8/site-packages/zuul/manager/__init__.py", line 1356, in _processOneItem self.reportItem(item) File "/usr/local/lib/python3.8/site-packages/zuul/manager/__init__.py", line 1612, in reportItem merged = source.isMerged(item.change, item.change.branch) File "/usr/local/lib/python3.8/site-packages/zuul/driver/gerrit/gerritsource.py", line 47, in isMerged return self.connection.isMerged(change, head) File "/usr/local/lib/python3.8/site-packages/zuul/driver/gerrit/gerritconnection.py", line 1013, in isMerged self._change_cache.updateChangeWithRetry(key, change, _update_change) File "/usr/local/lib/python3.8/site-packages/zuul/zk/change_cache.py", line 330, in updateChangeWithRetry self.set(key, change, version) File "/usr/local/lib/python3.8/site-packages/zuul/zk/change_cache.py", line 302, in set if self._change_cache[key._hash] is not change: KeyError: 'ef075359268c2f3ee7d52ccbcb6ac51a3a5922c709e634fdb2efcf97c57095b2' The process may continue: 6) Event from source triggers a data refresh of same change. 7) Refresh succeeds and cache is popuplated with new change object. Then the pipeline will fail with this error: Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/zuul/scheduler.py", line 1615, in _process_pipeline while not self._stopped and pipeline.manager.processQueue(): File "/usr/local/lib/python3.8/site-packages/zuul/manager/__init__.py", line 1418, in processQueue item_changed, nnfi = self._processOneItem( File "/usr/local/lib/python3.8/site-packages/zuul/manager/__init__.py", line 1356, in _processOneItem self.reportItem(item) File "/usr/local/lib/python3.8/site-packages/zuul/manager/__init__.py", line 1612, in reportItem merged = source.isMerged(item.change, item.change.branch) File "/usr/local/lib/python3.8/site-packages/zuul/driver/gerrit/gerritsource.py", line 47, in isMerged return self.connection.isMerged(change, head) File "/usr/local/lib/python3.8/site-packages/zuul/driver/gerrit/gerritconnection.py", line 1013, in isMerged self._change_cache.updateChangeWithRetry(key, change, _update_change) File "/usr/local/lib/python3.8/site-packages/zuul/zk/change_cache.py", line 330, in updateChangeWithRetry self.set(key, change, version) File "/usr/local/lib/python3.8/site-packages/zuul/zk/change_cache.py", line 303, in set raise RuntimeError( RuntimeError: Conflicting change objects (existing vs. new for key '{"connection_name": "gerrit", "project_name": null, "change_type": "GerritChange", "stable_id": "810014", "revision": "2"}' To avoid this, we should never remove a change from the cache unless it is completely unused (that is, we should only remove changes from the cache via the prune method). Even if it means that the change is out of date, it is still the best information that we have, and a future event may succeed and eventually update the change. This removes the exception handling which deleted the change from all drivers. Change-Id: Idbecdf717b517cce5c25975a40d9f42d57a26c9e --- zuul/driver/gerrit/gerritconnection.py | 6 +-- zuul/driver/github/githubconnection.py | 61 ++++++++++++-------------- zuul/driver/gitlab/gitlabconnection.py | 22 ++++------ zuul/driver/pagure/pagureconnection.py | 22 ++++------ 4 files changed, 47 insertions(+), 64 deletions(-) diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py index bf71aba1f4..6d55299750 100644 --- a/zuul/driver/gerrit/gerritconnection.py +++ b/zuul/driver/gerrit/gerritconnection.py @@ -775,11 +775,7 @@ class GerritConnection(ZKChangeCacheMixin, BaseConnection): change = GerritChange(None) change.number = number change.patchset = patchset - try: - return self._updateChange(key, change, event, history) - except Exception: - self._change_cache.delete(key) - raise + return self._updateChange(key, change, event, history) def _getTag(self, event): tag = event.ref[len('refs/tags/'):] diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py index 5d749cbae9..3689f8c735 100644 --- a/zuul/driver/github/githubconnection.py +++ b/zuul/driver/github/githubconnection.py @@ -1308,41 +1308,36 @@ class GithubConnection(ZKChangeCacheMixin, CachedBranchConnection): change.number = number change.patchset = patchset - try: - # This can be called multi-threaded during github event - # preprocessing. In order to avoid data races perform locking - # by cached key. Try to acquire the lock non-blocking at first. - # If the lock is already taken we're currently updating the very - # same chnange right now and would likely get the same data again. - lock = self._change_update_lock.setdefault(key, threading.Lock()) - if lock.acquire(blocking=False): - try: - pull = self.getPull(change.project.name, change.number, - event=event) + # This can be called multi-threaded during github event + # preprocessing. In order to avoid data races perform locking + # by cached key. Try to acquire the lock non-blocking at first. + # If the lock is already taken we're currently updating the very + # same chnange right now and would likely get the same data again. + lock = self._change_update_lock.setdefault(key, threading.Lock()) + if lock.acquire(blocking=False): + try: + pull = self.getPull(change.project.name, change.number, + event=event) - def _update_change(c): - self._updateChange(c, event, pull) + def _update_change(c): + self._updateChange(c, event, pull) - change = self._change_cache.updateChangeWithRetry( - key, change, _update_change) - finally: - # We need to remove the lock here again so we don't leak - # them. - del self._change_update_lock[key] - lock.release() - else: - # We didn't get the lock so we don't need to update the same - # change again, but to be correct we should at least wait until - # the other thread is done updating the change. - log = get_annotated_logger(self.log, event) - log.debug("Change %s is currently being updated, " - "waiting for it to finish", change) - with lock: - log.debug('Finished updating change %s', change) - except Exception: - self.log.warning("Deleting cache key %s due to exception", key) - self._change_cache.delete(key) - raise + change = self._change_cache.updateChangeWithRetry( + key, change, _update_change) + finally: + # We need to remove the lock here again so we don't leak + # them. + del self._change_update_lock[key] + lock.release() + else: + # We didn't get the lock so we don't need to update the same + # change again, but to be correct we should at least wait until + # the other thread is done updating the change. + log = get_annotated_logger(self.log, event) + log.debug("Change %s is currently being updated, " + "waiting for it to finish", change) + with lock: + log.debug('Finished updating change %s', change) return change def _getTag(self, project, event): diff --git a/zuul/driver/gitlab/gitlabconnection.py b/zuul/driver/gitlab/gitlabconnection.py index 4b2e684fc0..62d0641d25 100644 --- a/zuul/driver/gitlab/gitlabconnection.py +++ b/zuul/driver/gitlab/gitlabconnection.py @@ -539,21 +539,17 @@ class GitlabConnection(ZKChangeCacheMixin, CachedBranchConnection): change.patchset = patch_number change.url = url or self.getMRUrl(project.name, number) change.uris = [change.url.split('://', 1)[-1]] # remove scheme - try: - log.debug("Getting change mr#%s from project %s" % ( - number, project.name)) - log.info("Updating change from Gitlab %s" % change) - mr = self.getMR(change.project.name, change.number, event=event) - def _update_change(c): - self._updateChange(c, event, mr) + log.debug("Getting change mr#%s from project %s" % ( + number, project.name)) + log.info("Updating change from Gitlab %s" % change) + mr = self.getMR(change.project.name, change.number, event=event) - change = self._change_cache.updateChangeWithRetry(key, change, - _update_change) - except Exception: - self.log.warning("Deleting cache key %s due to exception", key) - self._change_cache.delete(key) - raise + def _update_change(c): + self._updateChange(c, event, mr) + + change = self._change_cache.updateChangeWithRetry(key, change, + _update_change) return change def _updateChange(self, change, event, mr): diff --git a/zuul/driver/pagure/pagureconnection.py b/zuul/driver/pagure/pagureconnection.py index 8d6edeeacd..b211db3e48 100644 --- a/zuul/driver/pagure/pagureconnection.py +++ b/zuul/driver/pagure/pagureconnection.py @@ -617,21 +617,17 @@ class PagureConnection(ZKChangeCacheMixin, BaseConnection): change.uris = [ '%s/%s/pull/%s' % (self.baseurl, project, number), ] - try: - self.log.debug("Getting change pr#%s from project %s" % ( - number, project.name)) - self.log.info("Updating change from pagure %s" % change) - pull = self.getPull(change.project.name, change.number) - def _update_change(c): - self._updateChange(c, event, pull) + self.log.debug("Getting change pr#%s from project %s" % ( + number, project.name)) + self.log.info("Updating change from pagure %s" % change) + pull = self.getPull(change.project.name, change.number) - change = self._change_cache.updateChangeWithRetry(key, change, - _update_change) - except Exception: - self.log.warning("Deleting cache key %s due to exception", key) - self._change_cache.delete(key) - raise + def _update_change(c): + self._updateChange(c, event, pull) + + change = self._change_cache.updateChangeWithRetry(key, change, + _update_change) return change def _getNonPRRef(self, project, event):