diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index c948997826..c1f61e5fad 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -25,11 +25,9 @@ from zuul.lib.tarjan import strongly_connected_components import zuul.lib.tracing as tracing from zuul.model import ( Change, DequeueEvent, PipelineState, PipelineChangeList, QueueItem, - PipelinePostConfigEvent, ) from zuul.zk.change_cache import ChangeKey from zuul.zk.components import COMPONENT_REGISTRY -from zuul.zk.locks import pipeline_lock from opentelemetry import trace @@ -96,41 +94,19 @@ class PipelineManager(metaclass=ABCMeta): def _postConfig(self): layout = self.pipeline.tenant.layout - # If our layout UUID already matches the UUID in ZK, we don't - # need to make any changes in ZK. But we do still need to - # update our local object pointers. Note that our local queue - # state may still be out of date after this because we skip - # the refresh. self.buildChangeQueues(layout) with self.sched.createZKContext(None, self.log) as ctx,\ self.currentContext(ctx): - if layout.uuid == PipelineState.peekLayoutUUID(self.pipeline): - self.pipeline.state = PipelineState() - self.pipeline.state._set(pipeline=self.pipeline) - self.pipeline.change_list = PipelineChangeList() - self.pipeline.change_list._set(pipeline=self.pipeline) - return - - with pipeline_lock( - self.sched.zk_client, self.pipeline.tenant.name, - self.pipeline.name) as lock,\ - self.sched.createZKContext(lock, self.log) as ctx,\ - self.currentContext(ctx): - # Since the layout UUID is new, this will move queues - # to "old_queues". Note that it will *not* refresh - # the contents, in fact, we will get a new - # PipelineState python object with no queues, just as - # above. Our state is guaranteed to be out of date - # now, but we don't need to do anything with it, we - # will let the next actor to use it refresh it then. - self.pipeline.state = PipelineState.resetOrCreate( + # Make sure we have state and change list objects, and + # ensure that they exist in ZK. We don't hold the + # pipeline lock, but if they don't exist, that means they + # are new, so no one else will either. These will not + # automatically refresh now, so they will be out of date + # until they are refreshed later. + self.pipeline.state = PipelineState.create( self.pipeline, layout.uuid) self.pipeline.change_list = PipelineChangeList.create( self.pipeline) - event = PipelinePostConfigEvent() - self.sched.pipeline_management_events[ - self.pipeline.tenant.name][self.pipeline.name].put( - event, needs_result=False) def buildChangeQueues(self, layout): self.log.debug("Building relative_priority queues") diff --git a/zuul/model.py b/zuul/model.py index 3658a02897..0daa0d434c 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -20,7 +20,6 @@ import json import hashlib import logging import os -import zlib from functools import total_ordering import re2 @@ -613,6 +612,7 @@ class PipelineState(zkobject.ZKObject): layout_uuid=None, # Local pipeline reference (not persisted in Zookeeper) pipeline=None, + _read_only=False, ) @classmethod @@ -626,62 +626,17 @@ class PipelineState(zkobject.ZKObject): return obj @classmethod - def peekLayoutUUID(cls, pipeline): + def create(cls, pipeline, layout_uuid): + # If the object does not exist in ZK, create it with the + # default attributes and the supplied layout UUID. Otherwise, + # return an initialized object without loading any data so + # that data can be loaded on the next refresh. ctx = pipeline.manager.current_context - try: - path = cls.pipelinePath(pipeline) - compressed_data, zstat = ctx.client.get(path) - try: - raw = zlib.decompress(compressed_data) - except zlib.error: - # Fallback for old, uncompressed data - raw = compressed_data - data = json.loads(raw.decode("utf8")) - return data["layout_uuid"] - except NoNodeError: - return None - - @classmethod - def resetOrCreate(cls, pipeline, layout_uuid): - # If there is an object in ZK, all the queues will be moved to - # old_queues. Then this method will will create a new python - # object with no attached queues regardless of the contents of - # ZK (so refresh() will need to be called before use). - ctx = pipeline.manager.current_context - try: - state = cls() - state._set(pipeline=pipeline) - state._reset(ctx, layout_uuid) + state = cls() + state._set(pipeline=pipeline) + if state.exists(ctx): return state - except NoNodeError: - return cls.new(ctx, pipeline=pipeline, layout_uuid=layout_uuid) - - def _reset(self, context, layout_uuid): - # Deserialize will recursively load/refresh children, but we - # want to avoid that here, so we just load the raw data and - # manipulate the top level attributes. - raw = self._load(context, deserialize=False) - state = json.loads(raw.decode("utf8")) - - if state["layout_uuid"] == layout_uuid: - return - - # Note this differs from normal serialization in that we are - # dealing with queues only as string path references rather - # than objects. - reset_state = dict( - state=Pipeline.STATE_NORMAL, - queues=[], - old_queues=state["old_queues"] + state["queues"], - consecutive_failures=0, - disabled=False, - layout_uuid=layout_uuid, - ) - - raw = json.dumps(reset_state, sort_keys=True).encode("utf8") - # Since we only save the object when the layout UUID changes, we can - # skip the hash check that we have in other places. - self._save(context, raw) + return cls.new(ctx, pipeline=pipeline, layout_uuid=layout_uuid) def getPath(self): if hasattr(self, '_path'): @@ -725,6 +680,8 @@ class PipelineState(zkobject.ZKObject): self.old_queues.remove(queue) def serialize(self, context): + if self._read_only: + raise RuntimeError("Attempt to serialize read-only pipeline state") data = { "state": self.state, "consecutive_failures": self.consecutive_failures, @@ -735,6 +692,15 @@ class PipelineState(zkobject.ZKObject): } return json.dumps(data, sort_keys=True).encode("utf8") + def refresh(self, context, read_only=False): + # Set read_only to True to indicate that we should avoid + # "resetting" the pipeline state if the layout has changed. + # This is so that we can refresh the object in circumstances + # where we haven't verified that our local layout matches + # what's in ZK. + self._set(_read_only=read_only) + return super().refresh(context) + def deserialize(self, raw, context): # We may have old change objects in the pipeline cache, so # make sure they are the same objects we would get from the @@ -742,6 +708,23 @@ class PipelineState(zkobject.ZKObject): self.pipeline.manager.clearCache() data = super().deserialize(raw, context) + + if not self._read_only: + # Skip this check if we're in a context where we want to + # read the state without updating it (in case we're not + # certain that the layout is up to date). + if data['layout_uuid'] != self.pipeline.tenant.layout.uuid: + # The tenant layout has updated since our last state; we + # need to reset the state. + data = dict( + state=Pipeline.STATE_NORMAL, + queues=[], + old_queues=data["old_queues"] + data["queues"], + consecutive_failures=0, + disabled=False, + layout_uuid=self.pipeline.tenant.layout.uuid, + ) + existing_queues = { q.getPath(): q for q in self.queues + self.old_queues } @@ -914,13 +897,16 @@ class PipelineChangeList(zkobject.ShardedZKObject): @classmethod def create(cls, pipeline): + # If the object does not exist in ZK, create it with the + # default attributes. Otherwise, return an initialized object + # without loading any data so that data can be loaded on the + # next refresh. ctx = pipeline.manager.current_context - try: - change_list = cls.fromZK(ctx, cls.getChangeListPath(pipeline), - pipeline=pipeline) + change_list = cls() + change_list._set(pipeline=pipeline) + if change_list.exists(ctx): return change_list - except NoNodeError: - return cls.new(ctx, pipeline=pipeline) + return cls.new(ctx, pipeline=pipeline) def serialize(self, context): data = { diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 83d74fb437..a748bf4945 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -685,7 +685,7 @@ class Scheduler(threading.Thread): with self.createZKContext(lock, self.log) as ctx: with pipeline.manager.currentContext(ctx): pipeline.change_list.refresh(ctx) - pipeline.state.refresh(ctx) + pipeline.state.refresh(ctx, read_only=True) # In case we're in the middle of a reconfig, # include the old queue items. for item in pipeline.getAllItems(include_old=True): @@ -767,7 +767,7 @@ class Scheduler(threading.Thread): self.zk_client, tenant.name, pipeline.name) as lock,\ self.createZKContext(lock, self.log) as ctx: - pipeline.state.refresh(ctx) + pipeline.state.refresh(ctx, read_only=True) # add any blobstore references for item in pipeline.getAllItems(include_old=True): live_blobs.update(item.getBlobKeys()) @@ -1701,6 +1701,11 @@ class Scheduler(threading.Thread): trigger.postConfig(pipeline) for reporter in pipeline.actions: reporter.postConfig() + # Emit an event to trigger a pipeline run after the + # reconfiguration. + event = PipelinePostConfigEvent() + self.pipeline_management_events[tenant.name][pipeline.name].put( + event, needs_result=False) # Assemble a new list of min. ltimes of the project branch caches. branch_cache_min_ltimes = { diff --git a/zuul/zk/zkobject.py b/zuul/zk/zkobject.py index ba190f759a..73adf59549 100644 --- a/zuul/zk/zkobject.py +++ b/zuul/zk/zkobject.py @@ -237,6 +237,11 @@ class ZKObject: """Update data from ZK""" self._load(context) + def exists(self, context): + """Return whether the object exists in ZK""" + path = self.getPath() + return bool(context.client.exists(path)) + def _trySerialize(self, context): if isinstance(context, LocalZKContext): return b''