diff --git a/doc/source/zuul.rst b/doc/source/zuul.rst index ad8ec2e103..b5b8d7bf19 100644 --- a/doc/source/zuul.rst +++ b/doc/source/zuul.rst @@ -704,6 +704,11 @@ each job as it builds a list from the project specification. would largely defeat the parallelization of dependent change testing that is the main feature of Zuul. Default: ``false``. +**mutex (optional)** + This is a string that names a mutex that should be observed by this + job. Only one build of any job that references the same named mutex + will be enqueued at a time. This applies across all pipelines. + **branch (optional)** This job should only be run on matching branches. This field is treated as a regular expression and multiple branches may be diff --git a/tests/fixtures/layout-mutex.yaml b/tests/fixtures/layout-mutex.yaml new file mode 100644 index 0000000000..fcd052973c --- /dev/null +++ b/tests/fixtures/layout-mutex.yaml @@ -0,0 +1,25 @@ +pipelines: + - name: check + manager: IndependentPipelineManager + trigger: + gerrit: + - event: patchset-created + success: + gerrit: + verified: 1 + failure: + gerrit: + verified: -1 + +jobs: + - name: mutex-one + mutex: test-mutex + - name: mutex-two + mutex: test-mutex + +projects: + - name: org/project + check: + - project-test1 + - mutex-one + - mutex-two diff --git a/tests/fixtures/layout.yaml b/tests/fixtures/layout.yaml index 1d2344370a..e8f035e5f4 100644 --- a/tests/fixtures/layout.yaml +++ b/tests/fixtures/layout.yaml @@ -116,6 +116,10 @@ jobs: parameter-function: select_debian_node - name: project1-project2-integration queue-name: integration + - name: mutex-one + mutex: test-mutex + - name: mutex-two + mutex: test-mutex project-templates: - name: test-one-and-two diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index ead8c6ef5a..8960e3af75 100755 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -2280,6 +2280,70 @@ class TestScheduler(ZuulTestCase): self.sched.reconfigure(self.config) self.assertEqual(len(self.sched.layout.pipelines['gate'].queues), 1) + def test_mutex(self): + "Test job mutexes" + self.config.set('zuul', 'layout_config', + 'tests/fixtures/layout-mutex.yaml') + self.sched.reconfigure(self.config) + + self.worker.hold_jobs_in_build = True + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') + self.assertFalse('test-mutex' in self.sched.mutex.mutexes) + + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + self.assertEqual(len(self.builds), 3) + self.assertEqual(self.builds[0].name, 'project-test1') + self.assertEqual(self.builds[1].name, 'mutex-one') + self.assertEqual(self.builds[2].name, 'project-test1') + + self.worker.release('mutex-one') + self.waitUntilSettled() + + self.assertEqual(len(self.builds), 3) + self.assertEqual(self.builds[0].name, 'project-test1') + self.assertEqual(self.builds[1].name, 'project-test1') + self.assertEqual(self.builds[2].name, 'mutex-two') + self.assertTrue('test-mutex' in self.sched.mutex.mutexes) + + self.worker.release('mutex-two') + self.waitUntilSettled() + + self.assertEqual(len(self.builds), 3) + self.assertEqual(self.builds[0].name, 'project-test1') + self.assertEqual(self.builds[1].name, 'project-test1') + self.assertEqual(self.builds[2].name, 'mutex-one') + self.assertTrue('test-mutex' in self.sched.mutex.mutexes) + + self.worker.release('mutex-one') + self.waitUntilSettled() + + self.assertEqual(len(self.builds), 3) + self.assertEqual(self.builds[0].name, 'project-test1') + self.assertEqual(self.builds[1].name, 'project-test1') + self.assertEqual(self.builds[2].name, 'mutex-two') + self.assertTrue('test-mutex' in self.sched.mutex.mutexes) + + self.worker.release('mutex-two') + self.waitUntilSettled() + + self.assertEqual(len(self.builds), 2) + self.assertEqual(self.builds[0].name, 'project-test1') + self.assertEqual(self.builds[1].name, 'project-test1') + self.assertFalse('test-mutex' in self.sched.mutex.mutexes) + + self.worker.hold_jobs_in_build = False + self.worker.release() + + self.waitUntilSettled() + self.assertEqual(len(self.builds), 0) + + self.assertEqual(A.reported, 1) + self.assertEqual(B.reported, 1) + self.assertFalse('test-mutex' in self.sched.mutex.mutexes) + def test_node_label(self): "Test that a job runs on a specific node label" self.worker.registerFunction('build:node-project-test1:debian') diff --git a/zuul/layoutvalidator.py b/zuul/layoutvalidator.py index ba96ab7949..a01eed3e76 100644 --- a/zuul/layoutvalidator.py +++ b/zuul/layoutvalidator.py @@ -103,6 +103,7 @@ class LayoutSchema(object): 'success-pattern': str, 'hold-following-changes': bool, 'voting': bool, + 'mutex': str, 'parameter-function': str, 'branch': toList(str), 'files': toList(str), diff --git a/zuul/model.py b/zuul/model.py index c555561148..75f727dfc3 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -134,7 +134,7 @@ class Pipeline(object): return [] return item.change.filterJobs(tree.getJobs()) - def _findJobsToRun(self, job_trees, item): + def _findJobsToRun(self, job_trees, item, mutex): torun = [] if item.item_ahead: # Only run jobs if any 'hold' jobs on the change ahead @@ -153,20 +153,23 @@ class Pipeline(object): else: # There is no build for the root of this job tree, # so we should run it. - torun.append(job) + if mutex.acquire(item, job): + # If this job needs a mutex, either acquire it or make + # sure that we have it before running the job. + torun.append(job) # If there is no job, this is a null job tree, and we should # run all of its jobs. if result == 'SUCCESS' or not job: - torun.extend(self._findJobsToRun(tree.job_trees, item)) + torun.extend(self._findJobsToRun(tree.job_trees, item, mutex)) return torun - def findJobsToRun(self, item): + def findJobsToRun(self, item, mutex): if not item.live: return [] tree = self.getJobTree(item.change.project) if not tree: return [] - return self._findJobsToRun(tree.job_trees, item) + return self._findJobsToRun(tree.job_trees, item, mutex) def haveAllJobsStarted(self, item): for job in self.getJobs(item): @@ -441,6 +444,7 @@ class Job(object): self.failure_pattern = None self.success_pattern = None self.parameter_function = None + self.mutex = None # A metajob should only supply values for attributes that have # been explicitly provided, so avoid setting boolean defaults. if self.is_metajob: @@ -487,6 +491,8 @@ class Job(object): self.skip_if_matcher = other.skip_if_matcher.copy() if other.swift: self.swift.update(other.swift) + if other.mutex: + self.mutex = other.mutex # Only non-None values should be copied for boolean attributes. if other.hold_following_changes is not None: self.hold_following_changes = other.hold_following_changes diff --git a/zuul/scheduler.py b/zuul/scheduler.py index f93eca92b6..0e3fea0eeb 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -59,6 +59,68 @@ def deep_format(obj, paramdict): return ret +class MutexHandler(object): + log = logging.getLogger("zuul.MutexHandler") + + def __init__(self): + self.mutexes = {} + + def acquire(self, item, job): + if not job.mutex: + return True + mutex_name = job.mutex + m = self.mutexes.get(mutex_name) + if not m: + # The mutex is not held, acquire it + self._acquire(mutex_name, item, job.name) + return True + held_item, held_job_name = m + if held_item is item and held_job_name == job.name: + # This item already holds the mutex + return True + held_build = held_item.current_build_set.getBuild(held_job_name) + if held_build and held_build.result: + # The build that held the mutex is complete, release it + # and let the new item have it. + self.log.error("Held mutex %s being released because " + "the build that holds it is complete" % + (mutex_name,)) + self._release(mutex_name, item, job.name) + self._acquire(mutex_name, item, job.name) + return True + return False + + def release(self, item, job): + if not job.mutex: + return + mutex_name = job.mutex + m = self.mutexes.get(mutex_name) + if not m: + # The mutex is not held, nothing to do + self.log.error("Mutex can not be released for %s " + "because the mutex is not held" % + (item,)) + return + held_item, held_job_name = m + if held_item is item and held_job_name == job.name: + # This item holds the mutex + self._release(mutex_name, item, job.name) + return + self.log.error("Mutex can not be released for %s " + "which does not hold it" % + (item,)) + + def _acquire(self, mutex_name, item, job_name): + self.log.debug("Job %s of item %s acquiring mutex %s" % + (job_name, item, mutex_name)) + self.mutexes[mutex_name] = (item, job_name) + + def _release(self, mutex_name, item, job_name): + self.log.debug("Job %s of item %s releasing mutex %s" % + (job_name, item, mutex_name)) + del self.mutexes[mutex_name] + + class ManagementEvent(object): """An event that should be processed within the main queue run loop""" def __init__(self): @@ -185,6 +247,7 @@ class Scheduler(threading.Thread): self._stopped = False self.launcher = None self.merger = None + self.mutex = MutexHandler() self.connections = dict() # Despite triggers being part of the pipeline, there is one trigger set # per scheduler. The pipeline handles the trigger filters but since @@ -461,6 +524,9 @@ class Scheduler(threading.Thread): m = config_job.get('voting', None) if m is not None: job.voting = m + m = config_job.get('mutex', None) + if m is not None: + job.mutex = m fname = config_job.get('parameter-function', None) if fname: func = config_env.get(fname, None) @@ -1086,14 +1152,16 @@ class BasePipelineManager(object): efilters += str(tree.job.skip_if_matcher) if efilters: efilters = ' ' + efilters - hold = '' + tags = [] if tree.job.hold_following_changes: - hold = ' [hold]' - voting = '' + tags.append('[hold]') if not tree.job.voting: - voting = ' [nonvoting]' - self.log.info("%s%s%s%s%s" % (istr, repr(tree.job), - efilters, hold, voting)) + tags.append('[nonvoting]') + if tree.job.mutex: + tags.append('[mutex: %s]' % tree.job.mutex) + tags = ' '.join(tags) + self.log.info("%s%s%s %s" % (istr, repr(tree.job), + efilters, tags)) for x in tree.job_trees: log_jobs(x, indent + 2) @@ -1410,7 +1478,7 @@ class BasePipelineManager(object): "for change %s:" % (job, item.change)) def launchJobs(self, item): - jobs = self.pipeline.findJobsToRun(item) + jobs = self.pipeline.findJobsToRun(item, self.sched.mutex) if jobs: self._launchJobs(item, jobs) @@ -1566,6 +1634,7 @@ class BasePipelineManager(object): item = build.build_set.item self.pipeline.setResult(item, build) + self.sched.mutex.release(item, build.job) self.log.debug("Item %s status is now:\n %s" % (item, item.formatStatus())) return True