Store pipeline state in Zookeeper

This splits out the runtime state of a pipeline into a PipelineState
class that inherits from ZKObject.

The pipeline state is stored in the following Zookeeper path:

    /zuul/<tenant>/pipeline/<pipeline>

Change-Id: Ifd7af1885872def762a5d7ed46021f33ac132ae6
This commit is contained in:
Simon Westphahl
2021-09-23 14:36:38 +02:00
committed by James E. Blair
parent 3706d676e3
commit 36f558fdbe
4 changed files with 105 additions and 41 deletions

View File

@@ -5907,8 +5907,8 @@ For CI problems and help debugging, contact ci@example.org"""
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
self.assertEqual(3, tenant.layout.pipelines['check'].disable_at)
self.assertEqual(
0, tenant.layout.pipelines['check']._consecutive_failures)
self.assertFalse(tenant.layout.pipelines['check']._disabled)
0, tenant.layout.pipelines['check'].state.consecutive_failures)
self.assertFalse(tenant.layout.pipelines['check'].state.disabled)
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
@@ -5939,15 +5939,15 @@ For CI problems and help debugging, contact ci@example.org"""
self.waitUntilSettled()
self.assertEqual(
2, tenant.layout.pipelines['check']._consecutive_failures)
self.assertFalse(tenant.layout.pipelines['check']._disabled)
2, tenant.layout.pipelines['check'].state.consecutive_failures)
self.assertFalse(tenant.layout.pipelines['check'].state.disabled)
self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(
0, tenant.layout.pipelines['check']._consecutive_failures)
self.assertFalse(tenant.layout.pipelines['check']._disabled)
0, tenant.layout.pipelines['check'].state.consecutive_failures)
self.assertFalse(tenant.layout.pipelines['check'].state.disabled)
self.fake_gerrit.addEvent(D.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(E.getPatchsetCreatedEvent(1))
@@ -5956,8 +5956,8 @@ For CI problems and help debugging, contact ci@example.org"""
# We should be disabled now
self.assertEqual(
3, tenant.layout.pipelines['check']._consecutive_failures)
self.assertTrue(tenant.layout.pipelines['check']._disabled)
3, tenant.layout.pipelines['check'].state.consecutive_failures)
self.assertTrue(tenant.layout.pipelines['check'].state.disabled)
# We need to wait between each of these patches to make sure the
# smtp messages come back in an expected order
@@ -6006,16 +6006,16 @@ For CI problems and help debugging, contact ci@example.org"""
self.assertEqual(3, tenant.layout.pipelines['check'].disable_at)
self.assertEqual(
0, tenant.layout.pipelines['check']._consecutive_failures)
self.assertFalse(tenant.layout.pipelines['check']._disabled)
0, tenant.layout.pipelines['check'].state.consecutive_failures)
self.assertFalse(tenant.layout.pipelines['check'].state.disabled)
self.fake_gerrit.addEvent(J.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(K.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(
2, tenant.layout.pipelines['check']._consecutive_failures)
self.assertFalse(tenant.layout.pipelines['check']._disabled)
2, tenant.layout.pipelines['check'].state.consecutive_failures)
self.assertFalse(tenant.layout.pipelines['check'].state.disabled)
# J and K went back to gerrit
self.assertEqual(1, len(J.messages))

View File

@@ -22,8 +22,9 @@ from zuul import model
from zuul.lib.dependson import find_dependency_headers
from zuul.lib.logutil import get_annotated_logger
from zuul.lib.tarjan import strongly_connected_components
from zuul.model import Change, DequeueEvent, QueueItem
from zuul.model import Change, DequeueEvent, PipelineState, QueueItem
from zuul.zk.change_cache import ChangeKey
from zuul.zk.locks import pipeline_lock
class DynamicChangeQueueContextManager(object):
@@ -84,7 +85,14 @@ class PipelineManager(metaclass=ABCMeta):
# All pipelines support shared queues for setting
# relative_priority; only the dependent pipeline uses them for
# pipeline queing.
self.buildChangeQueues(layout)
with pipeline_lock(
self.sched.zk_client, self.pipeline.tenant.name, self.pipeline.name
) as lock:
ctx = self.sched.createZKContext(lock, self.log)
with self.currentContext(ctx):
self.pipeline.state = PipelineState.resetOrCreate(
self.pipeline)
self.buildChangeQueues(layout)
def buildChangeQueues(self, layout):
self.log.debug("Building relative_priority queues")
@@ -241,7 +249,7 @@ class PipelineManager(metaclass=ABCMeta):
self.updateCommitDependencies(change, None, event)
def reportEnqueue(self, item):
if not self.pipeline._disabled:
if not self.pipeline.state.disabled:
self.log.info("Reporting enqueue, action %s item %s" %
(self.pipeline.enqueue_actions, item))
ret = self.sendReport(self.pipeline.enqueue_actions, item)
@@ -250,7 +258,7 @@ class PipelineManager(metaclass=ABCMeta):
(item, ret))
def reportStart(self, item):
if not self.pipeline._disabled:
if not self.pipeline.state.disabled:
self.log.info("Reporting start, action %s item %s" %
(self.pipeline.start_actions, item))
ret = self.sendReport(self.pipeline.start_actions, item)
@@ -259,7 +267,7 @@ class PipelineManager(metaclass=ABCMeta):
(item, ret))
def reportDequeue(self, item):
if not self.pipeline._disabled:
if not self.pipeline.state.disabled:
self.log.info(
"Reporting dequeue, action %s item%s",
self.pipeline.dequeue_actions,
@@ -1751,26 +1759,31 @@ class PipelineManager(metaclass=ABCMeta):
actions = self.pipeline.failure_actions
item.setReportedResult("FAILURE")
if not item.didAllJobsSucceed():
self.pipeline._consecutive_failures += 1
with self.pipeline.state.activeContext(self.current_context):
self.pipeline.state.consecutive_failures += 1
elif item.didAllJobsSucceed() and not item.isBundleFailing():
log.debug("success %s", self.pipeline.success_actions)
action = 'success'
actions = self.pipeline.success_actions
item.setReportedResult('SUCCESS')
self.pipeline._consecutive_failures = 0
with self.pipeline.state.activeContext(self.current_context):
self.pipeline.state.consecutive_failures = 0
else:
action = 'failure'
actions = self.pipeline.failure_actions
item.setReportedResult('FAILURE')
self.pipeline._consecutive_failures += 1
if project_in_pipeline and self.pipeline._disabled:
with self.pipeline.state.activeContext(self.current_context):
self.pipeline.state.consecutive_failures += 1
if project_in_pipeline and self.pipeline.state.disabled:
actions = self.pipeline.disabled_actions
# Check here if we should disable so that we only use the disabled
# reporters /after/ the last disable_at failure is still reported as
# normal.
if (self.pipeline.disable_at and not self.pipeline._disabled and
self.pipeline._consecutive_failures >= self.pipeline.disable_at):
self.pipeline._disabled = True
if (self.pipeline.disable_at and not self.pipeline.state.disabled and
self.pipeline.state.consecutive_failures
>= self.pipeline.disable_at):
self.pipeline.state.updateAttributes(
self.current_context, disabled=True)
if actions:
log.info("Reporting item %s, actions: %s", item, actions)
ret = self.sendReport(actions, item)

View File

@@ -29,6 +29,7 @@ import textwrap
import types
import itertools
from kazoo.exceptions import NoNodeError
from cachetools.func import lru_cache
from zuul.lib import yamlutil as yaml
@@ -282,8 +283,6 @@ class Pipeline(object):
self.dequeue_on_new_patchset = True
self.ignore_dependencies = False
self.manager = None
self.queues = []
self.relative_priority_queues = {}
self.precedence = PRECEDENCE_NORMAL
self.supercedes = []
self.triggers = []
@@ -296,24 +295,21 @@ class Pipeline(object):
self.disabled_actions = []
self.dequeue_actions = []
self.disable_at = None
self._consecutive_failures = 0
self._disabled = False
self.window = None
self.window_floor = None
self.window_increase_type = None
self.window_increase_factor = None
self.window_decrease_type = None
self.window_decrease_factor = None
self.state = self.STATE_NORMAL
self.state = None
def getPath(self):
safe_tenant = urllib.parse.quote_plus(self.tenant.name)
safe_pipeline = urllib.parse.quote_plus(self.name)
return f"/zuul/{safe_tenant}/pipeline/{safe_pipeline}"
@property
def queues(self):
return self.state.queues
def refresh(self, context):
for queue in self.queues:
queue.refresh(context)
@property
def relative_priority_queues(self):
return self.state.relative_priority_queues
@property
def actions(self):
@@ -384,7 +380,7 @@ class Pipeline(object):
def formatStatusJSON(self, websocket_url=None):
j_pipeline = dict(name=self.name,
description=self.description,
state=self.state)
state=self.state.state)
j_queues = []
j_pipeline['change_queues'] = j_queues
for queue in self.queues:
@@ -416,6 +412,57 @@ class Pipeline(object):
return j_pipeline
class PipelineState(zkobject.ZKObject):
def __init__(self):
super().__init__()
self._set(**self.defaultState())
@classmethod
def defaultState(cls):
return dict(
state=Pipeline.STATE_NORMAL,
queues=[],
relative_priority_queues={},
consecutive_failures=0,
disabled=False,
pipeline=None,
)
@classmethod
def resetOrCreate(cls, pipeline):
ctx = pipeline.manager.current_context
try:
state = cls.fromZK(ctx, cls.pipelinePath(pipeline))
reset_state = {**state.defaultState(), "pipeline": pipeline}
state.updateAttributes(ctx, **reset_state)
return state
except NoNodeError:
return cls.new(ctx, pipeline=pipeline)
def getPath(self):
return self.pipelinePath(self.pipeline)
@classmethod
def pipelinePath(cls, pipeline):
safe_tenant = urllib.parse.quote_plus(pipeline.tenant.name)
safe_pipeline = urllib.parse.quote_plus(pipeline.name)
return f"/zuul/{safe_tenant}/pipeline/{safe_pipeline}"
def serialize(self):
data = {
"state": self.state,
"consecutive_failures": self.consecutive_failures,
"disabled": self.disabled,
}
return json.dumps(data).encode("utf8")
def refresh(self, context):
super().refresh(context)
for queue in self.queues:
queue.refresh(context)
class ChangeQueue(object):
"""A ChangeQueue contains Changes to be processed for related projects.
@@ -2703,7 +2750,7 @@ class QueueItem(zkobject.ZKObject):
return obj
def getPath(self):
return self.itemPath(self.pipeline.getPath(), self.uuid)
return self.itemPath(self.pipeline.state.getPath(), self.uuid)
@classmethod
def itemPath(cls, pipeline_path, item_uuid):

View File

@@ -1611,7 +1611,7 @@ class Scheduler(threading.Thread):
) as lock:
ctx = self.createZKContext(lock, self.log)
with pipeline.manager.currentContext(ctx):
pipeline.refresh(ctx)
pipeline.state.refresh(ctx)
self._process_pipeline(tenant, pipeline)
except LockException:
@@ -1629,10 +1629,14 @@ class Scheduler(threading.Thread):
pass
except Exception:
self.log.exception("Exception in pipeline processing:")
pipeline.state = pipeline.STATE_ERROR
pipeline.state.updateAttributes(
pipeline.manager.current_context,
state=pipeline.STATE_ERROR)
# Continue processing other pipelines+tenants
else:
pipeline.state = pipeline.STATE_NORMAL
pipeline.state.updateAttributes(
pipeline.manager.current_context,
state=pipeline.STATE_NORMAL)
def _gatherConnectionCacheKeys(self):
relevant = set()