From 153f8a90ccb88fde55c01a99f17453be46f69ad8 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Thu, 6 May 2021 16:40:00 -0700 Subject: [PATCH] Support multiple semaphores This allows jobs to request multiple semaphores. Zuul will wait until all are available before acquiring them* and will not start a job unless all have been acquired. This is useful for jobs which require access to mulitple limited resources (especially if other jobs require access to a subset or superset of those same resources). * Implementation note: for efficiency, we actually do acquire them one-by-one but then release any which have been acquired if they are not all available. This all happens very quickly within a single attempt to start a job. We don't hold semaphores while we wait for others as that could cause deadlocks. To be consistent with other job attributes which accept lists, this deprecates job.semaphore and replaces it with job.semaphores. Change-Id: I295a891a2d02b904820d8f60afe8ef862693b75d --- doc/source/reference/job_def.rst | 38 ++++++++++++------ .../semaphore-resources-295dceaf7ddbab0d.yaml | 2 +- .../notes/semaphores-53ff91bf6f1ad0f5.yaml | 23 +++++++++++ .../fixtures/layouts/multiple-semaphores.yaml | 40 +++++++++++++++++++ tests/unit/test_scheduler.py | 39 +++++++++++++++++- tests/unit/test_web.py | 16 ++++---- web/src/containers/job/JobVariant.jsx | 2 +- zuul/configloader.py | 19 ++++++--- zuul/model.py | 24 ++++++----- zuul/zk/semaphore.py | 33 +++++++++++---- 10 files changed, 190 insertions(+), 46 deletions(-) create mode 100644 releasenotes/notes/semaphores-53ff91bf6f1ad0f5.yaml create mode 100644 tests/fixtures/layouts/multiple-semaphores.yaml diff --git a/doc/source/reference/job_def.rst b/doc/source/reference/job_def.rst index e509a9b873..beb320edc5 100644 --- a/doc/source/reference/job_def.rst +++ b/doc/source/reference/job_def.rst @@ -220,12 +220,26 @@ Here is an example of two job definitions: .. attr:: semaphore - The name of a :ref:`semaphore` which should be acquired and - released when the job begins and ends. If the semaphore is at - maximum capacity, then Zuul will wait until it can be acquired - before starting the job. The format is either a string or a - dictionary. If it's a string it references a semaphore using the - default value for :attr:`job.semaphore.resources-first`. + A deprecated alias of :attr:`job.semaphores`. + + .. attr:: semaphores + + The name of a :ref:`semaphore` (or list of them) which should be + acquired and released when the job begins and ends. If the + semaphore is at maximum capacity, then Zuul will wait until it + can be acquired before starting the job. The format is either a + string, a dictionary, or a list of either of those in the case + of multiple semaphores. If it's a string it references a + semaphore using the default value for + :attr:`job.semaphores.resources-first`. + + If multiple semaphores are requested, the job will not start + until all have been acquired, and Zuul will wait until all are + available before acquiring any. + + When inheriting jobs or applying variants, the list of + semaphores is extended (semaphores specified in a job definition + are added to any supplied by their parents). .. attr:: name :required: @@ -236,12 +250,12 @@ Here is an example of two job definitions: :default: False By default a semaphore is acquired before the resources are - requested. However in some cases the user wants to run cheap - jobs as quickly as possible in a consecutive manner. In this - case :attr:`job.semaphore.resources-first` can be enabled to - request the resources before locking the semaphore. This can - lead to some amount of blocked resources while waiting for the - semaphore so this should be used with caution. + requested. However in some cases the user may want to run + cheap jobs as quickly as possible in a consecutive manner. In + this case `resources-first` can be enabled to request the + resources before locking the semaphore. This can lead to some + amount of blocked resources while waiting for the semaphore + so this should be used with caution. .. attr:: tags diff --git a/releasenotes/notes/semaphore-resources-295dceaf7ddbab0d.yaml b/releasenotes/notes/semaphore-resources-295dceaf7ddbab0d.yaml index 9d3b7d417f..c2fff5018b 100644 --- a/releasenotes/notes/semaphore-resources-295dceaf7ddbab0d.yaml +++ b/releasenotes/notes/semaphore-resources-295dceaf7ddbab0d.yaml @@ -10,5 +10,5 @@ upgrade: to run. However this could lead to a high amount of resource waste. Instead jobs now acquire the semaphore before requesting the resources by default. This behavior can be overridden by jobs using - :attr:`job.semaphore.resources-first` if some waste of resources is + job.semaphores.resources-first if some waste of resources is acceptable. diff --git a/releasenotes/notes/semaphores-53ff91bf6f1ad0f5.yaml b/releasenotes/notes/semaphores-53ff91bf6f1ad0f5.yaml new file mode 100644 index 0000000000..d8b6d10b70 --- /dev/null +++ b/releasenotes/notes/semaphores-53ff91bf6f1ad0f5.yaml @@ -0,0 +1,23 @@ +--- +features: + - | + Jobs may now request multiple semaphores and they will not start until + all semaphores are acquired. + + Use the new :attr:`job.semaphores` (plural) attribute to specify them. + + Note that the new attribute is additive when considering + inheritance and job variants. That is to say that a job + definition with a `semaphores` attribute will extend the list of + semaphores supplied by its parent rather than overriding it (which + is the behavior for the deprecated attribute). +deprecations: + - | + The job attribute :attr:`job.semaphore` (note the singular rather + than plural form) job attribute is now deprecated. Use the plural + form :attr:`job.semaphores` instead. As with most list items in + Zuul configuration, it also accepts a single item without the + wrapping list, so to convert existing jobs, simply change the + spelling of the attribute, no change to the value is required. + + The singular form will be removed in Zuul 5.0. diff --git a/tests/fixtures/layouts/multiple-semaphores.yaml b/tests/fixtures/layouts/multiple-semaphores.yaml new file mode 100644 index 0000000000..95882ae87e --- /dev/null +++ b/tests/fixtures/layouts/multiple-semaphores.yaml @@ -0,0 +1,40 @@ +- pipeline: + name: check + manager: independent + trigger: + gerrit: + - event: patchset-created + success: + gerrit: + Verified: 1 + failure: + gerrit: + Verified: -1 + +- job: + name: base + parent: null + run: playbooks/base.yaml + +- job: + name: job1 + semaphores: + - sem1 + +- job: + name: job2 + semaphores: + - sem1 + - sem2 + +- project: + name: org/project1 + check: + jobs: + - job1 + +- project: + name: org/project2 + check: + jobs: + - job2 diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index e80f4d8bf7..748e85cdda 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -7269,7 +7269,7 @@ class TestSemaphore(ZuulTestCase): self.assertIsNone(jobs[0]["waiting_status"]) self.assertIsNone(jobs[1]["waiting_status"]) self.assertEqual(jobs[2]["waiting_status"], - 'semaphore: test-semaphore') + 'semaphores: test-semaphore') # By default we first lock the semaphore and then get the nodes # so at this point the semaphore needs to be aquired. @@ -7833,6 +7833,43 @@ class TestSemaphore(ZuulTestCase): len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), 0) + @simple_layout('layouts/multiple-semaphores.yaml') + def test_multiple_semaphores(self): + # Test a job with multiple semaphores + self.executor_server.hold_jobs_in_build = True + A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + # One job should be running, and hold sem1 + self.assertBuilds([dict(name='job1')]) + + B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B') + self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + # Job2 requires sem1 and sem2; it hasn't started because job + # is still holding sem1. + self.assertBuilds([dict(name='job1')]) + + self.executor_server.release('job1') + self.waitUntilSettled() + + # Job1 is finished, so job2 can acquire both semaphores. + self.assertBuilds([dict(name='job2')]) + + self.executor_server.release() + self.waitUntilSettled() + self.assertHistory([ + dict(name='job1', result='SUCCESS', changes='1,1'), + dict(name='job2', result='SUCCESS', changes='2,1'), + ]) + # TODO(corvus): Consider a version of this test which launches + # 2 jobs with the same multiple-semaphore requirements + # simultaneously to test the behavior with contention (at + # least one should be able to start on each pass through the + # loop). + class TestSemaphoreMultiTenant(ZuulTestCase): tenant_config_file = 'config/multi-tenant-semaphore/main.yaml' diff --git a/tests/unit/test_web.py b/tests/unit/test_web.py index fcee9ca0c5..17db4b727e 100644 --- a/tests/unit/test_web.py +++ b/tests/unit/test_web.py @@ -365,7 +365,7 @@ class TestWeb(BaseTestWeb): 'pre_run': [], 'post_run': [], 'cleanup_run': [], - 'semaphore': None, + 'semaphores': [], 'source_context': source_ctx, 'tags': [], 'timeout': None, @@ -410,7 +410,7 @@ class TestWeb(BaseTestWeb): 'pre_run': [], 'post_run': [], 'cleanup_run': [], - 'semaphore': None, + 'semaphores': [], 'source_context': source_ctx, 'tags': [], 'timeout': None, @@ -453,7 +453,7 @@ class TestWeb(BaseTestWeb): 'pre_run': [], 'post_run': [], 'cleanup_run': [], - 'semaphore': None, + 'semaphores': [], 'source_context': source_ctx, 'tags': [], 'timeout': None, @@ -572,7 +572,7 @@ class TestWeb(BaseTestWeb): 'pre_run': [], 'post_run': [], 'cleanup_run': [], - 'semaphore': None, + 'semaphores': [], 'source_context': { 'branch': 'master', 'path': 'zuul.yaml', @@ -610,7 +610,7 @@ class TestWeb(BaseTestWeb): 'pre_run': [], 'post_run': [], 'cleanup_run': [], - 'semaphore': None, + 'semaphores': [], 'source_context': { 'branch': 'master', 'path': 'zuul.yaml', @@ -648,7 +648,7 @@ class TestWeb(BaseTestWeb): 'pre_run': [], 'post_run': [], 'cleanup_run': [], - 'semaphore': None, + 'semaphores': [], 'source_context': { 'branch': 'master', 'path': 'zuul.yaml', @@ -686,7 +686,7 @@ class TestWeb(BaseTestWeb): 'pre_run': [], 'post_run': [], 'cleanup_run': [], - 'semaphore': None, + 'semaphores': [], 'source_context': { 'branch': 'master', 'path': 'zuul.yaml', @@ -745,7 +745,7 @@ class TestWeb(BaseTestWeb): 'requires': [], 'roles': [], 'run': [], - 'semaphore': None, + 'semaphores': [], 'source_context': {'branch': 'master', 'path': 'zuul.yaml', 'project': 'common-config'}, diff --git a/web/src/containers/job/JobVariant.jsx b/web/src/containers/job/JobVariant.jsx index 5e33e028d5..79b14c87eb 100644 --- a/web/src/containers/job/JobVariant.jsx +++ b/web/src/containers/job/JobVariant.jsx @@ -84,7 +84,7 @@ class JobVariant extends React.Component { const jobInfos = [ 'description', 'context', 'builds', 'status', - 'parent', 'attempts', 'timeout', 'semaphore', + 'parent', 'attempts', 'timeout', 'semaphores', 'nodeset', 'variables', 'override_checkout', ] jobInfos.forEach(key => { diff --git a/zuul/configloader.py b/zuul/configloader.py index 8566da6979..7a4597a5b0 100644 --- a/zuul/configloader.py +++ b/zuul/configloader.py @@ -619,6 +619,7 @@ class JobParser(object): 'hold-following-changes': bool, 'voting': bool, 'semaphore': vs.Any(semaphore, str), + 'semaphores': to_list(vs.Any(semaphore, str)), 'tags': to_list(str), 'branches': to_list(str), 'files': to_list(str), @@ -883,14 +884,22 @@ class JobParser(object): new_dependencies.append(job_dependency) job.dependencies = new_dependencies - if 'semaphore' in conf: - semaphore = conf.get('semaphore') + semaphores = as_list(conf.get('semaphores', conf.get('semaphore', []))) + job_semaphores = [] + for semaphore in semaphores: if isinstance(semaphore, str): - job.semaphore = model.JobSemaphore(semaphore) + job_semaphores.append(model.JobSemaphore(semaphore)) else: - job.semaphore = model.JobSemaphore( + job_semaphores.append(model.JobSemaphore( semaphore.get('name'), - semaphore.get('resources-first', False)) + semaphore.get('resources-first', False))) + if job_semaphores: + # Sort the list of semaphores to avoid issues with + # contention (where two jobs try to start at the same time + # and fail due to acquiring the same semaphores but in + # reverse order. + job.semaphores = tuple(sorted(job_semaphores, + key=lambda x: x.name)) for k in ('tags', 'requires', 'provides'): v = frozenset(as_list(conf.get(k))) diff --git a/zuul/model.py b/zuul/model.py index 2b5204bbe7..86ea67f81c 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -1230,7 +1230,7 @@ class Job(ConfigObject): cleanup_run=(), run=(), ansible_version=None, - semaphore=None, + semaphores=(), attempts=3, final=False, abstract=False, @@ -1295,12 +1295,7 @@ class Job(ConfigObject): d['required_projects'] = [] for project in self.required_projects.values(): d['required_projects'].append(project.toDict()) - if self.semaphore: - # For now just leave the semaphore name here until we really need - # more information in zuul-web about this - d['semaphore'] = self.semaphore.name - else: - d['semaphore'] = None + d['semaphores'] = [s.toDict() for s in self.semaphores] d['variables'] = self.variables d['extra_variables'] = self.extra_variables d['host_variables'] = self.host_variables @@ -1621,7 +1616,8 @@ class Job(ConfigObject): if k not in set(['pre_run', 'run', 'post_run', 'cleanup_run', 'roles', 'variables', 'extra_variables', 'host_variables', 'group_variables', - 'required_projects', 'allowed_projects']): + 'required_projects', 'allowed_projects', + 'semaphores']): setattr(self, k, other._get(k)) # Don't set final above so that we don't trip an error halfway @@ -1715,6 +1711,14 @@ class Job(ConfigObject): other.allowed_projects)) elif other._get('allowed_projects') is not None: self.allowed_projects = other.allowed_projects + if other._get('semaphores') is not None: + # Sort the list of semaphores to avoid issues with + # contention (where two jobs try to start at the same time + # and fail due to acquiring the same semaphores but in + # reverse order. + self.semaphores = tuple( + sorted(other.semaphores + self.semaphores, + key=lambda x: x.name)) for k in self.context_attributes: if (other._get(k) is not None and @@ -2814,8 +2818,8 @@ class QueueItem(object): toreq.append(job) job.queued = True else: - job.waiting_status = 'semaphore: {}'.format( - job.semaphore.name) + sem_names = ','.join([s.name for s in job.semaphores]) + job.waiting_status = 'semaphores: {}'.format(sem_names) return toreq def setResult(self, build): diff --git a/zuul/zk/semaphore.py b/zuul/zk/semaphore.py index 37d89a052e..ec872196c6 100644 --- a/zuul/zk/semaphore.py +++ b/zuul/zk/semaphore.py @@ -60,11 +60,23 @@ class SemaphoreHandler(ZooKeeperSimpleBase): self.log.exception("Unable to send semaphore stats:") def acquire(self, item, job, request_resources): - if not job.semaphore: + if not job.semaphores: return True log = get_annotated_logger(self.log, item.event) - if job.semaphore.resources_first and request_resources: + all_acquired = True + for semaphore in job.semaphores: + if not self._acquire_one(log, item, job, request_resources, + semaphore): + all_acquired = False + break + if not all_acquired: + self.release(item, job) + return False + return True + + def _acquire_one(self, log, item, job, request_resources, semaphore): + if semaphore.resources_first and request_resources: # We're currently in the resource request phase and want to get the # resources before locking. So we don't need to do anything here. return True @@ -75,7 +87,7 @@ class SemaphoreHandler(ZooKeeperSimpleBase): # the resources phase. pass - semaphore_key = quote_plus(job.semaphore.name) + semaphore_key = quote_plus(semaphore.name) semaphore_path = f"{self.tenant_root}/{semaphore_key}" semaphore_handle = f"{item.uuid}-{job.name}" @@ -86,7 +98,7 @@ class SemaphoreHandler(ZooKeeperSimpleBase): return True # semaphore is there, check max - while len(semaphore_holders) < self._max_count(job.semaphore.name): + while len(semaphore_holders) < self._max_count(semaphore.name): semaphore_holders.append(semaphore_handle) try: @@ -96,12 +108,12 @@ class SemaphoreHandler(ZooKeeperSimpleBase): except BadVersionError: log.debug( "Retrying semaphore %s acquire due to concurrent update", - job.semaphore.name) + semaphore.name) semaphore_holders, zstat = self.getHolders(semaphore_path) continue log.info("Semaphore %s acquired: job %s, item %s", - job.semaphore.name, job.name, item) + semaphore.name, job.name, item) self._emitStats(semaphore_path, len(semaphore_holders)) return True @@ -145,11 +157,16 @@ class SemaphoreHandler(ZooKeeperSimpleBase): break def release(self, item, job): - if not job.semaphore: + if not job.semaphores: return log = get_annotated_logger(self.log, item.event) - semaphore_key = quote_plus(job.semaphore.name) + + for semaphore in job.semaphores: + self._release_one(log, item, job, semaphore) + + def _release_one(self, log, item, job, semaphore): + semaphore_key = quote_plus(semaphore.name) semaphore_path = f"{self.tenant_root}/{semaphore_key}" semaphore_handle = f"{item.uuid}-{job.name}"