From e25795a1202a240118baf70f1015d28b068fa338 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Mon, 28 Nov 2022 16:17:26 -0800 Subject: [PATCH] Avoid acquiring pipeline locks in manager postConfig After creating a new tenant layout, we call _postConfig on every pipeline manager. That creates the shared change queues and then also attaches new PipelineState and PipelineChangeList objects to the new Pipeline objects on the new layout. If the routine detects that it is the first scheduler to deal with this pipeline under the new layout UUID, it also resets the pipeline state in ZK (resets flags and moves all queues to the old_queues attribute so they will be re-enqueued). It also drops a PostConfigEvent into the pipeline queues to trigger a run of the pipeline after the re-enqueues. The work in the above paragraph means that we must hold the pipeline lock for each pipeline in turn during the reconfiguration. Most pipelines should not be processed at this point since we do hold the tenant write lock, however, some cleanup routines can be operating, and so we would end up waiting for them to complete before completing the reconfiguration. This could end up adding minutes to a reconfiguration. Incidentally, these cleanup routines are written with the expectation that they may be running during a reconfiguration and handle missing data from refreshes. We can avoid this by moving the "reset" work into the PipelineState deserialization method, where we can determine at the moment we refresh the object whether we need to "reset" it and do so. We can tell that a reset needs to happen if the layout uuid of the state object does not match the running layout of the tenant. We still need to attach new state and change list objects to the pipeline in _postConfig (since our pipeline object is new). We also should make sure that the objects exist in ZK before we leave that method, so that if a new pipeline is created, other schedulers will be able to load the (potentially still empty) objects from ZK. As an alternative, we could avoid even this work in _postConfig, but then we would have to handle missing objects on refresh, and it would not be possible to tell if the object was missing due to it being new or due to an error. To avoid masking errors, we keep the current expectation that we will create these objects in ZK on the initial reconfiguration. Finally, we do still want to run the pipeline processing after a reconfiguration (at least for now -- we may be approaching a point where that will no longer be necessary). So we move the emission of the PostConfigEvent into the scheduler in the cases where we know it has just updated the tenant layout. Change-Id: Ib1e467b5adb907f93bab0de61da84d2efc22e2a7 --- zuul/manager/__init__.py | 38 +++----------- zuul/model.py | 106 +++++++++++++++++---------------------- zuul/scheduler.py | 9 +++- zuul/zk/zkobject.py | 5 ++ 4 files changed, 65 insertions(+), 93 deletions(-) diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 538da2c104..0223ea4f29 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -25,11 +25,9 @@ from zuul.lib.tarjan import strongly_connected_components import zuul.lib.tracing as tracing from zuul.model import ( Change, DequeueEvent, PipelineState, PipelineChangeList, QueueItem, - PipelinePostConfigEvent, ) from zuul.zk.change_cache import ChangeKey from zuul.zk.components import COMPONENT_REGISTRY -from zuul.zk.locks import pipeline_lock from opentelemetry import trace @@ -96,41 +94,19 @@ class PipelineManager(metaclass=ABCMeta): def _postConfig(self): layout = self.pipeline.tenant.layout - # If our layout UUID already matches the UUID in ZK, we don't - # need to make any changes in ZK. But we do still need to - # update our local object pointers. Note that our local queue - # state may still be out of date after this because we skip - # the refresh. self.buildChangeQueues(layout) with self.sched.createZKContext(None, self.log) as ctx,\ self.currentContext(ctx): - if layout.uuid == PipelineState.peekLayoutUUID(self.pipeline): - self.pipeline.state = PipelineState() - self.pipeline.state._set(pipeline=self.pipeline) - self.pipeline.change_list = PipelineChangeList() - self.pipeline.change_list._set(pipeline=self.pipeline) - return - - with pipeline_lock( - self.sched.zk_client, self.pipeline.tenant.name, - self.pipeline.name) as lock,\ - self.sched.createZKContext(lock, self.log) as ctx,\ - self.currentContext(ctx): - # Since the layout UUID is new, this will move queues - # to "old_queues". Note that it will *not* refresh - # the contents, in fact, we will get a new - # PipelineState python object with no queues, just as - # above. Our state is guaranteed to be out of date - # now, but we don't need to do anything with it, we - # will let the next actor to use it refresh it then. - self.pipeline.state = PipelineState.resetOrCreate( + # Make sure we have state and change list objects, and + # ensure that they exist in ZK. We don't hold the + # pipeline lock, but if they don't exist, that means they + # are new, so no one else will either. These will not + # automatically refresh now, so they will be out of date + # until they are refreshed later. + self.pipeline.state = PipelineState.create( self.pipeline, layout.uuid) self.pipeline.change_list = PipelineChangeList.create( self.pipeline) - event = PipelinePostConfigEvent() - self.sched.pipeline_management_events[ - self.pipeline.tenant.name][self.pipeline.name].put( - event, needs_result=False) def buildChangeQueues(self, layout): self.log.debug("Building relative_priority queues") diff --git a/zuul/model.py b/zuul/model.py index c2da4f65cf..102919439a 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -20,7 +20,6 @@ import json import hashlib import logging import os -import zlib from functools import total_ordering import re2 @@ -613,6 +612,7 @@ class PipelineState(zkobject.ZKObject): layout_uuid=None, # Local pipeline reference (not persisted in Zookeeper) pipeline=None, + _read_only=False, ) @classmethod @@ -626,62 +626,17 @@ class PipelineState(zkobject.ZKObject): return obj @classmethod - def peekLayoutUUID(cls, pipeline): + def create(cls, pipeline, layout_uuid): + # If the object does not exist in ZK, create it with the + # default attributes and the supplied layout UUID. Otherwise, + # return an initialized object without loading any data so + # that data can be loaded on the next refresh. ctx = pipeline.manager.current_context - try: - path = cls.pipelinePath(pipeline) - compressed_data, zstat = ctx.client.get(path) - try: - raw = zlib.decompress(compressed_data) - except zlib.error: - # Fallback for old, uncompressed data - raw = compressed_data - data = json.loads(raw.decode("utf8")) - return data["layout_uuid"] - except NoNodeError: - return None - - @classmethod - def resetOrCreate(cls, pipeline, layout_uuid): - # If there is an object in ZK, all the queues will be moved to - # old_queues. Then this method will will create a new python - # object with no attached queues regardless of the contents of - # ZK (so refresh() will need to be called before use). - ctx = pipeline.manager.current_context - try: - state = cls() - state._set(pipeline=pipeline) - state._reset(ctx, layout_uuid) + state = cls() + state._set(pipeline=pipeline) + if state.exists(ctx): return state - except NoNodeError: - return cls.new(ctx, pipeline=pipeline, layout_uuid=layout_uuid) - - def _reset(self, context, layout_uuid): - # Deserialize will recursively load/refresh children, but we - # want to avoid that here, so we just load the raw data and - # manipulate the top level attributes. - raw = self._load(context, deserialize=False) - state = json.loads(raw.decode("utf8")) - - if state["layout_uuid"] == layout_uuid: - return - - # Note this differs from normal serialization in that we are - # dealing with queues only as string path references rather - # than objects. - reset_state = dict( - state=Pipeline.STATE_NORMAL, - queues=[], - old_queues=state["old_queues"] + state["queues"], - consecutive_failures=0, - disabled=False, - layout_uuid=layout_uuid, - ) - - raw = json.dumps(reset_state, sort_keys=True).encode("utf8") - # Since we only save the object when the layout UUID changes, we can - # skip the hash check that we have in other places. - self._save(context, raw) + return cls.new(ctx, pipeline=pipeline, layout_uuid=layout_uuid) def getPath(self): if hasattr(self, '_path'): @@ -725,6 +680,8 @@ class PipelineState(zkobject.ZKObject): self.old_queues.remove(queue) def serialize(self, context): + if self._read_only: + raise RuntimeError("Attempt to serialize read-only pipeline state") data = { "state": self.state, "consecutive_failures": self.consecutive_failures, @@ -735,6 +692,15 @@ class PipelineState(zkobject.ZKObject): } return json.dumps(data, sort_keys=True).encode("utf8") + def refresh(self, context, read_only=False): + # Set read_only to True to indicate that we should avoid + # "resetting" the pipeline state if the layout has changed. + # This is so that we can refresh the object in circumstances + # where we haven't verified that our local layout matches + # what's in ZK. + self._set(_read_only=read_only) + return super().refresh(context) + def deserialize(self, raw, context): # We may have old change objects in the pipeline cache, so # make sure they are the same objects we would get from the @@ -742,6 +708,23 @@ class PipelineState(zkobject.ZKObject): self.pipeline.manager.clearCache() data = super().deserialize(raw, context) + + if not self._read_only: + # Skip this check if we're in a context where we want to + # read the state without updating it (in case we're not + # certain that the layout is up to date). + if data['layout_uuid'] != self.pipeline.tenant.layout.uuid: + # The tenant layout has updated since our last state; we + # need to reset the state. + data = dict( + state=Pipeline.STATE_NORMAL, + queues=[], + old_queues=data["old_queues"] + data["queues"], + consecutive_failures=0, + disabled=False, + layout_uuid=self.pipeline.tenant.layout.uuid, + ) + existing_queues = { q.getPath(): q for q in self.queues + self.old_queues } @@ -914,13 +897,16 @@ class PipelineChangeList(zkobject.ShardedZKObject): @classmethod def create(cls, pipeline): + # If the object does not exist in ZK, create it with the + # default attributes. Otherwise, return an initialized object + # without loading any data so that data can be loaded on the + # next refresh. ctx = pipeline.manager.current_context - try: - change_list = cls.fromZK(ctx, cls.getChangeListPath(pipeline), - pipeline=pipeline) + change_list = cls() + change_list._set(pipeline=pipeline) + if change_list.exists(ctx): return change_list - except NoNodeError: - return cls.new(ctx, pipeline=pipeline) + return cls.new(ctx, pipeline=pipeline) def serialize(self, context): data = { diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 42f8afbb0d..a16bdbc20d 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -684,7 +684,7 @@ class Scheduler(threading.Thread): with self.createZKContext(lock, self.log) as ctx: with pipeline.manager.currentContext(ctx): pipeline.change_list.refresh(ctx) - pipeline.state.refresh(ctx) + pipeline.state.refresh(ctx, read_only=True) # In case we're in the middle of a reconfig, # include the old queue items. for item in pipeline.getAllItems(include_old=True): @@ -766,7 +766,7 @@ class Scheduler(threading.Thread): self.zk_client, tenant.name, pipeline.name) as lock,\ self.createZKContext(lock, self.log) as ctx: - pipeline.state.refresh(ctx) + pipeline.state.refresh(ctx, read_only=True) # add any blobstore references for item in pipeline.getAllItems(include_old=True): live_blobs.update(item.getBlobKeys()) @@ -1700,6 +1700,11 @@ class Scheduler(threading.Thread): trigger.postConfig(pipeline) for reporter in pipeline.actions: reporter.postConfig() + # Emit an event to trigger a pipeline run after the + # reconfiguration. + event = PipelinePostConfigEvent() + self.pipeline_management_events[tenant.name][pipeline.name].put( + event, needs_result=False) # Assemble a new list of min. ltimes of the project branch caches. branch_cache_min_ltimes = { diff --git a/zuul/zk/zkobject.py b/zuul/zk/zkobject.py index ba190f759a..73adf59549 100644 --- a/zuul/zk/zkobject.py +++ b/zuul/zk/zkobject.py @@ -237,6 +237,11 @@ class ZKObject: """Update data from ZK""" self._load(context) + def exists(self, context): + """Return whether the object exists in ZK""" + path = self.getPath() + return bool(context.client.exists(path)) + def _trySerialize(self, context): if isinstance(context, LocalZKContext): return b''