diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 9a59289c2e..7a19342732 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -1492,10 +1492,7 @@ class PipelineManager(metaclass=ABCMeta): if item_changed: queue_changed = True self.reportStats(item) - if len(change_keys) < 1024: - # Only keep 1024 of these so we don't have to deal - # with sharding. - change_keys.add(item.change.cache_stat.key) + change_keys.add(item.change.cache_stat.key) if queue_changed: changed = True status = '' diff --git a/zuul/model.py b/zuul/model.py index b440e1cad1..d65a9f3e72 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -30,7 +30,7 @@ import textwrap import types import itertools -from kazoo.exceptions import NoNodeError +from kazoo.exceptions import NoNodeError, ZookeeperError from cachetools.func import lru_cache from zuul.lib import yamlutil as yaml @@ -748,13 +748,19 @@ class PipelineState(zkobject.ZKObject): recursive=True) -class PipelineChangeList(zkobject.ZKObject): +class PipelineChangeList(zkobject.ShardedZKObject): """A list of change references within a pipeline This is used by the scheduler to quickly decide if events which otherwise don't match the pipeline triggers should be nevertheless forwarded to the pipeline. + + It is also used to maintain the connection cache. """ + # We can read from this object without locking, and since it's + # sharded, that may produce an error. If that happens, don't + # delete the object, just retry. + delete_on_error = False def __init__(self): super().__init__() @@ -762,6 +768,21 @@ class PipelineChangeList(zkobject.ZKObject): changes=[], ) + def refresh(self, context): + # See comment above about reading without a lock. + while context.sessionIsValid(): + try: + super().refresh(context) + return + except ZookeeperError: + # These errors come from the server and are not + # retryable. Connection errors are KazooExceptions so + # they aren't caught here and we will retry. + raise + except Exception: + context.log.error("Failed to refresh change list") + time.sleep(self._retry_interval) + def getPath(self): return self.getChangeListPath(self.pipeline) @@ -789,7 +810,10 @@ class PipelineChangeList(zkobject.ZKObject): def deserialize(self, data, context): data = super().deserialize(data, context) change_keys = [] - for ref in data.get('changes', []): + # We must have a dictionary with a 'changes' key; otherwise we + # may be reading immediately after truncating. Allow the + # KeyError exception to propogate in that case. + for ref in data['changes']: change_keys.append(ChangeKey.fromReference(ref)) data['_change_keys'] = change_keys return data diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 169a33c3c1..4a97ae08d8 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -1705,7 +1705,6 @@ class Scheduler(threading.Thread): ) as lock: ctx = self.createZKContext(lock, self.log) with pipeline.manager.currentContext(ctx): - pipeline.change_list.refresh(ctx) pipeline.state.refresh(ctx) if pipeline.state.old_queues: self._reenqueuePipeline(tenant, pipeline, ctx)