From ae887dab58d0aa9a4ed375fbbf4b9a69d8043f8a Mon Sep 17 00:00:00 2001 From: Tobias Henkel Date: Tue, 20 Nov 2018 15:14:16 +0100 Subject: [PATCH] Improve resource usage with semaphores Currently when jobs use semaphores they first get and lock the build nodes and then aquire the semaphore. If there are many jobs waiting for the semaphore this can block a substantial part of the available resources. In order to make this safe default to acquire the semaphore before requesting the nodes. However in some cases when jobs with a semaphore shall run as fast as possible in a consecutive manner then it might be preferrable to accept some waste of resources. In order to support this use case the job using a semaphore can override this behavior and still acquire the semaphore after getting the nodes. Change-Id: Id6f582ec29219d280d05319d1b822c7934437b7a --- doc/source/user/config.rst | 20 +++++- .../semaphore-resources-295dceaf7ddbab0d.yaml | 14 +++++ .../semaphore/git/common-config/zuul.yaml | 26 ++++++++ .../config/semaphore/git/org_project3/README | 1 + tests/fixtures/config/semaphore/main.yaml | 1 + tests/unit/test_scheduler.py | 58 +++++++++++++++++ zuul/configloader.py | 15 ++++- zuul/manager/__init__.py | 2 +- zuul/model.py | 62 ++++++++++++++++--- 9 files changed, 187 insertions(+), 12 deletions(-) create mode 100644 releasenotes/notes/semaphore-resources-295dceaf7ddbab0d.yaml create mode 100644 tests/fixtures/config/semaphore/git/org_project3/README diff --git a/doc/source/user/config.rst b/doc/source/user/config.rst index e0235b8ec4..61830a06a4 100644 --- a/doc/source/user/config.rst +++ b/doc/source/user/config.rst @@ -629,7 +629,25 @@ Here is an example of two job definitions: The name of a :ref:`semaphore` which should be acquired and released when the job begins and ends. If the semaphore is at maximum capacity, then Zuul will wait until it can be acquired - before starting the job. + before starting the job. The format is either a string or a + dictionary. If it's a string it references a semaphore using the + default value for :attr:`job.semaphore.resources-first`. + + .. attr:: name + :required: + + The name of the referenced semaphore + + .. attr:: resources-first + :default: False + + By default a semaphore is acquired before the resources are + requested. However in some cases the user wants to run cheap + jobs as quickly as possible in a consecutive manner. In this + case :attr:`job.semaphore.resources-first` can be enabled to + request the resources before locking the semaphore. This can + lead to some amount of blocked resources while waiting for the + semaphore so this should be used with caution. .. attr:: tags diff --git a/releasenotes/notes/semaphore-resources-295dceaf7ddbab0d.yaml b/releasenotes/notes/semaphore-resources-295dceaf7ddbab0d.yaml new file mode 100644 index 0000000000..9d3b7d417f --- /dev/null +++ b/releasenotes/notes/semaphore-resources-295dceaf7ddbab0d.yaml @@ -0,0 +1,14 @@ +--- +features: + - | + A job using a semaphore now can configure if it should acquire the it + before requesting resources or just before running. +upgrade: + - | + The acquiring behavior of jobs with semaphores has been changed. Up to now + a job requested resources and aquired the semaphore just before it started + to run. However this could lead to a high amount of resource waste. Instead + jobs now acquire the semaphore before requesting the resources by default. + This behavior can be overridden by jobs using + :attr:`job.semaphore.resources-first` if some waste of resources is + acceptable. diff --git a/tests/fixtures/config/semaphore/git/common-config/zuul.yaml b/tests/fixtures/config/semaphore/git/common-config/zuul.yaml index 600543cdb4..dcb0cbd034 100644 --- a/tests/fixtures/config/semaphore/git/common-config/zuul.yaml +++ b/tests/fixtures/config/semaphore/git/common-config/zuul.yaml @@ -22,6 +22,10 @@ - job: name: base parent: null + nodeset: + nodes: + - name: controller + label: label1 - job: name: project-test1 @@ -56,6 +60,20 @@ - name: controller label: label1 +- job: + name: semaphore-one-test1-resources-first + semaphore: + name: test-semaphore + resources-first: True + run: playbooks/semaphore-one-test1.yaml + +- job: + name: semaphore-one-test2-resources-first + semaphore: + name: test-semaphore + resources-first: True + run: playbooks/semaphore-one-test1.yaml + - project: name: org/project check: @@ -77,3 +95,11 @@ check: jobs: - semaphore-one-test3 + +- project: + name: org/project3 + check: + jobs: + - project-test1 + - semaphore-one-test1-resources-first + - semaphore-one-test2-resources-first diff --git a/tests/fixtures/config/semaphore/git/org_project3/README b/tests/fixtures/config/semaphore/git/org_project3/README new file mode 100644 index 0000000000..9daeafb986 --- /dev/null +++ b/tests/fixtures/config/semaphore/git/org_project3/README @@ -0,0 +1 @@ +test diff --git a/tests/fixtures/config/semaphore/main.yaml b/tests/fixtures/config/semaphore/main.yaml index 83ed0925a9..919921e916 100644 --- a/tests/fixtures/config/semaphore/main.yaml +++ b/tests/fixtures/config/semaphore/main.yaml @@ -8,3 +8,4 @@ - org/project - org/project1 - org/project2 + - org/project3 diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index c4bd99d3fe..a07c25953b 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -5849,6 +5849,10 @@ class TestSemaphore(ZuulTestCase): self.executor_server.hold_jobs_in_build = True + # Pause nodepool so we can check the ordering of getting the nodes + # and aquiring the semaphore. + self.fake_nodepool.paused = True + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') self.assertFalse('test-semaphore' in @@ -5858,6 +5862,13 @@ class TestSemaphore(ZuulTestCase): self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) self.waitUntilSettled() + # By default we first lock the semaphore and then get the nodes + # so at this point the semaphore needs to be aquired. + self.assertTrue('test-semaphore' in + tenant.semaphore_handler.semaphores) + self.fake_nodepool.paused = False + self.waitUntilSettled() + self.assertEqual(len(self.builds), 3) self.assertEqual(self.builds[0].name, 'project-test1') self.assertEqual(self.builds[1].name, 'semaphore-one-test1') @@ -5993,6 +6004,53 @@ class TestSemaphore(ZuulTestCase): self.assertEqual(A.reported, 1) self.assertEqual(B.reported, 1) + def test_semaphore_resources_first(self): + "Test semaphores with max=1 (mutex) and get resources first" + tenant = self.sched.abide.tenants.get('tenant-one') + + self.executor_server.hold_jobs_in_build = True + + # Pause nodepool so we can check the ordering of getting the nodes + # and aquiring the semaphore. + self.fake_nodepool.paused = True + + A = self.fake_gerrit.addFakeChange('org/project3', 'master', 'A') + B = self.fake_gerrit.addFakeChange('org/project3', 'master', 'B') + self.assertFalse('test-semaphore' in + tenant.semaphore_handler.semaphores) + + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) + self.waitUntilSettled() + + # Here we first get the resources and then lock the semaphore + # so at this point the semaphore should not be aquired. + self.assertFalse('test-semaphore' in + tenant.semaphore_handler.semaphores) + self.fake_nodepool.paused = False + self.waitUntilSettled() + + self.assertEqual(len(self.builds), 3) + self.assertEqual(self.builds[0].name, 'project-test1') + self.assertEqual(self.builds[1].name, + 'semaphore-one-test1-resources-first') + self.assertEqual(self.builds[2].name, 'project-test1') + + self.executor_server.release('semaphore-one-test1') + self.waitUntilSettled() + + self.assertEqual(len(self.builds), 3) + self.assertEqual(self.builds[0].name, 'project-test1') + self.assertEqual(self.builds[1].name, 'project-test1') + self.assertEqual(self.builds[2].name, + 'semaphore-one-test2-resources-first') + self.assertTrue('test-semaphore' in + tenant.semaphore_handler.semaphores) + + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + self.waitUntilSettled() + def test_semaphore_zk_error(self): "Test semaphore release with zk error" tenant = self.sched.abide.tenants.get('tenant-one') diff --git a/zuul/configloader.py b/zuul/configloader.py index 87e76c9eaf..d1272002bd 100644 --- a/zuul/configloader.py +++ b/zuul/configloader.py @@ -509,6 +509,9 @@ class JobParser(object): secret = {vs.Required('name'): str, vs.Required('secret'): str} + semaphore = {vs.Required('name'): str, + 'resources-first': bool} + # Attributes of a job that can also be used in Project and ProjectTemplate job_attributes = {'parent': vs.Any(str, None), 'final': bool, @@ -520,7 +523,7 @@ class JobParser(object): 'success-url': str, 'hold-following-changes': bool, 'voting': bool, - 'semaphore': str, + 'semaphore': vs.Any(semaphore, str), 'tags': to_list(str), 'branches': to_list(str), 'files': to_list(str), @@ -565,7 +568,6 @@ class JobParser(object): 'workspace', 'voting', 'hold-following-changes', - 'semaphore', 'attempts', 'failure-message', 'success-message', @@ -720,6 +722,15 @@ class JobParser(object): new_projects[project.canonical_name] = job_project job.required_projects = new_projects + if 'semaphore' in conf: + semaphore = conf.get('semaphore') + if isinstance(semaphore, str): + job.semaphore = model.JobSemaphore(semaphore) + else: + job.semaphore = model.JobSemaphore( + semaphore.get('name'), + semaphore.get('resources-first', False)) + tags = conf.get('tags') if tags: job.tags = set(tags) diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 7c0ff66e1b..da38818c8c 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -322,7 +322,7 @@ class PipelineManager(object): change.commit_needs_changes = dependencies def provisionNodes(self, item): - jobs = item.findJobsToRequest() + jobs = item.findJobsToRequest(item.pipeline.tenant.semaphore_handler) if not jobs: return False build_set = item.current_build_set diff --git a/zuul/model.py b/zuul/model.py index 5de9cd6005..d938b4aa97 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -1112,7 +1112,12 @@ class Job(ConfigObject): d['required_projects'] = [] for project in self.required_projects.values(): d['required_projects'].append(project.toDict()) - d['semaphore'] = self.semaphore + if self.semaphore: + # For now just leave the semaphore name here until we really need + # more information in zuul-web about this + d['semaphore'] = self.semaphore.name + else: + d['semaphore'] = None d['variables'] = self.variables d['final'] = self.final d['abstract'] = self.abstract @@ -1511,6 +1516,21 @@ class JobProject(ConfigObject): return d +class JobSemaphore(ConfigObject): + """ A reference to a semaphore from a job. """ + + def __init__(self, semaphore_name, resources_first=False): + super().__init__() + self.name = semaphore_name + self.resources_first = resources_first + + def toDict(self): + d = dict() + d['name'] = self.name + d['resources_first'] = self.resources_first + return d + + class JobList(ConfigObject): """ A list of jobs in a project's pipeline. """ @@ -2135,13 +2155,13 @@ class QueueItem(object): # The nodes for this job are not ready, skip # it for now. continue - if semaphore_handler.acquire(self, job): + if semaphore_handler.acquire(self, job, False): # If this job needs a semaphore, either acquire it or # make sure that we have it before running the job. torun.append(job) return torun - def findJobsToRequest(self): + def findJobsToRequest(self, semaphore_handler): build_set = self.current_build_set toreq = [] if not self.live: @@ -2177,7 +2197,10 @@ class QueueItem(object): all_parent_jobs_successful = False break if all_parent_jobs_successful: - toreq.append(job) + if semaphore_handler.acquire(self, job, True): + # If this job needs a semaphore, either acquire it or + # make sure that we have it before requesting the nodes. + toreq.append(job) return toreq def setResult(self, build): @@ -3596,11 +3619,34 @@ class SemaphoreHandler(object): def __init__(self): self.semaphores = {} - def acquire(self, item, job): + def acquire(self, item, job, request_resources): + """ + Aquires a semaphore for an item job combination. This gets called twice + during the lifecycle of a job. The first call is before requesting + build resources. The second call is before running the job. In which + call we really acquire the semaphore is defined by the job. + + :param item: The item + :param job: The job + :param request_resources: True if we want to acquire for the request + resources phase, False if we want to acquire + for the run phase. + """ if not job.semaphore: return True - semaphore_key = job.semaphore + if job.semaphore.resources_first and request_resources: + # We're currently in the resource request phase and want to get the + # resources before locking. So we don't need to do anything here. + return True + else: + # As a safety net we want to acuire the semaphore at least in the + # run phase so don't filter this here as re-acuiring the semaphore + # is not a problem here if it has been already acquired before in + # the resources phase. + pass + + semaphore_key = job.semaphore.name m = self.semaphores.get(semaphore_key) if not m: @@ -3612,7 +3658,7 @@ class SemaphoreHandler(object): return True # semaphore is there, check max - if len(m) < self._max_count(item, job.semaphore): + if len(m) < self._max_count(item, job.semaphore.name): self._acquire(semaphore_key, item, job.name) return True @@ -3622,7 +3668,7 @@ class SemaphoreHandler(object): if not job.semaphore: return - semaphore_key = job.semaphore + semaphore_key = job.semaphore.name m = self.semaphores.get(semaphore_key) if not m: