Contain pipeline exceptions

This will continue processing other pipelines (and other tenants)
if one pipeline has an exception.  This shouldn't happen except in
the case of a Zuul bug, but if it does, it can contain the fallout.

Story: 2007761
Change-Id: Idecffb71b1897cd2269a9c7edc40d7d7e62614b9
This commit is contained in:
James E. Blair 2020-06-05 13:57:42 -07:00
parent b79a83f323
commit ec18f479e5
2 changed files with 17 additions and 5 deletions

View File

@ -245,6 +245,9 @@ class Pipeline(object):
Reporter Reporter
Communicates success and failure results somewhere Communicates success and failure results somewhere
""" """
STATE_NORMAL = 'normal'
STATE_ERROR = 'error'
def __init__(self, name, tenant): def __init__(self, name, tenant):
self.name = name self.name = name
# Note that pipelines are not portable across tenants (new # Note that pipelines are not portable across tenants (new
@ -287,6 +290,7 @@ class Pipeline(object):
self.window_increase_factor = None self.window_increase_factor = None
self.window_decrease_type = None self.window_decrease_type = None
self.window_decrease_factor = None self.window_decrease_factor = None
self.state = self.STATE_NORMAL
@property @property
def actions(self): def actions(self):
@ -355,7 +359,8 @@ class Pipeline(object):
def formatStatusJSON(self, websocket_url=None): def formatStatusJSON(self, websocket_url=None):
j_pipeline = dict(name=self.name, j_pipeline = dict(name=self.name,
description=self.description) description=self.description,
state=self.state)
j_queues = [] j_queues = []
j_pipeline['change_queues'] = j_queues j_pipeline['change_queues'] = j_queues
for queue in self.queues: for queue in self.queues:

View File

@ -1232,10 +1232,17 @@ class Scheduler(threading.Thread):
for tenant in self.abide.tenants.values(): for tenant in self.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values(): for pipeline in tenant.layout.pipelines.values():
while (pipeline.manager.processQueue() and try:
not self._stopped): while (pipeline.manager.processQueue() and
pass not self._stopped):
pass
except Exception:
self.log.exception(
"Exception in pipeline processing:")
pipeline.state = pipeline.STATE_ERROR
# Continue processing other pipelines+tenants
else:
pipeline.state = pipeline.STATE_NORMAL
except Exception: except Exception:
self.log.exception("Exception in run handler:") self.log.exception("Exception in run handler:")
# There may still be more events to process # There may still be more events to process