Add playbook semaphores

This adds the ability to specify that the Zuul executor should
acquire a semaphore before running an individual playbook.  This
is useful for long running jobs which need exclusive access to
a resources for only a small amount of time.

Change-Id: I90f5e0f570ef6c4b0986b0143318a78ddc27bbde
This commit is contained in:
James E. Blair 2022-10-29 18:05:56 -07:00
parent 1d8117097f
commit c355adf44e
19 changed files with 798 additions and 166 deletions

View File

@ -483,48 +483,164 @@ Here is an example of two job definitions:
.. attr:: pre-run .. attr:: pre-run
The name of a playbook or list of playbooks to run before the The name of a playbook or list of playbooks to run before the
main body of a job. The full path to the playbook in the repo main body of a job. Values are either a string describing the
where the job is defined is expected. full path to the playbook in the repo where the job is defined,
or a dictionary described below.
When a job inherits from a parent, the child's pre-run playbooks When a job inherits from a parent, the child's pre-run playbooks
are run after the parent's. See :ref:`job` for more are run after the parent's. See :ref:`job` for more
information. information.
If the value is a dictionary, the following attributes are
available:
.. attr:: name
The path to the playbook relative to the root of the repo.
.. attr:: semaphore
The name of a :ref:`semaphore` (or list of them) or
:ref:`global_semaphore` which should be acquired and released
when the playbook begins and ends. If the semaphore is at
maximum capacity, then Zuul will wait until it can be
acquired before starting the playbook. The format is either a
string, or a list of strings.
If multiple semaphores are requested, the playbook will not
start until all have been acquired, and Zuul will wait until
all are available before acquiring any. The time spent
waiting for pre-run playbook semaphores is counted against
the :attr:`job.timeout`.
None of the semaphores specified for a playbook may also be
specified in the same job.
.. attr:: post-run .. attr:: post-run
The name of a playbook or list of playbooks to run after the The name of a playbook or list of playbooks to run after the
main body of a job. The full path to the playbook in the repo main body of a job. Values are either a string describing the
where the job is defined is expected. full path to the playbook in the repo where the job is defined,
or a dictionary described below.
When a job inherits from a parent, the child's post-run When a job inherits from a parent, the child's post-run playbooks
playbooks are run before the parent's. See :ref:`job` for more are run before the parent's. See :ref:`job` for more
information. information.
If the value is a dictionary, the following attributes are
available:
.. attr:: name
The path to the playbook relative to the root of the repo.
.. attr:: semaphore
The name of a :ref:`semaphore` (or list of them) or
:ref:`global_semaphore` which should be acquired and released
when the playbook begins and ends. If the semaphore is at
maximum capacity, then Zuul will wait until it can be
acquired before starting the playbook. The format is either a
string, or a list of strings.
If multiple semaphores are requested, the playbook will not
start until all have been acquired, and Zuul will wait until
all are available before acquiring any. The time spent
waiting for post-run playbook semaphores is counted against
the :attr:`job.post-timeout`.
None of the semaphores specified for a playbook may also be
specified in the same job.
.. attr:: cleanup-run .. attr:: cleanup-run
The name of a playbook or list of playbooks to run after a job The name of a playbook or list of playbooks to run after job
execution. The full path to the playbook in the repo execution. Values are either a string describing the full path
where the job is defined is expected. to the playbook in the repo where the job is defined, or a
dictionary described below.
The cleanup phase is performed unconditionally of the job's result, The cleanup phase is performed regardless of the job's result,
even when the job is canceled. Cleanup results are not taken into even when the job is canceled. Cleanup results are not taken
account. into account when reporting the job result.
When a job inherits from a parent, the child's cleanup-run When a job inherits from a parent, the child's cleanup-run playbooks
playbooks are run before the parent's. See :ref:`job` for more are run before the parent's. See :ref:`job` for more
information. information.
There is a hard-coded five minute timeout for cleanup playbooks.
If the value is a dictionary, the following attributes are
available:
.. attr:: name
The path to the playbook relative to the root of the repo.
.. attr:: semaphore
The name of a :ref:`semaphore` (or list of them) or
:ref:`global_semaphore` which should be acquired and released
when the playbook begins and ends. If the semaphore is at
maximum capacity, then Zuul will wait until it can be
acquired before starting the playbook. The format is either a
string, or a list of strings.
If multiple semaphores are requested, the playbook will not
start until all have been acquired, and Zuul will wait until
all are available before acquiring any. The time spent
waiting for post-run playbook semaphores is counted against
the cleanup phase timeout.
None of the semaphores specified for a playbook may also be
specified in the same job.
.. attr:: run .. attr:: run
The name of a playbook or list of playbooks for this job. If it The name of a playbook or list of playbooks for this job. If it
is not supplied, the parent's playbook will be used (and likewise is not supplied, the parent's playbook will be used (and
up the inheritance chain). The full path within the repo is likewise up the inheritance chain). Values are either a string
required. Example: describing the full path to the playbook in the repo where the
job is defined, or a dictionary described below.
If the value is a dictionary, the following attributes are
available:
.. attr:: name
The path to the playbook relative to the root of the repo.
.. attr:: semaphore
The name of a :ref:`semaphore` (or list of them) or
:ref:`global_semaphore` which should be acquired and released
when the playbook begins and ends. If the semaphore is at
maximum capacity, then Zuul will wait until it can be
acquired before starting the playbook. The format is either a
string, or a list of strings.
If multiple semaphores are requested, the playbook will not
start until all have been acquired, and Zuul will wait until
all are available before acquiring any. The time spent
waiting for run playbook semaphores is counted against
the :attr:`job.timeout`.
None of the semaphores specified for a playbook may also be
specified in the same job.
Example:
.. code-block:: yaml .. code-block:: yaml
run: playbooks/job-playbook.yaml run: playbooks/job-playbook.yaml
Or:
.. code-block:: yaml
run:
- name: playbooks/job-playbook.yaml
semaphores: playbook-semaphore
.. attr:: ansible-version .. attr:: ansible-version
The ansible version to use for all playbooks of the job. This can be The ansible version to use for all playbooks of the job. This can be

View File

@ -0,0 +1,11 @@
---
features:
- |
Individual playbooks may now be wrapped by a semaphore. Zuul will
start the job and proceed up to the point of a playbook which
requires a semaphore and then wait until it is able to aquire the
semaphore before proceeding. It releases the semaphore after the
end of that individual playbook.
The same semaphore may be used for both jobs and playbooks, but an
individual job may not use the same semaphore for both purposes.

View File

@ -3257,6 +3257,7 @@ class FakeBuild(object):
class RecordingAnsibleJob(zuul.executor.server.AnsibleJob): class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
result = None result = None
semaphore_sleep_time = 5
def _execute(self): def _execute(self):
for _ in iterate_timeout(60, 'wait for merge'): for _ in iterate_timeout(60, 'wait for merge'):

View File

@ -0,0 +1,13 @@
- hosts: all
tasks:
- debug: var=waitpath
- file:
path: "{{zuul._test.test_root}}/builds/{{zuul.build}}.run_start.flag"
state: touch
# Do not finish until test creates the flag file
- wait_for:
state: present
path: "{{waitpath}}"
- file:
path: "{{zuul._test.test_root}}/builds/{{zuul.build}}.run_end.flag"
state: touch

View File

@ -0,0 +1,32 @@
- pipeline:
name: check
manager: independent
post-review: true
trigger:
gerrit:
- event: patchset-created
success:
gerrit:
Verified: 1
failure:
gerrit:
Verified: -1
- job:
name: base
parent: null
nodeset:
nodes:
- name: test_node
label: test_label
- semaphore:
name: test-semaphore
- job:
name: test-job
vars:
waitpath: '{{zuul._test.test_root}}/builds/{{zuul.build}}/test_wait'
run:
- name: playbooks/run.yaml
semaphores: test-semaphore

View File

@ -0,0 +1,5 @@
- project:
name: org/project
check:
jobs:
- test-job

View File

@ -0,0 +1 @@
test

View File

@ -0,0 +1,8 @@
- tenant:
name: tenant-one
source:
gerrit:
config-projects:
- common-config
untrusted-projects:
- org/project

View File

@ -53,8 +53,10 @@ class TestJob(BaseTestCase):
self.connection = Dummy(connection_name='dummy_connection') self.connection = Dummy(connection_name='dummy_connection')
self.source = Dummy(canonical_hostname='git.example.com', self.source = Dummy(canonical_hostname='git.example.com',
connection=self.connection) connection=self.connection)
self.abide = model.Abide()
self.tenant = model.Tenant('tenant') self.tenant = model.Tenant('tenant')
self.tenant.default_ansible_version = AnsibleManager().default_version self.tenant.default_ansible_version = AnsibleManager().default_version
self.tenant.semaphore_handler = Dummy(abide=self.abide)
self.layout = model.Layout(self.tenant) self.layout = model.Layout(self.tenant)
self.tenant.layout = self.layout self.tenant.layout = self.layout
self.project = model.Project('project', self.source) self.project = model.Project('project', self.source)
@ -129,16 +131,18 @@ class TestJob(BaseTestCase):
# This simulates freezing a job. # This simulates freezing a job.
secrets = ['foo'] secrets = ['foo']
py27_pre = model.PlaybookContext(self.context, 'py27-pre', [], secrets) py27_pre = model.PlaybookContext(
py27_run = model.PlaybookContext(self.context, 'py27-run', [], secrets) self.context, 'py27-pre', [], secrets, [])
py27_post = model.PlaybookContext(self.context, 'py27-post', [], py27_run = model.PlaybookContext(
secrets) self.context, 'py27-run', [], secrets, [])
py27_post = model.PlaybookContext(
self.context, 'py27-post', [], secrets, [])
py27 = model.Job('py27') py27 = model.Job('py27')
py27.timeout = 30 py27.timeout = 30
py27.pre_run = [py27_pre] py27.pre_run = (py27_pre,)
py27.run = [py27_run] py27.run = (py27_run,)
py27.post_run = [py27_post] py27.post_run = (py27_post,)
job = py27.copy() job = py27.copy()
self.assertEqual(30, job.timeout) self.assertEqual(30, job.timeout)
@ -146,7 +150,7 @@ class TestJob(BaseTestCase):
# Apply the diablo variant # Apply the diablo variant
diablo = model.Job('py27') diablo = model.Job('py27')
diablo.timeout = 40 diablo.timeout = 40
job.applyVariant(diablo, self.layout) job.applyVariant(diablo, self.layout, None)
self.assertEqual(40, job.timeout) self.assertEqual(40, job.timeout)
self.assertEqual(['py27-pre'], self.assertEqual(['py27-pre'],
@ -165,7 +169,7 @@ class TestJob(BaseTestCase):
good_final = model.Job('py27') good_final = model.Job('py27')
good_final.voting = False good_final.voting = False
job.applyVariant(good_final, self.layout) job.applyVariant(good_final, self.layout, None)
self.assertFalse(job.voting) self.assertFalse(job.voting)
bad_final = model.Job('py27') bad_final = model.Job('py27')
@ -173,7 +177,7 @@ class TestJob(BaseTestCase):
with testtools.ExpectedException( with testtools.ExpectedException(
Exception, Exception,
"Unable to modify final job"): "Unable to modify final job"):
job.applyVariant(bad_final, self.layout) job.applyVariant(bad_final, self.layout, None)
@mock.patch("zuul.model.zkobject.ZKObject._save") @mock.patch("zuul.model.zkobject.ZKObject._save")
def test_job_inheritance_job_tree(self, save_mock): def test_job_inheritance_job_tree(self, save_mock):

View File

@ -4325,6 +4325,245 @@ class TestCleanupPlaybooks(AnsibleZuulTestCase):
self.assertFalse(os.path.exists(post_end)) self.assertFalse(os.path.exists(post_end))
class TestPlaybookSemaphore(AnsibleZuulTestCase):
tenant_config_file = 'config/playbook-semaphore/main.yaml'
def test_playbook_semaphore(self):
self.executor_server.verbose = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
for _ in iterate_timeout(60, 'job started'):
if len(self.builds) == 1:
break
build1 = self.builds[0]
# Wait for the first job to be running the mutexed playbook
run1_start = os.path.join(self.jobdir_root, build1.uuid +
'.run_start.flag')
for _ in iterate_timeout(60, 'job1 running'):
if os.path.exists(run1_start):
break
# Start a second build which should wait for the playbook
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
# Wait until we are waiting for the playbook
for _ in iterate_timeout(60, 'job2 waiting for semaphore'):
found = False
if len(self.builds) == 2:
build2 = self.builds[1]
for job_worker in self.executor_server.job_workers.values():
if job_worker.build_request.uuid == build2.uuid:
if job_worker.waiting_for_semaphores:
found = True
if found:
break
# Wait for build1 to finish
with open(os.path.join(self.jobdir_root, build1.uuid, 'test_wait'),
"w") as of:
of.write("continue")
# Wait for the second job to be running the mutexed playbook
run2_start = os.path.join(self.jobdir_root, build2.uuid +
'.run_start.flag')
for _ in iterate_timeout(60, 'job2 running'):
if os.path.exists(run2_start):
break
# Release build2 and wait to finish
with open(os.path.join(self.jobdir_root, build2.uuid, 'test_wait'),
"w") as of:
of.write("continue")
self.waitUntilSettled()
self.assertHistory([
dict(name='test-job', result='SUCCESS', changes='1,1'),
dict(name='test-job', result='SUCCESS', changes='2,1'),
])
def test_playbook_and_job_semaphore_runtime(self):
# Test that a playbook does not specify the same semaphore as
# the job. Test via inheritance which is a runtime check.
in_repo_conf = textwrap.dedent(
"""
- job:
name: test-job2
parent: test-job
semaphore: test-semaphore
- project:
check:
jobs:
- test-job2
""")
file_dict = {'.zuul.yaml': in_repo_conf}
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
files=file_dict)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertHistory([])
self.assertEqual(A.reported, 1)
self.assertEqual(A.patchsets[0]['approvals'][0]['value'], "-1")
self.assertIn('both job and playbook', A.messages[0])
def test_playbook_and_job_semaphore_def(self):
# Test that a playbook does not specify the same semaphore as
# the job. Static configuration test.
in_repo_conf = textwrap.dedent(
"""
- job:
name: test-job2
semaphore: test-semaphore
run:
- name: playbooks/run.yaml
semaphores: test-semaphore
- project:
check:
jobs:
- test-job2
""")
file_dict = {'.zuul.yaml': in_repo_conf}
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
files=file_dict)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertHistory([])
self.assertEqual(A.reported, 1)
self.assertEqual(A.patchsets[0]['approvals'][0]['value'], "-1")
self.assertIn('both job and playbook', A.messages[0])
def test_playbook_semaphore_timeout(self):
self.wait_timeout = 300
self.executor_server.verbose = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
for _ in iterate_timeout(60, 'job started'):
if len(self.builds) == 1:
break
build1 = self.builds[0]
# Wait for the first job to be running the mutexed playbook
run1_start = os.path.join(self.jobdir_root, build1.uuid +
'.run_start.flag')
for _ in iterate_timeout(60, 'job1 running'):
if os.path.exists(run1_start):
break
# Start a second build which should wait for the playbook
in_repo_conf = textwrap.dedent(
"""
- project:
check:
jobs:
- test-job:
timeout: 20
""")
file_dict = {'.zuul.yaml': in_repo_conf}
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B',
files=file_dict)
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
# Wait until we are waiting for the playbook
for _ in iterate_timeout(60, 'job2 waiting for semaphore'):
found = False
if len(self.builds) == 2:
build2 = self.builds[1]
for job_worker in self.executor_server.job_workers.values():
if job_worker.build_request.uuid == build2.uuid:
if job_worker.waiting_for_semaphores:
found = True
if found:
break
# Wait for the second build to timeout waiting for the semaphore
for _ in iterate_timeout(60, 'build timed out'):
if len(self.builds) == 1:
break
# Wait for build1 to finish
with open(os.path.join(self.jobdir_root, build1.uuid, 'test_wait'),
"w") as of:
of.write("continue")
self.waitUntilSettled()
self.assertHistory([
dict(name='test-job', result='TIMED_OUT', changes='2,1'),
dict(name='test-job', result='SUCCESS', changes='1,1'),
])
def test_playbook_semaphore_abort(self):
self.wait_timeout = 300
self.executor_server.verbose = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
for _ in iterate_timeout(60, 'job started'):
if len(self.builds) == 1:
break
build1 = self.builds[0]
# Wait for the first job to be running the mutexed playbook
run1_start = os.path.join(self.jobdir_root, build1.uuid +
'.run_start.flag')
for _ in iterate_timeout(60, 'job1 running'):
if os.path.exists(run1_start):
break
# Start a second build which should wait for the playbook
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
# Wait until we are waiting for the playbook
for _ in iterate_timeout(60, 'job2 waiting for semaphore'):
found = False
if len(self.builds) == 2:
build2 = self.builds[1]
for job_worker in self.executor_server.job_workers.values():
if job_worker.build_request.uuid == build2.uuid:
if job_worker.waiting_for_semaphores:
found = True
if found:
break
in_repo_conf = textwrap.dedent(
"""
- project:
check:
jobs: []
""")
file_dict = {'.zuul.yaml': in_repo_conf}
B.addPatchset(files=file_dict)
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(2))
for _ in iterate_timeout(60, 'build aborted'):
if len(self.builds) == 1:
break
# Wait for build1 to finish
with open(os.path.join(self.jobdir_root, build1.uuid, 'test_wait'),
"w") as of:
of.write("continue")
self.waitUntilSettled()
self.assertHistory([
dict(name='test-job', result='ABORTED', changes='2,1'),
dict(name='test-job', result='SUCCESS', changes='1,1'),
])
class TestBrokenTrustedConfig(ZuulTestCase): class TestBrokenTrustedConfig(ZuulTestCase):
# Test we can deal with a broken config only with trusted projects. This # Test we can deal with a broken config only with trusted projects. This
# is different then TestBrokenConfig, as it does not have a missing # is different then TestBrokenConfig, as it does not have a missing

View File

@ -441,6 +441,7 @@ class TestWeb(BaseTestWeb):
'type': 'zuul' 'type': 'zuul'
}], }],
'secrets': [], 'secrets': [],
'semaphores': [],
'source_context': source_ctx, 'source_context': source_ctx,
}] }]
@ -611,6 +612,7 @@ class TestWeb(BaseTestWeb):
'type': 'zuul' 'type': 'zuul'
}], }],
'secrets': [], 'secrets': [],
'semaphores': [],
'source_context': { 'source_context': {
'branch': 'master', 'branch': 'master',
'path': 'zuul.yaml', 'path': 'zuul.yaml',
@ -1188,6 +1190,7 @@ class TestWeb(BaseTestWeb):
'project': 'common-config', 'project': 'common-config',
}], }],
'secrets': {}, 'secrets': {},
'semaphores': [],
'path': 'playbooks/project-test1.yaml', 'path': 'playbooks/project-test1.yaml',
}], }],
'pre_playbooks': [], 'pre_playbooks': [],

View File

@ -633,6 +633,11 @@ class JobParser(object):
semaphore = {vs.Required('name'): str, semaphore = {vs.Required('name'): str,
'resources-first': bool} 'resources-first': bool}
complex_playbook_def = {vs.Required('name'): str,
'semaphores': to_list(str)}
playbook_def = to_list(vs.Any(str, complex_playbook_def))
# Attributes of a job that can also be used in Project and ProjectTemplate # Attributes of a job that can also be used in Project and ProjectTemplate
job_attributes = {'parent': vs.Any(str, None), job_attributes = {'parent': vs.Any(str, None),
'final': bool, 'final': bool,
@ -661,10 +666,10 @@ class JobParser(object):
'timeout': int, 'timeout': int,
'post-timeout': int, 'post-timeout': int,
'attempts': int, 'attempts': int,
'pre-run': to_list(str), 'pre-run': playbook_def,
'post-run': to_list(str), 'post-run': playbook_def,
'run': to_list(str), 'run': playbook_def,
'cleanup-run': to_list(str), 'cleanup-run': playbook_def,
'ansible-version': vs.Any(str, float, int), 'ansible-version': vs.Any(str, float, int),
'_source_context': model.SourceContext, '_source_context': model.SourceContext,
'_start_mark': model.ZuulMark, '_start_mark': model.ZuulMark,
@ -827,29 +832,57 @@ class JobParser(object):
roles.insert(0, r) roles.insert(0, r)
job.addRoles(roles) job.addRoles(roles)
for pre_run_name in as_list(conf.get('pre-run')): seen_playbook_semaphores = set()
def get_playbook_attrs(playbook_defs):
# Helper method to extract information from a playbook
# defenition.
for pb_def in playbook_defs:
pb_semaphores = []
if isinstance(pb_def, dict):
pb_name = pb_def['name']
for pb_sem_name in as_list(pb_def.get('semaphores')):
pb_semaphores.append(model.JobSemaphore(pb_sem_name))
seen_playbook_semaphores.add(pb_sem_name)
else:
# The playbook definition is a simple string path
pb_name = pb_def
# Sort the list of semaphores to avoid issues with
# contention (where two jobs try to start at the same time
# and fail due to acquiring the same semaphores but in
# reverse order.
pb_semaphores = tuple(sorted(pb_semaphores,
key=lambda x: x.name))
yield (pb_name, pb_semaphores)
for pre_run_name, pre_run_semaphores in get_playbook_attrs(
as_list(conf.get('pre-run'))):
pre_run = model.PlaybookContext(job.source_context, pre_run = model.PlaybookContext(job.source_context,
pre_run_name, job.roles, pre_run_name, job.roles,
secrets) secrets, pre_run_semaphores)
job.pre_run = job.pre_run + (pre_run,) job.pre_run = job.pre_run + (pre_run,)
# NOTE(pabelanger): Reverse the order of our post-run list. We prepend # NOTE(pabelanger): Reverse the order of our post-run list. We prepend
# post-runs for inherits however, we want to execute post-runs in the # post-runs for inherits however, we want to execute post-runs in the
# order they are listed within the job. # order they are listed within the job.
for post_run_name in reversed(as_list(conf.get('post-run'))): for post_run_name, post_run_semaphores in get_playbook_attrs(
reversed(as_list(conf.get('post-run')))):
post_run = model.PlaybookContext(job.source_context, post_run = model.PlaybookContext(job.source_context,
post_run_name, job.roles, post_run_name, job.roles,
secrets) secrets, post_run_semaphores)
job.post_run = (post_run,) + job.post_run job.post_run = (post_run,) + job.post_run
for cleanup_run_name in reversed(as_list(conf.get('cleanup-run'))): for cleanup_run_name, cleanup_run_semaphores in get_playbook_attrs(
cleanup_run = model.PlaybookContext(job.source_context, reversed(as_list(conf.get('cleanup-run')))):
cleanup_run = model.PlaybookContext(
job.source_context,
cleanup_run_name, job.roles, cleanup_run_name, job.roles,
secrets) secrets, cleanup_run_semaphores)
job.cleanup_run = (cleanup_run,) + job.cleanup_run job.cleanup_run = (cleanup_run,) + job.cleanup_run
if 'run' in conf: if 'run' in conf:
for run_name in as_list(conf.get('run')): for run_name, run_semaphores in get_playbook_attrs(
as_list(conf.get('run'))):
run = model.PlaybookContext(job.source_context, run_name, run = model.PlaybookContext(job.source_context, run_name,
job.roles, secrets) job.roles, secrets, run_semaphores)
job.run = job.run + (run,) job.run = job.run + (run,)
if conf.get('intermediate', False) and not conf.get('abstract', False): if conf.get('intermediate', False) and not conf.get('abstract', False):
@ -926,6 +959,7 @@ class JobParser(object):
job_semaphores.append(model.JobSemaphore( job_semaphores.append(model.JobSemaphore(
semaphore.get('name'), semaphore.get('name'),
semaphore.get('resources-first', False))) semaphore.get('resources-first', False)))
if job_semaphores: if job_semaphores:
# Sort the list of semaphores to avoid issues with # Sort the list of semaphores to avoid issues with
# contention (where two jobs try to start at the same time # contention (where two jobs try to start at the same time
@ -933,6 +967,12 @@ class JobParser(object):
# reverse order. # reverse order.
job.semaphores = tuple(sorted(job_semaphores, job.semaphores = tuple(sorted(job_semaphores,
key=lambda x: x.name)) key=lambda x: x.name))
common = (set([x.name for x in job_semaphores]) &
seen_playbook_semaphores)
if common:
raise Exception(f"Semaphores {common} specified as both "
"job and playbook semaphores but may only "
"be used for one")
for k in ('tags', 'requires', 'provides'): for k in ('tags', 'requires', 'provides'):
v = frozenset(as_list(conf.get(k))) v = frozenset(as_list(conf.get(k)))

View File

@ -67,6 +67,10 @@ class ExecutorClient(object):
# TODO: deprecate and remove this variable? # TODO: deprecate and remove this variable?
params["zuul"]["_inheritance_path"] = list(job.inheritance_path) params["zuul"]["_inheritance_path"] = list(job.inheritance_path)
semaphore_handler = item.pipeline.tenant.semaphore_handler
params['semaphore_handle'] = semaphore_handler.getSemaphoreHandle(
item, job)
parent_span = tracing.restoreSpan(item.current_build_set.span_info) parent_span = tracing.restoreSpan(item.current_build_set.span_info)
execute_time = time.time() execute_time = time.time()
with trace.use_span(parent_span): with trace.use_span(parent_span):

View File

@ -82,6 +82,8 @@ from zuul.zk.executor import ExecutorApi
from zuul.zk.job_request_queue import JobRequestEvent from zuul.zk.job_request_queue import JobRequestEvent
from zuul.zk.system import ZuulSystem from zuul.zk.system import ZuulSystem
from zuul.zk.zkobject import ZKContext from zuul.zk.zkobject import ZKContext
from zuul.zk.semaphore import SemaphoreHandler
BUFFER_LINES_FOR_SYNTAX = 200 BUFFER_LINES_FOR_SYNTAX = 200
DEFAULT_FINGER_PORT = 7900 DEFAULT_FINGER_PORT = 7900
@ -497,6 +499,7 @@ class JobDirPlaybook(object):
self.secrets = os.path.join(self.secrets_root, 'all.yaml') self.secrets = os.path.join(self.secrets_root, 'all.yaml')
self.secrets_content = None self.secrets_content = None
self.secrets_keys = set() self.secrets_keys = set()
self.semaphores = []
def addRole(self): def addRole(self):
count = len(self.roles) count = len(self.roles)
@ -963,6 +966,8 @@ class AnsibleJob(object):
RESULT_DISK_FULL: 'RESULT_DISK_FULL', RESULT_DISK_FULL: 'RESULT_DISK_FULL',
} }
semaphore_sleep_time = 30
def __init__(self, executor_server, build_request, arguments): def __init__(self, executor_server, build_request, arguments):
logger = logging.getLogger("zuul.AnsibleJob") logger = logging.getLogger("zuul.AnsibleJob")
self.arguments = arguments self.arguments = arguments
@ -1042,6 +1047,7 @@ class AnsibleJob(object):
self.frozen_hostvars = {} self.frozen_hostvars = {}
# The zuul.* vars # The zuul.* vars
self.zuul_vars = {} self.zuul_vars = {}
self.waiting_for_semaphores = False
def run(self): def run(self):
self.running = True self.running = True
@ -2034,6 +2040,7 @@ class AnsibleJob(object):
jobdir_playbook.project_canonical_name = project.canonical_name jobdir_playbook.project_canonical_name = project.canonical_name
jobdir_playbook.canonical_name_and_path = os.path.join( jobdir_playbook.canonical_name_and_path = os.path.join(
project.canonical_name, playbook['path']) project.canonical_name, playbook['path'])
jobdir_playbook.semaphores = playbook['semaphores']
path = None path = None
if not jobdir_playbook.trusted: if not jobdir_playbook.trusted:
@ -3086,10 +3093,45 @@ class AnsibleJob(object):
self.emitPlaybookBanner(playbook, 'START', phase) self.emitPlaybookBanner(playbook, 'START', phase)
result, code = self.runAnsible(cmd, timeout, playbook, ansible_version, semaphore_handle = self.arguments['semaphore_handle']
semaphore_wait_start = time.time()
acquired_semaphores = True
while not self.executor_server.semaphore_handler.acquireFromInfo(
self.log, playbook.semaphores, semaphore_handle):
self.log.debug("Unable to acquire playbook semaphores, waiting")
if not self.waiting_for_semaphores:
self.waiting_for_semaphores = True
remain = self.getAnsibleTimeout(semaphore_wait_start, timeout)
if remain is not None and remain <= 0:
self.log.info("Timed out waiting for semaphore")
acquired_semaphores = False
result = self.RESULT_TIMED_OUT
code = 0
break
if self.aborted:
acquired_semaphores = False
result = self.RESULT_ABORTED
code = 0
break
time.sleep(self.semaphore_sleep_time)
if self.waiting_for_semaphores:
self.waiting_for_semaphores = False
timeout = self.getAnsibleTimeout(semaphore_wait_start, timeout)
if acquired_semaphores:
result, code = self.runAnsible(
cmd, timeout, playbook, ansible_version,
cleanup=phase == 'cleanup') cleanup=phase == 'cleanup')
self.log.debug("Ansible complete, result %s code %s" % ( self.log.debug("Ansible complete, result %s code %s" % (
self.RESULT_MAP[result], code)) self.RESULT_MAP[result], code))
if acquired_semaphores:
event_queue = self.executor_server.result_events[
self.build_request.tenant_name][
self.build_request.pipeline_name]
self.executor_server.semaphore_handler.releaseFromInfo(
self.log, event_queue, playbook.semaphores, semaphore_handle)
if self.executor_server.statsd: if self.executor_server.statsd:
base_key = "zuul.executor.{hostname}.phase.{phase}" base_key = "zuul.executor.{hostname}.phase.{phase}"
self.executor_server.statsd.incr( self.executor_server.statsd.incr(
@ -3308,6 +3350,9 @@ class ExecutorServer(BaseMergeServer):
# Used to offload expensive operations to different processes # Used to offload expensive operations to different processes
self.process_worker = None self.process_worker = None
self.semaphore_handler = SemaphoreHandler(
self.zk_client, self.statsd, None, None, None)
def _get_key_store_password(self): def _get_key_store_password(self):
try: try:
return self.config["keystore"]["password"] return self.config["keystore"]["password"]

View File

@ -959,7 +959,10 @@ class PipelineManager(metaclass=ABCMeta):
# current item so a potentially aquired semaphore must be # current item so a potentially aquired semaphore must be
# released as it won't be released on dequeue of the item. # released as it won't be released on dequeue of the item.
tenant = item.pipeline.tenant tenant = item.pipeline.tenant
tenant.semaphore_handler.release(self.sched, item, job) pipeline = build_set.item.pipeline
event_queue = self.sched.pipeline_result_events[
tenant.name][pipeline.name]
tenant.semaphore_handler.release(event_queue, item, job)
except Exception: except Exception:
log.exception("Exception while releasing semaphore") log.exception("Exception while releasing semaphore")
@ -1796,8 +1799,11 @@ class PipelineManager(metaclass=ABCMeta):
item = build.build_set.item item = build.build_set.item
log.debug("Build %s of %s completed" % (build, item.change)) log.debug("Build %s of %s completed" % (build, item.change))
event_queue = self.sched.pipeline_result_events[
item.pipeline.tenant.name][item.pipeline.name]
item.pipeline.tenant.semaphore_handler.release( item.pipeline.tenant.semaphore_handler.release(
self.sched, item, build.job) event_queue, item, build.job)
if item.getJob(build.job.name) is None: if item.getJob(build.job.name) is None:
log.info("Build %s no longer in job graph for item %s", log.info("Build %s no longer in job graph for item %s",
@ -1970,9 +1976,12 @@ class PipelineManager(metaclass=ABCMeta):
build_set.item.setNodeRequestFailure( build_set.item.setNodeRequestFailure(
job, f'Node request {request.id} failed') job, f'Node request {request.id} failed')
self._resumeBuilds(build_set) self._resumeBuilds(build_set)
tenant = build_set.item.pipeline.tenant pipeline = build_set.item.pipeline
tenant = pipeline.tenant
event_queue = self.sched.pipeline_result_events[
tenant.name][pipeline.name]
tenant.semaphore_handler.release( tenant.semaphore_handler.release(
self.sched, build_set.item, job) event_queue, build_set.item, job)
log.info("Completed node request %s for job %s of item %s " log.info("Completed node request %s for job %s of item %s "
"with nodes %s", "with nodes %s",

View File

@ -1890,7 +1890,8 @@ class PlaybookContext(ConfigObject):
""" """
def __init__(self, source_context, path, roles, secrets): def __init__(self, source_context, path, roles, secrets,
semaphores):
super(PlaybookContext, self).__init__() super(PlaybookContext, self).__init__()
self.source_context = source_context self.source_context = source_context
self.path = path self.path = path
@ -1901,6 +1902,10 @@ class PlaybookContext(ConfigObject):
# FrozenSecret objects which contain only the info the # FrozenSecret objects which contain only the info the
# executor needs # executor needs
self.frozen_secrets = () self.frozen_secrets = ()
# The original JobSemaphore objects
self.semaphores = semaphores
# the result of getSemaphoreInfo from semaphore handler
self.frozen_semaphores = ()
def __repr__(self): def __repr__(self):
return '<PlaybookContext %s %s>' % (self.source_context, return '<PlaybookContext %s %s>' % (self.source_context,
@ -1915,13 +1920,15 @@ class PlaybookContext(ConfigObject):
return (self.source_context == other.source_context and return (self.source_context == other.source_context and
self.path == other.path and self.path == other.path and
self.roles == other.roles and self.roles == other.roles and
self.secrets == other.secrets) self.secrets == other.secrets and
self.semaphores == other.semaphores)
def copy(self): def copy(self):
r = PlaybookContext(self.source_context, r = PlaybookContext(self.source_context,
self.path, self.path,
self.roles, self.roles,
self.secrets) self.secrets,
self.semaphores)
return r return r
def validateReferences(self, layout): def validateReferences(self, layout):
@ -1944,6 +1951,15 @@ class PlaybookContext(ConfigObject):
self.source_context.project_canonical_name)[1] self.source_context.project_canonical_name)[1]
# Decrypt a copy of the secret to verify it can be done # Decrypt a copy of the secret to verify it can be done
secret.decrypt(project.private_secrets_key) secret.decrypt(project.private_secrets_key)
# TODO: if we remove the implicit max=1 semaphore, validate
# references here.
def freezeSemaphores(self, layout, semaphore_handler):
semaphores = []
for job_semaphore in self.semaphores:
info = semaphore_handler.getSemaphoreInfo(job_semaphore)
semaphores.append(info)
self.frozen_semaphores = tuple(semaphores)
def freezeSecrets(self, layout): def freezeSecrets(self, layout):
secrets = [] secrets = []
@ -1981,6 +1997,7 @@ class PlaybookContext(ConfigObject):
trusted=self.source_context.trusted, trusted=self.source_context.trusted,
roles=[r.toDict() for r in self.roles], roles=[r.toDict() for r in self.roles],
secrets=secrets, secrets=secrets,
semaphores=self.frozen_semaphores,
path=self.path) path=self.path)
def toSchemaDict(self): def toSchemaDict(self):
@ -1990,6 +2007,7 @@ class PlaybookContext(ConfigObject):
'roles': list(map(lambda x: x.toDict(), self.roles)), 'roles': list(map(lambda x: x.toDict(), self.roles)),
'secrets': [{'name': secret.name, 'alias': secret.alias} 'secrets': [{'name': secret.name, 'alias': secret.alias}
for secret in self.secrets], for secret in self.secrets],
'semaphores': [{'name': sem.name} for sem in self.semaphores],
} }
if self.source_context: if self.source_context:
d['source_context'] = self.source_context.toDict() d['source_context'] = self.source_context.toDict()
@ -2847,16 +2865,20 @@ class Job(ConfigObject):
def _get(self, name): def _get(self, name):
return self.__dict__.get(name) return self.__dict__.get(name)
def setBase(self, layout): def setBase(self, layout, semaphore_handler):
self.inheritance_path = self.inheritance_path + (repr(self),) self.inheritance_path = self.inheritance_path + (repr(self),)
if self._get('run') is not None: if self._get('run') is not None:
self.run = self.freezePlaybooks(self.run, layout) self.run = self.freezePlaybooks(
self.run, layout, semaphore_handler)
if self._get('pre_run') is not None: if self._get('pre_run') is not None:
self.pre_run = self.freezePlaybooks(self.pre_run, layout) self.pre_run = self.freezePlaybooks(
self.pre_run, layout, semaphore_handler)
if self._get('post_run') is not None: if self._get('post_run') is not None:
self.post_run = self.freezePlaybooks(self.post_run, layout) self.post_run = self.freezePlaybooks(
self.post_run, layout, semaphore_handler)
if self._get('cleanup_run') is not None: if self._get('cleanup_run') is not None:
self.cleanup_run = self.freezePlaybooks(self.cleanup_run, layout) self.cleanup_run = self.freezePlaybooks(
self.cleanup_run, layout, semaphore_handler)
def getNodeset(self, layout): def getNodeset(self, layout):
if isinstance(self.nodeset, str): if isinstance(self.nodeset, str):
@ -2993,7 +3015,7 @@ class Job(ConfigObject):
setattr(job, k, v) setattr(job, k, v)
return job return job
def freezePlaybooks(self, pblist, layout): def freezePlaybooks(self, pblist, layout, semaphore_handler):
"""Take a list of playbooks, and return a copy of it updated with this """Take a list of playbooks, and return a copy of it updated with this
job's roles. job's roles.
@ -3004,10 +3026,11 @@ class Job(ConfigObject):
pb = old_pb.copy() pb = old_pb.copy()
pb.roles = self.roles pb.roles = self.roles
pb.freezeSecrets(layout) pb.freezeSecrets(layout)
pb.freezeSemaphores(layout, semaphore_handler)
ret.append(pb) ret.append(pb)
return tuple(ret) return tuple(ret)
def applyVariant(self, other, layout): def applyVariant(self, other, layout, semaphore_handler):
"""Copy the attributes which have been set on the other job to this """Copy the attributes which have been set on the other job to this
job.""" job."""
if not isinstance(other, Job): if not isinstance(other, Job):
@ -3113,16 +3136,20 @@ class Job(ConfigObject):
self.post_review = True self.post_review = True
if other._get('run') is not None: if other._get('run') is not None:
other_run = self.freezePlaybooks(other.run, layout) other_run = self.freezePlaybooks(
other.run, layout, semaphore_handler)
self.run = other_run self.run = other_run
if other._get('pre_run') is not None: if other._get('pre_run') is not None:
other_pre_run = self.freezePlaybooks(other.pre_run, layout) other_pre_run = self.freezePlaybooks(
other.pre_run, layout, semaphore_handler)
self.pre_run = self.pre_run + other_pre_run self.pre_run = self.pre_run + other_pre_run
if other._get('post_run') is not None: if other._get('post_run') is not None:
other_post_run = self.freezePlaybooks(other.post_run, layout) other_post_run = self.freezePlaybooks(
other.post_run, layout, semaphore_handler)
self.post_run = other_post_run + self.post_run self.post_run = other_post_run + self.post_run
if other._get('cleanup_run') is not None: if other._get('cleanup_run') is not None:
other_cleanup_run = self.freezePlaybooks(other.cleanup_run, layout) other_cleanup_run = self.freezePlaybooks(
other.cleanup_run, layout, semaphore_handler)
self.cleanup_run = other_cleanup_run + self.cleanup_run self.cleanup_run = other_cleanup_run + self.cleanup_run
self.updateVariables(other.variables, other.extra_variables, self.updateVariables(other.variables, other.extra_variables,
other.host_variables, other.group_variables) other.host_variables, other.group_variables)
@ -3144,6 +3171,16 @@ class Job(ConfigObject):
sorted(other.semaphores + self.semaphores, sorted(other.semaphores + self.semaphores,
key=lambda x: x.name)) key=lambda x: x.name))
pb_semaphores = set()
for pb in self.run + self.pre_run + self.post_run + self.cleanup_run:
pb_semaphores.update([x['name'] for x in pb.frozen_semaphores])
common = (set([x.name for x in self.semaphores]) &
pb_semaphores)
if common:
raise Exception(f"Semaphores {common} specified as both "
"job and playbook semaphores but may only "
"be used for one")
for k in self.context_attributes: for k in self.context_attributes:
if (other._get(k) is not None and if (other._get(k) is not None and
k not in set(['tags', 'requires', 'provides'])): k not in set(['tags', 'requires', 'provides'])):
@ -6380,6 +6417,33 @@ class ResultEvent(AbstractEvent):
pass pass
class SemaphoreReleaseEvent(ResultEvent):
"""Enqueued after a semaphore has been released in order
to trigger a processing run.
This is emitted when a playbook semaphore is released to instruct
the scheduler to emit fan-out PipelineSemaphoreReleaseEvents for
every potentially affected tenant-pipeline.
"""
def __init__(self, semaphore_name):
self.semaphore_name = semaphore_name
def toDict(self):
return {
"semaphore_name": self.semaphore_name,
}
@classmethod
def fromDict(cls, data):
return cls(data.get("semaphore_name"))
def __repr__(self):
return (
f"<{self.__class__.__name__} semaphore_name={self.semaphore_name}>"
)
class BuildResultEvent(ResultEvent): class BuildResultEvent(ResultEvent):
"""Base class for all build result events. """Base class for all build result events.
@ -7400,7 +7464,7 @@ class Layout(object):
noop.description = 'A job that will always succeed, no operation.' noop.description = 'A job that will always succeed, no operation.'
noop.parent = noop.BASE_JOB_MARKER noop.parent = noop.BASE_JOB_MARKER
noop.deduplicate = False noop.deduplicate = False
noop.run = (PlaybookContext(None, 'noop.yaml', [], []),) noop.run = (PlaybookContext(None, 'noop.yaml', [], [], []),)
self.jobs = {'noop': [noop]} self.jobs = {'noop': [noop]}
self.nodesets = {} self.nodesets = {}
self.secrets = {} self.secrets = {}
@ -7771,6 +7835,7 @@ class Layout(object):
skip_file_matcher, redact_secrets_and_keys, skip_file_matcher, redact_secrets_and_keys,
debug_messages): debug_messages):
log = item.annotateLogger(self.log) log = item.annotateLogger(self.log)
semaphore_handler = item.pipeline.tenant.semaphore_handler
job_list = ppc.job_list job_list = ppc.job_list
change = item.change change = item.change
pipeline = item.pipeline pipeline = item.pipeline
@ -7808,9 +7873,9 @@ class Layout(object):
for variant in variants: for variant in variants:
if frozen_job is None: if frozen_job is None:
frozen_job = variant.copy() frozen_job = variant.copy()
frozen_job.setBase(self) frozen_job.setBase(self, semaphore_handler)
else: else:
frozen_job.applyVariant(variant, self) frozen_job.applyVariant(variant, self, semaphore_handler)
frozen_job.name = variant.name frozen_job.name = variant.name
frozen_job.name = jobname frozen_job.name = jobname
@ -7830,7 +7895,7 @@ class Layout(object):
matched = False matched = False
for variant in job_list.jobs[jobname]: for variant in job_list.jobs[jobname]:
if variant.changeMatchesBranch(change): if variant.changeMatchesBranch(change):
frozen_job.applyVariant(variant, self) frozen_job.applyVariant(variant, self, semaphore_handler)
matched = True matched = True
log.debug("Pipeline variant %s matched %s", log.debug("Pipeline variant %s matched %s",
repr(variant), change) repr(variant), change)

View File

@ -59,6 +59,7 @@ from zuul.model import (
Change, Change,
ChangeManagementEvent, ChangeManagementEvent,
PipelinePostConfigEvent, PipelinePostConfigEvent,
SemaphoreReleaseEvent,
PipelineSemaphoreReleaseEvent, PipelineSemaphoreReleaseEvent,
DequeueEvent, DequeueEvent,
EnqueueEvent, EnqueueEvent,
@ -1924,6 +1925,27 @@ class Scheduler(threading.Thread):
pipeline.manager.removeItem(item) pipeline.manager.removeItem(item)
return return
def _doSemaphoreReleaseEvent(self, event, pipeline):
tenant = pipeline.tenant
semaphore = tenant.layout.getSemaphore(
self.abide, event.semaphore_name)
if semaphore.global_scope:
tenants = [t for t in self.abide.tenants.values()
if event.semaphore_name in t.global_semaphores]
else:
tenants = [tenant]
for tenant in tenants:
for pipeline_name in tenant.layout.pipelines.keys():
if (tenant.name == pipeline.tenant.name and
pipeline_name == pipeline.name):
# This pipeline is already awake because it is
# where this event originated.
continue
event = PipelineSemaphoreReleaseEvent()
self.pipeline_management_events[
tenant.name][pipeline_name].put(
event, needs_result=False)
def _areAllBuildsComplete(self): def _areAllBuildsComplete(self):
self.log.debug("Checking if all builds are complete") self.log.debug("Checking if all builds are complete")
waiting = False waiting = False
@ -2546,6 +2568,8 @@ class Scheduler(threading.Thread):
self._doFilesChangesCompletedEvent(event, pipeline) self._doFilesChangesCompletedEvent(event, pipeline)
elif isinstance(event, NodesProvisionedEvent): elif isinstance(event, NodesProvisionedEvent):
self._doNodesProvisionedEvent(event, pipeline) self._doNodesProvisionedEvent(event, pipeline)
elif isinstance(event, SemaphoreReleaseEvent):
self._doSemaphoreReleaseEvent(event, pipeline)
else: else:
self.log.error("Unable to handle event %s", event) self.log.error("Unable to handle event %s", event)
@ -2920,8 +2944,11 @@ class Scheduler(threading.Thread):
buildset.addBuild(fakebuild) buildset.addBuild(fakebuild)
finally: finally:
# Release the semaphore in any case # Release the semaphore in any case
tenant = buildset.item.pipeline.tenant pipeline = buildset.item.pipeline
tenant.semaphore_handler.release(self, item, job) tenant = pipeline.tenant
event_queue = self.pipeline_result_events[
tenant.name][pipeline.name]
tenant.semaphore_handler.release(event_queue, item, job)
def createZKContext(self, lock, log): def createZKContext(self, lock, log):
return ZKContext(self.zk_client, lock, self.stop_event, log) return ZKContext(self.zk_client, lock, self.stop_event, log)

View File

@ -40,6 +40,7 @@ RESULT_EVENT_TYPE_MAP = {
"FilesChangesCompletedEvent": model.FilesChangesCompletedEvent, "FilesChangesCompletedEvent": model.FilesChangesCompletedEvent,
"MergeCompletedEvent": model.MergeCompletedEvent, "MergeCompletedEvent": model.MergeCompletedEvent,
"NodesProvisionedEvent": model.NodesProvisionedEvent, "NodesProvisionedEvent": model.NodesProvisionedEvent,
"SemaphoreReleaseEvent": model.SemaphoreReleaseEvent,
} }
MANAGEMENT_EVENT_TYPE_MAP = { MANAGEMENT_EVENT_TYPE_MAP = {

View File

@ -20,7 +20,7 @@ from urllib.parse import quote_plus, unquote
from kazoo.exceptions import BadVersionError, NoNodeError from kazoo.exceptions import BadVersionError, NoNodeError
from zuul.lib.logutil import get_annotated_logger from zuul.lib.logutil import get_annotated_logger
from zuul.model import PipelineSemaphoreReleaseEvent from zuul.model import SemaphoreReleaseEvent
from zuul.zk import ZooKeeperSimpleBase from zuul.zk import ZooKeeperSimpleBase
@ -73,29 +73,60 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
except Exception: except Exception:
self.log.exception("Unable to send semaphore stats:") self.log.exception("Unable to send semaphore stats:")
def getSemaphoreInfo(self, job_semaphore):
semaphore = self.layout.getSemaphore(self.abide, job_semaphore.name)
return {
'name': job_semaphore.name,
'path': self._makePath(semaphore),
'resources_first': job_semaphore.resources_first,
'max': 1 if semaphore is None else semaphore.max,
}
def getSemaphoreHandle(self, item, job):
return {
"buildset_path": item.current_build_set.getPath(),
"job_name": job.name,
}
def acquire(self, item, job, request_resources): def acquire(self, item, job, request_resources):
# This is the typical method for acquiring semaphores. It
# runs on the scheduler and acquires all semaphores for a job.
if self.read_only: if self.read_only:
raise RuntimeError("Read-only semaphore handler") raise RuntimeError("Read-only semaphore handler")
if not job.semaphores: if not job.semaphores:
return True return True
log = get_annotated_logger(self.log, item.event) log = get_annotated_logger(self.log, item.event)
handle = self.getSemaphoreHandle(item, job)
infos = [self.getSemaphoreInfo(job_semaphore)
for job_semaphore in job.semaphores]
return self.acquireFromInfo(log, infos, handle, request_resources)
def acquireFromInfo(self, log, infos, handle, request_resources=False):
# This method is used by the executor to acquire a playbook
# semaphore; it is similar to the acquire method but the
# semaphore info is frozen (this operates without an abide).
if self.read_only:
raise RuntimeError("Read-only semaphore handler")
if not infos:
return True
all_acquired = True all_acquired = True
for semaphore in job.semaphores: for info in infos:
if not self._acquire_one(log, item, job, request_resources, if not self._acquire_one(log, info, handle, request_resources):
semaphore):
all_acquired = False all_acquired = False
break break
if not all_acquired: if not all_acquired:
# Since we know we have less than all the required # Since we know we have less than all the required
# semaphores, set quiet=True so we don't log an inability # semaphores, set quiet=True so we don't log an inability
# to release them. # to release them.
self.release(None, item, job, quiet=True) self.releaseFromInfo(log, None, infos, handle, quiet=True)
return False return False
return True return True
def _acquire_one(self, log, item, job, request_resources, job_semaphore): def _acquire_one(self, log, info, handle, request_resources):
if job_semaphore.resources_first and request_resources: if info['resources_first'] and request_resources:
# We're currently in the resource request phase and want to get the # We're currently in the resource request phase and want to get the
# resources before locking. So we don't need to do anything here. # resources before locking. So we don't need to do anything here.
return True return True
@ -106,42 +137,86 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
# the resources phase. # the resources phase.
pass pass
semaphore = self.layout.getSemaphore(self.abide, job_semaphore.name) self.kazoo_client.ensure_path(info['path'])
semaphore_path = self._makePath(semaphore) semaphore_holders, zstat = self.getHolders(info['path'])
semaphore_handle = {
"buildset_path": item.current_build_set.getPath(),
"job_name": job.name,
}
self.kazoo_client.ensure_path(semaphore_path) if handle in semaphore_holders:
semaphore_holders, zstat = self.getHolders(semaphore_path)
if semaphore_handle in semaphore_holders:
return True return True
# semaphore is there, check max # semaphore is there, check max
while len(semaphore_holders) < self._max_count(semaphore.name): while len(semaphore_holders) < info['max']:
semaphore_holders.append(semaphore_handle) semaphore_holders.append(handle)
try: try:
self.kazoo_client.set(semaphore_path, self.kazoo_client.set(info['path'],
holdersToData(semaphore_holders), holdersToData(semaphore_holders),
version=zstat.version) version=zstat.version)
except BadVersionError: except BadVersionError:
log.debug( log.debug(
"Retrying semaphore %s acquire due to concurrent update", "Retrying semaphore %s acquire due to concurrent update",
semaphore.name) info['name'])
semaphore_holders, zstat = self.getHolders(semaphore_path) semaphore_holders, zstat = self.getHolders(info['path'])
continue continue
log.info("Semaphore %s acquired: job %s, item %s", log.info("Semaphore %s acquired: handle %s",
semaphore.name, job.name, item) info['name'], handle)
self._emitStats(info['path'], len(semaphore_holders))
self._emitStats(semaphore_path, len(semaphore_holders))
return True return True
return False return False
def release(self, event_queue, item, job, quiet=False):
if self.read_only:
raise RuntimeError("Read-only semaphore handler")
if not job.semaphores:
return
log = get_annotated_logger(self.log, item.event)
handle = self.getSemaphoreHandle(item, job)
infos = [self.getSemaphoreInfo(job_semaphore)
for job_semaphore in job.semaphores]
return self.releaseFromInfo(log, event_queue, infos, handle,
quiet=False)
def releaseFromInfo(self, log, event_queue, infos, handle, quiet=False):
for info in infos:
self._release_one(log, info, handle, quiet)
if event_queue:
# If a scheduler has been provided (which it is except
# in the case of a rollback from acquire in this
# class), broadcast an event to trigger pipeline runs.
event = SemaphoreReleaseEvent(info['name'])
event_queue.put(event)
def _release_one(self, log, info, handle, quiet=False):
while True:
try:
semaphore_holders, zstat = self.getHolders(info['path'])
semaphore_holders.remove(handle)
except (ValueError, NoNodeError):
if not quiet:
log.error("Semaphore %s can not be released for %s "
"because the semaphore is not held",
info['path'], handle)
break
try:
self.kazoo_client.set(info['path'],
holdersToData(semaphore_holders),
zstat.version)
except BadVersionError:
log.debug(
"Retrying semaphore %s release due to concurrent update",
info['path'])
continue
log.info("Semaphore %s released for %s",
info['path'], handle)
self._emitStats(info['path'], len(semaphore_holders))
break
def getHolders(self, semaphore_path): def getHolders(self, semaphore_path):
data, zstat = self.kazoo_client.get(semaphore_path) data, zstat = self.kazoo_client.get(semaphore_path)
return holdersFromData(data), zstat return holdersFromData(data), zstat
@ -155,72 +230,6 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
pass pass
return ret return ret
def _release(self, log, semaphore_path, semaphore_handle, quiet):
while True:
try:
semaphore_holders, zstat = self.getHolders(semaphore_path)
semaphore_holders.remove(semaphore_handle)
except (ValueError, NoNodeError):
if not quiet:
log.error("Semaphore %s can not be released for %s "
"because the semaphore is not held",
semaphore_path, semaphore_handle)
break
try:
self.kazoo_client.set(semaphore_path,
holdersToData(semaphore_holders),
zstat.version)
except BadVersionError:
log.debug(
"Retrying semaphore %s release due to concurrent update",
semaphore_path)
continue
log.info("Semaphore %s released for %s",
semaphore_path, semaphore_handle)
self._emitStats(semaphore_path, len(semaphore_holders))
break
def release(self, sched, item, job, quiet=False):
if self.read_only:
raise RuntimeError("Read-only semaphore handler")
if not job.semaphores:
return
log = get_annotated_logger(self.log, item.event)
for job_semaphore in job.semaphores:
self._release_one(log, item, job, job_semaphore, quiet)
# If a scheduler has been provided (which it is except in the
# case of a rollback from acquire in this class), broadcast an
# event to trigger pipeline runs.
if sched is None:
return
semaphore = self.layout.getSemaphore(self.abide, job_semaphore.name)
if semaphore.global_scope:
tenants = [t for t in self.abide.tenants.values()
if job_semaphore.name in t.global_semaphores]
else:
tenants = [self.abide.tenants[self.tenant_name]]
for tenant in tenants:
for pipeline_name in tenant.layout.pipelines.keys():
event = PipelineSemaphoreReleaseEvent()
sched.pipeline_management_events[
tenant.name][pipeline_name].put(
event, needs_result=False)
def _release_one(self, log, item, job, job_semaphore, quiet):
semaphore = self.layout.getSemaphore(self.abide, job_semaphore.name)
semaphore_path = self._makePath(semaphore)
semaphore_handle = {
"buildset_path": item.current_build_set.getPath(),
"job_name": job.name,
}
self._release(log, semaphore_path, semaphore_handle, quiet)
def semaphoreHolders(self, semaphore_name): def semaphoreHolders(self, semaphore_name):
semaphore = self.layout.getSemaphore(self.abide, semaphore_name) semaphore = self.layout.getSemaphore(self.abide, semaphore_name)
semaphore_path = self._makePath(semaphore) semaphore_path = self._makePath(semaphore)
@ -230,10 +239,6 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
holders = [] holders = []
return holders return holders
def _max_count(self, semaphore_name):
semaphore = self.layout.getSemaphore(self.abide, semaphore_name)
return 1 if semaphore is None else semaphore.max
def cleanupLeaks(self): def cleanupLeaks(self):
if self.read_only: if self.read_only:
raise RuntimeError("Read-only semaphore handler") raise RuntimeError("Read-only semaphore handler")
@ -246,7 +251,10 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
semaphore = self.layout.getSemaphore( semaphore = self.layout.getSemaphore(
self.abide, semaphore_name) self.abide, semaphore_name)
semaphore_path = self._makePath(semaphore) info = {
'name': semaphore.name,
'path': self._makePath(semaphore),
}
self.log.error("Releasing leaked semaphore %s held by %s", self.log.error("Releasing leaked semaphore %s held by %s",
semaphore_path, holder) info['path'], holder)
self._release(self.log, semaphore_path, holder, quiet=False) self._release_one(self.log, info, holder, quiet=False)