Support multiple triggers

Add the ability for Zuul to accept inputs from multiple trigger
sources simultaneously.

Pipelines are associated with exactly one trigger, which must now
be named in the configuration file.

Co-Authored-By: Monty Taylor <mordred@inaugust.com>

Change-Id: Ief2b31a7b8d85d30817f2747c1e2635f71ea24b9
This commit is contained in:
James E. Blair 2013-07-29 17:06:47 -07:00
parent 3cd25034b6
commit 6c358e72ea
20 changed files with 214 additions and 147 deletions

View File

@ -11,13 +11,19 @@ Since 1.3.0:
QueueItem has the full context for why the change is being run
(including the pipeline, items ahead and behind, etc.). The Change
is still available via the "change" attribute on the QueueItem. The
second argument is now the Job that is about to be run, ande the
second argument is now the Job that is about to be run, and the
parameter dictionary is shifted to the third position.
* The ZUUL_SHORT_* parameters have been removed (the same
functionality may be achieved with a custom parameter function that
matches all jobs).
* Multiple triggers are now supported, in principle (though only
Gerrit is defined currently). Your layout.yaml file will need to
change to add the key "gerrit:" inside of the "triggers:" list to
specify a Gerrit trigger (and facilitate adding other kinds of
triggers later). See the sample layout.yaml.
* The default behavior is now to immediately dequeue changes that have
merge conflicts, even those not at the head of the queue. To enable
the old behavior (which would wait until the conflicting change was

View File

@ -67,6 +67,19 @@ List Failing Tests
.tox/py27/bin/activate
testr failing --list
Hanging Tests
-------------
The following will run each test in turn and print the name of the
test as it is run::
. .tox/py27/bin/activate
testr run --subunit | subunit2pyunit
You can compare the output of that to::
python -m testtools.run discover --list
Need More Info?
---------------

View File

@ -178,6 +178,7 @@ explanation of each of the parameters::
- name: check
manager: IndependentPipelineManager
trigger:
gerrit:
- event: patchset-created
success:
verified: 1
@ -253,10 +254,14 @@ explanation of each of the parameters::
DependentPipelineManager, see: :doc:`gating`.
**trigger**
This describes what Gerrit events should be placed in the pipeline.
Exactly one trigger source must be supplied for each pipeline.
Triggers are not exclusive -- matching events may be placed in
multiple pipelines, and they will behave independently in each of the
pipelines they match. Multiple triggers may be listed. Further
multiple pipelines, and they will behave independently in each of
the pipelines they match. You may select from the following:
**gerrit**
This describes what Gerrit events should be placed in the
pipeline. Multiple gerrit triggers may be listed. Further
parameters describe the kind of events that match:
*event*

View File

@ -2,6 +2,7 @@ pipelines:
- name: check
manager: IndependentPipelineManager
trigger:
gerrit:
- event: patchset-created
success:
verified: 1
@ -11,6 +12,7 @@ pipelines:
- name: tests
manager: IndependentPipelineManager
trigger:
gerrit:
- event: patchset-created
email_filter: ^.*@example.org$
success:
@ -21,12 +23,14 @@ pipelines:
- name: post
manager: IndependentPipelineManager
trigger:
gerrit:
- event: ref-updated
ref: ^(?!refs/).*$
- name: gate
manager: DependentPipelineManager
trigger:
gerrit:
- event: comment-added
approval:
- approved: 1

View File

@ -2,6 +2,7 @@ pipelines:
- name: check
manager: IndependentPipelineManager
trigger:
gerrit:
- event: patchset-created
success:
verified: 1
@ -11,6 +12,7 @@ pipelines:
- name: post
manager: IndependentPipelineManager
trigger:
gerrit:
- event: ref-updated
ref: ^(?!refs/).*$
@ -18,6 +20,7 @@ pipelines:
manager: DependentPipelineManager
failure-message: Build failed. For information on how to proceed, see http://wiki.example.org/Test_Failures
trigger:
gerrit:
- event: comment-added
approval:
- approved: 1

View File

@ -5,6 +5,7 @@ pipelines:
- name: check
manager: IndependentPipelineManager
trigger:
gerrit:
- event: patchset-created
success:
verified: 1
@ -14,6 +15,7 @@ pipelines:
- name: post
manager: IndependentPipelineManager
trigger:
gerrit:
- event: ref-updated
ref: ^(?!refs/).*$
@ -21,6 +23,7 @@ pipelines:
manager: DependentPipelineManager
failure-message: Build failed. For information on how to proceed, see http://wiki.example.org/Test_Failures
trigger:
gerrit:
- event: comment-added
approval:
- approved: 1
@ -37,6 +40,7 @@ pipelines:
manager: IndependentPipelineManager
dequeue-on-new-patchset: false
trigger:
gerrit:
- event: comment-added
approval:
- approved: 1
@ -44,6 +48,7 @@ pipelines:
- name: dup1
manager: IndependentPipelineManager
trigger:
gerrit:
- event: change-restored
success:
verified: 1
@ -53,6 +58,7 @@ pipelines:
- name: dup2
manager: IndependentPipelineManager
trigger:
gerrit:
- event: change-restored
success:
verified: 1
@ -64,6 +70,7 @@ pipelines:
manager: DependentPipelineManager
failure-message: Build failed. For information on how to proceed, see http://wiki.example.org/Test_Failures
trigger:
gerrit:
- event: comment-added
approval:
- approved: 1

View File

@ -2,6 +2,7 @@ pipelines:
- name: check
manager: IndependentPipelineManager
trigger:
gerrit:
- event: non-event
projects:

View File

@ -2,6 +2,7 @@ pipelines:
- name: check
manager: IndependentPipelineManager
trigger:
gerrit:
- approval:
- approved: 1

View File

@ -2,6 +2,7 @@ pipelines:
- name: check
manager: IndependentPipelineManager
trigger:
gerrit:
- event: comment-added
approved: 1

View File

@ -4,6 +4,7 @@ pipelines:
- name: 'check'
manager: IndependentPipelineManager
trigger:
gerrit:
- event: patchset-created
project-templates:

View File

@ -4,6 +4,7 @@ pipelines:
- name: 'check'
manager: IndependentPipelineManager
trigger:
gerrit:
- event: patchset-created
project-templates:

View File

@ -5,6 +5,7 @@ pipelines:
- name: check
manager: IndependentPipelineManager
trigger:
gerrit:
- event: patchset-created
success:
verified: 1
@ -14,6 +15,7 @@ pipelines:
- name: post
manager: IndependentPipelineManager
trigger:
gerrit:
- event: ref-updated
ref: ^(?!refs/).*$
@ -22,6 +24,7 @@ pipelines:
success-message: Your change is awesome.
failure-message: Build failed. For information on how to proceed, see http://wiki.example.org/Test_Failures
trigger:
gerrit:
- event: comment-added
approval:
- approved: 1

View File

@ -2,6 +2,7 @@ pipelines:
- name: 'check'
manager: IndependentPipelineManager
trigger:
gerrit:
- event: patchset-created
project-templates:

View File

@ -28,7 +28,7 @@ FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
LAYOUT_RE = re.compile(r'^(good|bad)_.*\.yaml$')
class testScheduler(testtools.TestCase):
class TestLayoutValidator(testtools.TestCase):
def test_layouts(self):
"""Test layout file validation"""
print

View File

@ -761,7 +761,7 @@ class TestScheduler(testtools.TestCase):
self.webapp = zuul.webapp.WebApp(self.sched, port=0)
self.sched.setLauncher(self.launcher)
self.sched.setTrigger(self.gerrit)
self.sched.registerTrigger(self.gerrit)
self.sched.start()
self.sched.reconfigure(self.config)
@ -777,7 +777,7 @@ class TestScheduler(testtools.TestCase):
def assertFinalState(self):
# Make sure that the change cache is cleared
self.assertEqual(len(self.sched.trigger._change_cache.keys()), 0)
self.assertEqual(len(self.gerrit._change_cache.keys()), 0)
self.assertEmptyQueues()
def shutdown(self):
@ -1440,9 +1440,9 @@ class TestScheduler(testtools.TestCase):
self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
self.waitUntilSettled()
self.log.debug("len %s " % self.sched.trigger._change_cache.keys())
self.log.debug("len %s " % self.gerrit._change_cache.keys())
# there should still be changes in the cache
self.assertNotEqual(len(self.sched.trigger._change_cache.keys()), 0)
self.assertNotEqual(len(self.gerrit._change_cache.keys()), 0)
self.worker.hold_jobs_in_build = False
self.worker.release()
@ -1457,21 +1457,19 @@ class TestScheduler(testtools.TestCase):
"Test whether a change is ready to merge"
# TODO: move to test_gerrit (this is a unit test!)
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
a = self.sched.trigger.getChange(1, 2)
trigger = self.sched.layout.pipelines['gate'].trigger
a = self.sched.triggers['gerrit'].getChange(1, 2)
mgr = self.sched.layout.pipelines['gate'].manager
self.assertFalse(
self.sched.trigger.canMerge(a, mgr.getSubmitAllowNeeds()))
self.assertFalse(trigger.canMerge(a, mgr.getSubmitAllowNeeds()))
A.addApproval('CRVW', 2)
a = self.sched.trigger.getChange(1, 2, refresh=True)
self.assertFalse(
self.sched.trigger.canMerge(a, mgr.getSubmitAllowNeeds()))
a = trigger.getChange(1, 2, refresh=True)
self.assertFalse(trigger.canMerge(a, mgr.getSubmitAllowNeeds()))
A.addApproval('APRV', 1)
a = self.sched.trigger.getChange(1, 2, refresh=True)
self.assertTrue(
self.sched.trigger.canMerge(a, mgr.getSubmitAllowNeeds()))
self.sched.trigger.maintainCache([])
a = trigger.getChange(1, 2, refresh=True)
self.assertTrue(trigger.canMerge(a, mgr.getSubmitAllowNeeds()))
trigger.maintainCache([])
def test_build_configuration(self):
"Test that zuul merges the right commits for testing"
@ -2347,6 +2345,7 @@ class TestScheduler(testtools.TestCase):
def test_test_config(self):
"Test that we can test the config"
sched = zuul.scheduler.Scheduler()
sched.registerTrigger(None, 'gerrit')
sched.testConfig(CONFIG.get('zuul', 'layout_config'))
def test_build_description(self):

View File

@ -105,6 +105,7 @@ class Server(object):
logging.basicConfig(level=logging.DEBUG)
self.sched = zuul.scheduler.Scheduler()
self.sched.registerTrigger(None, 'gerrit')
layout = self.sched.testConfig(self.config.get('zuul',
'layout_config'))
if not job_list_path:
@ -165,7 +166,7 @@ class Server(object):
webapp = zuul.webapp.WebApp(self.sched)
self.sched.setLauncher(gearman)
self.sched.setTrigger(gerrit)
self.sched.registerTrigger(gerrit)
self.sched.start()
self.sched.reconfigure(self.config)

View File

@ -35,7 +35,8 @@ class LayoutSchema(object):
variable_dict = v.Schema({}, extra=True)
trigger = {v.Required('event'): toList(v.Any('patchset-created',
gerrit_trigger = {v.Required('event'):
toList(v.Any('patchset-created',
'change-abandoned',
'change-restored',
'change-merged',
@ -48,6 +49,8 @@ class LayoutSchema(object):
'approval': toList(variable_dict),
}
trigger = v.Required(v.Any({'gerrit': toList(gerrit_trigger)}))
pipeline = {v.Required('name'): str,
v.Required('manager'): manager,
'precedence': precedence,
@ -56,7 +59,7 @@ class LayoutSchema(object):
'failure-message': str,
'dequeue-on-new-patchset': bool,
'dequeue-on-conflict': bool,
'trigger': toList(trigger),
'trigger': trigger,
'success': variable_dict,
'failure': variable_dict,
'start': variable_dict,

View File

@ -47,6 +47,7 @@ class Pipeline(object):
self.manager = None
self.queues = []
self.precedence = PRECEDENCE_NORMAL
self.trigger = None
def __repr__(self):
return '<Pipeline %s>' % self.name
@ -724,6 +725,7 @@ class TriggerEvent(object):
# common
self.type = None
self.project_name = None
self.trigger_name = None
# Representation of the user account that performed the event.
self.account = None
# patchset-created, comment-added, etc.

View File

@ -74,7 +74,7 @@ class Scheduler(threading.Thread):
self._exit = False
self._stopped = False
self.launcher = None
self.trigger = None
self.triggers = dict()
self.config = None
self._maintain_trigger_cache = False
@ -141,7 +141,11 @@ class Scheduler(threading.Thread):
manager.success_action = conf_pipeline.get('success')
manager.failure_action = conf_pipeline.get('failure')
manager.start_action = conf_pipeline.get('start')
for trigger in toList(conf_pipeline['trigger']):
# TODO: move this into triggers (may require pluggable
# configuration)
if 'gerrit' in conf_pipeline['trigger']:
pipeline.trigger = self.triggers['gerrit']
for trigger in toList(conf_pipeline['trigger']['gerrit']):
approvals = {}
for approval_dict in toList(trigger.get('approval')):
for k, v in approval_dict.items():
@ -272,17 +276,23 @@ class Scheduler(threading.Thread):
else:
sshkey = None
self.merger = merger.Merger(self.trigger, merge_root, push_refs,
# TODO: The merger should have an upstream repo independent of
# triggers, and then each trigger should provide a fetch
# location.
self.merger = merger.Merger(self.triggers['gerrit'],
merge_root, push_refs,
sshkey, merge_email, merge_name)
for project in self.layout.projects.values():
url = self.trigger.getGitUrl(project)
url = self.triggers['gerrit'].getGitUrl(project)
self.merger.addProject(project, url)
def setLauncher(self, launcher):
self.launcher = launcher
def setTrigger(self, trigger):
self.trigger = trigger
def registerTrigger(self, trigger, name=None):
if name is None:
name = trigger.name
self.triggers[name] = trigger
def getProject(self, name):
self.layout_lock.acquire()
@ -518,7 +528,8 @@ class Scheduler(threading.Thread):
relevant.add(item.change)
relevant.update(item.change.getRelatedChanges())
self.log.debug("Trigger cache size: %s" % len(relevant))
self.trigger.maintainCache(relevant)
for trigger in self.triggers.values():
trigger.maintainCache(relevant)
def process_event_queue(self):
self.log.debug("Fetching trigger event")
@ -540,7 +551,8 @@ class Scheduler(threading.Thread):
self.merger.updateRepo(project)
for pipeline in self.layout.pipelines.values():
change = event.getChange(project, self.trigger)
change = event.getChange(project,
self.triggers.get(event.trigger_name))
if event.type == 'patchset-created':
pipeline.manager.removeOldVersionsOfChange(change)
if pipeline.manager.eventMatches(event):
@ -709,7 +721,7 @@ class BasePipelineManager(object):
msg = "Starting %s jobs." % self.pipeline.name
if self.sched.config.has_option('zuul', 'status_url'):
msg += "\n" + self.sched.config.get('zuul', 'status_url')
ret = self.sched.trigger.report(change, msg, self.start_action)
ret = self.pipeline.trigger.report(change, msg, self.start_action)
if ret:
self.log.error("Reporting change start %s received: %s" %
(change, ret))
@ -1025,7 +1037,7 @@ class BasePipelineManager(object):
succeeded = self.pipeline.didAllJobsSucceed(item)
merged = (not ret)
if merged:
merged = self.sched.trigger.isMerged(item.change,
merged = self.pipeline.trigger.isMerged(item.change,
item.change.branch)
self.log.info("Reported change %s status: all-succeeded: %s, "
"merged: %s" % (item.change, succeeded, merged))
@ -1054,7 +1066,7 @@ class BasePipelineManager(object):
try:
self.log.info("Reporting change %s, action: %s" %
(item.change, action))
ret = self.sched.trigger.report(item.change, report, action)
ret = self.pipeline.trigger.report(item.change, report, action)
if ret:
self.log.error("Reporting change %s received: %s" %
(item.change, ret))
@ -1307,7 +1319,7 @@ class DependentPipelineManager(BasePipelineManager):
self.log.info(" %s" % queue)
def isChangeReadyToBeEnqueued(self, change):
if not self.sched.trigger.canMerge(change,
if not self.pipeline.trigger.canMerge(change,
self.getSubmitAllowNeeds()):
self.log.debug("Change %s can not merge, ignoring" % change)
return False
@ -1320,7 +1332,7 @@ class DependentPipelineManager(BasePipelineManager):
self.log.debug(" Changeish does not support dependencies")
return
for needs in change.needed_by_changes:
if self.sched.trigger.canMerge(needs,
if self.pipeline.trigger.canMerge(needs,
self.getSubmitAllowNeeds()):
self.log.debug(" Change %s needs %s and is ready to merge" %
(needs, change))
@ -1358,7 +1370,7 @@ class DependentPipelineManager(BasePipelineManager):
if self.isChangeAlreadyInQueue(change.needs_change):
self.log.debug(" Needed change is already ahead in the queue")
return True
if self.sched.trigger.canMerge(change.needs_change,
if self.pipeline.trigger.canMerge(change.needs_change,
self.getSubmitAllowNeeds()):
self.log.debug(" Change %s is needed" %
change.needs_change)

View File

@ -25,11 +25,12 @@ class GerritEventConnector(threading.Thread):
log = logging.getLogger("zuul.GerritEventConnector")
def __init__(self, gerrit, sched):
def __init__(self, gerrit, sched, trigger):
super(GerritEventConnector, self).__init__()
self.daemon = True
self.gerrit = gerrit
self.sched = sched
self.trigger = trigger
self._stopped = False
def stop(self):
@ -42,6 +43,7 @@ class GerritEventConnector(threading.Thread):
return
event = TriggerEvent()
event.type = data.get('type')
event.trigger_name = self.trigger.name
change = data.get('change')
if change:
event.project_name = change.get('project')
@ -85,7 +87,7 @@ class GerritEventConnector(threading.Thread):
# Call getChange for the side effect of updating the
# cache. Note that this modifies Change objects outside
# the main thread.
self.sched.trigger.getChange(event.change_number,
self.trigger.getChange(event.change_number,
event.patch_number,
refresh=True)
@ -103,6 +105,7 @@ class GerritEventConnector(threading.Thread):
class Gerrit(object):
name = 'gerrit'
log = logging.getLogger("zuul.Gerrit")
replication_timeout = 60
replication_retry_interval = 5
@ -128,7 +131,7 @@ class Gerrit(object):
self.gerrit = gerrit.Gerrit(self.server, user, port, sshkey)
self.gerrit.startWatching()
self.gerrit_connector = GerritEventConnector(
self.gerrit, sched)
self.gerrit, sched, self)
self.gerrit_connector.start()
def stop(self):