Support multiple semaphores

This allows jobs to request multiple semaphores.  Zuul will wait until
all are available before acquiring them* and will not start a job unless
all have been acquired.

This is useful for jobs which require access to mulitple limited resources
(especially if other jobs require access to a subset or superset of those
same resources).

* Implementation note: for efficiency, we actually do acquire them
  one-by-one but then release any which have been acquired if they are
  not all available.  This all happens very quickly within a single
  attempt to start a job.  We don't hold semaphores while we wait for
  others as that could cause deadlocks.

To be consistent with other job attributes which accept lists, this
deprecates job.semaphore and replaces it with job.semaphores.

Change-Id: I295a891a2d02b904820d8f60afe8ef862693b75d
This commit is contained in:
James E. Blair 2021-05-06 16:40:00 -07:00
parent 29ccaa278d
commit 153f8a90cc
10 changed files with 190 additions and 46 deletions

View File

@ -220,12 +220,26 @@ Here is an example of two job definitions:
.. attr:: semaphore
The name of a :ref:`semaphore` which should be acquired and
released when the job begins and ends. If the semaphore is at
maximum capacity, then Zuul will wait until it can be acquired
before starting the job. The format is either a string or a
dictionary. If it's a string it references a semaphore using the
default value for :attr:`job.semaphore.resources-first`.
A deprecated alias of :attr:`job.semaphores`.
.. attr:: semaphores
The name of a :ref:`semaphore` (or list of them) which should be
acquired and released when the job begins and ends. If the
semaphore is at maximum capacity, then Zuul will wait until it
can be acquired before starting the job. The format is either a
string, a dictionary, or a list of either of those in the case
of multiple semaphores. If it's a string it references a
semaphore using the default value for
:attr:`job.semaphores.resources-first`.
If multiple semaphores are requested, the job will not start
until all have been acquired, and Zuul will wait until all are
available before acquiring any.
When inheriting jobs or applying variants, the list of
semaphores is extended (semaphores specified in a job definition
are added to any supplied by their parents).
.. attr:: name
:required:
@ -236,12 +250,12 @@ Here is an example of two job definitions:
:default: False
By default a semaphore is acquired before the resources are
requested. However in some cases the user wants to run cheap
jobs as quickly as possible in a consecutive manner. In this
case :attr:`job.semaphore.resources-first` can be enabled to
request the resources before locking the semaphore. This can
lead to some amount of blocked resources while waiting for the
semaphore so this should be used with caution.
requested. However in some cases the user may want to run
cheap jobs as quickly as possible in a consecutive manner. In
this case `resources-first` can be enabled to request the
resources before locking the semaphore. This can lead to some
amount of blocked resources while waiting for the semaphore
so this should be used with caution.
.. attr:: tags

View File

@ -10,5 +10,5 @@ upgrade:
to run. However this could lead to a high amount of resource waste. Instead
jobs now acquire the semaphore before requesting the resources by default.
This behavior can be overridden by jobs using
:attr:`job.semaphore.resources-first` if some waste of resources is
job.semaphores.resources-first if some waste of resources is
acceptable.

View File

@ -0,0 +1,23 @@
---
features:
- |
Jobs may now request multiple semaphores and they will not start until
all semaphores are acquired.
Use the new :attr:`job.semaphores` (plural) attribute to specify them.
Note that the new attribute is additive when considering
inheritance and job variants. That is to say that a job
definition with a `semaphores` attribute will extend the list of
semaphores supplied by its parent rather than overriding it (which
is the behavior for the deprecated attribute).
deprecations:
- |
The job attribute :attr:`job.semaphore` (note the singular rather
than plural form) job attribute is now deprecated. Use the plural
form :attr:`job.semaphores` instead. As with most list items in
Zuul configuration, it also accepts a single item without the
wrapping list, so to convert existing jobs, simply change the
spelling of the attribute, no change to the value is required.
The singular form will be removed in Zuul 5.0.

View File

@ -0,0 +1,40 @@
- pipeline:
name: check
manager: independent
trigger:
gerrit:
- event: patchset-created
success:
gerrit:
Verified: 1
failure:
gerrit:
Verified: -1
- job:
name: base
parent: null
run: playbooks/base.yaml
- job:
name: job1
semaphores:
- sem1
- job:
name: job2
semaphores:
- sem1
- sem2
- project:
name: org/project1
check:
jobs:
- job1
- project:
name: org/project2
check:
jobs:
- job2

View File

@ -7269,7 +7269,7 @@ class TestSemaphore(ZuulTestCase):
self.assertIsNone(jobs[0]["waiting_status"])
self.assertIsNone(jobs[1]["waiting_status"])
self.assertEqual(jobs[2]["waiting_status"],
'semaphore: test-semaphore')
'semaphores: test-semaphore')
# By default we first lock the semaphore and then get the nodes
# so at this point the semaphore needs to be aquired.
@ -7833,6 +7833,43 @@ class TestSemaphore(ZuulTestCase):
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
@simple_layout('layouts/multiple-semaphores.yaml')
def test_multiple_semaphores(self):
# Test a job with multiple semaphores
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
# One job should be running, and hold sem1
self.assertBuilds([dict(name='job1')])
B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
# Job2 requires sem1 and sem2; it hasn't started because job
# is still holding sem1.
self.assertBuilds([dict(name='job1')])
self.executor_server.release('job1')
self.waitUntilSettled()
# Job1 is finished, so job2 can acquire both semaphores.
self.assertBuilds([dict(name='job2')])
self.executor_server.release()
self.waitUntilSettled()
self.assertHistory([
dict(name='job1', result='SUCCESS', changes='1,1'),
dict(name='job2', result='SUCCESS', changes='2,1'),
])
# TODO(corvus): Consider a version of this test which launches
# 2 jobs with the same multiple-semaphore requirements
# simultaneously to test the behavior with contention (at
# least one should be able to start on each pass through the
# loop).
class TestSemaphoreMultiTenant(ZuulTestCase):
tenant_config_file = 'config/multi-tenant-semaphore/main.yaml'

View File

@ -365,7 +365,7 @@ class TestWeb(BaseTestWeb):
'pre_run': [],
'post_run': [],
'cleanup_run': [],
'semaphore': None,
'semaphores': [],
'source_context': source_ctx,
'tags': [],
'timeout': None,
@ -410,7 +410,7 @@ class TestWeb(BaseTestWeb):
'pre_run': [],
'post_run': [],
'cleanup_run': [],
'semaphore': None,
'semaphores': [],
'source_context': source_ctx,
'tags': [],
'timeout': None,
@ -453,7 +453,7 @@ class TestWeb(BaseTestWeb):
'pre_run': [],
'post_run': [],
'cleanup_run': [],
'semaphore': None,
'semaphores': [],
'source_context': source_ctx,
'tags': [],
'timeout': None,
@ -572,7 +572,7 @@ class TestWeb(BaseTestWeb):
'pre_run': [],
'post_run': [],
'cleanup_run': [],
'semaphore': None,
'semaphores': [],
'source_context': {
'branch': 'master',
'path': 'zuul.yaml',
@ -610,7 +610,7 @@ class TestWeb(BaseTestWeb):
'pre_run': [],
'post_run': [],
'cleanup_run': [],
'semaphore': None,
'semaphores': [],
'source_context': {
'branch': 'master',
'path': 'zuul.yaml',
@ -648,7 +648,7 @@ class TestWeb(BaseTestWeb):
'pre_run': [],
'post_run': [],
'cleanup_run': [],
'semaphore': None,
'semaphores': [],
'source_context': {
'branch': 'master',
'path': 'zuul.yaml',
@ -686,7 +686,7 @@ class TestWeb(BaseTestWeb):
'pre_run': [],
'post_run': [],
'cleanup_run': [],
'semaphore': None,
'semaphores': [],
'source_context': {
'branch': 'master',
'path': 'zuul.yaml',
@ -745,7 +745,7 @@ class TestWeb(BaseTestWeb):
'requires': [],
'roles': [],
'run': [],
'semaphore': None,
'semaphores': [],
'source_context': {'branch': 'master',
'path': 'zuul.yaml',
'project': 'common-config'},

View File

@ -84,7 +84,7 @@ class JobVariant extends React.Component {
const jobInfos = [
'description', 'context', 'builds', 'status',
'parent', 'attempts', 'timeout', 'semaphore',
'parent', 'attempts', 'timeout', 'semaphores',
'nodeset', 'variables', 'override_checkout',
]
jobInfos.forEach(key => {

View File

@ -619,6 +619,7 @@ class JobParser(object):
'hold-following-changes': bool,
'voting': bool,
'semaphore': vs.Any(semaphore, str),
'semaphores': to_list(vs.Any(semaphore, str)),
'tags': to_list(str),
'branches': to_list(str),
'files': to_list(str),
@ -883,14 +884,22 @@ class JobParser(object):
new_dependencies.append(job_dependency)
job.dependencies = new_dependencies
if 'semaphore' in conf:
semaphore = conf.get('semaphore')
semaphores = as_list(conf.get('semaphores', conf.get('semaphore', [])))
job_semaphores = []
for semaphore in semaphores:
if isinstance(semaphore, str):
job.semaphore = model.JobSemaphore(semaphore)
job_semaphores.append(model.JobSemaphore(semaphore))
else:
job.semaphore = model.JobSemaphore(
job_semaphores.append(model.JobSemaphore(
semaphore.get('name'),
semaphore.get('resources-first', False))
semaphore.get('resources-first', False)))
if job_semaphores:
# Sort the list of semaphores to avoid issues with
# contention (where two jobs try to start at the same time
# and fail due to acquiring the same semaphores but in
# reverse order.
job.semaphores = tuple(sorted(job_semaphores,
key=lambda x: x.name))
for k in ('tags', 'requires', 'provides'):
v = frozenset(as_list(conf.get(k)))

View File

@ -1230,7 +1230,7 @@ class Job(ConfigObject):
cleanup_run=(),
run=(),
ansible_version=None,
semaphore=None,
semaphores=(),
attempts=3,
final=False,
abstract=False,
@ -1295,12 +1295,7 @@ class Job(ConfigObject):
d['required_projects'] = []
for project in self.required_projects.values():
d['required_projects'].append(project.toDict())
if self.semaphore:
# For now just leave the semaphore name here until we really need
# more information in zuul-web about this
d['semaphore'] = self.semaphore.name
else:
d['semaphore'] = None
d['semaphores'] = [s.toDict() for s in self.semaphores]
d['variables'] = self.variables
d['extra_variables'] = self.extra_variables
d['host_variables'] = self.host_variables
@ -1621,7 +1616,8 @@ class Job(ConfigObject):
if k not in set(['pre_run', 'run', 'post_run', 'cleanup_run',
'roles', 'variables', 'extra_variables',
'host_variables', 'group_variables',
'required_projects', 'allowed_projects']):
'required_projects', 'allowed_projects',
'semaphores']):
setattr(self, k, other._get(k))
# Don't set final above so that we don't trip an error halfway
@ -1715,6 +1711,14 @@ class Job(ConfigObject):
other.allowed_projects))
elif other._get('allowed_projects') is not None:
self.allowed_projects = other.allowed_projects
if other._get('semaphores') is not None:
# Sort the list of semaphores to avoid issues with
# contention (where two jobs try to start at the same time
# and fail due to acquiring the same semaphores but in
# reverse order.
self.semaphores = tuple(
sorted(other.semaphores + self.semaphores,
key=lambda x: x.name))
for k in self.context_attributes:
if (other._get(k) is not None and
@ -2814,8 +2818,8 @@ class QueueItem(object):
toreq.append(job)
job.queued = True
else:
job.waiting_status = 'semaphore: {}'.format(
job.semaphore.name)
sem_names = ','.join([s.name for s in job.semaphores])
job.waiting_status = 'semaphores: {}'.format(sem_names)
return toreq
def setResult(self, build):

View File

@ -60,11 +60,23 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
self.log.exception("Unable to send semaphore stats:")
def acquire(self, item, job, request_resources):
if not job.semaphore:
if not job.semaphores:
return True
log = get_annotated_logger(self.log, item.event)
if job.semaphore.resources_first and request_resources:
all_acquired = True
for semaphore in job.semaphores:
if not self._acquire_one(log, item, job, request_resources,
semaphore):
all_acquired = False
break
if not all_acquired:
self.release(item, job)
return False
return True
def _acquire_one(self, log, item, job, request_resources, semaphore):
if semaphore.resources_first and request_resources:
# We're currently in the resource request phase and want to get the
# resources before locking. So we don't need to do anything here.
return True
@ -75,7 +87,7 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
# the resources phase.
pass
semaphore_key = quote_plus(job.semaphore.name)
semaphore_key = quote_plus(semaphore.name)
semaphore_path = f"{self.tenant_root}/{semaphore_key}"
semaphore_handle = f"{item.uuid}-{job.name}"
@ -86,7 +98,7 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
return True
# semaphore is there, check max
while len(semaphore_holders) < self._max_count(job.semaphore.name):
while len(semaphore_holders) < self._max_count(semaphore.name):
semaphore_holders.append(semaphore_handle)
try:
@ -96,12 +108,12 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
except BadVersionError:
log.debug(
"Retrying semaphore %s acquire due to concurrent update",
job.semaphore.name)
semaphore.name)
semaphore_holders, zstat = self.getHolders(semaphore_path)
continue
log.info("Semaphore %s acquired: job %s, item %s",
job.semaphore.name, job.name, item)
semaphore.name, job.name, item)
self._emitStats(semaphore_path, len(semaphore_holders))
return True
@ -145,11 +157,16 @@ class SemaphoreHandler(ZooKeeperSimpleBase):
break
def release(self, item, job):
if not job.semaphore:
if not job.semaphores:
return
log = get_annotated_logger(self.log, item.event)
semaphore_key = quote_plus(job.semaphore.name)
for semaphore in job.semaphores:
self._release_one(log, item, job, semaphore)
def _release_one(self, log, item, job, semaphore):
semaphore_key = quote_plus(semaphore.name)
semaphore_path = f"{self.tenant_root}/{semaphore_key}"
semaphore_handle = f"{item.uuid}-{job.name}"