Avoid acquiring pipeline locks in manager postConfig

After creating a new tenant layout, we call _postConfig on every
pipeline manager.  That creates the shared change queues and then
also attaches new PipelineState and PipelineChangeList objects
to the new Pipeline objects on the new layout.

If the routine detects that it is the first scheduler to deal with
this pipeline under the new layout UUID, it also resets the
pipeline state in ZK (resets flags and moves all queues to the
old_queues attribute so they will be re-enqueued).  It also drops
a PostConfigEvent into the pipeline queues to trigger a run of
the pipeline after the re-enqueues.

The work in the above paragraph means that we must hold the
pipeline lock for each pipeline in turn during the reconfiguration.
Most pipelines should not be processed at this point since we do
hold the tenant write lock, however, some cleanup routines can
be operating, and so we would end up waiting for them to complete
before completing the reconfiguration.  This could end up adding
minutes to a reconfiguration.  Incidentally, these cleanup routines
are written with the expectation that they may be running during
a reconfiguration and handle missing data from refreshes.

We can avoid this by moving the "reset" work into the PipelineState
deserialization method, where we can determine at the moment we
refresh the object whether we need to "reset" it and do so.  We
can tell that a reset needs to happen if the layout uuid of the
state object does not match the running layout of the tenant.

We still need to attach new state and change list objects to the
pipeline in _postConfig (since our pipeline object is new).  We
also should make sure that the objects exist in ZK before we leave
that method, so that if a new pipeline is created, other schedulers
will be able to load the (potentially still empty) objects from ZK.
As an alternative, we could avoid even this work in _postConfig, but
then we would have to handle missing objects on refresh, and it
would not be possible to tell if the object was missing due to it
being new or due to an error.  To avoid masking errors, we keep
the current expectation that we will create these objects in ZK
on the initial reconfiguration.

Finally, we do still want to run the pipeline processing after
a reconfiguration (at least for now -- we may be approaching a
point where that will no longer be necessary).  So we move the
emission of the PostConfigEvent into the scheduler in the cases
where we know it has just updated the tenant layout.

Change-Id: Ib1e467b5adb907f93bab0de61da84d2efc22e2a7
This commit is contained in:
James E. Blair 2022-11-28 16:17:26 -08:00
parent 7d91fb6010
commit e25795a120
4 changed files with 65 additions and 93 deletions

View File

@ -25,11 +25,9 @@ from zuul.lib.tarjan import strongly_connected_components
import zuul.lib.tracing as tracing
from zuul.model import (
Change, DequeueEvent, PipelineState, PipelineChangeList, QueueItem,
PipelinePostConfigEvent,
)
from zuul.zk.change_cache import ChangeKey
from zuul.zk.components import COMPONENT_REGISTRY
from zuul.zk.locks import pipeline_lock
from opentelemetry import trace
@ -96,41 +94,19 @@ class PipelineManager(metaclass=ABCMeta):
def _postConfig(self):
layout = self.pipeline.tenant.layout
# If our layout UUID already matches the UUID in ZK, we don't
# need to make any changes in ZK. But we do still need to
# update our local object pointers. Note that our local queue
# state may still be out of date after this because we skip
# the refresh.
self.buildChangeQueues(layout)
with self.sched.createZKContext(None, self.log) as ctx,\
self.currentContext(ctx):
if layout.uuid == PipelineState.peekLayoutUUID(self.pipeline):
self.pipeline.state = PipelineState()
self.pipeline.state._set(pipeline=self.pipeline)
self.pipeline.change_list = PipelineChangeList()
self.pipeline.change_list._set(pipeline=self.pipeline)
return
with pipeline_lock(
self.sched.zk_client, self.pipeline.tenant.name,
self.pipeline.name) as lock,\
self.sched.createZKContext(lock, self.log) as ctx,\
self.currentContext(ctx):
# Since the layout UUID is new, this will move queues
# to "old_queues". Note that it will *not* refresh
# the contents, in fact, we will get a new
# PipelineState python object with no queues, just as
# above. Our state is guaranteed to be out of date
# now, but we don't need to do anything with it, we
# will let the next actor to use it refresh it then.
self.pipeline.state = PipelineState.resetOrCreate(
# Make sure we have state and change list objects, and
# ensure that they exist in ZK. We don't hold the
# pipeline lock, but if they don't exist, that means they
# are new, so no one else will either. These will not
# automatically refresh now, so they will be out of date
# until they are refreshed later.
self.pipeline.state = PipelineState.create(
self.pipeline, layout.uuid)
self.pipeline.change_list = PipelineChangeList.create(
self.pipeline)
event = PipelinePostConfigEvent()
self.sched.pipeline_management_events[
self.pipeline.tenant.name][self.pipeline.name].put(
event, needs_result=False)
def buildChangeQueues(self, layout):
self.log.debug("Building relative_priority queues")

View File

@ -20,7 +20,6 @@ import json
import hashlib
import logging
import os
import zlib
from functools import total_ordering
import re2
@ -613,6 +612,7 @@ class PipelineState(zkobject.ZKObject):
layout_uuid=None,
# Local pipeline reference (not persisted in Zookeeper)
pipeline=None,
_read_only=False,
)
@classmethod
@ -626,62 +626,17 @@ class PipelineState(zkobject.ZKObject):
return obj
@classmethod
def peekLayoutUUID(cls, pipeline):
def create(cls, pipeline, layout_uuid):
# If the object does not exist in ZK, create it with the
# default attributes and the supplied layout UUID. Otherwise,
# return an initialized object without loading any data so
# that data can be loaded on the next refresh.
ctx = pipeline.manager.current_context
try:
path = cls.pipelinePath(pipeline)
compressed_data, zstat = ctx.client.get(path)
try:
raw = zlib.decompress(compressed_data)
except zlib.error:
# Fallback for old, uncompressed data
raw = compressed_data
data = json.loads(raw.decode("utf8"))
return data["layout_uuid"]
except NoNodeError:
return None
@classmethod
def resetOrCreate(cls, pipeline, layout_uuid):
# If there is an object in ZK, all the queues will be moved to
# old_queues. Then this method will will create a new python
# object with no attached queues regardless of the contents of
# ZK (so refresh() will need to be called before use).
ctx = pipeline.manager.current_context
try:
state = cls()
state._set(pipeline=pipeline)
state._reset(ctx, layout_uuid)
state = cls()
state._set(pipeline=pipeline)
if state.exists(ctx):
return state
except NoNodeError:
return cls.new(ctx, pipeline=pipeline, layout_uuid=layout_uuid)
def _reset(self, context, layout_uuid):
# Deserialize will recursively load/refresh children, but we
# want to avoid that here, so we just load the raw data and
# manipulate the top level attributes.
raw = self._load(context, deserialize=False)
state = json.loads(raw.decode("utf8"))
if state["layout_uuid"] == layout_uuid:
return
# Note this differs from normal serialization in that we are
# dealing with queues only as string path references rather
# than objects.
reset_state = dict(
state=Pipeline.STATE_NORMAL,
queues=[],
old_queues=state["old_queues"] + state["queues"],
consecutive_failures=0,
disabled=False,
layout_uuid=layout_uuid,
)
raw = json.dumps(reset_state, sort_keys=True).encode("utf8")
# Since we only save the object when the layout UUID changes, we can
# skip the hash check that we have in other places.
self._save(context, raw)
return cls.new(ctx, pipeline=pipeline, layout_uuid=layout_uuid)
def getPath(self):
if hasattr(self, '_path'):
@ -725,6 +680,8 @@ class PipelineState(zkobject.ZKObject):
self.old_queues.remove(queue)
def serialize(self, context):
if self._read_only:
raise RuntimeError("Attempt to serialize read-only pipeline state")
data = {
"state": self.state,
"consecutive_failures": self.consecutive_failures,
@ -735,6 +692,15 @@ class PipelineState(zkobject.ZKObject):
}
return json.dumps(data, sort_keys=True).encode("utf8")
def refresh(self, context, read_only=False):
# Set read_only to True to indicate that we should avoid
# "resetting" the pipeline state if the layout has changed.
# This is so that we can refresh the object in circumstances
# where we haven't verified that our local layout matches
# what's in ZK.
self._set(_read_only=read_only)
return super().refresh(context)
def deserialize(self, raw, context):
# We may have old change objects in the pipeline cache, so
# make sure they are the same objects we would get from the
@ -742,6 +708,23 @@ class PipelineState(zkobject.ZKObject):
self.pipeline.manager.clearCache()
data = super().deserialize(raw, context)
if not self._read_only:
# Skip this check if we're in a context where we want to
# read the state without updating it (in case we're not
# certain that the layout is up to date).
if data['layout_uuid'] != self.pipeline.tenant.layout.uuid:
# The tenant layout has updated since our last state; we
# need to reset the state.
data = dict(
state=Pipeline.STATE_NORMAL,
queues=[],
old_queues=data["old_queues"] + data["queues"],
consecutive_failures=0,
disabled=False,
layout_uuid=self.pipeline.tenant.layout.uuid,
)
existing_queues = {
q.getPath(): q for q in self.queues + self.old_queues
}
@ -914,13 +897,16 @@ class PipelineChangeList(zkobject.ShardedZKObject):
@classmethod
def create(cls, pipeline):
# If the object does not exist in ZK, create it with the
# default attributes. Otherwise, return an initialized object
# without loading any data so that data can be loaded on the
# next refresh.
ctx = pipeline.manager.current_context
try:
change_list = cls.fromZK(ctx, cls.getChangeListPath(pipeline),
pipeline=pipeline)
change_list = cls()
change_list._set(pipeline=pipeline)
if change_list.exists(ctx):
return change_list
except NoNodeError:
return cls.new(ctx, pipeline=pipeline)
return cls.new(ctx, pipeline=pipeline)
def serialize(self, context):
data = {

View File

@ -684,7 +684,7 @@ class Scheduler(threading.Thread):
with self.createZKContext(lock, self.log) as ctx:
with pipeline.manager.currentContext(ctx):
pipeline.change_list.refresh(ctx)
pipeline.state.refresh(ctx)
pipeline.state.refresh(ctx, read_only=True)
# In case we're in the middle of a reconfig,
# include the old queue items.
for item in pipeline.getAllItems(include_old=True):
@ -766,7 +766,7 @@ class Scheduler(threading.Thread):
self.zk_client, tenant.name,
pipeline.name) as lock,\
self.createZKContext(lock, self.log) as ctx:
pipeline.state.refresh(ctx)
pipeline.state.refresh(ctx, read_only=True)
# add any blobstore references
for item in pipeline.getAllItems(include_old=True):
live_blobs.update(item.getBlobKeys())
@ -1700,6 +1700,11 @@ class Scheduler(threading.Thread):
trigger.postConfig(pipeline)
for reporter in pipeline.actions:
reporter.postConfig()
# Emit an event to trigger a pipeline run after the
# reconfiguration.
event = PipelinePostConfigEvent()
self.pipeline_management_events[tenant.name][pipeline.name].put(
event, needs_result=False)
# Assemble a new list of min. ltimes of the project branch caches.
branch_cache_min_ltimes = {

View File

@ -237,6 +237,11 @@ class ZKObject:
"""Update data from ZK"""
self._load(context)
def exists(self, context):
"""Return whether the object exists in ZK"""
path = self.getPath()
return bool(context.client.exists(path))
def _trySerialize(self, context):
if isinstance(context, LocalZKContext):
return b''