Refactor pipeline loading

Change-Id: I3a980ebb9946beaa1088e78dc2dff7997b7276eb
This commit is contained in:
James E. Blair 2015-12-22 10:24:40 -08:00
parent b97ed8022d
commit fb610ce274
1 changed files with 198 additions and 106 deletions

View File

@ -251,6 +251,200 @@ class ProjectParser(object):
return project
class PipelineParser(object):
log = logging.getLogger("zuul.PipelineParser")
# A set of reporter configuration keys to action mapping
reporter_actions = {
'start': 'start_actions',
'success': 'success_actions',
'failure': 'failure_actions',
'merge-failure': 'merge_failure_actions',
'disabled': 'disabled_actions',
}
@staticmethod
def getDriverSchema(dtype, connections):
# TODO(jhesketh): Make the driver discovery dynamic
connection_drivers = {
'trigger': {
'gerrit': 'zuul.trigger.gerrit',
},
'reporter': {
'gerrit': 'zuul.reporter.gerrit',
'smtp': 'zuul.reporter.smtp',
},
}
standard_drivers = {
'trigger': {
'timer': 'zuul.trigger.timer',
'zuul': 'zuul.trigger.zuultrigger',
}
}
schema = {}
# Add the configured connections as available layout options
for connection_name, connection in connections.connections.items():
for dname, dmod in connection_drivers.get(dtype, {}).items():
if connection.driver_name == dname:
schema[connection_name] = to_list(__import__(
connection_drivers[dtype][dname],
fromlist=['']).getSchema())
# Standard drivers are always available and don't require a unique
# (connection) name
for dname, dmod in standard_drivers.get(dtype, {}).items():
schema[dname] = to_list(__import__(
standard_drivers[dtype][dname], fromlist=['']).getSchema())
return schema
@staticmethod
def getSchema(layout, connections):
manager = vs.Any('independent',
'dependent')
precedence = vs.Any('normal', 'low', 'high')
approval = vs.Schema({'username': str,
'email-filter': str,
'email': str,
'older-than': str,
'newer-than': str,
}, extra=True)
require = {'approval': to_list(approval),
'open': bool,
'current-patchset': bool,
'status': to_list(str)}
reject = {'approval': to_list(approval)}
window = vs.All(int, vs.Range(min=0))
window_floor = vs.All(int, vs.Range(min=1))
window_type = vs.Any('linear', 'exponential')
window_factor = vs.All(int, vs.Range(min=1))
pipeline = {vs.Required('name'): str,
vs.Required('manager'): manager,
'source': str,
'precedence': precedence,
'description': str,
'require': require,
'reject': reject,
'success-message': str,
'failure-message': str,
'merge-failure-message': str,
'footer-message': str,
'dequeue-on-new-patchset': bool,
'ignore-dependencies': bool,
'disable-after-consecutive-failures':
vs.All(int, vs.Range(min=1)),
'window': window,
'window-floor': window_floor,
'window-increase-type': window_type,
'window-increase-factor': window_factor,
'window-decrease-type': window_type,
'window-decrease-factor': window_factor,
}
pipeline['trigger'] = vs.Required(
PipelineParser.getDriverSchema('trigger', connections))
for action in ['start', 'success', 'failure', 'merge-failure',
'disabled']:
pipeline[action] = PipelineParser.getDriverSchema('reporter',
connections)
return vs.Schema(pipeline)
@staticmethod
def fromYaml(layout, connections, scheduler, conf):
PipelineParser.getSchema(layout, connections)(conf)
pipeline = model.Pipeline(conf['name'], layout)
pipeline.description = conf.get('description')
pipeline.source = connections.getSource(conf['source'])
precedence = model.PRECEDENCE_MAP[conf.get('precedence')]
pipeline.precedence = precedence
pipeline.failure_message = conf.get('failure-message',
"Build failed.")
pipeline.merge_failure_message = conf.get(
'merge-failure-message', "Merge Failed.\n\nThis change or one "
"of its cross-repo dependencies was unable to be "
"automatically merged with the current state of its "
"repository. Please rebase the change and upload a new "
"patchset.")
pipeline.success_message = conf.get('success-message',
"Build succeeded.")
pipeline.footer_message = conf.get('footer-message', "")
pipeline.dequeue_on_new_patchset = conf.get(
'dequeue-on-new-patchset', True)
pipeline.ignore_dependencies = conf.get(
'ignore-dependencies', False)
for conf_key, action in PipelineParser.reporter_actions.items():
reporter_set = []
if conf.get(conf_key):
for reporter_name, params \
in conf.get(conf_key).items():
reporter = connections.getReporter(reporter_name,
params)
reporter.setAction(conf_key)
reporter_set.append(reporter)
setattr(pipeline, action, reporter_set)
# If merge-failure actions aren't explicit, use the failure actions
if not pipeline.merge_failure_actions:
pipeline.merge_failure_actions = pipeline.failure_actions
pipeline.disable_at = conf.get(
'disable-after-consecutive-failures', None)
pipeline.window = conf.get('window', 20)
pipeline.window_floor = conf.get('window-floor', 3)
pipeline.window_increase_type = conf.get(
'window-increase-type', 'linear')
pipeline.window_increase_factor = conf.get(
'window-increase-factor', 1)
pipeline.window_decrease_type = conf.get(
'window-decrease-type', 'exponential')
pipeline.window_decrease_factor = conf.get(
'window-decrease-factor', 2)
manager_name = conf['manager']
if manager_name == 'dependent':
manager = zuul.manager.dependent.DependentPipelineManager(
scheduler, pipeline)
elif manager_name == 'independent':
manager = zuul.manager.independent.IndependentPipelineManager(
scheduler, pipeline)
pipeline.setManager(manager)
layout.pipelines[conf['name']] = pipeline
if 'require' in conf or 'reject' in conf:
require = conf.get('require', {})
reject = conf.get('reject', {})
f = model.ChangeishFilter(
open=require.get('open'),
current_patchset=require.get('current-patchset'),
statuses=to_list(require.get('status')),
required_approvals=to_list(require.get('approval')),
reject_approvals=to_list(reject.get('approval'))
)
manager.changeish_filters.append(f)
for trigger_name, trigger_config\
in conf.get('trigger').items():
trigger = connections.getTrigger(trigger_name, trigger_config)
pipeline.triggers.append(trigger)
# TODO: move
manager.event_filters += trigger.getEventFilters(
conf['trigger'][trigger_name])
return pipeline
class AbideValidator(object):
tenant_source = vs.Schema({'repos': [str]})
@ -284,15 +478,6 @@ class AbideValidator(object):
class ConfigLoader(object):
log = logging.getLogger("zuul.ConfigLoader")
# A set of reporter configuration keys to action mapping
reporter_actions = {
'start': 'start_actions',
'success': 'success_actions',
'failure': 'failure_actions',
'merge-failure': 'merge_failure_actions',
'disabled': 'disabled_actions',
}
def loadConfig(self, config_path, scheduler, merger, connections):
abide = model.Abide()
@ -331,103 +516,10 @@ class ConfigLoader(object):
def _parseLayout(self, base, data, scheduler, connections):
layout = model.Layout()
# TODOv3(jeblair): add validation
# validator = layoutvalidator.LayoutValidator()
# validator.validate(data, connections)
config_env = {}
for include in data.get('includes', []):
if 'python-file' in include:
fn = include['python-file']
if not os.path.isabs(fn):
fn = os.path.join(base, fn)
fn = os.path.expanduser(fn)
execfile(fn, config_env)
for conf_pipeline in data.get('pipelines', []):
pipeline = model.Pipeline(conf_pipeline['name'], layout)
pipeline.description = conf_pipeline.get('description')
pipeline.source = connections.getSource(conf_pipeline['source'])
precedence = model.PRECEDENCE_MAP[conf_pipeline.get('precedence')]
pipeline.precedence = precedence
pipeline.failure_message = conf_pipeline.get('failure-message',
"Build failed.")
pipeline.merge_failure_message = conf_pipeline.get(
'merge-failure-message', "Merge Failed.\n\nThis change or one "
"of its cross-repo dependencies was unable to be "
"automatically merged with the current state of its "
"repository. Please rebase the change and upload a new "
"patchset.")
pipeline.success_message = conf_pipeline.get('success-message',
"Build succeeded.")
pipeline.footer_message = conf_pipeline.get('footer-message', "")
pipeline.dequeue_on_new_patchset = conf_pipeline.get(
'dequeue-on-new-patchset', True)
pipeline.ignore_dependencies = conf_pipeline.get(
'ignore-dependencies', False)
for conf_key, action in self.reporter_actions.items():
reporter_set = []
if conf_pipeline.get(conf_key):
for reporter_name, params \
in conf_pipeline.get(conf_key).items():
reporter = connections.getReporter(reporter_name,
params)
reporter.setAction(conf_key)
reporter_set.append(reporter)
setattr(pipeline, action, reporter_set)
# If merge-failure actions aren't explicit, use the failure actions
if not pipeline.merge_failure_actions:
pipeline.merge_failure_actions = pipeline.failure_actions
pipeline.disable_at = conf_pipeline.get(
'disable-after-consecutive-failures', None)
pipeline.window = conf_pipeline.get('window', 20)
pipeline.window_floor = conf_pipeline.get('window-floor', 3)
pipeline.window_increase_type = conf_pipeline.get(
'window-increase-type', 'linear')
pipeline.window_increase_factor = conf_pipeline.get(
'window-increase-factor', 1)
pipeline.window_decrease_type = conf_pipeline.get(
'window-decrease-type', 'exponential')
pipeline.window_decrease_factor = conf_pipeline.get(
'window-decrease-factor', 2)
manager_name = conf_pipeline['manager']
if manager_name == 'dependent':
manager = zuul.manager.dependent.DependentPipelineManager(
scheduler, pipeline)
elif manager_name == 'independent':
manager = zuul.manager.independent.IndependentPipelineManager(
scheduler, pipeline)
pipeline.setManager(manager)
layout.pipelines[conf_pipeline['name']] = pipeline
if 'require' in conf_pipeline or 'reject' in conf_pipeline:
require = conf_pipeline.get('require', {})
reject = conf_pipeline.get('reject', {})
f = model.ChangeishFilter(
open=require.get('open'),
current_patchset=require.get('current-patchset'),
statuses=to_list(require.get('status')),
required_approvals=to_list(require.get('approval')),
reject_approvals=to_list(reject.get('approval'))
)
manager.changeish_filters.append(f)
for trigger_name, trigger_config\
in conf_pipeline.get('trigger').items():
trigger = connections.getTrigger(trigger_name, trigger_config)
pipeline.triggers.append(trigger)
# TODO: move
manager.event_filters += trigger.getEventFilters(
conf_pipeline['trigger'][trigger_name])
for config_pipeline in data.get('pipelines', []):
layout.addPipeline(PipelineParser.fromYaml(layout, connections,
scheduler,
config_pipeline))
for config_job in data.get('jobs', []):
layout.addJob(JobParser.fromYaml(layout, config_job))