Merge master into feature/zuulv3

Conflicts:
	zuul/connection/gerrit.py
	zuul/lib/connections.py
	zuul/model.py
	zuul/scheduler.py

Change-Id: If1c8ac3bf26bd8c4496ac130958b966d9937519e
This commit is contained in:
Joshua Hesketh 2016-02-11 21:22:14 +11:00
commit 89b67f617c
19 changed files with 280 additions and 55 deletions

1
.gitignore vendored
View File

@ -1,3 +1,4 @@
*.sw?
*.egg
*.egg-info
*.pyc

View File

@ -239,7 +239,7 @@ the Git plugin to prepare them, or you may chose to use a shell script
instead. As an example, the OpenStack project uses the following
script to prepare the workspace for its integration testing:
https://github.com/openstack-infra/devstack-gate/blob/master/devstack-vm-gate-wrap.sh
https://git.openstack.org/cgit/openstack-infra/devstack-gate/tree/devstack-vm-gate-wrap.sh
Turbo Hipster Worker
~~~~~~~~~~~~~~~~~~~~

View File

@ -568,8 +568,8 @@ file. The first is called a *check* pipeline::
my_gerrit:
verified: 1
failure:
gerrit:
my_gerrit: -1
my_gerrit:
verified: -1
This will trigger jobs each time a new patchset (or change) is
uploaded to Gerrit, and report +/-1 values to Gerrit in the
@ -704,6 +704,11 @@ each job as it builds a list from the project specification.
would largely defeat the parallelization of dependent change testing
that is the main feature of Zuul. Default: ``false``.
**mutex (optional)**
This is a string that names a mutex that should be observed by this
job. Only one build of any job that references the same named mutex
will be enqueued at a time. This applies across all pipelines.
**branch (optional)**
This job should only be run on matching branches. This field is
treated as a regular expression and multiple branches may be

View File

@ -1,9 +1,9 @@
pbr>=0.5.21,<1.0
pbr>=1.1.0
argparse
PyYAML>=3.1.0
Paste
WebOb>=1.2.3,<1.3
WebOb>=1.2.3
paramiko>=1.8.0
GitPython>=0.3.3
ordereddict
@ -12,7 +12,7 @@ extras
statsd>=1.0.0,<3.0
voluptuous>=0.7
gear>=0.5.7,<1.0.0
apscheduler>=2.1.1,<3.0
apscheduler>=3.0
PrettyTable>=0.6,<0.8
babel>=1.0
six>=1.6.0

25
tests/fixtures/layout-mutex.yaml vendored Normal file
View File

@ -0,0 +1,25 @@
pipelines:
- name: check
manager: IndependentPipelineManager
trigger:
gerrit:
- event: patchset-created
success:
gerrit:
verified: 1
failure:
gerrit:
verified: -1
jobs:
- name: mutex-one
mutex: test-mutex
- name: mutex-two
mutex: test-mutex
projects:
- name: org/project
check:
- project-test1
- mutex-one
- mutex-two

View File

@ -132,6 +132,10 @@ jobs:
parameter-function: select_debian_node
- name: project1-project2-integration
queue-name: integration
- name: mutex-one
mutex: test-mutex
- name: mutex-two
mutex: test-mutex
project-templates:
- name: test-one-and-two

View File

@ -2286,6 +2286,70 @@ class TestScheduler(ZuulTestCase):
self.sched.reconfigure(self.config)
self.assertEqual(len(self.sched.layout.pipelines['gate'].queues), 1)
def test_mutex(self):
"Test job mutexes"
self.config.set('zuul', 'layout_config',
'tests/fixtures/layout-mutex.yaml')
self.sched.reconfigure(self.config)
self.worker.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
self.assertFalse('test-mutex' in self.sched.mutex.mutexes)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(len(self.builds), 3)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'mutex-one')
self.assertEqual(self.builds[2].name, 'project-test1')
self.worker.release('mutex-one')
self.waitUntilSettled()
self.assertEqual(len(self.builds), 3)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test1')
self.assertEqual(self.builds[2].name, 'mutex-two')
self.assertTrue('test-mutex' in self.sched.mutex.mutexes)
self.worker.release('mutex-two')
self.waitUntilSettled()
self.assertEqual(len(self.builds), 3)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test1')
self.assertEqual(self.builds[2].name, 'mutex-one')
self.assertTrue('test-mutex' in self.sched.mutex.mutexes)
self.worker.release('mutex-one')
self.waitUntilSettled()
self.assertEqual(len(self.builds), 3)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test1')
self.assertEqual(self.builds[2].name, 'mutex-two')
self.assertTrue('test-mutex' in self.sched.mutex.mutexes)
self.worker.release('mutex-two')
self.waitUntilSettled()
self.assertEqual(len(self.builds), 2)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test1')
self.assertFalse('test-mutex' in self.sched.mutex.mutexes)
self.worker.hold_jobs_in_build = False
self.worker.release()
self.waitUntilSettled()
self.assertEqual(len(self.builds), 0)
self.assertEqual(A.reported, 1)
self.assertEqual(B.reported, 1)
self.assertFalse('test-mutex' in self.sched.mutex.mutexes)
def test_node_label(self):
"Test that a job runs on a specific node label"
self.worker.registerFunction('build:node-project-test1:debian')
@ -2742,11 +2806,11 @@ class TestScheduler(ZuulTestCase):
'tests/fixtures/layout-idle.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
self.waitUntilSettled()
# The pipeline triggers every second, so we should have seen
# several by now.
time.sleep(5)
self.waitUntilSettled()
# Stop queuing timer triggered jobs so that the assertions
# below don't race against more jobs being queued.
@ -2754,6 +2818,7 @@ class TestScheduler(ZuulTestCase):
'tests/fixtures/layout-no-timer.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
self.waitUntilSettled()
self.assertEqual(len(self.builds), 2)
self.worker.release('.*')
@ -3412,6 +3477,31 @@ For CI problems and help debugging, contact ci@example.org"""
self.assertEqual('The merge failed! For more information...',
self.smtp_messages[0]['body'])
def test_default_merge_failure_reports(self):
"""Check that the default merge failure reports are correct."""
# A should report success, B should report merge failure.
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addPatchset(['conflict'])
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
B.addPatchset(['conflict'])
A.addApproval('CRVW', 2)
B.addApproval('CRVW', 2)
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
self.waitUntilSettled()
self.assertEqual(3, len(self.history)) # A jobs
self.assertEqual(A.reported, 2)
self.assertEqual(B.reported, 2)
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(B.data['status'], 'NEW')
self.assertIn('Build succeeded', A.messages[1])
self.assertIn('Merge Failed', B.messages[1])
self.assertIn('automatically merged', B.messages[1])
self.assertNotIn('logs.example.com', B.messages[1])
self.assertNotIn('SKIPPED', B.messages[1])
def test_swift_instructions(self):
"Test that the correct swift instructions are sent to the workers"
self.updateConfigLayout(

View File

@ -17,9 +17,6 @@ deps = -r{toxinidir}/requirements.txt
commands =
python setup.py testr --slowest --testr-args='{posargs}'
[tox:jenkins]
downloadcache = ~/cache/pip
[testenv:pep8]
commands = flake8 {posargs}

View File

@ -60,6 +60,7 @@ class JobParser(object):
'failure-url': str,
'success-url': str,
'voting': bool,
'mutex': str,
'branches': to_list(str),
'files': to_list(str),
'swift': to_list(swift),
@ -81,6 +82,7 @@ class JobParser(object):
job.pre_run = as_list(conf.get('pre-run', job.pre_run))
job.post_run = as_list(conf.get('post-run', job.post_run))
job.voting = conf.get('voting', True)
job.mutex = conf.get('mutex', None)
job.failure_message = conf.get('failure-message', job.failure_message)
job.success_message = conf.get('success-message', job.success_message)

View File

@ -43,6 +43,14 @@ class BaseConnection(object):
self.connection_name = connection_name
self.connection_config = connection_config
# Keep track of the sources, triggers and reporters using this
# connection
self.attached_to = {
'source': [],
'trigger': [],
'reporter': [],
}
def onLoad(self):
pass
@ -51,3 +59,6 @@ class BaseConnection(object):
def registerScheduler(self, sched):
self.sched = sched
def registerUse(self, what, instance):
self.attached_to[what].append(instance)

View File

@ -47,7 +47,6 @@ class GerritEventConnector(threading.Thread):
def _handleEvent(self):
ts, data = self.connection.getEvent()
if self._stopped:
self.connection.eventDone()
return
# Gerrit can produce inconsistent data immediately after an
# event, So ensure that we do not deliver the event to Zuul
@ -99,16 +98,27 @@ class GerritEventConnector(threading.Thread):
Can not get account information." % event.type)
event.account = None
# TODOv3(jeblair,jhesketh): this is broken in the main branch and
# the fix needs to be merged here
# if (event.change_number and
# self.connection.sched.getProject(event.project_name)):
if event.change_number:
# Mark the change as needing a refresh in the cache
event._needs_refresh = True
# TODO(jhesketh): Check if the project exists?
# and self.connection.sched.getProject(event.project_name):
# Call _getChange for the side effect of updating the
# cache. Note that this modifies Change objects outside
# the main thread.
# NOTE(jhesketh): Ideally we'd just remove the change from the
# cache to denote that it needs updating. However the change
# object is already used by Item's and hence BuildSet's etc. and
# we need to update those objects by reference so that they have
# the correct/new information and also avoid hitting gerrit
# multiple times.
if self.connection.attached_to['source']:
self.connection.attached_to['source'][0]._getChange(
event.change_number, event.patch_number, refresh=True)
# We only need to do this once since the connection maintains
# the cache (which is shared between all the sources)
# NOTE(jhesketh): We may couple sources and connections again
# at which point this becomes more sensible.
self.connection.sched.addEvent(event)
self.connection.eventDone()
def run(self):
while True:
@ -118,6 +128,8 @@ class GerritEventConnector(threading.Thread):
self._handleEvent()
except:
self.log.exception("Exception moving Gerrit event:")
finally:
self.connection.eventDone()
class GerritWatcher(threading.Thread):

View File

@ -127,6 +127,7 @@ class LayoutSchema(object):
'success-pattern': str,
'hold-following-changes': bool,
'voting': bool,
'mutex': str,
'parameter-function': str,
'branch': toList(str),
'files': toList(str),

View File

@ -71,12 +71,12 @@ class ConnectionRegistry(object):
if 'gerrit' in config.sections():
connections['gerrit'] = \
zuul.connection.gerrit.GerritConnection(
'_legacy_gerrit', dict(config.items('gerrit')))
'gerrit', dict(config.items('gerrit')))
if 'smtp' in config.sections():
connections['smtp'] = \
zuul.connection.smtp.SMTPConnection(
'_legacy_smtp', dict(config.items('smtp')))
'smtp', dict(config.items('smtp')))
self.connections = connections
@ -118,6 +118,9 @@ class ConnectionRegistry(object):
driver_config, self.sched, connection
)
if connection:
connection.registerUse(dtype, driver_instance)
return driver_instance
def getSource(self, connection_name):

View File

@ -77,14 +77,16 @@ class BasePipelineManager(object):
efilters += str(tree.job.skip_if_matcher)
if efilters:
efilters = ' ' + efilters
hold = ''
tags = []
if tree.job.hold_following_changes:
hold = ' [hold]'
voting = ''
tags.append('[hold]')
if not tree.job.voting:
voting = ' [nonvoting]'
self.log.info("%s%s%s%s%s" % (istr, repr(tree.job),
efilters, hold, voting))
tags.append('[nonvoting]')
if tree.job.mutex:
tags.append('[mutex: %s]' % tree.job.mutex)
tags = ' '.join(tags)
self.log.info("%s%s%s %s" % (istr, repr(tree.job),
efilters, tags))
for x in tree.job_trees:
log_jobs(x, indent + 2)
@ -348,7 +350,7 @@ class BasePipelineManager(object):
def launchJobs(self, item):
# TODO(jeblair): This should return a value indicating a job
# was launched. Appears to be a longstanding bug.
jobs = self.pipeline.findJobsToRun(item)
jobs = self.pipeline.findJobsToRun(item, self.sched.mutex)
if jobs:
self._launchJobs(item, jobs)
@ -474,13 +476,23 @@ class BasePipelineManager(object):
def updateBuildDescriptions(self, build_set):
for build in build_set.getBuilds():
try:
desc = self.formatDescription(build)
self.sched.launcher.setBuildDescription(build, desc)
except:
# Log the failure and let loop continue
self.log.error("Failed to update description for build %s" %
(build))
if build_set.previous_build_set:
for build in build_set.previous_build_set.getBuilds():
try:
desc = self.formatDescription(build)
self.sched.launcher.setBuildDescription(build, desc)
except:
# Log the failure and let loop continue
self.log.error("Failed to update description for "
"build %s in previous build set" % (build))
def onBuildStarted(self, build):
self.log.debug("Build %s started" % build)
@ -491,6 +503,7 @@ class BasePipelineManager(object):
item = build.build_set.item
self.pipeline.setResult(item, build)
self.sched.mutex.release(item, build.job)
self.log.debug("Item %s status is now:\n %s" %
(item, item.formatStatus()))
return True
@ -503,9 +516,9 @@ class BasePipelineManager(object):
if event.merged:
build_set.commit = event.commit
elif event.updated:
if not isinstance(item, NullChange):
if not isinstance(item.change, NullChange):
build_set.commit = item.change.newrev
if not build_set.commit:
if not build_set.commit and not isinstance(item.change, NullChange):
self.log.info("Unable to merge change %s" % item.change)
self.pipeline.setUnableToMerge(item)

View File

@ -146,7 +146,7 @@ class Pipeline(object):
return []
return item.change.filterJobs(tree.getJobs())
def _findJobsToRun(self, job_trees, item):
def _findJobsToRun(self, job_trees, item, mutex):
torun = []
for tree in job_trees:
job = tree.job
@ -160,20 +160,23 @@ class Pipeline(object):
else:
# There is no build for the root of this job tree,
# so we should run it.
if mutex.acquire(item, job):
# If this job needs a mutex, either acquire it or make
# sure that we have it before running the job.
torun.append(job)
# If there is no job, this is a null job tree, and we should
# run all of its jobs.
if result == 'SUCCESS' or not job:
torun.extend(self._findJobsToRun(tree.job_trees, item))
torun.extend(self._findJobsToRun(tree.job_trees, item, mutex))
return torun
def findJobsToRun(self, item):
def findJobsToRun(self, item, mutex):
if not item.live:
return []
tree = item.job_tree
if not tree:
return []
return self._findJobsToRun(tree.job_trees, item)
return self._findJobsToRun(tree.job_trees, item, mutex)
def haveAllJobsStarted(self, item):
for job in self.getJobs(item):
@ -464,6 +467,7 @@ class Job(object):
swift=None, # TODOv3(jeblair): move to auth
parameter_function=None, # TODOv3(jeblair): remove
success_pattern=None, # TODOv3(jeblair): remove
mutex=None,
)
def __init__(self, name):
@ -1051,9 +1055,6 @@ class TriggerEvent(object):
# an admin command, etc):
self.forced_pipeline = None
# Internal mechanism to track if the change needs a refresh from cache
self._needs_refresh = False
def __repr__(self):
ret = '<TriggerEvent %s %s' % (self.type, self.project_name)

View File

@ -81,6 +81,8 @@ class BaseReporter(object):
def _formatItemReportFailure(self, pipeline, item):
if item.dequeued_needing_change:
msg = 'This change depends on a change that failed to merge.\n'
elif not pipeline.didMergerSucceed(item):
msg = pipeline.merge_failure_message
else:
msg = (pipeline.failure_message + '\n\n' +
self._formatItemReportJobs(pipeline, item))

View File

@ -34,6 +34,68 @@ from zuul import version as zuul_version
statsd = extras.try_import('statsd.statsd')
class MutexHandler(object):
log = logging.getLogger("zuul.MutexHandler")
def __init__(self):
self.mutexes = {}
def acquire(self, item, job):
if not job.mutex:
return True
mutex_name = job.mutex
m = self.mutexes.get(mutex_name)
if not m:
# The mutex is not held, acquire it
self._acquire(mutex_name, item, job.name)
return True
held_item, held_job_name = m
if held_item is item and held_job_name == job.name:
# This item already holds the mutex
return True
held_build = held_item.current_build_set.getBuild(held_job_name)
if held_build and held_build.result:
# The build that held the mutex is complete, release it
# and let the new item have it.
self.log.error("Held mutex %s being released because "
"the build that holds it is complete" %
(mutex_name,))
self._release(mutex_name, item, job.name)
self._acquire(mutex_name, item, job.name)
return True
return False
def release(self, item, job):
if not job.mutex:
return
mutex_name = job.mutex
m = self.mutexes.get(mutex_name)
if not m:
# The mutex is not held, nothing to do
self.log.error("Mutex can not be released for %s "
"because the mutex is not held" %
(item,))
return
held_item, held_job_name = m
if held_item is item and held_job_name == job.name:
# This item holds the mutex
self._release(mutex_name, item, job.name)
return
self.log.error("Mutex can not be released for %s "
"which does not hold it" %
(item,))
def _acquire(self, mutex_name, item, job_name):
self.log.debug("Job %s of item %s acquiring mutex %s" %
(job_name, item, mutex_name))
self.mutexes[mutex_name] = (item, job_name)
def _release(self, mutex_name, item, job_name):
self.log.debug("Job %s of item %s releasing mutex %s" %
(job_name, item, mutex_name))
del self.mutexes[mutex_name]
class ManagementEvent(object):
"""An event that should be processed within the main queue run loop"""
def __init__(self):
@ -162,6 +224,7 @@ class Scheduler(threading.Thread):
self.merger = None
self.connections = None
# TODO(jeblair): fix this
self.mutex = MutexHandler()
# Despite triggers being part of the pipeline, there is one trigger set
# per scheduler. The pipeline handles the trigger filters but since
# the events are handled by the scheduler itself it needs to handle

View File

@ -132,9 +132,6 @@ class GerritSource(BaseSource):
def getChange(self, event):
if event.change_number:
refresh = False
if event._needs_refresh:
refresh = True
event._needs_refresh = False
change = self._getChange(event.change_number, event.patch_number,
refresh=refresh)
elif event.ref:

View File

@ -13,7 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import apscheduler.scheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
import logging
import voluptuous as v
from zuul.model import EventFilter, TriggerEvent
@ -26,7 +27,7 @@ class TimerTrigger(BaseTrigger):
def __init__(self, trigger_config={}, sched=None, connection=None):
super(TimerTrigger, self).__init__(trigger_config, sched, connection)
self.apsched = apscheduler.scheduler.Scheduler()
self.apsched = BackgroundScheduler()
self.apsched.start()
def _onTrigger(self, pipeline_name, timespec):
@ -62,7 +63,7 @@ class TimerTrigger(BaseTrigger):
def postConfig(self):
for job in self.apsched.get_jobs():
self.apsched.unschedule_job(job)
job.remove()
for pipeline in self.sched.layout.pipelines.values():
for ef in pipeline.manager.event_filters:
if ef.trigger != self:
@ -81,14 +82,11 @@ class TimerTrigger(BaseTrigger):
second = parts[5]
else:
second = None
self.apsched.add_cron_job(self._onTrigger,
day=dom,
day_of_week=dow,
hour=hour,
minute=minute,
second=second,
args=(pipeline.name,
timespec,))
trigger = CronTrigger(day=dom, day_of_week=dow, hour=hour,
minute=minute, second=second)
self.apsched.add_job(self._onTrigger, trigger=trigger,
args=(pipeline.name, timespec,))
def getSchema():