diff --git a/doc/source/reference/job_def.rst b/doc/source/reference/job_def.rst index e509a9b873..beb320edc5 100644 --- a/doc/source/reference/job_def.rst +++ b/doc/source/reference/job_def.rst @@ -220,12 +220,26 @@ Here is an example of two job definitions: .. attr:: semaphore - The name of a :ref:`semaphore` which should be acquired and - released when the job begins and ends. If the semaphore is at - maximum capacity, then Zuul will wait until it can be acquired - before starting the job. The format is either a string or a - dictionary. If it's a string it references a semaphore using the - default value for :attr:`job.semaphore.resources-first`. + A deprecated alias of :attr:`job.semaphores`. + + .. attr:: semaphores + + The name of a :ref:`semaphore` (or list of them) which should be + acquired and released when the job begins and ends. If the + semaphore is at maximum capacity, then Zuul will wait until it + can be acquired before starting the job. The format is either a + string, a dictionary, or a list of either of those in the case + of multiple semaphores. If it's a string it references a + semaphore using the default value for + :attr:`job.semaphores.resources-first`. + + If multiple semaphores are requested, the job will not start + until all have been acquired, and Zuul will wait until all are + available before acquiring any. + + When inheriting jobs or applying variants, the list of + semaphores is extended (semaphores specified in a job definition + are added to any supplied by their parents). .. attr:: name :required: @@ -236,12 +250,12 @@ Here is an example of two job definitions: :default: False By default a semaphore is acquired before the resources are - requested. However in some cases the user wants to run cheap - jobs as quickly as possible in a consecutive manner. In this - case :attr:`job.semaphore.resources-first` can be enabled to - request the resources before locking the semaphore. This can - lead to some amount of blocked resources while waiting for the - semaphore so this should be used with caution. + requested. However in some cases the user may want to run + cheap jobs as quickly as possible in a consecutive manner. In + this case `resources-first` can be enabled to request the + resources before locking the semaphore. This can lead to some + amount of blocked resources while waiting for the semaphore + so this should be used with caution. .. attr:: tags diff --git a/releasenotes/notes/semaphore-resources-295dceaf7ddbab0d.yaml b/releasenotes/notes/semaphore-resources-295dceaf7ddbab0d.yaml index 9d3b7d417f..c2fff5018b 100644 --- a/releasenotes/notes/semaphore-resources-295dceaf7ddbab0d.yaml +++ b/releasenotes/notes/semaphore-resources-295dceaf7ddbab0d.yaml @@ -10,5 +10,5 @@ upgrade: to run. However this could lead to a high amount of resource waste. Instead jobs now acquire the semaphore before requesting the resources by default. This behavior can be overridden by jobs using - :attr:`job.semaphore.resources-first` if some waste of resources is + job.semaphores.resources-first if some waste of resources is acceptable. diff --git a/releasenotes/notes/semaphores-53ff91bf6f1ad0f5.yaml b/releasenotes/notes/semaphores-53ff91bf6f1ad0f5.yaml new file mode 100644 index 0000000000..d8b6d10b70 --- /dev/null +++ b/releasenotes/notes/semaphores-53ff91bf6f1ad0f5.yaml @@ -0,0 +1,23 @@ +--- +features: + - | + Jobs may now request multiple semaphores and they will not start until + all semaphores are acquired. + + Use the new :attr:`job.semaphores` (plural) attribute to specify them. + + Note that the new attribute is additive when considering + inheritance and job variants. That is to say that a job + definition with a `semaphores` attribute will extend the list of + semaphores supplied by its parent rather than overriding it (which + is the behavior for the deprecated attribute). +deprecations: + - | + The job attribute :attr:`job.semaphore` (note the singular rather + than plural form) job attribute is now deprecated. Use the plural + form :attr:`job.semaphores` instead. As with most list items in + Zuul configuration, it also accepts a single item without the + wrapping list, so to convert existing jobs, simply change the + spelling of the attribute, no change to the value is required. + + The singular form will be removed in Zuul 5.0. diff --git a/tests/fixtures/layouts/multiple-semaphores.yaml b/tests/fixtures/layouts/multiple-semaphores.yaml new file mode 100644 index 0000000000..95882ae87e --- /dev/null +++ b/tests/fixtures/layouts/multiple-semaphores.yaml @@ -0,0 +1,40 @@ +- pipeline: + name: check + manager: independent + trigger: + gerrit: + - event: patchset-created + success: + gerrit: + Verified: 1 + failure: + gerrit: + Verified: -1 + +- job: + name: base + parent: null + run: playbooks/base.yaml + +- job: + name: job1 + semaphores: + - sem1 + +- job: + name: job2 + semaphores: + - sem1 + - sem2 + +- project: + name: org/project1 + check: + jobs: + - job1 + +- project: + name: org/project2 + check: + jobs: + - job2 diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index e80f4d8bf7..748e85cdda 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -7269,7 +7269,7 @@ class TestSemaphore(ZuulTestCase): self.assertIsNone(jobs[0]["waiting_status"]) self.assertIsNone(jobs[1]["waiting_status"]) self.assertEqual(jobs[2]["waiting_status"], - 'semaphore: test-semaphore') + 'semaphores: test-semaphore') # By default we first lock the semaphore and then get the nodes # so at this point the semaphore needs to be aquired. @@ -7833,6 +7833,43 @@ class TestSemaphore(ZuulTestCase): len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), 0) + @simple_layout('layouts/multiple-semaphores.yaml') + def test_multiple_semaphores(self): + # Test a job with multiple semaphores + self.executor_server.hold_jobs_in_build = True + A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + # One job should be running, and hold sem1 + self.assertBuilds([dict(name='job1')]) + + B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B') + self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + # Job2 requires sem1 and sem2; it hasn't started because job + # is still holding sem1. + self.assertBuilds([dict(name='job1')]) + + self.executor_server.release('job1') + self.waitUntilSettled() + + # Job1 is finished, so job2 can acquire both semaphores. + self.assertBuilds([dict(name='job2')]) + + self.executor_server.release() + self.waitUntilSettled() + self.assertHistory([ + dict(name='job1', result='SUCCESS', changes='1,1'), + dict(name='job2', result='SUCCESS', changes='2,1'), + ]) + # TODO(corvus): Consider a version of this test which launches + # 2 jobs with the same multiple-semaphore requirements + # simultaneously to test the behavior with contention (at + # least one should be able to start on each pass through the + # loop). + class TestSemaphoreMultiTenant(ZuulTestCase): tenant_config_file = 'config/multi-tenant-semaphore/main.yaml' diff --git a/tests/unit/test_web.py b/tests/unit/test_web.py index fcee9ca0c5..17db4b727e 100644 --- a/tests/unit/test_web.py +++ b/tests/unit/test_web.py @@ -365,7 +365,7 @@ class TestWeb(BaseTestWeb): 'pre_run': [], 'post_run': [], 'cleanup_run': [], - 'semaphore': None, + 'semaphores': [], 'source_context': source_ctx, 'tags': [], 'timeout': None, @@ -410,7 +410,7 @@ class TestWeb(BaseTestWeb): 'pre_run': [], 'post_run': [], 'cleanup_run': [], - 'semaphore': None, + 'semaphores': [], 'source_context': source_ctx, 'tags': [], 'timeout': None, @@ -453,7 +453,7 @@ class TestWeb(BaseTestWeb): 'pre_run': [], 'post_run': [], 'cleanup_run': [], - 'semaphore': None, + 'semaphores': [], 'source_context': source_ctx, 'tags': [], 'timeout': None, @@ -572,7 +572,7 @@ class TestWeb(BaseTestWeb): 'pre_run': [], 'post_run': [], 'cleanup_run': [], - 'semaphore': None, + 'semaphores': [], 'source_context': { 'branch': 'master', 'path': 'zuul.yaml', @@ -610,7 +610,7 @@ class TestWeb(BaseTestWeb): 'pre_run': [], 'post_run': [], 'cleanup_run': [], - 'semaphore': None, + 'semaphores': [], 'source_context': { 'branch': 'master', 'path': 'zuul.yaml', @@ -648,7 +648,7 @@ class TestWeb(BaseTestWeb): 'pre_run': [], 'post_run': [], 'cleanup_run': [], - 'semaphore': None, + 'semaphores': [], 'source_context': { 'branch': 'master', 'path': 'zuul.yaml', @@ -686,7 +686,7 @@ class TestWeb(BaseTestWeb): 'pre_run': [], 'post_run': [], 'cleanup_run': [], - 'semaphore': None, + 'semaphores': [], 'source_context': { 'branch': 'master', 'path': 'zuul.yaml', @@ -745,7 +745,7 @@ class TestWeb(BaseTestWeb): 'requires': [], 'roles': [], 'run': [], - 'semaphore': None, + 'semaphores': [], 'source_context': {'branch': 'master', 'path': 'zuul.yaml', 'project': 'common-config'}, diff --git a/web/src/containers/job/JobVariant.jsx b/web/src/containers/job/JobVariant.jsx index 5e33e028d5..79b14c87eb 100644 --- a/web/src/containers/job/JobVariant.jsx +++ b/web/src/containers/job/JobVariant.jsx @@ -84,7 +84,7 @@ class JobVariant extends React.Component { const jobInfos = [ 'description', 'context', 'builds', 'status', - 'parent', 'attempts', 'timeout', 'semaphore', + 'parent', 'attempts', 'timeout', 'semaphores', 'nodeset', 'variables', 'override_checkout', ] jobInfos.forEach(key => { diff --git a/zuul/configloader.py b/zuul/configloader.py index 8566da6979..7a4597a5b0 100644 --- a/zuul/configloader.py +++ b/zuul/configloader.py @@ -619,6 +619,7 @@ class JobParser(object): 'hold-following-changes': bool, 'voting': bool, 'semaphore': vs.Any(semaphore, str), + 'semaphores': to_list(vs.Any(semaphore, str)), 'tags': to_list(str), 'branches': to_list(str), 'files': to_list(str), @@ -883,14 +884,22 @@ class JobParser(object): new_dependencies.append(job_dependency) job.dependencies = new_dependencies - if 'semaphore' in conf: - semaphore = conf.get('semaphore') + semaphores = as_list(conf.get('semaphores', conf.get('semaphore', []))) + job_semaphores = [] + for semaphore in semaphores: if isinstance(semaphore, str): - job.semaphore = model.JobSemaphore(semaphore) + job_semaphores.append(model.JobSemaphore(semaphore)) else: - job.semaphore = model.JobSemaphore( + job_semaphores.append(model.JobSemaphore( semaphore.get('name'), - semaphore.get('resources-first', False)) + semaphore.get('resources-first', False))) + if job_semaphores: + # Sort the list of semaphores to avoid issues with + # contention (where two jobs try to start at the same time + # and fail due to acquiring the same semaphores but in + # reverse order. + job.semaphores = tuple(sorted(job_semaphores, + key=lambda x: x.name)) for k in ('tags', 'requires', 'provides'): v = frozenset(as_list(conf.get(k))) diff --git a/zuul/model.py b/zuul/model.py index 2b5204bbe7..86ea67f81c 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -1230,7 +1230,7 @@ class Job(ConfigObject): cleanup_run=(), run=(), ansible_version=None, - semaphore=None, + semaphores=(), attempts=3, final=False, abstract=False, @@ -1295,12 +1295,7 @@ class Job(ConfigObject): d['required_projects'] = [] for project in self.required_projects.values(): d['required_projects'].append(project.toDict()) - if self.semaphore: - # For now just leave the semaphore name here until we really need - # more information in zuul-web about this - d['semaphore'] = self.semaphore.name - else: - d['semaphore'] = None + d['semaphores'] = [s.toDict() for s in self.semaphores] d['variables'] = self.variables d['extra_variables'] = self.extra_variables d['host_variables'] = self.host_variables @@ -1621,7 +1616,8 @@ class Job(ConfigObject): if k not in set(['pre_run', 'run', 'post_run', 'cleanup_run', 'roles', 'variables', 'extra_variables', 'host_variables', 'group_variables', - 'required_projects', 'allowed_projects']): + 'required_projects', 'allowed_projects', + 'semaphores']): setattr(self, k, other._get(k)) # Don't set final above so that we don't trip an error halfway @@ -1715,6 +1711,14 @@ class Job(ConfigObject): other.allowed_projects)) elif other._get('allowed_projects') is not None: self.allowed_projects = other.allowed_projects + if other._get('semaphores') is not None: + # Sort the list of semaphores to avoid issues with + # contention (where two jobs try to start at the same time + # and fail due to acquiring the same semaphores but in + # reverse order. + self.semaphores = tuple( + sorted(other.semaphores + self.semaphores, + key=lambda x: x.name)) for k in self.context_attributes: if (other._get(k) is not None and @@ -2814,8 +2818,8 @@ class QueueItem(object): toreq.append(job) job.queued = True else: - job.waiting_status = 'semaphore: {}'.format( - job.semaphore.name) + sem_names = ','.join([s.name for s in job.semaphores]) + job.waiting_status = 'semaphores: {}'.format(sem_names) return toreq def setResult(self, build): diff --git a/zuul/zk/semaphore.py b/zuul/zk/semaphore.py index 37d89a052e..ec872196c6 100644 --- a/zuul/zk/semaphore.py +++ b/zuul/zk/semaphore.py @@ -60,11 +60,23 @@ class SemaphoreHandler(ZooKeeperSimpleBase): self.log.exception("Unable to send semaphore stats:") def acquire(self, item, job, request_resources): - if not job.semaphore: + if not job.semaphores: return True log = get_annotated_logger(self.log, item.event) - if job.semaphore.resources_first and request_resources: + all_acquired = True + for semaphore in job.semaphores: + if not self._acquire_one(log, item, job, request_resources, + semaphore): + all_acquired = False + break + if not all_acquired: + self.release(item, job) + return False + return True + + def _acquire_one(self, log, item, job, request_resources, semaphore): + if semaphore.resources_first and request_resources: # We're currently in the resource request phase and want to get the # resources before locking. So we don't need to do anything here. return True @@ -75,7 +87,7 @@ class SemaphoreHandler(ZooKeeperSimpleBase): # the resources phase. pass - semaphore_key = quote_plus(job.semaphore.name) + semaphore_key = quote_plus(semaphore.name) semaphore_path = f"{self.tenant_root}/{semaphore_key}" semaphore_handle = f"{item.uuid}-{job.name}" @@ -86,7 +98,7 @@ class SemaphoreHandler(ZooKeeperSimpleBase): return True # semaphore is there, check max - while len(semaphore_holders) < self._max_count(job.semaphore.name): + while len(semaphore_holders) < self._max_count(semaphore.name): semaphore_holders.append(semaphore_handle) try: @@ -96,12 +108,12 @@ class SemaphoreHandler(ZooKeeperSimpleBase): except BadVersionError: log.debug( "Retrying semaphore %s acquire due to concurrent update", - job.semaphore.name) + semaphore.name) semaphore_holders, zstat = self.getHolders(semaphore_path) continue log.info("Semaphore %s acquired: job %s, item %s", - job.semaphore.name, job.name, item) + semaphore.name, job.name, item) self._emitStats(semaphore_path, len(semaphore_holders)) return True @@ -145,11 +157,16 @@ class SemaphoreHandler(ZooKeeperSimpleBase): break def release(self, item, job): - if not job.semaphore: + if not job.semaphores: return log = get_annotated_logger(self.log, item.event) - semaphore_key = quote_plus(job.semaphore.name) + + for semaphore in job.semaphores: + self._release_one(log, item, job, semaphore) + + def _release_one(self, log, item, job, semaphore): + semaphore_key = quote_plus(semaphore.name) semaphore_path = f"{self.tenant_root}/{semaphore_key}" semaphore_handle = f"{item.uuid}-{job.name}"