From 61eea2169b677d54e733fb8ae11301f4d0b2c766 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Wed, 1 Dec 2021 15:29:41 -0800 Subject: [PATCH] Handle more than 1024 changes in the pipeline change list We originally wrote the change list to be a best-effort service for the scheduler check for whether a change is in a pipeline (which must be fast and can't lock each of the pipelines to read in the full state). To make it even simpler, we avoided sharding and instead limited it to only the first 1024 changes. But scope creep happened, and it now also serves to provide the list of relevant changes to the change cache. If we have a pipeline with 1025 changes and delete one of them from the cache, that tenant will break, so this needs to be corrected. This change uses sharding to correct it. Since it's possible to attempt to read a sharded object mid-write, we retry reads in the case of exceptions until they succeed. In most cases this should still only be a single znode, but we do truncate sharded znodes, so there is a chance even in the case of a small number of changes of reading incorrect data. To resolve this for all cases, we retry reading until it succeeds. The scheduler no longer reads the state at the start of pipeline processing (it never needed to anyway), so if the data become corrupt, a scheduler will eventually be able to correct it. In other words, the main pipeline processing path only writes this, and the other paths only read it. (An alternative solution would be to leave this as it was and instead load the full pipeline state for maintaining the change cache; that runs infrequently enough that we can accept the cost. This method is chosen since it also makes other uses of this object more correct.) Change-Id: I132d67149c065df7343cbd3aea69988f547498f4 --- zuul/manager/__init__.py | 5 +---- zuul/model.py | 30 +++++++++++++++++++++++++++--- zuul/scheduler.py | 1 - 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 9a59289c2e..7a19342732 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -1492,10 +1492,7 @@ class PipelineManager(metaclass=ABCMeta): if item_changed: queue_changed = True self.reportStats(item) - if len(change_keys) < 1024: - # Only keep 1024 of these so we don't have to deal - # with sharding. - change_keys.add(item.change.cache_stat.key) + change_keys.add(item.change.cache_stat.key) if queue_changed: changed = True status = '' diff --git a/zuul/model.py b/zuul/model.py index b440e1cad1..d65a9f3e72 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -30,7 +30,7 @@ import textwrap import types import itertools -from kazoo.exceptions import NoNodeError +from kazoo.exceptions import NoNodeError, ZookeeperError from cachetools.func import lru_cache from zuul.lib import yamlutil as yaml @@ -748,13 +748,19 @@ class PipelineState(zkobject.ZKObject): recursive=True) -class PipelineChangeList(zkobject.ZKObject): +class PipelineChangeList(zkobject.ShardedZKObject): """A list of change references within a pipeline This is used by the scheduler to quickly decide if events which otherwise don't match the pipeline triggers should be nevertheless forwarded to the pipeline. + + It is also used to maintain the connection cache. """ + # We can read from this object without locking, and since it's + # sharded, that may produce an error. If that happens, don't + # delete the object, just retry. + delete_on_error = False def __init__(self): super().__init__() @@ -762,6 +768,21 @@ class PipelineChangeList(zkobject.ZKObject): changes=[], ) + def refresh(self, context): + # See comment above about reading without a lock. + while context.sessionIsValid(): + try: + super().refresh(context) + return + except ZookeeperError: + # These errors come from the server and are not + # retryable. Connection errors are KazooExceptions so + # they aren't caught here and we will retry. + raise + except Exception: + context.log.error("Failed to refresh change list") + time.sleep(self._retry_interval) + def getPath(self): return self.getChangeListPath(self.pipeline) @@ -789,7 +810,10 @@ class PipelineChangeList(zkobject.ZKObject): def deserialize(self, data, context): data = super().deserialize(data, context) change_keys = [] - for ref in data.get('changes', []): + # We must have a dictionary with a 'changes' key; otherwise we + # may be reading immediately after truncating. Allow the + # KeyError exception to propogate in that case. + for ref in data['changes']: change_keys.append(ChangeKey.fromReference(ref)) data['_change_keys'] = change_keys return data diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 169a33c3c1..4a97ae08d8 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -1705,7 +1705,6 @@ class Scheduler(threading.Thread): ) as lock: ctx = self.createZKContext(lock, self.log) with pipeline.manager.currentContext(ctx): - pipeline.change_list.refresh(ctx) pipeline.state.refresh(ctx) if pipeline.state.old_queues: self._reenqueuePipeline(tenant, pipeline, ctx)