Add a timer trigger

Add a trigger that enqueues items based on a timer.

Change-Id: I024be871914c8272c237b1f818589be08cda04da
This commit is contained in:
James E. Blair 2013-07-29 17:14:51 -07:00
parent 6c358e72ea
commit 63bb0ef073
12 changed files with 256 additions and 11 deletions

View File

@ -18,11 +18,12 @@ Since 1.3.0:
functionality may be achieved with a custom parameter function that
matches all jobs).
* Multiple triggers are now supported, in principle (though only
Gerrit is defined currently). Your layout.yaml file will need to
* Multiple triggers are now supported (currently Gerrit and a simple
Timer trigger ar supported). Your layout.yaml file will need to
change to add the key "gerrit:" inside of the "triggers:" list to
specify a Gerrit trigger (and facilitate adding other kinds of
triggers later). See the sample layout.yaml.
triggers later). See the sample layout.yaml and Zuul section of the
documentation.
* The default behavior is now to immediately dequeue changes that have
merge conflicts, even those not at the head of the queue. To enable

View File

@ -4,11 +4,10 @@ Triggers
========
The process of merging a change starts with proposing a change to be
merged. Currently Zuul only supports Gerrit as a triggering system.
merged. Primarily, Zuul supports Gerrit as a triggering system, as
well as a facility for triggering jobs based on a timer.
Zuul's design is modular, so alternate triggering and reporting
systems can be supported. However, Gerrit has a particularly robust
data model, and Zuul does make some assumptions that include that data
model. Nonetheless, patches to support alternate systems are welcome.
systems can be supported.
Gerrit
------
@ -101,3 +100,9 @@ instance, a clone will produce a repository in an unpredictable state
depending on what the state of Zuul's repository is when the clone
happens). They are, however, suitable for automated systems that
respond to Zuul triggers.
Timer
-----
A simple timer trigger is available as well. It supports triggering
jobs in a pipeline based on cron-style time instructions.

View File

@ -304,6 +304,18 @@ explanation of each of the parameters::
containing 'retrigger' somewhere in the comment text are added to a
change.
**timer**
This trigger will run based on a cron-style time specification.
It will enqueue an event into its pipeline for every project
defined in the configuration. Any job associated with the
pipeline will run in response to that event.
*time*
The time specification in cron syntax. Only the 5 part syntax is
supported, not the symbolic names. Example: ``0 0 * * *`` runs
at midnight.
**dequeue-on-new-patchset**
Normally, if a new patchset is uploaded to a change that is in a
pipeline, the existing entry in the pipeline will be removed (with

View File

@ -13,3 +13,4 @@ extras
statsd>=1.0.0,<3.0
voluptuous>=0.6,<0.7
gear>=0.3.1,<0.4.0
apscheduler>=2.1.1,<3.0

26
tests/fixtures/layout-timer.yaml vendored Normal file
View File

@ -0,0 +1,26 @@
pipelines:
- name: check
manager: IndependentPipelineManager
trigger:
gerrit:
- event: patchset-created
success:
verified: 1
failure:
verified: -1
- name: periodic
manager: IndependentPipelineManager
trigger:
timer:
- time: '* * * * * */10'
projects:
- name: org/project
check:
- project-merge:
- project-test1
- project-test2
periodic:
- project-bitrot-stable-old
- project-bitrot-stable-older

View File

@ -45,6 +45,7 @@ import zuul.scheduler
import zuul.webapp
import zuul.launcher.gearman
import zuul.trigger.gerrit
import zuul.trigger.timer
FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
'fixtures')
@ -762,6 +763,8 @@ class TestScheduler(testtools.TestCase):
self.sched.setLauncher(self.launcher)
self.sched.registerTrigger(self.gerrit)
self.timer = zuul.trigger.timer.Timer(self.config, self.sched)
self.sched.registerTrigger(self.timer)
self.sched.start()
self.sched.reconfigure(self.config)
@ -786,6 +789,7 @@ class TestScheduler(testtools.TestCase):
self.worker.shutdown()
self.gearman_server.shutdown()
self.gerrit.stop()
self.timer.stop()
self.sched.stop()
self.sched.join()
self.statsd.stop()
@ -2346,6 +2350,7 @@ class TestScheduler(testtools.TestCase):
"Test that we can test the config"
sched = zuul.scheduler.Scheduler()
sched.registerTrigger(None, 'gerrit')
sched.registerTrigger(None, 'timer')
sched.testConfig(CONFIG.get('zuul', 'layout_config'))
def test_build_description(self):
@ -2481,3 +2486,50 @@ class TestScheduler(testtools.TestCase):
'SUCCESS')
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(A.reported, 2)
def test_timer(self):
"Test that a periodic job is triggered"
self.worker.hold_jobs_in_build = True
self.config.set('zuul', 'layout_config',
'tests/fixtures/layout-timer.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
start = time.time()
failed = True
while ((time.time() - start) < 30):
if len(self.builds) == 2:
failed = False
break
else:
time.sleep(1)
if failed:
raise Exception("Expected jobs never ran")
self.waitUntilSettled()
port = self.webapp.server.socket.getsockname()[1]
f = urllib.urlopen("http://localhost:%s/status.json" % port)
data = f.read()
self.worker.hold_jobs_in_build = False
self.worker.release()
self.waitUntilSettled()
self.assertEqual(self.getJobFromHistory(
'project-bitrot-stable-old').result, 'SUCCESS')
self.assertEqual(self.getJobFromHistory(
'project-bitrot-stable-older').result, 'SUCCESS')
data = json.loads(data)
status_jobs = set()
for p in data['pipelines']:
for q in p['change_queues']:
for head in q['heads']:
for change in head:
self.assertEqual(change['id'], 'None')
for job in change['jobs']:
status_jobs.add(job['name'])
self.assertIn('project-bitrot-stable-old', status_jobs)
self.assertIn('project-bitrot-stable-older', status_jobs)

View File

@ -106,6 +106,7 @@ class Server(object):
logging.basicConfig(level=logging.DEBUG)
self.sched = zuul.scheduler.Scheduler()
self.sched.registerTrigger(None, 'gerrit')
self.sched.registerTrigger(None, 'timer')
layout = self.sched.testConfig(self.config.get('zuul',
'layout_config'))
if not job_list_path:
@ -151,6 +152,7 @@ class Server(object):
import zuul.scheduler
import zuul.launcher.gearman
import zuul.trigger.gerrit
import zuul.trigger.timer
import zuul.webapp
if (self.config.has_option('gearman_server', 'start') and
@ -163,10 +165,12 @@ class Server(object):
gearman = zuul.launcher.gearman.Gearman(self.config, self.sched)
gerrit = zuul.trigger.gerrit.Gerrit(self.config, self.sched)
timer = zuul.trigger.timer.Timer(self.config, self.sched)
webapp = zuul.webapp.WebApp(self.sched)
self.sched.setLauncher(gearman)
self.sched.registerTrigger(gerrit)
self.sched.registerTrigger(timer)
self.sched.start()
self.sched.reconfigure(self.config)

View File

@ -49,7 +49,10 @@ class LayoutSchema(object):
'approval': toList(variable_dict),
}
trigger = v.Required(v.Any({'gerrit': toList(gerrit_trigger)}))
timer_trigger = {v.Required('time'): str}
trigger = v.Required(v.Any({'gerrit': toList(gerrit_trigger)},
{'timer': toList(timer_trigger)}))
pipeline = {v.Required('name'): str,
v.Required('manager'): manager,

View File

@ -719,6 +719,22 @@ class Ref(Changeish):
return False
class NullChange(Changeish):
is_reportable = False
def __init__(self, project):
super(NullChange, self).__init__(project)
def _id(self):
return 'None'
def equals(self, other):
return False
def isUpdateOf(self, other):
return False
class TriggerEvent(object):
def __init__(self):
self.data = None
@ -740,6 +756,8 @@ class TriggerEvent(object):
self.ref = None
self.oldrev = None
self.newrev = None
# timer
self.timespec = None
def __repr__(self):
ret = '<TriggerEvent %s %s' % (self.type, self.project_name)
@ -758,19 +776,21 @@ class TriggerEvent(object):
def getChange(self, project, trigger):
if self.change_number:
change = trigger.getChange(self.change_number, self.patch_number)
if self.ref:
elif self.ref:
change = Ref(project)
change.ref = self.ref
change.oldrev = self.oldrev
change.newrev = self.newrev
change.url = trigger.getGitwebUrl(project, sha=self.newrev)
else:
change = NullChange(project)
return change
class EventFilter(object):
def __init__(self, types=[], branches=[], refs=[], approvals={},
comment_filters=[], email_filters=[]):
comment_filters=[], email_filters=[], timespecs=[]):
self._types = types
self._branches = branches
self._refs = refs
@ -782,6 +802,7 @@ class EventFilter(object):
self.comment_filters = [re.compile(x) for x in comment_filters]
self.email_filters = [re.compile(x) for x in email_filters]
self.approvals = approvals
self.timespecs = timespecs
def __repr__(self):
ret = '<EventFilter'
@ -799,6 +820,8 @@ class EventFilter(object):
ret += ' comment_filters: %s' % ', '.join(self._comment_filters)
if self._email_filters:
ret += ' email_filters: %s' % ', '.join(self._email_filters)
if self.timespecs:
ret += ' timespecs: %s' % ', '.join(self.timespecs)
ret += '>'
return ret
@ -863,6 +886,15 @@ class EventFilter(object):
matches_approval = True
if not matches_approval:
return False
# timespecs are ORed
matches_timespec = False
for timespec in self.timespecs:
if (event.timespec == timespec):
matches_timespec = True
if self.timespecs and not matches_timespec:
return False
return True

View File

@ -159,6 +159,12 @@ class Scheduler(threading.Thread):
email_filters=
toList(trigger.get('email_filter')))
manager.event_filters.append(f)
elif 'timer' in conf_pipeline['trigger']:
pipeline.trigger = self.triggers['timer']
for trigger in toList(conf_pipeline['trigger']['timer']):
f = EventFilter(types=['timer'],
timespecs=toList(trigger['time']))
manager.event_filters.append(f)
for project_template in data.get('project-templates', []):
# Make sure the template only contains valid pipelines
@ -460,6 +466,8 @@ class Scheduler(threading.Thread):
old_pipeline.manager.building_jobs
self.layout = layout
self._setupMerger()
for trigger in self.triggers.values():
trigger.postConfig()
self._reconfigure = False
self.reconfigure_complete_event.set()
finally:
@ -1190,7 +1198,7 @@ class BasePipelineManager(object):
Branch: <b>{change.branch}</b><br/>
Pipeline: <b>{self.pipeline.name}</b>
</p>"""
else:
elif hasattr(change, 'ref'):
ret = """\
<p>
Triggered by reference:
@ -1199,6 +1207,8 @@ class BasePipelineManager(object):
New revision: <b>{change.newrev}</b><br/>
Pipeline: <b>{self.pipeline.name}</b>
</p>"""
else:
ret = ""
if concurrent_changes:
ret += """\

View File

@ -302,6 +302,9 @@ class Gerrit(object):
for key in remove:
del self._change_cache[key]
def postConfig(self):
pass
def getChange(self, number, patchset, refresh=False):
key = '%s,%s' % (number, patchset)
change = None

96
zuul/trigger/timer.py Normal file
View File

@ -0,0 +1,96 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P.
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import apscheduler.scheduler
import logging
from zuul.model import TriggerEvent
class Timer(object):
name = 'timer'
log = logging.getLogger("zuul.Timer")
def __init__(self, config, sched):
self.sched = sched
self.config = config
self.apsched = apscheduler.scheduler.Scheduler()
self.apsched.start()
def _onTrigger(self, timespec):
for project in self.sched.layout.projects.values():
event = TriggerEvent()
event.type = 'timer'
event.timespec = timespec
event.project_name = project.name
self.log.debug("Adding event %s" % event)
self.sched.addEvent(event)
def stop(self):
self.apsched.shutdown()
def report(self, change, message, action):
raise Exception("Timer trigger does not support reporting.")
def isMerged(self, change, head=None):
raise Exception("Timer trigger does not support checking if "
"a change is merged.")
def canMerge(self, change, allow_needs):
raise Exception("Timer trigger does not support checking if "
"a change can merge.")
def maintainCache(self, relevant):
return
def postConfig(self):
for job in self.apsched.get_jobs():
self.apsched.unschedule_job(job)
for pipeline in self.sched.layout.pipelines.values():
if pipeline.trigger != self:
continue
for ef in pipeline.manager.event_filters:
for timespec in ef.timespecs:
parts = timespec.split()
if len(parts) < 5 or len(parts) > 6:
self.log.error(
"Unable to parse time value '%s' "
"defined in pipeline %s" % (
timespec,
pipeline.name))
continue
minute, hour, dom, month, dow = parts[:5]
if len(parts) > 5:
second = parts[5]
else:
second = None
self.apsched.add_cron_job(self._onTrigger,
day=dom,
day_of_week=dow,
hour=hour,
minute=minute,
second=second,
args=(timespec,))
def getChange(self, number, patchset, refresh=False):
raise Exception("Timer trigger does not support changes.")
def getGitUrl(self, project):
pass
def getGitwebUrl(self, project, sha=None):
url = '%s/gitweb?p=%s.git' % (self.baseurl, project)
if sha:
url += ';a=commitdiff;h=' + sha
return url