Merge "Add pipeline source"

This commit is contained in:
Jenkins 2014-08-15 17:15:27 +00:00 committed by Gerrit Code Review
commit 4e0cdeb2db
8 changed files with 66 additions and 62 deletions

View File

@ -295,6 +295,7 @@ explanation of each of the parameters::
- name: check
manager: IndependentPipelineManager
source: gerrit
trigger:
gerrit:
- event: patchset-created
@ -311,6 +312,11 @@ explanation of each of the parameters::
This is an optional field that may be used to provide a textual
description of the pipeline.
**source**
A required field that specifies a trigger that provides access to
the change objects that this pipeline operates on. Currently only
the value ``gerrit`` is supported.
**success-message**
An optional field that supplies the introductory text in message
reported back to Gerrit when all the voting builds are successful.

View File

@ -651,19 +651,19 @@ class TestScheduler(ZuulTestCase):
"Test whether a change is ready to merge"
# TODO: move to test_gerrit (this is a unit test!)
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
trigger = self.sched.layout.pipelines['gate'].trigger
a = self.sched.triggers['gerrit'].getChange(1, 2)
source = self.sched.layout.pipelines['gate'].source
a = source._getChange(1, 2)
mgr = self.sched.layout.pipelines['gate'].manager
self.assertFalse(trigger.canMerge(a, mgr.getSubmitAllowNeeds()))
self.assertFalse(source.canMerge(a, mgr.getSubmitAllowNeeds()))
A.addApproval('CRVW', 2)
a = trigger.getChange(1, 2, refresh=True)
self.assertFalse(trigger.canMerge(a, mgr.getSubmitAllowNeeds()))
a = source._getChange(1, 2, refresh=True)
self.assertFalse(source.canMerge(a, mgr.getSubmitAllowNeeds()))
A.addApproval('APRV', 1)
a = trigger.getChange(1, 2, refresh=True)
self.assertTrue(trigger.canMerge(a, mgr.getSubmitAllowNeeds()))
trigger.maintainCache([])
a = source._getChange(1, 2, refresh=True)
self.assertTrue(source.canMerge(a, mgr.getSubmitAllowNeeds()))
source.maintainCache([])
def test_build_configuration(self):
"Test that zuul merges the right commits for testing"

View File

@ -87,6 +87,7 @@ class LayoutSchema(object):
pipeline = {v.Required('name'): str,
v.Required('manager'): manager,
'source': v.Any('gerrit'),
'precedence': precedence,
'description': str,
'require': require,

View File

@ -77,7 +77,7 @@ class Pipeline(object):
self.manager = None
self.queues = []
self.precedence = PRECEDENCE_NORMAL
self.trigger = None
self.source = None
self.start_actions = None
self.success_actions = None
self.failure_actions = None
@ -965,20 +965,6 @@ class TriggerEvent(object):
return ret
def getChange(self, project, trigger):
if self.change_number:
change = trigger.getChange(self.change_number, self.patch_number)
elif self.ref:
change = Ref(project)
change.ref = self.ref
change.oldrev = self.oldrev
change.newrev = self.newrev
change.url = trigger.getGitwebUrl(project, sha=self.newrev)
else:
change = NullChange(project)
return change
class BaseFilter(object):
def __init__(self, required_approvals=[]):
@ -1038,11 +1024,12 @@ class BaseFilter(object):
class EventFilter(BaseFilter):
def __init__(self, types=[], branches=[], refs=[], event_approvals={},
comments=[], emails=[], usernames=[], timespecs=[],
required_approvals=[]):
def __init__(self, trigger, types=[], branches=[], refs=[],
event_approvals={}, comments=[], emails=[], usernames=[],
timespecs=[], required_approvals=[]):
super(EventFilter, self).__init__(
required_approvals=required_approvals)
self.trigger = trigger
self._types = types
self._branches = branches
self._refs = refs

View File

@ -109,7 +109,7 @@ class RPCListener(object):
if not errors:
event.change_number, event.patch_number = args['change'].split(',')
try:
event.getChange(project, trigger)
pipeline.source.getChange(event, project)
except Exception:
errors += 'Invalid change: %s\n' % (args['change'],)

View File

@ -235,6 +235,8 @@ class Scheduler(threading.Thread):
for conf_pipeline in data.get('pipelines', []):
pipeline = Pipeline(conf_pipeline['name'])
pipeline.description = conf_pipeline.get('description')
# TODO(jeblair): remove backwards compatibility:
pipeline.source = self.triggers[conf_pipeline.get('source', 'gerrit')]
precedence = model.PRECEDENCE_MAP[conf_pipeline.get('precedence')]
pipeline.precedence = precedence
pipeline.failure_message = conf_pipeline.get('failure-message',
@ -298,7 +300,6 @@ class Scheduler(threading.Thread):
# TODO: move this into triggers (may require pluggable
# configuration)
if 'gerrit' in conf_pipeline['trigger']:
pipeline.trigger = self.triggers['gerrit']
for trigger in toList(conf_pipeline['trigger']['gerrit']):
approvals = {}
for approval_dict in toList(trigger.get('approval')):
@ -314,7 +315,8 @@ class Scheduler(threading.Thread):
usernames = toList(trigger.get('username'))
if not usernames:
usernames = toList(trigger.get('username_filter'))
f = EventFilter(types=toList(trigger['event']),
f = EventFilter(trigger=self.triggers['gerrit'],
types=toList(trigger['event']),
branches=toList(trigger.get('branch')),
refs=toList(trigger.get('ref')),
event_approvals=approvals,
@ -325,9 +327,9 @@ class Scheduler(threading.Thread):
toList(trigger.get('require-approval')))
manager.event_filters.append(f)
elif 'timer' in conf_pipeline['trigger']:
pipeline.trigger = self.triggers['timer']
for trigger in toList(conf_pipeline['trigger']['timer']):
f = EventFilter(types=['timer'],
f = EventFilter(trigger=self.triggers['timer'],
types=['timer'],
timespecs=toList(trigger['time']))
manager.event_filters.append(f)
@ -714,8 +716,7 @@ class Scheduler(threading.Thread):
def _doEnqueueEvent(self, event):
project = self.layout.projects.get(event.project_name)
pipeline = self.layout.pipelines[event.forced_pipeline]
trigger = self.triggers.get(event.trigger_name)
change = event.getChange(project, trigger)
change = pipeline.source.getChange(event, project)
self.log.debug("Event %s for change %s was directly assigned "
"to pipeline %s" % (event, change, self))
self.log.info("Adding %s, %s to %s" %
@ -809,8 +810,7 @@ class Scheduler(threading.Thread):
return
for pipeline in self.layout.pipelines.values():
change = event.getChange(project,
self.triggers.get(event.trigger_name))
change = pipeline.source.getChange(event, project)
if event.type == 'patchset-created':
pipeline.manager.removeOldVersionsOfChange(change)
elif event.type == 'change-abandoned':
@ -944,6 +944,7 @@ class BasePipelineManager(object):
def _postConfig(self, layout):
self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
self.log.info(" Source: %s" % self.pipeline.source)
self.log.info(" Requirements:")
for f in self.changeish_filters:
self.log.info(" %s" % f)
@ -1188,7 +1189,7 @@ class BasePipelineManager(object):
oldrev = item.change.oldrev
newrev = item.change.newrev
return dict(project=item.change.project.name,
url=self.pipeline.trigger.getGitUrl(
url=self.pipeline.source.getGitUrl(
item.change.project),
merge_mode=item.change.project.merge_mode,
refspec=item.change.refspec,
@ -1220,7 +1221,7 @@ class BasePipelineManager(object):
item.current_build_set)
else:
self.log.debug("Preparing update repo for: %s" % item.change)
url = self.pipeline.trigger.getGitUrl(item.change.project)
url = self.pipeline.source.getGitUrl(item.change.project)
self.sched.merger.updateRepo(item.change.project.name,
url, build_set)
return False
@ -1410,8 +1411,8 @@ class BasePipelineManager(object):
succeeded = self.pipeline.didAllJobsSucceed(item)
merged = item.reported
if merged:
merged = self.pipeline.trigger.isMerged(item.change,
item.change.branch)
merged = self.pipeline.source.isMerged(item.change,
item.change.branch)
self.log.info("Reported change %s status: all-succeeded: %s, "
"merged: %s" % (item.change, succeeded, merged))
change_queue = self.pipeline.getQueue(item.change.project)
@ -1737,8 +1738,8 @@ class DependentPipelineManager(BasePipelineManager):
return new_change_queues
def isChangeReadyToBeEnqueued(self, change):
if not self.pipeline.trigger.canMerge(change,
self.getSubmitAllowNeeds()):
if not self.pipeline.source.canMerge(change,
self.getSubmitAllowNeeds()):
self.log.debug("Change %s can not merge, ignoring" % change)
return False
return True
@ -1750,8 +1751,8 @@ class DependentPipelineManager(BasePipelineManager):
self.log.debug(" Changeish does not support dependencies")
return
for needs in change.needed_by_changes:
if self.pipeline.trigger.canMerge(needs,
self.getSubmitAllowNeeds()):
if self.pipeline.source.canMerge(needs,
self.getSubmitAllowNeeds()):
self.log.debug(" Change %s needs %s and is ready to merge" %
(needs, change))
to_enqueue.append(needs)
@ -1790,8 +1791,8 @@ class DependentPipelineManager(BasePipelineManager):
if self.isChangeAlreadyInQueue(change.needs_change):
self.log.debug(" Needed change is already ahead in the queue")
return True
if self.pipeline.trigger.canMerge(change.needs_change,
self.getSubmitAllowNeeds()):
if self.pipeline.source.canMerge(change.needs_change,
self.getSubmitAllowNeeds()):
self.log.debug(" Change %s is needed" %
change.needs_change)
return change.needs_change

View File

@ -17,7 +17,7 @@ import threading
import time
import urllib2
from zuul.lib import gerrit
from zuul.model import TriggerEvent, Change
from zuul.model import TriggerEvent, Change, Ref, NullChange
class GerritEventConnector(threading.Thread):
@ -84,12 +84,12 @@ class GerritEventConnector(threading.Thread):
event.account = None
if event.change_number:
# Call getChange for the side effect of updating the
# Call _getChange for the side effect of updating the
# cache. Note that this modifies Change objects outside
# the main thread.
self.trigger.getChange(event.change_number,
event.patch_number,
refresh=True)
self.trigger._getChange(event.change_number,
event.patch_number,
refresh=True)
self.sched.addEvent(event)
self.gerrit.eventDone()
@ -290,7 +290,20 @@ class Gerrit(object):
def postConfig(self):
pass
def getChange(self, number, patchset, refresh=False):
def getChange(self, event, project):
if event.change_number:
change = self._getChange(event.change_number, event.patch_number)
elif event.ref:
change = Ref(project)
change.ref = event.ref
change.oldrev = event.oldrev
change.newrev = event.newrev
change.url = self.getGitwebUrl(project, sha=event.newrev)
else:
change = NullChange(project)
return change
def _getChange(self, number, patchset, refresh=False):
key = '%s,%s' % (number, patchset)
change = None
if key in self._change_cache:
@ -349,7 +362,7 @@ class Gerrit(object):
if 'dependsOn' in data:
parts = data['dependsOn'][0]['ref'].split('/')
dep_num, dep_ps = parts[3], parts[4]
dep = self.getChange(dep_num, dep_ps)
dep = self._getChange(dep_num, dep_ps)
if not dep.is_merged:
change.needs_change = dep
@ -358,7 +371,7 @@ class Gerrit(object):
for needed in data['neededBy']:
parts = needed['ref'].split('/')
dep_num, dep_ps = parts[3], parts[4]
dep = self.getChange(dep_num, dep_ps)
dep = self._getChange(dep_num, dep_ps)
if not dep.is_merged and dep.is_current_patchset:
change.needed_by_changes.append(dep)

View File

@ -56,9 +56,9 @@ class Timer(object):
for job in self.apsched.get_jobs():
self.apsched.unschedule_job(job)
for pipeline in self.sched.layout.pipelines.values():
if pipeline.trigger != self:
continue
for ef in pipeline.manager.event_filters:
if ef.trigger != self:
continue
for timespec in ef.timespecs:
parts = timespec.split()
if len(parts) < 5 or len(parts) > 6:
@ -82,15 +82,11 @@ class Timer(object):
args=(pipeline.name,
timespec,))
def getChange(self, number, patchset, refresh=False):
def getChange(self, event, project):
raise Exception("Timer trigger does not support changes.")
def getGitUrl(self, project):
# For the moment, the timer trigger requires gerrit.
return self.sched.triggers['gerrit'].getGitUrl(project)
raise Exception("Timer trigger does not support changes.")
def getGitwebUrl(self, project, sha=None):
url = '%s/gitweb?p=%s.git' % (self.baseurl, project)
if sha:
url += ';a=commitdiff;h=' + sha
return url
raise Exception("Timer trigger does not support changes.")