Never externally delete change cache entries

The change cache depends on having singleton objects for entries.
If a scheduler ever ends up with 2 objects for the same change, the
cache will refuse to update the cache with new data for the object
which is not in the cache.  However, there is a simple series of
events which could lead to this:

1) Event from source populates cache with a change.
2) Change is enqueued into pipeline.
3) Event from source triggers a data refresh of same change.
4) Data refresh fails.
5) Exception handler for data refresh deletes change from cache.

Imagine that the pipeline processor is now attempting to refresh the
change to determine whether it has merged.  At this point, updates
to the cache will fail with this error:

2021-09-28 14:25:23,057 ERROR zuul.Scheduler: Exception in pipeline processing:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/zuul/scheduler.py", line 1615, in _process_pipeline
    while not self._stopped and pipeline.manager.processQueue():
  File "/usr/local/lib/python3.8/site-packages/zuul/manager/__init__.py", line 1418, in processQueue
    item_changed, nnfi = self._processOneItem(
  File "/usr/local/lib/python3.8/site-packages/zuul/manager/__init__.py", line 1356, in _processOneItem
    self.reportItem(item)
  File "/usr/local/lib/python3.8/site-packages/zuul/manager/__init__.py", line 1612, in reportItem
    merged = source.isMerged(item.change, item.change.branch)
  File "/usr/local/lib/python3.8/site-packages/zuul/driver/gerrit/gerritsource.py", line 47, in isMerged
    return self.connection.isMerged(change, head)
  File "/usr/local/lib/python3.8/site-packages/zuul/driver/gerrit/gerritconnection.py", line 1013, in isMerged
    self._change_cache.updateChangeWithRetry(key, change, _update_change)
  File "/usr/local/lib/python3.8/site-packages/zuul/zk/change_cache.py", line 330, in updateChangeWithRetry
    self.set(key, change, version)
  File "/usr/local/lib/python3.8/site-packages/zuul/zk/change_cache.py", line 302, in set
    if self._change_cache[key._hash] is not change:
KeyError: 'ef075359268c2f3ee7d52ccbcb6ac51a3a5922c709e634fdb2efcf97c57095b2'

The process may continue:

6) Event from source triggers a data refresh of same change.
7) Refresh succeeds and cache is popuplated with new change object.

Then the pipeline will fail with this error:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/zuul/scheduler.py", line 1615, in _process_pipeline
    while not self._stopped and pipeline.manager.processQueue():
  File "/usr/local/lib/python3.8/site-packages/zuul/manager/__init__.py", line 1418, in processQueue
    item_changed, nnfi = self._processOneItem(
  File "/usr/local/lib/python3.8/site-packages/zuul/manager/__init__.py", line 1356, in _processOneItem
    self.reportItem(item)
  File "/usr/local/lib/python3.8/site-packages/zuul/manager/__init__.py", line 1612, in reportItem
    merged = source.isMerged(item.change, item.change.branch)
  File "/usr/local/lib/python3.8/site-packages/zuul/driver/gerrit/gerritsource.py", line 47, in isMerged
    return self.connection.isMerged(change, head)
  File "/usr/local/lib/python3.8/site-packages/zuul/driver/gerrit/gerritconnection.py", line 1013, in isMerged
    self._change_cache.updateChangeWithRetry(key, change, _update_change)
  File "/usr/local/lib/python3.8/site-packages/zuul/zk/change_cache.py", line 330, in updateChangeWithRetry
    self.set(key, change, version)
  File "/usr/local/lib/python3.8/site-packages/zuul/zk/change_cache.py", line 303, in set
    raise RuntimeError(
RuntimeError: Conflicting change objects (existing <Change 0x7f1405c188e0 starlingx/nfv 810014,2> vs. new <Change 0x7f148446c370 starlingx/nfv 810014,2> for key '{"connection_name": "gerrit", "project_name": null, "change_type": "GerritChange", "stable_id": "810014", "revision": "2"}'

To avoid this, we should never remove a change from the cache unless it is
completely unused (that is, we should only remove changes from the cache via
the prune method).  Even if it means that the change is out of date, it is
still the best information that we have, and a future event may succeed and
eventually update the change.

This removes the exception handling which deleted the change from all drivers.

Change-Id: Idbecdf717b517cce5c25975a40d9f42d57a26c9e
This commit is contained in:
James E. Blair 2021-09-28 10:09:29 -07:00
parent 82812c1bf7
commit 29d0534696
4 changed files with 47 additions and 64 deletions

View File

@ -775,11 +775,7 @@ class GerritConnection(ZKChangeCacheMixin, BaseConnection):
change = GerritChange(None)
change.number = number
change.patchset = patchset
try:
return self._updateChange(key, change, event, history)
except Exception:
self._change_cache.delete(key)
raise
return self._updateChange(key, change, event, history)
def _getTag(self, event):
tag = event.ref[len('refs/tags/'):]

View File

@ -1308,41 +1308,36 @@ class GithubConnection(ZKChangeCacheMixin, CachedBranchConnection):
change.number = number
change.patchset = patchset
try:
# This can be called multi-threaded during github event
# preprocessing. In order to avoid data races perform locking
# by cached key. Try to acquire the lock non-blocking at first.
# If the lock is already taken we're currently updating the very
# same chnange right now and would likely get the same data again.
lock = self._change_update_lock.setdefault(key, threading.Lock())
if lock.acquire(blocking=False):
try:
pull = self.getPull(change.project.name, change.number,
event=event)
# This can be called multi-threaded during github event
# preprocessing. In order to avoid data races perform locking
# by cached key. Try to acquire the lock non-blocking at first.
# If the lock is already taken we're currently updating the very
# same chnange right now and would likely get the same data again.
lock = self._change_update_lock.setdefault(key, threading.Lock())
if lock.acquire(blocking=False):
try:
pull = self.getPull(change.project.name, change.number,
event=event)
def _update_change(c):
self._updateChange(c, event, pull)
def _update_change(c):
self._updateChange(c, event, pull)
change = self._change_cache.updateChangeWithRetry(
key, change, _update_change)
finally:
# We need to remove the lock here again so we don't leak
# them.
del self._change_update_lock[key]
lock.release()
else:
# We didn't get the lock so we don't need to update the same
# change again, but to be correct we should at least wait until
# the other thread is done updating the change.
log = get_annotated_logger(self.log, event)
log.debug("Change %s is currently being updated, "
"waiting for it to finish", change)
with lock:
log.debug('Finished updating change %s', change)
except Exception:
self.log.warning("Deleting cache key %s due to exception", key)
self._change_cache.delete(key)
raise
change = self._change_cache.updateChangeWithRetry(
key, change, _update_change)
finally:
# We need to remove the lock here again so we don't leak
# them.
del self._change_update_lock[key]
lock.release()
else:
# We didn't get the lock so we don't need to update the same
# change again, but to be correct we should at least wait until
# the other thread is done updating the change.
log = get_annotated_logger(self.log, event)
log.debug("Change %s is currently being updated, "
"waiting for it to finish", change)
with lock:
log.debug('Finished updating change %s', change)
return change
def _getTag(self, project, event):

View File

@ -539,21 +539,17 @@ class GitlabConnection(ZKChangeCacheMixin, CachedBranchConnection):
change.patchset = patch_number
change.url = url or self.getMRUrl(project.name, number)
change.uris = [change.url.split('://', 1)[-1]] # remove scheme
try:
log.debug("Getting change mr#%s from project %s" % (
number, project.name))
log.info("Updating change from Gitlab %s" % change)
mr = self.getMR(change.project.name, change.number, event=event)
def _update_change(c):
self._updateChange(c, event, mr)
log.debug("Getting change mr#%s from project %s" % (
number, project.name))
log.info("Updating change from Gitlab %s" % change)
mr = self.getMR(change.project.name, change.number, event=event)
change = self._change_cache.updateChangeWithRetry(key, change,
_update_change)
except Exception:
self.log.warning("Deleting cache key %s due to exception", key)
self._change_cache.delete(key)
raise
def _update_change(c):
self._updateChange(c, event, mr)
change = self._change_cache.updateChangeWithRetry(key, change,
_update_change)
return change
def _updateChange(self, change, event, mr):

View File

@ -617,21 +617,17 @@ class PagureConnection(ZKChangeCacheMixin, BaseConnection):
change.uris = [
'%s/%s/pull/%s' % (self.baseurl, project, number),
]
try:
self.log.debug("Getting change pr#%s from project %s" % (
number, project.name))
self.log.info("Updating change from pagure %s" % change)
pull = self.getPull(change.project.name, change.number)
def _update_change(c):
self._updateChange(c, event, pull)
self.log.debug("Getting change pr#%s from project %s" % (
number, project.name))
self.log.info("Updating change from pagure %s" % change)
pull = self.getPull(change.project.name, change.number)
change = self._change_cache.updateChangeWithRetry(key, change,
_update_change)
except Exception:
self.log.warning("Deleting cache key %s due to exception", key)
self._change_cache.delete(key)
raise
def _update_change(c):
self._updateChange(c, event, pull)
change = self._change_cache.updateChangeWithRetry(key, change,
_update_change)
return change
def _getNonPRRef(self, project, event):