Merge "Handle more than 1024 changes in the pipeline change list"
This commit is contained in:
commit
599a228a40
|
@ -1492,10 +1492,7 @@ class PipelineManager(metaclass=ABCMeta):
|
|||
if item_changed:
|
||||
queue_changed = True
|
||||
self.reportStats(item)
|
||||
if len(change_keys) < 1024:
|
||||
# Only keep 1024 of these so we don't have to deal
|
||||
# with sharding.
|
||||
change_keys.add(item.change.cache_stat.key)
|
||||
change_keys.add(item.change.cache_stat.key)
|
||||
if queue_changed:
|
||||
changed = True
|
||||
status = ''
|
||||
|
|
|
@ -30,7 +30,7 @@ import textwrap
|
|||
import types
|
||||
import itertools
|
||||
|
||||
from kazoo.exceptions import NoNodeError
|
||||
from kazoo.exceptions import NoNodeError, ZookeeperError
|
||||
from cachetools.func import lru_cache
|
||||
|
||||
from zuul.lib import yamlutil as yaml
|
||||
|
@ -748,13 +748,19 @@ class PipelineState(zkobject.ZKObject):
|
|||
recursive=True)
|
||||
|
||||
|
||||
class PipelineChangeList(zkobject.ZKObject):
|
||||
class PipelineChangeList(zkobject.ShardedZKObject):
|
||||
"""A list of change references within a pipeline
|
||||
|
||||
This is used by the scheduler to quickly decide if events which
|
||||
otherwise don't match the pipeline triggers should be
|
||||
nevertheless forwarded to the pipeline.
|
||||
|
||||
It is also used to maintain the connection cache.
|
||||
"""
|
||||
# We can read from this object without locking, and since it's
|
||||
# sharded, that may produce an error. If that happens, don't
|
||||
# delete the object, just retry.
|
||||
delete_on_error = False
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
@ -762,6 +768,21 @@ class PipelineChangeList(zkobject.ZKObject):
|
|||
changes=[],
|
||||
)
|
||||
|
||||
def refresh(self, context):
|
||||
# See comment above about reading without a lock.
|
||||
while context.sessionIsValid():
|
||||
try:
|
||||
super().refresh(context)
|
||||
return
|
||||
except ZookeeperError:
|
||||
# These errors come from the server and are not
|
||||
# retryable. Connection errors are KazooExceptions so
|
||||
# they aren't caught here and we will retry.
|
||||
raise
|
||||
except Exception:
|
||||
context.log.error("Failed to refresh change list")
|
||||
time.sleep(self._retry_interval)
|
||||
|
||||
def getPath(self):
|
||||
return self.getChangeListPath(self.pipeline)
|
||||
|
||||
|
@ -789,7 +810,10 @@ class PipelineChangeList(zkobject.ZKObject):
|
|||
def deserialize(self, data, context):
|
||||
data = super().deserialize(data, context)
|
||||
change_keys = []
|
||||
for ref in data.get('changes', []):
|
||||
# We must have a dictionary with a 'changes' key; otherwise we
|
||||
# may be reading immediately after truncating. Allow the
|
||||
# KeyError exception to propogate in that case.
|
||||
for ref in data['changes']:
|
||||
change_keys.append(ChangeKey.fromReference(ref))
|
||||
data['_change_keys'] = change_keys
|
||||
return data
|
||||
|
|
|
@ -1705,7 +1705,6 @@ class Scheduler(threading.Thread):
|
|||
) as lock:
|
||||
ctx = self.createZKContext(lock, self.log)
|
||||
with pipeline.manager.currentContext(ctx):
|
||||
pipeline.change_list.refresh(ctx)
|
||||
pipeline.state.refresh(ctx)
|
||||
if pipeline.state.old_queues:
|
||||
self._reenqueuePipeline(tenant, pipeline, ctx)
|
||||
|
|
Loading…
Reference in New Issue