Handle more than 1024 changes in the pipeline change list
We originally wrote the change list to be a best-effort service for the scheduler check for whether a change is in a pipeline (which must be fast and can't lock each of the pipelines to read in the full state). To make it even simpler, we avoided sharding and instead limited it to only the first 1024 changes. But scope creep happened, and it now also serves to provide the list of relevant changes to the change cache. If we have a pipeline with 1025 changes and delete one of them from the cache, that tenant will break, so this needs to be corrected. This change uses sharding to correct it. Since it's possible to attempt to read a sharded object mid-write, we retry reads in the case of exceptions until they succeed. In most cases this should still only be a single znode, but we do truncate sharded znodes, so there is a chance even in the case of a small number of changes of reading incorrect data. To resolve this for all cases, we retry reading until it succeeds. The scheduler no longer reads the state at the start of pipeline processing (it never needed to anyway), so if the data become corrupt, a scheduler will eventually be able to correct it. In other words, the main pipeline processing path only writes this, and the other paths only read it. (An alternative solution would be to leave this as it was and instead load the full pipeline state for maintaining the change cache; that runs infrequently enough that we can accept the cost. This method is chosen since it also makes other uses of this object more correct.) Change-Id: I132d67149c065df7343cbd3aea69988f547498f4
This commit is contained in:
parent
da9df89591
commit
61eea2169b
|
@ -1492,10 +1492,7 @@ class PipelineManager(metaclass=ABCMeta):
|
|||
if item_changed:
|
||||
queue_changed = True
|
||||
self.reportStats(item)
|
||||
if len(change_keys) < 1024:
|
||||
# Only keep 1024 of these so we don't have to deal
|
||||
# with sharding.
|
||||
change_keys.add(item.change.cache_stat.key)
|
||||
change_keys.add(item.change.cache_stat.key)
|
||||
if queue_changed:
|
||||
changed = True
|
||||
status = ''
|
||||
|
|
|
@ -30,7 +30,7 @@ import textwrap
|
|||
import types
|
||||
import itertools
|
||||
|
||||
from kazoo.exceptions import NoNodeError
|
||||
from kazoo.exceptions import NoNodeError, ZookeeperError
|
||||
from cachetools.func import lru_cache
|
||||
|
||||
from zuul.lib import yamlutil as yaml
|
||||
|
@ -748,13 +748,19 @@ class PipelineState(zkobject.ZKObject):
|
|||
recursive=True)
|
||||
|
||||
|
||||
class PipelineChangeList(zkobject.ZKObject):
|
||||
class PipelineChangeList(zkobject.ShardedZKObject):
|
||||
"""A list of change references within a pipeline
|
||||
|
||||
This is used by the scheduler to quickly decide if events which
|
||||
otherwise don't match the pipeline triggers should be
|
||||
nevertheless forwarded to the pipeline.
|
||||
|
||||
It is also used to maintain the connection cache.
|
||||
"""
|
||||
# We can read from this object without locking, and since it's
|
||||
# sharded, that may produce an error. If that happens, don't
|
||||
# delete the object, just retry.
|
||||
delete_on_error = False
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
@ -762,6 +768,21 @@ class PipelineChangeList(zkobject.ZKObject):
|
|||
changes=[],
|
||||
)
|
||||
|
||||
def refresh(self, context):
|
||||
# See comment above about reading without a lock.
|
||||
while context.sessionIsValid():
|
||||
try:
|
||||
super().refresh(context)
|
||||
return
|
||||
except ZookeeperError:
|
||||
# These errors come from the server and are not
|
||||
# retryable. Connection errors are KazooExceptions so
|
||||
# they aren't caught here and we will retry.
|
||||
raise
|
||||
except Exception:
|
||||
context.log.error("Failed to refresh change list")
|
||||
time.sleep(self._retry_interval)
|
||||
|
||||
def getPath(self):
|
||||
return self.getChangeListPath(self.pipeline)
|
||||
|
||||
|
@ -789,7 +810,10 @@ class PipelineChangeList(zkobject.ZKObject):
|
|||
def deserialize(self, data, context):
|
||||
data = super().deserialize(data, context)
|
||||
change_keys = []
|
||||
for ref in data.get('changes', []):
|
||||
# We must have a dictionary with a 'changes' key; otherwise we
|
||||
# may be reading immediately after truncating. Allow the
|
||||
# KeyError exception to propogate in that case.
|
||||
for ref in data['changes']:
|
||||
change_keys.append(ChangeKey.fromReference(ref))
|
||||
data['_change_keys'] = change_keys
|
||||
return data
|
||||
|
|
|
@ -1705,7 +1705,6 @@ class Scheduler(threading.Thread):
|
|||
) as lock:
|
||||
ctx = self.createZKContext(lock, self.log)
|
||||
with pipeline.manager.currentContext(ctx):
|
||||
pipeline.change_list.refresh(ctx)
|
||||
pipeline.state.refresh(ctx)
|
||||
if pipeline.state.old_queues:
|
||||
self._reenqueuePipeline(tenant, pipeline, ctx)
|
||||
|
|
Loading…
Reference in New Issue