Add job mutex support

This is so that jobs that interact with external resources can be
mutexed.

Change-Id: I94365e258cae30c5fe61981eccc879f400b02f7f
This commit is contained in:
James E. Blair 2016-02-03 15:07:18 -08:00
parent 995fc0fc5b
commit af17a978c4
7 changed files with 186 additions and 12 deletions

View File

@ -704,6 +704,11 @@ each job as it builds a list from the project specification.
would largely defeat the parallelization of dependent change testing
that is the main feature of Zuul. Default: ``false``.
**mutex (optional)**
This is a string that names a mutex that should be observed by this
job. Only one build of any job that references the same named mutex
will be enqueued at a time. This applies across all pipelines.
**branch (optional)**
This job should only be run on matching branches. This field is
treated as a regular expression and multiple branches may be

25
tests/fixtures/layout-mutex.yaml vendored Normal file
View File

@ -0,0 +1,25 @@
pipelines:
- name: check
manager: IndependentPipelineManager
trigger:
gerrit:
- event: patchset-created
success:
gerrit:
verified: 1
failure:
gerrit:
verified: -1
jobs:
- name: mutex-one
mutex: test-mutex
- name: mutex-two
mutex: test-mutex
projects:
- name: org/project
check:
- project-test1
- mutex-one
- mutex-two

View File

@ -116,6 +116,10 @@ jobs:
parameter-function: select_debian_node
- name: project1-project2-integration
queue-name: integration
- name: mutex-one
mutex: test-mutex
- name: mutex-two
mutex: test-mutex
project-templates:
- name: test-one-and-two

View File

@ -2280,6 +2280,70 @@ class TestScheduler(ZuulTestCase):
self.sched.reconfigure(self.config)
self.assertEqual(len(self.sched.layout.pipelines['gate'].queues), 1)
def test_mutex(self):
"Test job mutexes"
self.config.set('zuul', 'layout_config',
'tests/fixtures/layout-mutex.yaml')
self.sched.reconfigure(self.config)
self.worker.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
self.assertFalse('test-mutex' in self.sched.mutex.mutexes)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(len(self.builds), 3)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'mutex-one')
self.assertEqual(self.builds[2].name, 'project-test1')
self.worker.release('mutex-one')
self.waitUntilSettled()
self.assertEqual(len(self.builds), 3)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test1')
self.assertEqual(self.builds[2].name, 'mutex-two')
self.assertTrue('test-mutex' in self.sched.mutex.mutexes)
self.worker.release('mutex-two')
self.waitUntilSettled()
self.assertEqual(len(self.builds), 3)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test1')
self.assertEqual(self.builds[2].name, 'mutex-one')
self.assertTrue('test-mutex' in self.sched.mutex.mutexes)
self.worker.release('mutex-one')
self.waitUntilSettled()
self.assertEqual(len(self.builds), 3)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test1')
self.assertEqual(self.builds[2].name, 'mutex-two')
self.assertTrue('test-mutex' in self.sched.mutex.mutexes)
self.worker.release('mutex-two')
self.waitUntilSettled()
self.assertEqual(len(self.builds), 2)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test1')
self.assertFalse('test-mutex' in self.sched.mutex.mutexes)
self.worker.hold_jobs_in_build = False
self.worker.release()
self.waitUntilSettled()
self.assertEqual(len(self.builds), 0)
self.assertEqual(A.reported, 1)
self.assertEqual(B.reported, 1)
self.assertFalse('test-mutex' in self.sched.mutex.mutexes)
def test_node_label(self):
"Test that a job runs on a specific node label"
self.worker.registerFunction('build:node-project-test1:debian')

View File

@ -103,6 +103,7 @@ class LayoutSchema(object):
'success-pattern': str,
'hold-following-changes': bool,
'voting': bool,
'mutex': str,
'parameter-function': str,
'branch': toList(str),
'files': toList(str),

View File

@ -134,7 +134,7 @@ class Pipeline(object):
return []
return item.change.filterJobs(tree.getJobs())
def _findJobsToRun(self, job_trees, item):
def _findJobsToRun(self, job_trees, item, mutex):
torun = []
if item.item_ahead:
# Only run jobs if any 'hold' jobs on the change ahead
@ -153,20 +153,23 @@ class Pipeline(object):
else:
# There is no build for the root of this job tree,
# so we should run it.
torun.append(job)
if mutex.acquire(item, job):
# If this job needs a mutex, either acquire it or make
# sure that we have it before running the job.
torun.append(job)
# If there is no job, this is a null job tree, and we should
# run all of its jobs.
if result == 'SUCCESS' or not job:
torun.extend(self._findJobsToRun(tree.job_trees, item))
torun.extend(self._findJobsToRun(tree.job_trees, item, mutex))
return torun
def findJobsToRun(self, item):
def findJobsToRun(self, item, mutex):
if not item.live:
return []
tree = self.getJobTree(item.change.project)
if not tree:
return []
return self._findJobsToRun(tree.job_trees, item)
return self._findJobsToRun(tree.job_trees, item, mutex)
def haveAllJobsStarted(self, item):
for job in self.getJobs(item):
@ -441,6 +444,7 @@ class Job(object):
self.failure_pattern = None
self.success_pattern = None
self.parameter_function = None
self.mutex = None
# A metajob should only supply values for attributes that have
# been explicitly provided, so avoid setting boolean defaults.
if self.is_metajob:
@ -487,6 +491,8 @@ class Job(object):
self.skip_if_matcher = other.skip_if_matcher.copy()
if other.swift:
self.swift.update(other.swift)
if other.mutex:
self.mutex = other.mutex
# Only non-None values should be copied for boolean attributes.
if other.hold_following_changes is not None:
self.hold_following_changes = other.hold_following_changes

View File

@ -59,6 +59,68 @@ def deep_format(obj, paramdict):
return ret
class MutexHandler(object):
log = logging.getLogger("zuul.MutexHandler")
def __init__(self):
self.mutexes = {}
def acquire(self, item, job):
if not job.mutex:
return True
mutex_name = job.mutex
m = self.mutexes.get(mutex_name)
if not m:
# The mutex is not held, acquire it
self._acquire(mutex_name, item, job.name)
return True
held_item, held_job_name = m
if held_item is item and held_job_name == job.name:
# This item already holds the mutex
return True
held_build = held_item.current_build_set.getBuild(held_job_name)
if held_build and held_build.result:
# The build that held the mutex is complete, release it
# and let the new item have it.
self.log.error("Held mutex %s being released because "
"the build that holds it is complete" %
(mutex_name,))
self._release(mutex_name, item, job.name)
self._acquire(mutex_name, item, job.name)
return True
return False
def release(self, item, job):
if not job.mutex:
return
mutex_name = job.mutex
m = self.mutexes.get(mutex_name)
if not m:
# The mutex is not held, nothing to do
self.log.error("Mutex can not be released for %s "
"because the mutex is not held" %
(item,))
return
held_item, held_job_name = m
if held_item is item and held_job_name == job.name:
# This item holds the mutex
self._release(mutex_name, item, job.name)
return
self.log.error("Mutex can not be released for %s "
"which does not hold it" %
(item,))
def _acquire(self, mutex_name, item, job_name):
self.log.debug("Job %s of item %s acquiring mutex %s" %
(job_name, item, mutex_name))
self.mutexes[mutex_name] = (item, job_name)
def _release(self, mutex_name, item, job_name):
self.log.debug("Job %s of item %s releasing mutex %s" %
(job_name, item, mutex_name))
del self.mutexes[mutex_name]
class ManagementEvent(object):
"""An event that should be processed within the main queue run loop"""
def __init__(self):
@ -185,6 +247,7 @@ class Scheduler(threading.Thread):
self._stopped = False
self.launcher = None
self.merger = None
self.mutex = MutexHandler()
self.connections = dict()
# Despite triggers being part of the pipeline, there is one trigger set
# per scheduler. The pipeline handles the trigger filters but since
@ -461,6 +524,9 @@ class Scheduler(threading.Thread):
m = config_job.get('voting', None)
if m is not None:
job.voting = m
m = config_job.get('mutex', None)
if m is not None:
job.mutex = m
fname = config_job.get('parameter-function', None)
if fname:
func = config_env.get(fname, None)
@ -1086,14 +1152,16 @@ class BasePipelineManager(object):
efilters += str(tree.job.skip_if_matcher)
if efilters:
efilters = ' ' + efilters
hold = ''
tags = []
if tree.job.hold_following_changes:
hold = ' [hold]'
voting = ''
tags.append('[hold]')
if not tree.job.voting:
voting = ' [nonvoting]'
self.log.info("%s%s%s%s%s" % (istr, repr(tree.job),
efilters, hold, voting))
tags.append('[nonvoting]')
if tree.job.mutex:
tags.append('[mutex: %s]' % tree.job.mutex)
tags = ' '.join(tags)
self.log.info("%s%s%s %s" % (istr, repr(tree.job),
efilters, tags))
for x in tree.job_trees:
log_jobs(x, indent + 2)
@ -1410,7 +1478,7 @@ class BasePipelineManager(object):
"for change %s:" % (job, item.change))
def launchJobs(self, item):
jobs = self.pipeline.findJobsToRun(item)
jobs = self.pipeline.findJobsToRun(item, self.sched.mutex)
if jobs:
self._launchJobs(item, jobs)
@ -1566,6 +1634,7 @@ class BasePipelineManager(object):
item = build.build_set.item
self.pipeline.setResult(item, build)
self.sched.mutex.release(item, build.job)
self.log.debug("Item %s status is now:\n %s" %
(item, item.formatStatus()))
return True