Merge "Improve resource usage with semaphores"
This commit is contained in:
commit
ccbe7b10df
|
@ -629,7 +629,25 @@ Here is an example of two job definitions:
|
||||||
The name of a :ref:`semaphore` which should be acquired and
|
The name of a :ref:`semaphore` which should be acquired and
|
||||||
released when the job begins and ends. If the semaphore is at
|
released when the job begins and ends. If the semaphore is at
|
||||||
maximum capacity, then Zuul will wait until it can be acquired
|
maximum capacity, then Zuul will wait until it can be acquired
|
||||||
before starting the job.
|
before starting the job. The format is either a string or a
|
||||||
|
dictionary. If it's a string it references a semaphore using the
|
||||||
|
default value for :attr:`job.semaphore.resources-first`.
|
||||||
|
|
||||||
|
.. attr:: name
|
||||||
|
:required:
|
||||||
|
|
||||||
|
The name of the referenced semaphore
|
||||||
|
|
||||||
|
.. attr:: resources-first
|
||||||
|
:default: False
|
||||||
|
|
||||||
|
By default a semaphore is acquired before the resources are
|
||||||
|
requested. However in some cases the user wants to run cheap
|
||||||
|
jobs as quickly as possible in a consecutive manner. In this
|
||||||
|
case :attr:`job.semaphore.resources-first` can be enabled to
|
||||||
|
request the resources before locking the semaphore. This can
|
||||||
|
lead to some amount of blocked resources while waiting for the
|
||||||
|
semaphore so this should be used with caution.
|
||||||
|
|
||||||
.. attr:: tags
|
.. attr:: tags
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,14 @@
|
||||||
|
---
|
||||||
|
features:
|
||||||
|
- |
|
||||||
|
A job using a semaphore now can configure if it should acquire the it
|
||||||
|
before requesting resources or just before running.
|
||||||
|
upgrade:
|
||||||
|
- |
|
||||||
|
The acquiring behavior of jobs with semaphores has been changed. Up to now
|
||||||
|
a job requested resources and aquired the semaphore just before it started
|
||||||
|
to run. However this could lead to a high amount of resource waste. Instead
|
||||||
|
jobs now acquire the semaphore before requesting the resources by default.
|
||||||
|
This behavior can be overridden by jobs using
|
||||||
|
:attr:`job.semaphore.resources-first` if some waste of resources is
|
||||||
|
acceptable.
|
|
@ -22,6 +22,10 @@
|
||||||
- job:
|
- job:
|
||||||
name: base
|
name: base
|
||||||
parent: null
|
parent: null
|
||||||
|
nodeset:
|
||||||
|
nodes:
|
||||||
|
- name: controller
|
||||||
|
label: label1
|
||||||
|
|
||||||
- job:
|
- job:
|
||||||
name: project-test1
|
name: project-test1
|
||||||
|
@ -56,6 +60,20 @@
|
||||||
- name: controller
|
- name: controller
|
||||||
label: label1
|
label: label1
|
||||||
|
|
||||||
|
- job:
|
||||||
|
name: semaphore-one-test1-resources-first
|
||||||
|
semaphore:
|
||||||
|
name: test-semaphore
|
||||||
|
resources-first: True
|
||||||
|
run: playbooks/semaphore-one-test1.yaml
|
||||||
|
|
||||||
|
- job:
|
||||||
|
name: semaphore-one-test2-resources-first
|
||||||
|
semaphore:
|
||||||
|
name: test-semaphore
|
||||||
|
resources-first: True
|
||||||
|
run: playbooks/semaphore-one-test1.yaml
|
||||||
|
|
||||||
- project:
|
- project:
|
||||||
name: org/project
|
name: org/project
|
||||||
check:
|
check:
|
||||||
|
@ -77,3 +95,11 @@
|
||||||
check:
|
check:
|
||||||
jobs:
|
jobs:
|
||||||
- semaphore-one-test3
|
- semaphore-one-test3
|
||||||
|
|
||||||
|
- project:
|
||||||
|
name: org/project3
|
||||||
|
check:
|
||||||
|
jobs:
|
||||||
|
- project-test1
|
||||||
|
- semaphore-one-test1-resources-first
|
||||||
|
- semaphore-one-test2-resources-first
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
test
|
|
@ -8,3 +8,4 @@
|
||||||
- org/project
|
- org/project
|
||||||
- org/project1
|
- org/project1
|
||||||
- org/project2
|
- org/project2
|
||||||
|
- org/project3
|
||||||
|
|
|
@ -5849,6 +5849,10 @@ class TestSemaphore(ZuulTestCase):
|
||||||
|
|
||||||
self.executor_server.hold_jobs_in_build = True
|
self.executor_server.hold_jobs_in_build = True
|
||||||
|
|
||||||
|
# Pause nodepool so we can check the ordering of getting the nodes
|
||||||
|
# and aquiring the semaphore.
|
||||||
|
self.fake_nodepool.paused = True
|
||||||
|
|
||||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||||
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
|
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertFalse('test-semaphore' in
|
||||||
|
@ -5858,6 +5862,13 @@ class TestSemaphore(ZuulTestCase):
|
||||||
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
|
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
|
||||||
|
# By default we first lock the semaphore and then get the nodes
|
||||||
|
# so at this point the semaphore needs to be aquired.
|
||||||
|
self.assertTrue('test-semaphore' in
|
||||||
|
tenant.semaphore_handler.semaphores)
|
||||||
|
self.fake_nodepool.paused = False
|
||||||
|
self.waitUntilSettled()
|
||||||
|
|
||||||
self.assertEqual(len(self.builds), 3)
|
self.assertEqual(len(self.builds), 3)
|
||||||
self.assertEqual(self.builds[0].name, 'project-test1')
|
self.assertEqual(self.builds[0].name, 'project-test1')
|
||||||
self.assertEqual(self.builds[1].name, 'semaphore-one-test1')
|
self.assertEqual(self.builds[1].name, 'semaphore-one-test1')
|
||||||
|
@ -5993,6 +6004,53 @@ class TestSemaphore(ZuulTestCase):
|
||||||
self.assertEqual(A.reported, 1)
|
self.assertEqual(A.reported, 1)
|
||||||
self.assertEqual(B.reported, 1)
|
self.assertEqual(B.reported, 1)
|
||||||
|
|
||||||
|
def test_semaphore_resources_first(self):
|
||||||
|
"Test semaphores with max=1 (mutex) and get resources first"
|
||||||
|
tenant = self.sched.abide.tenants.get('tenant-one')
|
||||||
|
|
||||||
|
self.executor_server.hold_jobs_in_build = True
|
||||||
|
|
||||||
|
# Pause nodepool so we can check the ordering of getting the nodes
|
||||||
|
# and aquiring the semaphore.
|
||||||
|
self.fake_nodepool.paused = True
|
||||||
|
|
||||||
|
A = self.fake_gerrit.addFakeChange('org/project3', 'master', 'A')
|
||||||
|
B = self.fake_gerrit.addFakeChange('org/project3', 'master', 'B')
|
||||||
|
self.assertFalse('test-semaphore' in
|
||||||
|
tenant.semaphore_handler.semaphores)
|
||||||
|
|
||||||
|
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||||
|
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
|
||||||
|
self.waitUntilSettled()
|
||||||
|
|
||||||
|
# Here we first get the resources and then lock the semaphore
|
||||||
|
# so at this point the semaphore should not be aquired.
|
||||||
|
self.assertFalse('test-semaphore' in
|
||||||
|
tenant.semaphore_handler.semaphores)
|
||||||
|
self.fake_nodepool.paused = False
|
||||||
|
self.waitUntilSettled()
|
||||||
|
|
||||||
|
self.assertEqual(len(self.builds), 3)
|
||||||
|
self.assertEqual(self.builds[0].name, 'project-test1')
|
||||||
|
self.assertEqual(self.builds[1].name,
|
||||||
|
'semaphore-one-test1-resources-first')
|
||||||
|
self.assertEqual(self.builds[2].name, 'project-test1')
|
||||||
|
|
||||||
|
self.executor_server.release('semaphore-one-test1')
|
||||||
|
self.waitUntilSettled()
|
||||||
|
|
||||||
|
self.assertEqual(len(self.builds), 3)
|
||||||
|
self.assertEqual(self.builds[0].name, 'project-test1')
|
||||||
|
self.assertEqual(self.builds[1].name, 'project-test1')
|
||||||
|
self.assertEqual(self.builds[2].name,
|
||||||
|
'semaphore-one-test2-resources-first')
|
||||||
|
self.assertTrue('test-semaphore' in
|
||||||
|
tenant.semaphore_handler.semaphores)
|
||||||
|
|
||||||
|
self.executor_server.hold_jobs_in_build = False
|
||||||
|
self.executor_server.release()
|
||||||
|
self.waitUntilSettled()
|
||||||
|
|
||||||
def test_semaphore_zk_error(self):
|
def test_semaphore_zk_error(self):
|
||||||
"Test semaphore release with zk error"
|
"Test semaphore release with zk error"
|
||||||
tenant = self.sched.abide.tenants.get('tenant-one')
|
tenant = self.sched.abide.tenants.get('tenant-one')
|
||||||
|
|
|
@ -517,6 +517,9 @@ class JobParser(object):
|
||||||
secret = {vs.Required('name'): str,
|
secret = {vs.Required('name'): str,
|
||||||
vs.Required('secret'): str}
|
vs.Required('secret'): str}
|
||||||
|
|
||||||
|
semaphore = {vs.Required('name'): str,
|
||||||
|
'resources-first': bool}
|
||||||
|
|
||||||
# Attributes of a job that can also be used in Project and ProjectTemplate
|
# Attributes of a job that can also be used in Project and ProjectTemplate
|
||||||
job_attributes = {'parent': vs.Any(str, None),
|
job_attributes = {'parent': vs.Any(str, None),
|
||||||
'final': bool,
|
'final': bool,
|
||||||
|
@ -528,7 +531,7 @@ class JobParser(object):
|
||||||
'success-url': str,
|
'success-url': str,
|
||||||
'hold-following-changes': bool,
|
'hold-following-changes': bool,
|
||||||
'voting': bool,
|
'voting': bool,
|
||||||
'semaphore': str,
|
'semaphore': vs.Any(semaphore, str),
|
||||||
'tags': to_list(str),
|
'tags': to_list(str),
|
||||||
'branches': to_list(str),
|
'branches': to_list(str),
|
||||||
'files': to_list(str),
|
'files': to_list(str),
|
||||||
|
@ -573,7 +576,6 @@ class JobParser(object):
|
||||||
'workspace',
|
'workspace',
|
||||||
'voting',
|
'voting',
|
||||||
'hold-following-changes',
|
'hold-following-changes',
|
||||||
'semaphore',
|
|
||||||
'attempts',
|
'attempts',
|
||||||
'failure-message',
|
'failure-message',
|
||||||
'success-message',
|
'success-message',
|
||||||
|
@ -728,6 +730,15 @@ class JobParser(object):
|
||||||
new_projects[project.canonical_name] = job_project
|
new_projects[project.canonical_name] = job_project
|
||||||
job.required_projects = new_projects
|
job.required_projects = new_projects
|
||||||
|
|
||||||
|
if 'semaphore' in conf:
|
||||||
|
semaphore = conf.get('semaphore')
|
||||||
|
if isinstance(semaphore, str):
|
||||||
|
job.semaphore = model.JobSemaphore(semaphore)
|
||||||
|
else:
|
||||||
|
job.semaphore = model.JobSemaphore(
|
||||||
|
semaphore.get('name'),
|
||||||
|
semaphore.get('resources-first', False))
|
||||||
|
|
||||||
tags = conf.get('tags')
|
tags = conf.get('tags')
|
||||||
if tags:
|
if tags:
|
||||||
job.tags = set(tags)
|
job.tags = set(tags)
|
||||||
|
|
|
@ -322,7 +322,7 @@ class PipelineManager(object):
|
||||||
change.commit_needs_changes = dependencies
|
change.commit_needs_changes = dependencies
|
||||||
|
|
||||||
def provisionNodes(self, item):
|
def provisionNodes(self, item):
|
||||||
jobs = item.findJobsToRequest()
|
jobs = item.findJobsToRequest(item.pipeline.tenant.semaphore_handler)
|
||||||
if not jobs:
|
if not jobs:
|
||||||
return False
|
return False
|
||||||
build_set = item.current_build_set
|
build_set = item.current_build_set
|
||||||
|
|
|
@ -1112,7 +1112,12 @@ class Job(ConfigObject):
|
||||||
d['required_projects'] = []
|
d['required_projects'] = []
|
||||||
for project in self.required_projects.values():
|
for project in self.required_projects.values():
|
||||||
d['required_projects'].append(project.toDict())
|
d['required_projects'].append(project.toDict())
|
||||||
d['semaphore'] = self.semaphore
|
if self.semaphore:
|
||||||
|
# For now just leave the semaphore name here until we really need
|
||||||
|
# more information in zuul-web about this
|
||||||
|
d['semaphore'] = self.semaphore.name
|
||||||
|
else:
|
||||||
|
d['semaphore'] = None
|
||||||
d['variables'] = self.variables
|
d['variables'] = self.variables
|
||||||
d['final'] = self.final
|
d['final'] = self.final
|
||||||
d['abstract'] = self.abstract
|
d['abstract'] = self.abstract
|
||||||
|
@ -1511,6 +1516,21 @@ class JobProject(ConfigObject):
|
||||||
return d
|
return d
|
||||||
|
|
||||||
|
|
||||||
|
class JobSemaphore(ConfigObject):
|
||||||
|
""" A reference to a semaphore from a job. """
|
||||||
|
|
||||||
|
def __init__(self, semaphore_name, resources_first=False):
|
||||||
|
super().__init__()
|
||||||
|
self.name = semaphore_name
|
||||||
|
self.resources_first = resources_first
|
||||||
|
|
||||||
|
def toDict(self):
|
||||||
|
d = dict()
|
||||||
|
d['name'] = self.name
|
||||||
|
d['resources_first'] = self.resources_first
|
||||||
|
return d
|
||||||
|
|
||||||
|
|
||||||
class JobList(ConfigObject):
|
class JobList(ConfigObject):
|
||||||
""" A list of jobs in a project's pipeline. """
|
""" A list of jobs in a project's pipeline. """
|
||||||
|
|
||||||
|
@ -2135,13 +2155,13 @@ class QueueItem(object):
|
||||||
# The nodes for this job are not ready, skip
|
# The nodes for this job are not ready, skip
|
||||||
# it for now.
|
# it for now.
|
||||||
continue
|
continue
|
||||||
if semaphore_handler.acquire(self, job):
|
if semaphore_handler.acquire(self, job, False):
|
||||||
# If this job needs a semaphore, either acquire it or
|
# If this job needs a semaphore, either acquire it or
|
||||||
# make sure that we have it before running the job.
|
# make sure that we have it before running the job.
|
||||||
torun.append(job)
|
torun.append(job)
|
||||||
return torun
|
return torun
|
||||||
|
|
||||||
def findJobsToRequest(self):
|
def findJobsToRequest(self, semaphore_handler):
|
||||||
build_set = self.current_build_set
|
build_set = self.current_build_set
|
||||||
toreq = []
|
toreq = []
|
||||||
if not self.live:
|
if not self.live:
|
||||||
|
@ -2177,7 +2197,10 @@ class QueueItem(object):
|
||||||
all_parent_jobs_successful = False
|
all_parent_jobs_successful = False
|
||||||
break
|
break
|
||||||
if all_parent_jobs_successful:
|
if all_parent_jobs_successful:
|
||||||
toreq.append(job)
|
if semaphore_handler.acquire(self, job, True):
|
||||||
|
# If this job needs a semaphore, either acquire it or
|
||||||
|
# make sure that we have it before requesting the nodes.
|
||||||
|
toreq.append(job)
|
||||||
return toreq
|
return toreq
|
||||||
|
|
||||||
def setResult(self, build):
|
def setResult(self, build):
|
||||||
|
@ -3596,11 +3619,34 @@ class SemaphoreHandler(object):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.semaphores = {}
|
self.semaphores = {}
|
||||||
|
|
||||||
def acquire(self, item, job):
|
def acquire(self, item, job, request_resources):
|
||||||
|
"""
|
||||||
|
Aquires a semaphore for an item job combination. This gets called twice
|
||||||
|
during the lifecycle of a job. The first call is before requesting
|
||||||
|
build resources. The second call is before running the job. In which
|
||||||
|
call we really acquire the semaphore is defined by the job.
|
||||||
|
|
||||||
|
:param item: The item
|
||||||
|
:param job: The job
|
||||||
|
:param request_resources: True if we want to acquire for the request
|
||||||
|
resources phase, False if we want to acquire
|
||||||
|
for the run phase.
|
||||||
|
"""
|
||||||
if not job.semaphore:
|
if not job.semaphore:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
semaphore_key = job.semaphore
|
if job.semaphore.resources_first and request_resources:
|
||||||
|
# We're currently in the resource request phase and want to get the
|
||||||
|
# resources before locking. So we don't need to do anything here.
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
# As a safety net we want to acuire the semaphore at least in the
|
||||||
|
# run phase so don't filter this here as re-acuiring the semaphore
|
||||||
|
# is not a problem here if it has been already acquired before in
|
||||||
|
# the resources phase.
|
||||||
|
pass
|
||||||
|
|
||||||
|
semaphore_key = job.semaphore.name
|
||||||
|
|
||||||
m = self.semaphores.get(semaphore_key)
|
m = self.semaphores.get(semaphore_key)
|
||||||
if not m:
|
if not m:
|
||||||
|
@ -3612,7 +3658,7 @@ class SemaphoreHandler(object):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# semaphore is there, check max
|
# semaphore is there, check max
|
||||||
if len(m) < self._max_count(item, job.semaphore):
|
if len(m) < self._max_count(item, job.semaphore.name):
|
||||||
self._acquire(semaphore_key, item, job.name)
|
self._acquire(semaphore_key, item, job.name)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@ -3622,7 +3668,7 @@ class SemaphoreHandler(object):
|
||||||
if not job.semaphore:
|
if not job.semaphore:
|
||||||
return
|
return
|
||||||
|
|
||||||
semaphore_key = job.semaphore
|
semaphore_key = job.semaphore.name
|
||||||
|
|
||||||
m = self.semaphores.get(semaphore_key)
|
m = self.semaphores.get(semaphore_key)
|
||||||
if not m:
|
if not m:
|
||||||
|
|
Loading…
Reference in New Issue