diff --git a/doc/source/config/job.rst b/doc/source/config/job.rst index e27e6d205a..435d68e7e7 100644 --- a/doc/source/config/job.rst +++ b/doc/source/config/job.rst @@ -483,48 +483,164 @@ Here is an example of two job definitions: .. attr:: pre-run The name of a playbook or list of playbooks to run before the - main body of a job. The full path to the playbook in the repo - where the job is defined is expected. + main body of a job. Values are either a string describing the + full path to the playbook in the repo where the job is defined, + or a dictionary described below. When a job inherits from a parent, the child's pre-run playbooks are run after the parent's. See :ref:`job` for more information. + If the value is a dictionary, the following attributes are + available: + + .. attr:: name + + The path to the playbook relative to the root of the repo. + + .. attr:: semaphore + + The name of a :ref:`semaphore` (or list of them) or + :ref:`global_semaphore` which should be acquired and released + when the playbook begins and ends. If the semaphore is at + maximum capacity, then Zuul will wait until it can be + acquired before starting the playbook. The format is either a + string, or a list of strings. + + If multiple semaphores are requested, the playbook will not + start until all have been acquired, and Zuul will wait until + all are available before acquiring any. The time spent + waiting for pre-run playbook semaphores is counted against + the :attr:`job.timeout`. + + None of the semaphores specified for a playbook may also be + specified in the same job. + .. attr:: post-run The name of a playbook or list of playbooks to run after the - main body of a job. The full path to the playbook in the repo - where the job is defined is expected. + main body of a job. Values are either a string describing the + full path to the playbook in the repo where the job is defined, + or a dictionary described below. - When a job inherits from a parent, the child's post-run - playbooks are run before the parent's. See :ref:`job` for more + When a job inherits from a parent, the child's post-run playbooks + are run before the parent's. See :ref:`job` for more information. + If the value is a dictionary, the following attributes are + available: + + .. attr:: name + + The path to the playbook relative to the root of the repo. + + .. attr:: semaphore + + The name of a :ref:`semaphore` (or list of them) or + :ref:`global_semaphore` which should be acquired and released + when the playbook begins and ends. If the semaphore is at + maximum capacity, then Zuul will wait until it can be + acquired before starting the playbook. The format is either a + string, or a list of strings. + + If multiple semaphores are requested, the playbook will not + start until all have been acquired, and Zuul will wait until + all are available before acquiring any. The time spent + waiting for post-run playbook semaphores is counted against + the :attr:`job.post-timeout`. + + None of the semaphores specified for a playbook may also be + specified in the same job. + .. attr:: cleanup-run - The name of a playbook or list of playbooks to run after a job - execution. The full path to the playbook in the repo - where the job is defined is expected. + The name of a playbook or list of playbooks to run after job + execution. Values are either a string describing the full path + to the playbook in the repo where the job is defined, or a + dictionary described below. - The cleanup phase is performed unconditionally of the job's result, - even when the job is canceled. Cleanup results are not taken into - account. + The cleanup phase is performed regardless of the job's result, + even when the job is canceled. Cleanup results are not taken + into account when reporting the job result. - When a job inherits from a parent, the child's cleanup-run - playbooks are run before the parent's. See :ref:`job` for more + When a job inherits from a parent, the child's cleanup-run playbooks + are run before the parent's. See :ref:`job` for more information. + There is a hard-coded five minute timeout for cleanup playbooks. + + If the value is a dictionary, the following attributes are + available: + + .. attr:: name + + The path to the playbook relative to the root of the repo. + + .. attr:: semaphore + + The name of a :ref:`semaphore` (or list of them) or + :ref:`global_semaphore` which should be acquired and released + when the playbook begins and ends. If the semaphore is at + maximum capacity, then Zuul will wait until it can be + acquired before starting the playbook. The format is either a + string, or a list of strings. + + If multiple semaphores are requested, the playbook will not + start until all have been acquired, and Zuul will wait until + all are available before acquiring any. The time spent + waiting for post-run playbook semaphores is counted against + the cleanup phase timeout. + + None of the semaphores specified for a playbook may also be + specified in the same job. + .. attr:: run The name of a playbook or list of playbooks for this job. If it - is not supplied, the parent's playbook will be used (and likewise - up the inheritance chain). The full path within the repo is - required. Example: + is not supplied, the parent's playbook will be used (and + likewise up the inheritance chain). Values are either a string + describing the full path to the playbook in the repo where the + job is defined, or a dictionary described below. + + If the value is a dictionary, the following attributes are + available: + + .. attr:: name + + The path to the playbook relative to the root of the repo. + + .. attr:: semaphore + + The name of a :ref:`semaphore` (or list of them) or + :ref:`global_semaphore` which should be acquired and released + when the playbook begins and ends. If the semaphore is at + maximum capacity, then Zuul will wait until it can be + acquired before starting the playbook. The format is either a + string, or a list of strings. + + If multiple semaphores are requested, the playbook will not + start until all have been acquired, and Zuul will wait until + all are available before acquiring any. The time spent + waiting for run playbook semaphores is counted against + the :attr:`job.timeout`. + + None of the semaphores specified for a playbook may also be + specified in the same job. + + Example: .. code-block:: yaml run: playbooks/job-playbook.yaml + Or: + + .. code-block:: yaml + + run: + - name: playbooks/job-playbook.yaml + semaphores: playbook-semaphore + .. attr:: ansible-version The ansible version to use for all playbooks of the job. This can be diff --git a/releasenotes/notes/playbook-semaphore-0dc1ee618ef77678.yaml b/releasenotes/notes/playbook-semaphore-0dc1ee618ef77678.yaml new file mode 100644 index 0000000000..77515f3a0b --- /dev/null +++ b/releasenotes/notes/playbook-semaphore-0dc1ee618ef77678.yaml @@ -0,0 +1,11 @@ +--- +features: + - | + Individual playbooks may now be wrapped by a semaphore. Zuul will + start the job and proceed up to the point of a playbook which + requires a semaphore and then wait until it is able to aquire the + semaphore before proceeding. It releases the semaphore after the + end of that individual playbook. + + The same semaphore may be used for both jobs and playbooks, but an + individual job may not use the same semaphore for both purposes. diff --git a/tests/base.py b/tests/base.py index e052e37297..50de84d610 100644 --- a/tests/base.py +++ b/tests/base.py @@ -3257,6 +3257,7 @@ class FakeBuild(object): class RecordingAnsibleJob(zuul.executor.server.AnsibleJob): result = None + semaphore_sleep_time = 5 def _execute(self): for _ in iterate_timeout(60, 'wait for merge'): diff --git a/tests/fixtures/config/playbook-semaphore/git/common-config/playbooks/run.yaml b/tests/fixtures/config/playbook-semaphore/git/common-config/playbooks/run.yaml new file mode 100644 index 0000000000..9d92e1c60f --- /dev/null +++ b/tests/fixtures/config/playbook-semaphore/git/common-config/playbooks/run.yaml @@ -0,0 +1,13 @@ +- hosts: all + tasks: + - debug: var=waitpath + - file: + path: "{{zuul._test.test_root}}/builds/{{zuul.build}}.run_start.flag" + state: touch + # Do not finish until test creates the flag file + - wait_for: + state: present + path: "{{waitpath}}" + - file: + path: "{{zuul._test.test_root}}/builds/{{zuul.build}}.run_end.flag" + state: touch diff --git a/tests/fixtures/config/playbook-semaphore/git/common-config/zuul.yaml b/tests/fixtures/config/playbook-semaphore/git/common-config/zuul.yaml new file mode 100644 index 0000000000..c8a85d7af6 --- /dev/null +++ b/tests/fixtures/config/playbook-semaphore/git/common-config/zuul.yaml @@ -0,0 +1,32 @@ +- pipeline: + name: check + manager: independent + post-review: true + trigger: + gerrit: + - event: patchset-created + success: + gerrit: + Verified: 1 + failure: + gerrit: + Verified: -1 + +- job: + name: base + parent: null + nodeset: + nodes: + - name: test_node + label: test_label + +- semaphore: + name: test-semaphore + +- job: + name: test-job + vars: + waitpath: '{{zuul._test.test_root}}/builds/{{zuul.build}}/test_wait' + run: + - name: playbooks/run.yaml + semaphores: test-semaphore diff --git a/tests/fixtures/config/playbook-semaphore/git/org_project/.zuul.yaml b/tests/fixtures/config/playbook-semaphore/git/org_project/.zuul.yaml new file mode 100644 index 0000000000..f8c86c5bb4 --- /dev/null +++ b/tests/fixtures/config/playbook-semaphore/git/org_project/.zuul.yaml @@ -0,0 +1,5 @@ +- project: + name: org/project + check: + jobs: + - test-job diff --git a/tests/fixtures/config/playbook-semaphore/git/org_project/README b/tests/fixtures/config/playbook-semaphore/git/org_project/README new file mode 100644 index 0000000000..9daeafb986 --- /dev/null +++ b/tests/fixtures/config/playbook-semaphore/git/org_project/README @@ -0,0 +1 @@ +test diff --git a/tests/fixtures/config/playbook-semaphore/main.yaml b/tests/fixtures/config/playbook-semaphore/main.yaml new file mode 100644 index 0000000000..208e274b13 --- /dev/null +++ b/tests/fixtures/config/playbook-semaphore/main.yaml @@ -0,0 +1,8 @@ +- tenant: + name: tenant-one + source: + gerrit: + config-projects: + - common-config + untrusted-projects: + - org/project diff --git a/tests/unit/test_model.py b/tests/unit/test_model.py index c88a7bd95d..261d08af40 100644 --- a/tests/unit/test_model.py +++ b/tests/unit/test_model.py @@ -53,8 +53,10 @@ class TestJob(BaseTestCase): self.connection = Dummy(connection_name='dummy_connection') self.source = Dummy(canonical_hostname='git.example.com', connection=self.connection) + self.abide = model.Abide() self.tenant = model.Tenant('tenant') self.tenant.default_ansible_version = AnsibleManager().default_version + self.tenant.semaphore_handler = Dummy(abide=self.abide) self.layout = model.Layout(self.tenant) self.tenant.layout = self.layout self.project = model.Project('project', self.source) @@ -129,16 +131,18 @@ class TestJob(BaseTestCase): # This simulates freezing a job. secrets = ['foo'] - py27_pre = model.PlaybookContext(self.context, 'py27-pre', [], secrets) - py27_run = model.PlaybookContext(self.context, 'py27-run', [], secrets) - py27_post = model.PlaybookContext(self.context, 'py27-post', [], - secrets) + py27_pre = model.PlaybookContext( + self.context, 'py27-pre', [], secrets, []) + py27_run = model.PlaybookContext( + self.context, 'py27-run', [], secrets, []) + py27_post = model.PlaybookContext( + self.context, 'py27-post', [], secrets, []) py27 = model.Job('py27') py27.timeout = 30 - py27.pre_run = [py27_pre] - py27.run = [py27_run] - py27.post_run = [py27_post] + py27.pre_run = (py27_pre,) + py27.run = (py27_run,) + py27.post_run = (py27_post,) job = py27.copy() self.assertEqual(30, job.timeout) @@ -146,7 +150,7 @@ class TestJob(BaseTestCase): # Apply the diablo variant diablo = model.Job('py27') diablo.timeout = 40 - job.applyVariant(diablo, self.layout) + job.applyVariant(diablo, self.layout, None) self.assertEqual(40, job.timeout) self.assertEqual(['py27-pre'], @@ -165,7 +169,7 @@ class TestJob(BaseTestCase): good_final = model.Job('py27') good_final.voting = False - job.applyVariant(good_final, self.layout) + job.applyVariant(good_final, self.layout, None) self.assertFalse(job.voting) bad_final = model.Job('py27') @@ -173,7 +177,7 @@ class TestJob(BaseTestCase): with testtools.ExpectedException( Exception, "Unable to modify final job"): - job.applyVariant(bad_final, self.layout) + job.applyVariant(bad_final, self.layout, None) @mock.patch("zuul.model.zkobject.ZKObject._save") def test_job_inheritance_job_tree(self, save_mock): diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py index aa00e733fc..52c596bd80 100644 --- a/tests/unit/test_v3.py +++ b/tests/unit/test_v3.py @@ -4325,6 +4325,245 @@ class TestCleanupPlaybooks(AnsibleZuulTestCase): self.assertFalse(os.path.exists(post_end)) +class TestPlaybookSemaphore(AnsibleZuulTestCase): + tenant_config_file = 'config/playbook-semaphore/main.yaml' + + def test_playbook_semaphore(self): + self.executor_server.verbose = True + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + + for _ in iterate_timeout(60, 'job started'): + if len(self.builds) == 1: + break + build1 = self.builds[0] + + # Wait for the first job to be running the mutexed playbook + run1_start = os.path.join(self.jobdir_root, build1.uuid + + '.run_start.flag') + for _ in iterate_timeout(60, 'job1 running'): + if os.path.exists(run1_start): + break + + # Start a second build which should wait for the playbook + B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') + self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) + + # Wait until we are waiting for the playbook + for _ in iterate_timeout(60, 'job2 waiting for semaphore'): + found = False + if len(self.builds) == 2: + build2 = self.builds[1] + for job_worker in self.executor_server.job_workers.values(): + if job_worker.build_request.uuid == build2.uuid: + if job_worker.waiting_for_semaphores: + found = True + if found: + break + + # Wait for build1 to finish + with open(os.path.join(self.jobdir_root, build1.uuid, 'test_wait'), + "w") as of: + of.write("continue") + + # Wait for the second job to be running the mutexed playbook + run2_start = os.path.join(self.jobdir_root, build2.uuid + + '.run_start.flag') + for _ in iterate_timeout(60, 'job2 running'): + if os.path.exists(run2_start): + break + + # Release build2 and wait to finish + with open(os.path.join(self.jobdir_root, build2.uuid, 'test_wait'), + "w") as of: + of.write("continue") + self.waitUntilSettled() + + self.assertHistory([ + dict(name='test-job', result='SUCCESS', changes='1,1'), + dict(name='test-job', result='SUCCESS', changes='2,1'), + ]) + + def test_playbook_and_job_semaphore_runtime(self): + # Test that a playbook does not specify the same semaphore as + # the job. Test via inheritance which is a runtime check. + in_repo_conf = textwrap.dedent( + """ + - job: + name: test-job2 + parent: test-job + semaphore: test-semaphore + + - project: + check: + jobs: + - test-job2 + """) + + file_dict = {'.zuul.yaml': in_repo_conf} + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A', + files=file_dict) + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + self.assertHistory([]) + self.assertEqual(A.reported, 1) + self.assertEqual(A.patchsets[0]['approvals'][0]['value'], "-1") + self.assertIn('both job and playbook', A.messages[0]) + + def test_playbook_and_job_semaphore_def(self): + # Test that a playbook does not specify the same semaphore as + # the job. Static configuration test. + in_repo_conf = textwrap.dedent( + """ + - job: + name: test-job2 + semaphore: test-semaphore + run: + - name: playbooks/run.yaml + semaphores: test-semaphore + + - project: + check: + jobs: + - test-job2 + """) + + file_dict = {'.zuul.yaml': in_repo_conf} + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A', + files=file_dict) + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + self.assertHistory([]) + self.assertEqual(A.reported, 1) + self.assertEqual(A.patchsets[0]['approvals'][0]['value'], "-1") + self.assertIn('both job and playbook', A.messages[0]) + + def test_playbook_semaphore_timeout(self): + self.wait_timeout = 300 + self.executor_server.verbose = True + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + + for _ in iterate_timeout(60, 'job started'): + if len(self.builds) == 1: + break + build1 = self.builds[0] + + # Wait for the first job to be running the mutexed playbook + run1_start = os.path.join(self.jobdir_root, build1.uuid + + '.run_start.flag') + for _ in iterate_timeout(60, 'job1 running'): + if os.path.exists(run1_start): + break + + # Start a second build which should wait for the playbook + in_repo_conf = textwrap.dedent( + """ + - project: + check: + jobs: + - test-job: + timeout: 20 + """) + + file_dict = {'.zuul.yaml': in_repo_conf} + B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B', + files=file_dict) + self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) + + # Wait until we are waiting for the playbook + for _ in iterate_timeout(60, 'job2 waiting for semaphore'): + found = False + if len(self.builds) == 2: + build2 = self.builds[1] + for job_worker in self.executor_server.job_workers.values(): + if job_worker.build_request.uuid == build2.uuid: + if job_worker.waiting_for_semaphores: + found = True + if found: + break + + # Wait for the second build to timeout waiting for the semaphore + for _ in iterate_timeout(60, 'build timed out'): + if len(self.builds) == 1: + break + + # Wait for build1 to finish + with open(os.path.join(self.jobdir_root, build1.uuid, 'test_wait'), + "w") as of: + of.write("continue") + + self.waitUntilSettled() + + self.assertHistory([ + dict(name='test-job', result='TIMED_OUT', changes='2,1'), + dict(name='test-job', result='SUCCESS', changes='1,1'), + ]) + + def test_playbook_semaphore_abort(self): + self.wait_timeout = 300 + self.executor_server.verbose = True + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + + for _ in iterate_timeout(60, 'job started'): + if len(self.builds) == 1: + break + build1 = self.builds[0] + + # Wait for the first job to be running the mutexed playbook + run1_start = os.path.join(self.jobdir_root, build1.uuid + + '.run_start.flag') + for _ in iterate_timeout(60, 'job1 running'): + if os.path.exists(run1_start): + break + + # Start a second build which should wait for the playbook + B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') + self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) + + # Wait until we are waiting for the playbook + for _ in iterate_timeout(60, 'job2 waiting for semaphore'): + found = False + if len(self.builds) == 2: + build2 = self.builds[1] + for job_worker in self.executor_server.job_workers.values(): + if job_worker.build_request.uuid == build2.uuid: + if job_worker.waiting_for_semaphores: + found = True + if found: + break + + in_repo_conf = textwrap.dedent( + """ + - project: + check: + jobs: [] + """) + + file_dict = {'.zuul.yaml': in_repo_conf} + B.addPatchset(files=file_dict) + self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(2)) + + for _ in iterate_timeout(60, 'build aborted'): + if len(self.builds) == 1: + break + + # Wait for build1 to finish + with open(os.path.join(self.jobdir_root, build1.uuid, 'test_wait'), + "w") as of: + of.write("continue") + + self.waitUntilSettled() + + self.assertHistory([ + dict(name='test-job', result='ABORTED', changes='2,1'), + dict(name='test-job', result='SUCCESS', changes='1,1'), + ]) + + class TestBrokenTrustedConfig(ZuulTestCase): # Test we can deal with a broken config only with trusted projects. This # is different then TestBrokenConfig, as it does not have a missing diff --git a/tests/unit/test_web.py b/tests/unit/test_web.py index ad99a975c1..3145d4a0a3 100644 --- a/tests/unit/test_web.py +++ b/tests/unit/test_web.py @@ -441,6 +441,7 @@ class TestWeb(BaseTestWeb): 'type': 'zuul' }], 'secrets': [], + 'semaphores': [], 'source_context': source_ctx, }] @@ -611,6 +612,7 @@ class TestWeb(BaseTestWeb): 'type': 'zuul' }], 'secrets': [], + 'semaphores': [], 'source_context': { 'branch': 'master', 'path': 'zuul.yaml', @@ -1188,6 +1190,7 @@ class TestWeb(BaseTestWeb): 'project': 'common-config', }], 'secrets': {}, + 'semaphores': [], 'path': 'playbooks/project-test1.yaml', }], 'pre_playbooks': [], diff --git a/zuul/configloader.py b/zuul/configloader.py index 03953eb319..67d4494c49 100644 --- a/zuul/configloader.py +++ b/zuul/configloader.py @@ -633,6 +633,11 @@ class JobParser(object): semaphore = {vs.Required('name'): str, 'resources-first': bool} + complex_playbook_def = {vs.Required('name'): str, + 'semaphores': to_list(str)} + + playbook_def = to_list(vs.Any(str, complex_playbook_def)) + # Attributes of a job that can also be used in Project and ProjectTemplate job_attributes = {'parent': vs.Any(str, None), 'final': bool, @@ -661,10 +666,10 @@ class JobParser(object): 'timeout': int, 'post-timeout': int, 'attempts': int, - 'pre-run': to_list(str), - 'post-run': to_list(str), - 'run': to_list(str), - 'cleanup-run': to_list(str), + 'pre-run': playbook_def, + 'post-run': playbook_def, + 'run': playbook_def, + 'cleanup-run': playbook_def, 'ansible-version': vs.Any(str, float, int), '_source_context': model.SourceContext, '_start_mark': model.ZuulMark, @@ -827,29 +832,57 @@ class JobParser(object): roles.insert(0, r) job.addRoles(roles) - for pre_run_name in as_list(conf.get('pre-run')): + seen_playbook_semaphores = set() + + def get_playbook_attrs(playbook_defs): + # Helper method to extract information from a playbook + # defenition. + for pb_def in playbook_defs: + pb_semaphores = [] + if isinstance(pb_def, dict): + pb_name = pb_def['name'] + for pb_sem_name in as_list(pb_def.get('semaphores')): + pb_semaphores.append(model.JobSemaphore(pb_sem_name)) + seen_playbook_semaphores.add(pb_sem_name) + else: + # The playbook definition is a simple string path + pb_name = pb_def + # Sort the list of semaphores to avoid issues with + # contention (where two jobs try to start at the same time + # and fail due to acquiring the same semaphores but in + # reverse order. + pb_semaphores = tuple(sorted(pb_semaphores, + key=lambda x: x.name)) + yield (pb_name, pb_semaphores) + + for pre_run_name, pre_run_semaphores in get_playbook_attrs( + as_list(conf.get('pre-run'))): pre_run = model.PlaybookContext(job.source_context, pre_run_name, job.roles, - secrets) + secrets, pre_run_semaphores) job.pre_run = job.pre_run + (pre_run,) # NOTE(pabelanger): Reverse the order of our post-run list. We prepend # post-runs for inherits however, we want to execute post-runs in the # order they are listed within the job. - for post_run_name in reversed(as_list(conf.get('post-run'))): + for post_run_name, post_run_semaphores in get_playbook_attrs( + reversed(as_list(conf.get('post-run')))): post_run = model.PlaybookContext(job.source_context, post_run_name, job.roles, - secrets) + secrets, post_run_semaphores) job.post_run = (post_run,) + job.post_run - for cleanup_run_name in reversed(as_list(conf.get('cleanup-run'))): - cleanup_run = model.PlaybookContext(job.source_context, - cleanup_run_name, job.roles, - secrets) + for cleanup_run_name, cleanup_run_semaphores in get_playbook_attrs( + reversed(as_list(conf.get('cleanup-run')))): + cleanup_run = model.PlaybookContext( + job.source_context, + cleanup_run_name, job.roles, + secrets, cleanup_run_semaphores) job.cleanup_run = (cleanup_run,) + job.cleanup_run if 'run' in conf: - for run_name in as_list(conf.get('run')): + for run_name, run_semaphores in get_playbook_attrs( + as_list(conf.get('run'))): run = model.PlaybookContext(job.source_context, run_name, - job.roles, secrets) + job.roles, secrets, run_semaphores) job.run = job.run + (run,) if conf.get('intermediate', False) and not conf.get('abstract', False): @@ -926,6 +959,7 @@ class JobParser(object): job_semaphores.append(model.JobSemaphore( semaphore.get('name'), semaphore.get('resources-first', False))) + if job_semaphores: # Sort the list of semaphores to avoid issues with # contention (where two jobs try to start at the same time @@ -933,6 +967,12 @@ class JobParser(object): # reverse order. job.semaphores = tuple(sorted(job_semaphores, key=lambda x: x.name)) + common = (set([x.name for x in job_semaphores]) & + seen_playbook_semaphores) + if common: + raise Exception(f"Semaphores {common} specified as both " + "job and playbook semaphores but may only " + "be used for one") for k in ('tags', 'requires', 'provides'): v = frozenset(as_list(conf.get(k))) diff --git a/zuul/executor/client.py b/zuul/executor/client.py index 00d47ba380..9aa38cbcab 100644 --- a/zuul/executor/client.py +++ b/zuul/executor/client.py @@ -67,6 +67,10 @@ class ExecutorClient(object): # TODO: deprecate and remove this variable? params["zuul"]["_inheritance_path"] = list(job.inheritance_path) + semaphore_handler = item.pipeline.tenant.semaphore_handler + params['semaphore_handle'] = semaphore_handler.getSemaphoreHandle( + item, job) + parent_span = tracing.restoreSpan(item.current_build_set.span_info) execute_time = time.time() with trace.use_span(parent_span): diff --git a/zuul/executor/server.py b/zuul/executor/server.py index 9c7ed158c9..c2aa4fbb58 100644 --- a/zuul/executor/server.py +++ b/zuul/executor/server.py @@ -82,6 +82,8 @@ from zuul.zk.executor import ExecutorApi from zuul.zk.job_request_queue import JobRequestEvent from zuul.zk.system import ZuulSystem from zuul.zk.zkobject import ZKContext +from zuul.zk.semaphore import SemaphoreHandler + BUFFER_LINES_FOR_SYNTAX = 200 DEFAULT_FINGER_PORT = 7900 @@ -497,6 +499,7 @@ class JobDirPlaybook(object): self.secrets = os.path.join(self.secrets_root, 'all.yaml') self.secrets_content = None self.secrets_keys = set() + self.semaphores = [] def addRole(self): count = len(self.roles) @@ -963,6 +966,8 @@ class AnsibleJob(object): RESULT_DISK_FULL: 'RESULT_DISK_FULL', } + semaphore_sleep_time = 30 + def __init__(self, executor_server, build_request, arguments): logger = logging.getLogger("zuul.AnsibleJob") self.arguments = arguments @@ -1042,6 +1047,7 @@ class AnsibleJob(object): self.frozen_hostvars = {} # The zuul.* vars self.zuul_vars = {} + self.waiting_for_semaphores = False def run(self): self.running = True @@ -2034,6 +2040,7 @@ class AnsibleJob(object): jobdir_playbook.project_canonical_name = project.canonical_name jobdir_playbook.canonical_name_and_path = os.path.join( project.canonical_name, playbook['path']) + jobdir_playbook.semaphores = playbook['semaphores'] path = None if not jobdir_playbook.trusted: @@ -3086,10 +3093,45 @@ class AnsibleJob(object): self.emitPlaybookBanner(playbook, 'START', phase) - result, code = self.runAnsible(cmd, timeout, playbook, ansible_version, - cleanup=phase == 'cleanup') + semaphore_handle = self.arguments['semaphore_handle'] + semaphore_wait_start = time.time() + acquired_semaphores = True + while not self.executor_server.semaphore_handler.acquireFromInfo( + self.log, playbook.semaphores, semaphore_handle): + self.log.debug("Unable to acquire playbook semaphores, waiting") + if not self.waiting_for_semaphores: + self.waiting_for_semaphores = True + remain = self.getAnsibleTimeout(semaphore_wait_start, timeout) + if remain is not None and remain <= 0: + self.log.info("Timed out waiting for semaphore") + acquired_semaphores = False + result = self.RESULT_TIMED_OUT + code = 0 + break + if self.aborted: + acquired_semaphores = False + result = self.RESULT_ABORTED + code = 0 + break + time.sleep(self.semaphore_sleep_time) + if self.waiting_for_semaphores: + self.waiting_for_semaphores = False + timeout = self.getAnsibleTimeout(semaphore_wait_start, timeout) + + if acquired_semaphores: + result, code = self.runAnsible( + cmd, timeout, playbook, ansible_version, + cleanup=phase == 'cleanup') self.log.debug("Ansible complete, result %s code %s" % ( self.RESULT_MAP[result], code)) + + if acquired_semaphores: + event_queue = self.executor_server.result_events[ + self.build_request.tenant_name][ + self.build_request.pipeline_name] + self.executor_server.semaphore_handler.releaseFromInfo( + self.log, event_queue, playbook.semaphores, semaphore_handle) + if self.executor_server.statsd: base_key = "zuul.executor.{hostname}.phase.{phase}" self.executor_server.statsd.incr( @@ -3308,6 +3350,9 @@ class ExecutorServer(BaseMergeServer): # Used to offload expensive operations to different processes self.process_worker = None + self.semaphore_handler = SemaphoreHandler( + self.zk_client, self.statsd, None, None, None) + def _get_key_store_password(self): try: return self.config["keystore"]["password"] diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 480a3f9ca5..c7a89f7c75 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -959,7 +959,10 @@ class PipelineManager(metaclass=ABCMeta): # current item so a potentially aquired semaphore must be # released as it won't be released on dequeue of the item. tenant = item.pipeline.tenant - tenant.semaphore_handler.release(self.sched, item, job) + pipeline = build_set.item.pipeline + event_queue = self.sched.pipeline_result_events[ + tenant.name][pipeline.name] + tenant.semaphore_handler.release(event_queue, item, job) except Exception: log.exception("Exception while releasing semaphore") @@ -1796,8 +1799,11 @@ class PipelineManager(metaclass=ABCMeta): item = build.build_set.item log.debug("Build %s of %s completed" % (build, item.change)) + + event_queue = self.sched.pipeline_result_events[ + item.pipeline.tenant.name][item.pipeline.name] item.pipeline.tenant.semaphore_handler.release( - self.sched, item, build.job) + event_queue, item, build.job) if item.getJob(build.job.name) is None: log.info("Build %s no longer in job graph for item %s", @@ -1970,9 +1976,12 @@ class PipelineManager(metaclass=ABCMeta): build_set.item.setNodeRequestFailure( job, f'Node request {request.id} failed') self._resumeBuilds(build_set) - tenant = build_set.item.pipeline.tenant + pipeline = build_set.item.pipeline + tenant = pipeline.tenant + event_queue = self.sched.pipeline_result_events[ + tenant.name][pipeline.name] tenant.semaphore_handler.release( - self.sched, build_set.item, job) + event_queue, build_set.item, job) log.info("Completed node request %s for job %s of item %s " "with nodes %s", diff --git a/zuul/model.py b/zuul/model.py index 9d219dcfee..bd1dca00e4 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -1890,7 +1890,8 @@ class PlaybookContext(ConfigObject): """ - def __init__(self, source_context, path, roles, secrets): + def __init__(self, source_context, path, roles, secrets, + semaphores): super(PlaybookContext, self).__init__() self.source_context = source_context self.path = path @@ -1901,6 +1902,10 @@ class PlaybookContext(ConfigObject): # FrozenSecret objects which contain only the info the # executor needs self.frozen_secrets = () + # The original JobSemaphore objects + self.semaphores = semaphores + # the result of getSemaphoreInfo from semaphore handler + self.frozen_semaphores = () def __repr__(self): return '' % (self.source_context, @@ -1915,13 +1920,15 @@ class PlaybookContext(ConfigObject): return (self.source_context == other.source_context and self.path == other.path and self.roles == other.roles and - self.secrets == other.secrets) + self.secrets == other.secrets and + self.semaphores == other.semaphores) def copy(self): r = PlaybookContext(self.source_context, self.path, self.roles, - self.secrets) + self.secrets, + self.semaphores) return r def validateReferences(self, layout): @@ -1944,6 +1951,15 @@ class PlaybookContext(ConfigObject): self.source_context.project_canonical_name)[1] # Decrypt a copy of the secret to verify it can be done secret.decrypt(project.private_secrets_key) + # TODO: if we remove the implicit max=1 semaphore, validate + # references here. + + def freezeSemaphores(self, layout, semaphore_handler): + semaphores = [] + for job_semaphore in self.semaphores: + info = semaphore_handler.getSemaphoreInfo(job_semaphore) + semaphores.append(info) + self.frozen_semaphores = tuple(semaphores) def freezeSecrets(self, layout): secrets = [] @@ -1981,6 +1997,7 @@ class PlaybookContext(ConfigObject): trusted=self.source_context.trusted, roles=[r.toDict() for r in self.roles], secrets=secrets, + semaphores=self.frozen_semaphores, path=self.path) def toSchemaDict(self): @@ -1990,6 +2007,7 @@ class PlaybookContext(ConfigObject): 'roles': list(map(lambda x: x.toDict(), self.roles)), 'secrets': [{'name': secret.name, 'alias': secret.alias} for secret in self.secrets], + 'semaphores': [{'name': sem.name} for sem in self.semaphores], } if self.source_context: d['source_context'] = self.source_context.toDict() @@ -2847,16 +2865,20 @@ class Job(ConfigObject): def _get(self, name): return self.__dict__.get(name) - def setBase(self, layout): + def setBase(self, layout, semaphore_handler): self.inheritance_path = self.inheritance_path + (repr(self),) if self._get('run') is not None: - self.run = self.freezePlaybooks(self.run, layout) + self.run = self.freezePlaybooks( + self.run, layout, semaphore_handler) if self._get('pre_run') is not None: - self.pre_run = self.freezePlaybooks(self.pre_run, layout) + self.pre_run = self.freezePlaybooks( + self.pre_run, layout, semaphore_handler) if self._get('post_run') is not None: - self.post_run = self.freezePlaybooks(self.post_run, layout) + self.post_run = self.freezePlaybooks( + self.post_run, layout, semaphore_handler) if self._get('cleanup_run') is not None: - self.cleanup_run = self.freezePlaybooks(self.cleanup_run, layout) + self.cleanup_run = self.freezePlaybooks( + self.cleanup_run, layout, semaphore_handler) def getNodeset(self, layout): if isinstance(self.nodeset, str): @@ -2993,7 +3015,7 @@ class Job(ConfigObject): setattr(job, k, v) return job - def freezePlaybooks(self, pblist, layout): + def freezePlaybooks(self, pblist, layout, semaphore_handler): """Take a list of playbooks, and return a copy of it updated with this job's roles. @@ -3004,10 +3026,11 @@ class Job(ConfigObject): pb = old_pb.copy() pb.roles = self.roles pb.freezeSecrets(layout) + pb.freezeSemaphores(layout, semaphore_handler) ret.append(pb) return tuple(ret) - def applyVariant(self, other, layout): + def applyVariant(self, other, layout, semaphore_handler): """Copy the attributes which have been set on the other job to this job.""" if not isinstance(other, Job): @@ -3113,16 +3136,20 @@ class Job(ConfigObject): self.post_review = True if other._get('run') is not None: - other_run = self.freezePlaybooks(other.run, layout) + other_run = self.freezePlaybooks( + other.run, layout, semaphore_handler) self.run = other_run if other._get('pre_run') is not None: - other_pre_run = self.freezePlaybooks(other.pre_run, layout) + other_pre_run = self.freezePlaybooks( + other.pre_run, layout, semaphore_handler) self.pre_run = self.pre_run + other_pre_run if other._get('post_run') is not None: - other_post_run = self.freezePlaybooks(other.post_run, layout) + other_post_run = self.freezePlaybooks( + other.post_run, layout, semaphore_handler) self.post_run = other_post_run + self.post_run if other._get('cleanup_run') is not None: - other_cleanup_run = self.freezePlaybooks(other.cleanup_run, layout) + other_cleanup_run = self.freezePlaybooks( + other.cleanup_run, layout, semaphore_handler) self.cleanup_run = other_cleanup_run + self.cleanup_run self.updateVariables(other.variables, other.extra_variables, other.host_variables, other.group_variables) @@ -3144,6 +3171,16 @@ class Job(ConfigObject): sorted(other.semaphores + self.semaphores, key=lambda x: x.name)) + pb_semaphores = set() + for pb in self.run + self.pre_run + self.post_run + self.cleanup_run: + pb_semaphores.update([x['name'] for x in pb.frozen_semaphores]) + common = (set([x.name for x in self.semaphores]) & + pb_semaphores) + if common: + raise Exception(f"Semaphores {common} specified as both " + "job and playbook semaphores but may only " + "be used for one") + for k in self.context_attributes: if (other._get(k) is not None and k not in set(['tags', 'requires', 'provides'])): @@ -6380,6 +6417,33 @@ class ResultEvent(AbstractEvent): pass +class SemaphoreReleaseEvent(ResultEvent): + """Enqueued after a semaphore has been released in order + to trigger a processing run. + + This is emitted when a playbook semaphore is released to instruct + the scheduler to emit fan-out PipelineSemaphoreReleaseEvents for + every potentially affected tenant-pipeline. + """ + + def __init__(self, semaphore_name): + self.semaphore_name = semaphore_name + + def toDict(self): + return { + "semaphore_name": self.semaphore_name, + } + + @classmethod + def fromDict(cls, data): + return cls(data.get("semaphore_name")) + + def __repr__(self): + return ( + f"<{self.__class__.__name__} semaphore_name={self.semaphore_name}>" + ) + + class BuildResultEvent(ResultEvent): """Base class for all build result events. @@ -7400,7 +7464,7 @@ class Layout(object): noop.description = 'A job that will always succeed, no operation.' noop.parent = noop.BASE_JOB_MARKER noop.deduplicate = False - noop.run = (PlaybookContext(None, 'noop.yaml', [], []),) + noop.run = (PlaybookContext(None, 'noop.yaml', [], [], []),) self.jobs = {'noop': [noop]} self.nodesets = {} self.secrets = {} @@ -7771,6 +7835,7 @@ class Layout(object): skip_file_matcher, redact_secrets_and_keys, debug_messages): log = item.annotateLogger(self.log) + semaphore_handler = item.pipeline.tenant.semaphore_handler job_list = ppc.job_list change = item.change pipeline = item.pipeline @@ -7808,9 +7873,9 @@ class Layout(object): for variant in variants: if frozen_job is None: frozen_job = variant.copy() - frozen_job.setBase(self) + frozen_job.setBase(self, semaphore_handler) else: - frozen_job.applyVariant(variant, self) + frozen_job.applyVariant(variant, self, semaphore_handler) frozen_job.name = variant.name frozen_job.name = jobname @@ -7830,7 +7895,7 @@ class Layout(object): matched = False for variant in job_list.jobs[jobname]: if variant.changeMatchesBranch(change): - frozen_job.applyVariant(variant, self) + frozen_job.applyVariant(variant, self, semaphore_handler) matched = True log.debug("Pipeline variant %s matched %s", repr(variant), change) diff --git a/zuul/scheduler.py b/zuul/scheduler.py index dc136586f7..7d248aab85 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -59,6 +59,7 @@ from zuul.model import ( Change, ChangeManagementEvent, PipelinePostConfigEvent, + SemaphoreReleaseEvent, PipelineSemaphoreReleaseEvent, DequeueEvent, EnqueueEvent, @@ -1924,6 +1925,27 @@ class Scheduler(threading.Thread): pipeline.manager.removeItem(item) return + def _doSemaphoreReleaseEvent(self, event, pipeline): + tenant = pipeline.tenant + semaphore = tenant.layout.getSemaphore( + self.abide, event.semaphore_name) + if semaphore.global_scope: + tenants = [t for t in self.abide.tenants.values() + if event.semaphore_name in t.global_semaphores] + else: + tenants = [tenant] + for tenant in tenants: + for pipeline_name in tenant.layout.pipelines.keys(): + if (tenant.name == pipeline.tenant.name and + pipeline_name == pipeline.name): + # This pipeline is already awake because it is + # where this event originated. + continue + event = PipelineSemaphoreReleaseEvent() + self.pipeline_management_events[ + tenant.name][pipeline_name].put( + event, needs_result=False) + def _areAllBuildsComplete(self): self.log.debug("Checking if all builds are complete") waiting = False @@ -2546,6 +2568,8 @@ class Scheduler(threading.Thread): self._doFilesChangesCompletedEvent(event, pipeline) elif isinstance(event, NodesProvisionedEvent): self._doNodesProvisionedEvent(event, pipeline) + elif isinstance(event, SemaphoreReleaseEvent): + self._doSemaphoreReleaseEvent(event, pipeline) else: self.log.error("Unable to handle event %s", event) @@ -2920,8 +2944,11 @@ class Scheduler(threading.Thread): buildset.addBuild(fakebuild) finally: # Release the semaphore in any case - tenant = buildset.item.pipeline.tenant - tenant.semaphore_handler.release(self, item, job) + pipeline = buildset.item.pipeline + tenant = pipeline.tenant + event_queue = self.pipeline_result_events[ + tenant.name][pipeline.name] + tenant.semaphore_handler.release(event_queue, item, job) def createZKContext(self, lock, log): return ZKContext(self.zk_client, lock, self.stop_event, log) diff --git a/zuul/zk/event_queues.py b/zuul/zk/event_queues.py index ad75297910..ebb33ec88d 100644 --- a/zuul/zk/event_queues.py +++ b/zuul/zk/event_queues.py @@ -40,6 +40,7 @@ RESULT_EVENT_TYPE_MAP = { "FilesChangesCompletedEvent": model.FilesChangesCompletedEvent, "MergeCompletedEvent": model.MergeCompletedEvent, "NodesProvisionedEvent": model.NodesProvisionedEvent, + "SemaphoreReleaseEvent": model.SemaphoreReleaseEvent, } MANAGEMENT_EVENT_TYPE_MAP = { diff --git a/zuul/zk/semaphore.py b/zuul/zk/semaphore.py index 61ff835389..386fcb2df5 100644 --- a/zuul/zk/semaphore.py +++ b/zuul/zk/semaphore.py @@ -20,7 +20,7 @@ from urllib.parse import quote_plus, unquote from kazoo.exceptions import BadVersionError, NoNodeError from zuul.lib.logutil import get_annotated_logger -from zuul.model import PipelineSemaphoreReleaseEvent +from zuul.model import SemaphoreReleaseEvent from zuul.zk import ZooKeeperSimpleBase @@ -73,29 +73,60 @@ class SemaphoreHandler(ZooKeeperSimpleBase): except Exception: self.log.exception("Unable to send semaphore stats:") + def getSemaphoreInfo(self, job_semaphore): + semaphore = self.layout.getSemaphore(self.abide, job_semaphore.name) + return { + 'name': job_semaphore.name, + 'path': self._makePath(semaphore), + 'resources_first': job_semaphore.resources_first, + 'max': 1 if semaphore is None else semaphore.max, + } + + def getSemaphoreHandle(self, item, job): + return { + "buildset_path": item.current_build_set.getPath(), + "job_name": job.name, + } + def acquire(self, item, job, request_resources): + # This is the typical method for acquiring semaphores. It + # runs on the scheduler and acquires all semaphores for a job. if self.read_only: raise RuntimeError("Read-only semaphore handler") if not job.semaphores: return True log = get_annotated_logger(self.log, item.event) + handle = self.getSemaphoreHandle(item, job) + infos = [self.getSemaphoreInfo(job_semaphore) + for job_semaphore in job.semaphores] + + return self.acquireFromInfo(log, infos, handle, request_resources) + + def acquireFromInfo(self, log, infos, handle, request_resources=False): + # This method is used by the executor to acquire a playbook + # semaphore; it is similar to the acquire method but the + # semaphore info is frozen (this operates without an abide). + if self.read_only: + raise RuntimeError("Read-only semaphore handler") + if not infos: + return True + all_acquired = True - for semaphore in job.semaphores: - if not self._acquire_one(log, item, job, request_resources, - semaphore): + for info in infos: + if not self._acquire_one(log, info, handle, request_resources): all_acquired = False break if not all_acquired: # Since we know we have less than all the required # semaphores, set quiet=True so we don't log an inability # to release them. - self.release(None, item, job, quiet=True) + self.releaseFromInfo(log, None, infos, handle, quiet=True) return False return True - def _acquire_one(self, log, item, job, request_resources, job_semaphore): - if job_semaphore.resources_first and request_resources: + def _acquire_one(self, log, info, handle, request_resources): + if info['resources_first'] and request_resources: # We're currently in the resource request phase and want to get the # resources before locking. So we don't need to do anything here. return True @@ -106,42 +137,86 @@ class SemaphoreHandler(ZooKeeperSimpleBase): # the resources phase. pass - semaphore = self.layout.getSemaphore(self.abide, job_semaphore.name) - semaphore_path = self._makePath(semaphore) - semaphore_handle = { - "buildset_path": item.current_build_set.getPath(), - "job_name": job.name, - } + self.kazoo_client.ensure_path(info['path']) + semaphore_holders, zstat = self.getHolders(info['path']) - self.kazoo_client.ensure_path(semaphore_path) - semaphore_holders, zstat = self.getHolders(semaphore_path) - - if semaphore_handle in semaphore_holders: + if handle in semaphore_holders: return True # semaphore is there, check max - while len(semaphore_holders) < self._max_count(semaphore.name): - semaphore_holders.append(semaphore_handle) + while len(semaphore_holders) < info['max']: + semaphore_holders.append(handle) try: - self.kazoo_client.set(semaphore_path, + self.kazoo_client.set(info['path'], holdersToData(semaphore_holders), version=zstat.version) except BadVersionError: log.debug( "Retrying semaphore %s acquire due to concurrent update", - semaphore.name) - semaphore_holders, zstat = self.getHolders(semaphore_path) + info['name']) + semaphore_holders, zstat = self.getHolders(info['path']) continue - log.info("Semaphore %s acquired: job %s, item %s", - semaphore.name, job.name, item) - - self._emitStats(semaphore_path, len(semaphore_holders)) + log.info("Semaphore %s acquired: handle %s", + info['name'], handle) + self._emitStats(info['path'], len(semaphore_holders)) return True return False + def release(self, event_queue, item, job, quiet=False): + if self.read_only: + raise RuntimeError("Read-only semaphore handler") + if not job.semaphores: + return + + log = get_annotated_logger(self.log, item.event) + + handle = self.getSemaphoreHandle(item, job) + infos = [self.getSemaphoreInfo(job_semaphore) + for job_semaphore in job.semaphores] + + return self.releaseFromInfo(log, event_queue, infos, handle, + quiet=False) + + def releaseFromInfo(self, log, event_queue, infos, handle, quiet=False): + for info in infos: + self._release_one(log, info, handle, quiet) + if event_queue: + # If a scheduler has been provided (which it is except + # in the case of a rollback from acquire in this + # class), broadcast an event to trigger pipeline runs. + event = SemaphoreReleaseEvent(info['name']) + event_queue.put(event) + + def _release_one(self, log, info, handle, quiet=False): + while True: + try: + semaphore_holders, zstat = self.getHolders(info['path']) + semaphore_holders.remove(handle) + except (ValueError, NoNodeError): + if not quiet: + log.error("Semaphore %s can not be released for %s " + "because the semaphore is not held", + info['path'], handle) + break + + try: + self.kazoo_client.set(info['path'], + holdersToData(semaphore_holders), + zstat.version) + except BadVersionError: + log.debug( + "Retrying semaphore %s release due to concurrent update", + info['path']) + continue + + log.info("Semaphore %s released for %s", + info['path'], handle) + self._emitStats(info['path'], len(semaphore_holders)) + break + def getHolders(self, semaphore_path): data, zstat = self.kazoo_client.get(semaphore_path) return holdersFromData(data), zstat @@ -155,72 +230,6 @@ class SemaphoreHandler(ZooKeeperSimpleBase): pass return ret - def _release(self, log, semaphore_path, semaphore_handle, quiet): - while True: - try: - semaphore_holders, zstat = self.getHolders(semaphore_path) - semaphore_holders.remove(semaphore_handle) - except (ValueError, NoNodeError): - if not quiet: - log.error("Semaphore %s can not be released for %s " - "because the semaphore is not held", - semaphore_path, semaphore_handle) - break - - try: - self.kazoo_client.set(semaphore_path, - holdersToData(semaphore_holders), - zstat.version) - except BadVersionError: - log.debug( - "Retrying semaphore %s release due to concurrent update", - semaphore_path) - continue - - log.info("Semaphore %s released for %s", - semaphore_path, semaphore_handle) - self._emitStats(semaphore_path, len(semaphore_holders)) - break - - def release(self, sched, item, job, quiet=False): - if self.read_only: - raise RuntimeError("Read-only semaphore handler") - if not job.semaphores: - return - - log = get_annotated_logger(self.log, item.event) - - for job_semaphore in job.semaphores: - self._release_one(log, item, job, job_semaphore, quiet) - - # If a scheduler has been provided (which it is except in the - # case of a rollback from acquire in this class), broadcast an - # event to trigger pipeline runs. - if sched is None: - return - - semaphore = self.layout.getSemaphore(self.abide, job_semaphore.name) - if semaphore.global_scope: - tenants = [t for t in self.abide.tenants.values() - if job_semaphore.name in t.global_semaphores] - else: - tenants = [self.abide.tenants[self.tenant_name]] - for tenant in tenants: - for pipeline_name in tenant.layout.pipelines.keys(): - event = PipelineSemaphoreReleaseEvent() - sched.pipeline_management_events[ - tenant.name][pipeline_name].put( - event, needs_result=False) - - def _release_one(self, log, item, job, job_semaphore, quiet): - semaphore = self.layout.getSemaphore(self.abide, job_semaphore.name) - semaphore_path = self._makePath(semaphore) - semaphore_handle = { - "buildset_path": item.current_build_set.getPath(), - "job_name": job.name, - } - self._release(log, semaphore_path, semaphore_handle, quiet) - def semaphoreHolders(self, semaphore_name): semaphore = self.layout.getSemaphore(self.abide, semaphore_name) semaphore_path = self._makePath(semaphore) @@ -230,10 +239,6 @@ class SemaphoreHandler(ZooKeeperSimpleBase): holders = [] return holders - def _max_count(self, semaphore_name): - semaphore = self.layout.getSemaphore(self.abide, semaphore_name) - return 1 if semaphore is None else semaphore.max - def cleanupLeaks(self): if self.read_only: raise RuntimeError("Read-only semaphore handler") @@ -246,7 +251,10 @@ class SemaphoreHandler(ZooKeeperSimpleBase): semaphore = self.layout.getSemaphore( self.abide, semaphore_name) - semaphore_path = self._makePath(semaphore) + info = { + 'name': semaphore.name, + 'path': self._makePath(semaphore), + } self.log.error("Releasing leaked semaphore %s held by %s", - semaphore_path, holder) - self._release(self.log, semaphore_path, holder, quiet=False) + info['path'], holder) + self._release_one(self.log, info, holder, quiet=False)