Add pipeline source

A TriggerEvent may originate from a trigger that does not represent
the canonical location of the project source.  For instance, the
timer trigger strangely depends on the gerrit trigger to actually
handle Git operations behind the scenes.  Instead, make an explicit
association between pipelines and their source triggers so that
their event trigger does not need to have that implicit association.

This is a step toward having pipelines support multiple triggers
(they already support multiple reporters).

Change-Id: Ie80ffde411fe40fddfc4496b7adb0004f660c48c
This commit is contained in:
James E. Blair 2014-08-06 09:37:52 -07:00
parent 8c5297ac15
commit c0dedf8b3f
8 changed files with 66 additions and 62 deletions

View File

@ -278,6 +278,7 @@ explanation of each of the parameters::
- name: check
manager: IndependentPipelineManager
source: gerrit
trigger:
gerrit:
- event: patchset-created
@ -294,6 +295,11 @@ explanation of each of the parameters::
This is an optional field that may be used to provide a textual
description of the pipeline.
**source**
A required field that specifies a trigger that provides access to
the change objects that this pipeline operates on. Currently only
the value ``gerrit`` is supported.
**success-message**
An optional field that supplies the introductory text in message
reported back to Gerrit when all the voting builds are successful.

View File

@ -651,19 +651,19 @@ class TestScheduler(ZuulTestCase):
"Test whether a change is ready to merge"
# TODO: move to test_gerrit (this is a unit test!)
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
trigger = self.sched.layout.pipelines['gate'].trigger
a = self.sched.triggers['gerrit'].getChange(1, 2)
source = self.sched.layout.pipelines['gate'].source
a = source._getChange(1, 2)
mgr = self.sched.layout.pipelines['gate'].manager
self.assertFalse(trigger.canMerge(a, mgr.getSubmitAllowNeeds()))
self.assertFalse(source.canMerge(a, mgr.getSubmitAllowNeeds()))
A.addApproval('CRVW', 2)
a = trigger.getChange(1, 2, refresh=True)
self.assertFalse(trigger.canMerge(a, mgr.getSubmitAllowNeeds()))
a = source._getChange(1, 2, refresh=True)
self.assertFalse(source.canMerge(a, mgr.getSubmitAllowNeeds()))
A.addApproval('APRV', 1)
a = trigger.getChange(1, 2, refresh=True)
self.assertTrue(trigger.canMerge(a, mgr.getSubmitAllowNeeds()))
trigger.maintainCache([])
a = source._getChange(1, 2, refresh=True)
self.assertTrue(source.canMerge(a, mgr.getSubmitAllowNeeds()))
source.maintainCache([])
def test_build_configuration(self):
"Test that zuul merges the right commits for testing"

View File

@ -86,6 +86,7 @@ class LayoutSchema(object):
pipeline = {v.Required('name'): str,
v.Required('manager'): manager,
'source': v.Any('gerrit'),
'precedence': precedence,
'description': str,
'require': require,

View File

@ -77,7 +77,7 @@ class Pipeline(object):
self.manager = None
self.queues = []
self.precedence = PRECEDENCE_NORMAL
self.trigger = None
self.source = None
self.start_actions = None
self.success_actions = None
self.failure_actions = None
@ -965,20 +965,6 @@ class TriggerEvent(object):
return ret
def getChange(self, project, trigger):
if self.change_number:
change = trigger.getChange(self.change_number, self.patch_number)
elif self.ref:
change = Ref(project)
change.ref = self.ref
change.oldrev = self.oldrev
change.newrev = self.newrev
change.url = trigger.getGitwebUrl(project, sha=self.newrev)
else:
change = NullChange(project)
return change
class BaseFilter(object):
def __init__(self, required_approvals=[]):
@ -1038,11 +1024,12 @@ class BaseFilter(object):
class EventFilter(BaseFilter):
def __init__(self, types=[], branches=[], refs=[], event_approvals={},
comments=[], emails=[], usernames=[], timespecs=[],
required_approvals=[]):
def __init__(self, trigger, types=[], branches=[], refs=[],
event_approvals={}, comments=[], emails=[], usernames=[],
timespecs=[], required_approvals=[]):
super(EventFilter, self).__init__(
required_approvals=required_approvals)
self.trigger = trigger
self._types = types
self._branches = branches
self._refs = refs

View File

@ -109,7 +109,7 @@ class RPCListener(object):
if not errors:
event.change_number, event.patch_number = args['change'].split(',')
try:
event.getChange(project, trigger)
pipeline.source.getChange(event, project)
except Exception:
errors += 'Invalid change: %s\n' % (args['change'],)

View File

@ -235,6 +235,8 @@ class Scheduler(threading.Thread):
for conf_pipeline in data.get('pipelines', []):
pipeline = Pipeline(conf_pipeline['name'])
pipeline.description = conf_pipeline.get('description')
# TODO(jeblair): remove backwards compatibility:
pipeline.source = self.triggers[conf_pipeline.get('source', 'gerrit')]
precedence = model.PRECEDENCE_MAP[conf_pipeline.get('precedence')]
pipeline.precedence = precedence
pipeline.failure_message = conf_pipeline.get('failure-message',
@ -298,7 +300,6 @@ class Scheduler(threading.Thread):
# TODO: move this into triggers (may require pluggable
# configuration)
if 'gerrit' in conf_pipeline['trigger']:
pipeline.trigger = self.triggers['gerrit']
for trigger in toList(conf_pipeline['trigger']['gerrit']):
approvals = {}
for approval_dict in toList(trigger.get('approval')):
@ -314,7 +315,8 @@ class Scheduler(threading.Thread):
usernames = toList(trigger.get('username'))
if not usernames:
usernames = toList(trigger.get('username_filter'))
f = EventFilter(types=toList(trigger['event']),
f = EventFilter(trigger=self.triggers['gerrit'],
types=toList(trigger['event']),
branches=toList(trigger.get('branch')),
refs=toList(trigger.get('ref')),
event_approvals=approvals,
@ -325,9 +327,9 @@ class Scheduler(threading.Thread):
toList(trigger.get('require-approval')))
manager.event_filters.append(f)
elif 'timer' in conf_pipeline['trigger']:
pipeline.trigger = self.triggers['timer']
for trigger in toList(conf_pipeline['trigger']['timer']):
f = EventFilter(types=['timer'],
f = EventFilter(trigger=self.triggers['timer'],
types=['timer'],
timespecs=toList(trigger['time']))
manager.event_filters.append(f)
@ -714,8 +716,7 @@ class Scheduler(threading.Thread):
def _doEnqueueEvent(self, event):
project = self.layout.projects.get(event.project_name)
pipeline = self.layout.pipelines[event.forced_pipeline]
trigger = self.triggers.get(event.trigger_name)
change = event.getChange(project, trigger)
change = pipeline.source.getChange(event, project)
self.log.debug("Event %s for change %s was directly assigned "
"to pipeline %s" % (event, change, self))
self.log.info("Adding %s, %s to %s" %
@ -809,8 +810,7 @@ class Scheduler(threading.Thread):
return
for pipeline in self.layout.pipelines.values():
change = event.getChange(project,
self.triggers.get(event.trigger_name))
change = pipeline.source.getChange(event, project)
if event.type == 'patchset-created':
pipeline.manager.removeOldVersionsOfChange(change)
elif event.type == 'change-abandoned':
@ -944,6 +944,7 @@ class BasePipelineManager(object):
def _postConfig(self, layout):
self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
self.log.info(" Source: %s" % self.pipeline.source)
self.log.info(" Requirements:")
for f in self.changeish_filters:
self.log.info(" %s" % f)
@ -1188,7 +1189,7 @@ class BasePipelineManager(object):
oldrev = item.change.oldrev
newrev = item.change.newrev
return dict(project=item.change.project.name,
url=self.pipeline.trigger.getGitUrl(
url=self.pipeline.source.getGitUrl(
item.change.project),
merge_mode=item.change.project.merge_mode,
refspec=item.change.refspec,
@ -1220,7 +1221,7 @@ class BasePipelineManager(object):
item.current_build_set)
else:
self.log.debug("Preparing update repo for: %s" % item.change)
url = self.pipeline.trigger.getGitUrl(item.change.project)
url = self.pipeline.source.getGitUrl(item.change.project)
self.sched.merger.updateRepo(item.change.project.name,
url, build_set)
return False
@ -1410,8 +1411,8 @@ class BasePipelineManager(object):
succeeded = self.pipeline.didAllJobsSucceed(item)
merged = item.reported
if merged:
merged = self.pipeline.trigger.isMerged(item.change,
item.change.branch)
merged = self.pipeline.source.isMerged(item.change,
item.change.branch)
self.log.info("Reported change %s status: all-succeeded: %s, "
"merged: %s" % (item.change, succeeded, merged))
change_queue = self.pipeline.getQueue(item.change.project)
@ -1738,8 +1739,8 @@ class DependentPipelineManager(BasePipelineManager):
return new_change_queues
def isChangeReadyToBeEnqueued(self, change):
if not self.pipeline.trigger.canMerge(change,
self.getSubmitAllowNeeds()):
if not self.pipeline.source.canMerge(change,
self.getSubmitAllowNeeds()):
self.log.debug("Change %s can not merge, ignoring" % change)
return False
return True
@ -1751,8 +1752,8 @@ class DependentPipelineManager(BasePipelineManager):
self.log.debug(" Changeish does not support dependencies")
return
for needs in change.needed_by_changes:
if self.pipeline.trigger.canMerge(needs,
self.getSubmitAllowNeeds()):
if self.pipeline.source.canMerge(needs,
self.getSubmitAllowNeeds()):
self.log.debug(" Change %s needs %s and is ready to merge" %
(needs, change))
to_enqueue.append(needs)
@ -1791,8 +1792,8 @@ class DependentPipelineManager(BasePipelineManager):
if self.isChangeAlreadyInQueue(change.needs_change):
self.log.debug(" Needed change is already ahead in the queue")
return True
if self.pipeline.trigger.canMerge(change.needs_change,
self.getSubmitAllowNeeds()):
if self.pipeline.source.canMerge(change.needs_change,
self.getSubmitAllowNeeds()):
self.log.debug(" Change %s is needed" %
change.needs_change)
return change.needs_change

View File

@ -17,7 +17,7 @@ import threading
import time
import urllib2
from zuul.lib import gerrit
from zuul.model import TriggerEvent, Change
from zuul.model import TriggerEvent, Change, Ref, NullChange
class GerritEventConnector(threading.Thread):
@ -84,12 +84,12 @@ class GerritEventConnector(threading.Thread):
event.account = None
if event.change_number:
# Call getChange for the side effect of updating the
# Call _getChange for the side effect of updating the
# cache. Note that this modifies Change objects outside
# the main thread.
self.trigger.getChange(event.change_number,
event.patch_number,
refresh=True)
self.trigger._getChange(event.change_number,
event.patch_number,
refresh=True)
self.sched.addEvent(event)
self.gerrit.eventDone()
@ -290,7 +290,20 @@ class Gerrit(object):
def postConfig(self):
pass
def getChange(self, number, patchset, refresh=False):
def getChange(self, event, project):
if event.change_number:
change = self._getChange(event.change_number, event.patch_number)
elif event.ref:
change = Ref(project)
change.ref = event.ref
change.oldrev = event.oldrev
change.newrev = event.newrev
change.url = self.getGitwebUrl(project, sha=event.newrev)
else:
change = NullChange(project)
return change
def _getChange(self, number, patchset, refresh=False):
key = '%s,%s' % (number, patchset)
change = None
if key in self._change_cache:
@ -349,7 +362,7 @@ class Gerrit(object):
if 'dependsOn' in data:
parts = data['dependsOn'][0]['ref'].split('/')
dep_num, dep_ps = parts[3], parts[4]
dep = self.getChange(dep_num, dep_ps)
dep = self._getChange(dep_num, dep_ps)
if not dep.is_merged:
change.needs_change = dep
@ -358,7 +371,7 @@ class Gerrit(object):
for needed in data['neededBy']:
parts = needed['ref'].split('/')
dep_num, dep_ps = parts[3], parts[4]
dep = self.getChange(dep_num, dep_ps)
dep = self._getChange(dep_num, dep_ps)
if not dep.is_merged and dep.is_current_patchset:
change.needed_by_changes.append(dep)

View File

@ -56,9 +56,9 @@ class Timer(object):
for job in self.apsched.get_jobs():
self.apsched.unschedule_job(job)
for pipeline in self.sched.layout.pipelines.values():
if pipeline.trigger != self:
continue
for ef in pipeline.manager.event_filters:
if ef.trigger != self:
continue
for timespec in ef.timespecs:
parts = timespec.split()
if len(parts) < 5 or len(parts) > 6:
@ -82,15 +82,11 @@ class Timer(object):
args=(pipeline.name,
timespec,))
def getChange(self, number, patchset, refresh=False):
def getChange(self, event, project):
raise Exception("Timer trigger does not support changes.")
def getGitUrl(self, project):
# For the moment, the timer trigger requires gerrit.
return self.sched.triggers['gerrit'].getGitUrl(project)
raise Exception("Timer trigger does not support changes.")
def getGitwebUrl(self, project, sha=None):
url = '%s/gitweb?p=%s.git' % (self.baseurl, project)
if sha:
url += ';a=commitdiff;h=' + sha
return url
raise Exception("Timer trigger does not support changes.")