Change mutex to counting semaphore

The mutex in zuul is great but is limited to run one job at the same
time. Some use cases like using a limited number floating licenses in
jobs cannot be handled with this. Thus this changes the mutex
functionality to a counting semaphore (which defaults to 1).

This is a port of Ida589e49bc6694f4ccc4c586e0d43b391b8c3ae4 to zuulv3
branch.

Change-Id: Icf4013a6215e2b10ca8e6309928b9e5881dda02c
This commit is contained in:
Tobias Henkel 2017-03-20 16:16:02 +01:00
parent 77e5e8077d
commit 9a0e194afa
25 changed files with 686 additions and 169 deletions

View File

@ -644,10 +644,12 @@ each job as it builds a list from the project specification.
would largely defeat the parallelization of dependent change testing
that is the main feature of Zuul. Default: ``false``.
**mutex (optional)**
This is a string that names a mutex that should be observed by this
job. Only one build of any job that references the same named mutex
will be enqueued at a time. This applies across all pipelines.
**semaphore (optional)**
This is a string that names a semaphore that should be observed by this
job. The semaphore defines how many jobs which reference that semaphore
can be enqueued at a time. This applies across all pipelines in the same
tenant. The max value of the semaphore can be specified in the config
repositories and defaults to 1.
**branch (optional)**
This job should only be run on matching branches. This field is
@ -850,6 +852,21 @@ template specifies a job that is also specified in another template,
or specified in the project itself, the configuration defined by
either the last template or the project itself will take priority.
Semaphores
""""""""""
When using semaphores the maximum value of each one can be specified in their
respective config repositories. Unspecified semaphores default to 1::
- semaphore:
name: semaphore-foo
max: 5
- semaphore:
name: semaphore-bar
max: 3
logging.conf
~~~~~~~~~~~~
This file is optional. If provided, it should be a standard

View File

@ -6,3 +6,7 @@
tenant-one-gate:
jobs:
- project-test1
- semaphore:
name: test-semaphore
max: 1

View File

@ -0,0 +1,13 @@
- pipeline:
name: check
manager: independent
source: gerrit
trigger:
gerrit:
- event: patchset-created
success:
gerrit:
verified: 1
failure:
gerrit:
verified: -1

View File

@ -0,0 +1 @@
test

View File

@ -0,0 +1 @@
test

View File

@ -0,0 +1,13 @@
- job:
name: project1-test1
semaphore: test-semaphore
- project:
name: org/project1
check:
jobs:
- project1-test1
- semaphore:
name: test-semaphore
max: 1

View File

@ -0,0 +1,13 @@
- job:
name: project2-test1
semaphore: test-semaphore
- project:
name: org/project2
check:
jobs:
- project2-test1
- semaphore:
name: test-semaphore
max: 2

View File

@ -0,0 +1,15 @@
- tenant:
name: tenant-one
source:
gerrit:
config-repos:
- common-config
- tenant-one-config
- tenant:
name: tenant-two
source:
gerrit:
config-repos:
- common-config
- tenant-two-config

View File

@ -1,32 +0,0 @@
- pipeline:
name: check
manager: independent
source: gerrit
trigger:
gerrit:
- event: patchset-created
success:
gerrit:
verified: 1
failure:
gerrit:
verified: -1
- job:
name: project-test1
- job:
name: mutex-one
mutex: test-mutex
- job:
name: mutex-two
mutex: test-mutex
- project:
name: org/project
check:
jobs:
- project-test1
- mutex-one
- mutex-two

View File

@ -0,0 +1,2 @@
- hosts: all
tasks: []

View File

@ -0,0 +1,2 @@
- hosts: all
tasks: []

View File

@ -0,0 +1,2 @@
- hosts: all
tasks: []

View File

@ -0,0 +1,2 @@
- hosts: all
tasks: []

View File

@ -0,0 +1,52 @@
- pipeline:
name: check
manager: independent
source: gerrit
trigger:
gerrit:
- event: patchset-created
success:
gerrit:
verified: 1
failure:
gerrit:
verified: -1
- job:
name: project-test1
- job:
name: semaphore-one-test1
semaphore: test-semaphore
- job:
name: semaphore-one-test2
semaphore: test-semaphore
- job:
name: semaphore-two-test1
semaphore: test-semaphore-two
- job:
name: semaphore-two-test2
semaphore: test-semaphore-two
- project:
name: org/project
check:
jobs:
- project-test1
- semaphore-one-test1
- semaphore-one-test2
- project:
name: org/project1
check:
jobs:
- project-test1
- semaphore-two-test1
- semaphore-two-test2
- semaphore:
name: test-semaphore-two
max: 2

View File

@ -1,23 +0,0 @@
pipelines:
- name: check
manager: IndependentPipelineManager
trigger:
gerrit:
- event: patchset-created
success:
gerrit:
verified: 1
failure:
gerrit:
verified: -1
jobs:
- name: mutex-one
mutex: test-mutex
- name: mutex-two
mutex: test-mutex
projects:
- name: org/project
check:
- project-test1

View File

@ -15,6 +15,8 @@
# under the License.
import json
import textwrap
import os
import re
import shutil
@ -2177,58 +2179,68 @@ class TestScheduler(ZuulTestCase):
self.assertEqual('https://server/job/project-test2/0/',
status_jobs[2]['report_url'])
def test_mutex(self):
"Test job mutexes"
self.updateConfigLayout('layout-mutex')
def test_semaphore_one(self):
"Test semaphores with max=1 (mutex)"
self.updateConfigLayout('layout-semaphore')
self.sched.reconfigure(self.config)
self.waitUntilSettled()
tenant = self.sched.abide.tenants.get('openstack')
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
self.assertFalse('test-mutex' in self.sched.mutex.mutexes)
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(len(self.builds), 3)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'mutex-one')
self.assertEqual(self.builds[1].name, 'semaphore-one-test1')
self.assertEqual(self.builds[2].name, 'project-test1')
self.executor_server.release('mutex-one')
self.executor_server.release('semaphore-one-test1')
self.waitUntilSettled()
self.assertEqual(len(self.builds), 3)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test1')
self.assertEqual(self.builds[2].name, 'mutex-two')
self.assertTrue('test-mutex' in self.sched.mutex.mutexes)
self.assertEqual(self.builds[2].name, 'semaphore-one-test2')
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.executor_server.release('mutex-two')
self.executor_server.release('semaphore-one-test2')
self.waitUntilSettled()
self.assertEqual(len(self.builds), 3)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test1')
self.assertEqual(self.builds[2].name, 'mutex-one')
self.assertTrue('test-mutex' in self.sched.mutex.mutexes)
self.assertEqual(self.builds[2].name, 'semaphore-one-test1')
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.executor_server.release('mutex-one')
self.executor_server.release('semaphore-one-test1')
self.waitUntilSettled()
self.assertEqual(len(self.builds), 3)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test1')
self.assertEqual(self.builds[2].name, 'mutex-two')
self.assertTrue('test-mutex' in self.sched.mutex.mutexes)
self.assertEqual(self.builds[2].name, 'semaphore-one-test2')
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.executor_server.release('mutex-two')
self.executor_server.release('semaphore-one-test2')
self.waitUntilSettled()
self.assertEqual(len(self.builds), 2)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test1')
self.assertFalse('test-mutex' in self.sched.mutex.mutexes)
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
@ -2238,25 +2250,115 @@ class TestScheduler(ZuulTestCase):
self.assertEqual(A.reported, 1)
self.assertEqual(B.reported, 1)
self.assertFalse('test-mutex' in self.sched.mutex.mutexes)
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
def test_mutex_abandon(self):
"Test abandon with job mutexes"
self.updateConfigLayout('layout-mutex')
def test_semaphore_two(self):
"Test semaphores with max>1"
self.updateConfigLayout('layout-semaphore')
self.sched.reconfigure(self.config)
self.waitUntilSettled()
tenant = self.sched.abide.tenants.get('openstack')
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
self.assertFalse('test-semaphore-two' in
tenant.semaphore_handler.semaphores)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(len(self.builds), 4)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'semaphore-two-test1')
self.assertEqual(self.builds[2].name, 'semaphore-two-test2')
self.assertEqual(self.builds[3].name, 'project-test1')
self.assertTrue('test-semaphore-two' in
tenant.semaphore_handler.semaphores)
self.assertEqual(len(tenant.semaphore_handler.semaphores.get(
'test-semaphore-two', [])), 2)
self.executor_server.release('semaphore-two-test1')
self.waitUntilSettled()
self.assertEqual(len(self.builds), 4)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'semaphore-two-test2')
self.assertEqual(self.builds[2].name, 'project-test1')
self.assertEqual(self.builds[3].name, 'semaphore-two-test1')
self.assertTrue('test-semaphore-two' in
tenant.semaphore_handler.semaphores)
self.assertEqual(len(tenant.semaphore_handler.semaphores.get(
'test-semaphore-two', [])), 2)
self.executor_server.release('semaphore-two-test2')
self.waitUntilSettled()
self.assertEqual(len(self.builds), 4)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test1')
self.assertEqual(self.builds[2].name, 'semaphore-two-test1')
self.assertEqual(self.builds[3].name, 'semaphore-two-test2')
self.assertTrue('test-semaphore-two' in
tenant.semaphore_handler.semaphores)
self.assertEqual(len(tenant.semaphore_handler.semaphores.get(
'test-semaphore-two', [])), 2)
self.executor_server.release('semaphore-two-test1')
self.waitUntilSettled()
self.assertEqual(len(self.builds), 3)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test1')
self.assertEqual(self.builds[2].name, 'semaphore-two-test2')
self.assertTrue('test-semaphore-two' in
tenant.semaphore_handler.semaphores)
self.assertEqual(len(tenant.semaphore_handler.semaphores.get(
'test-semaphore-two', [])), 1)
self.executor_server.release('semaphore-two-test2')
self.waitUntilSettled()
self.assertEqual(len(self.builds), 2)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test1')
self.assertFalse('test-semaphore-two' in
tenant.semaphore_handler.semaphores)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.assertEqual(len(self.builds), 0)
self.assertEqual(A.reported, 1)
self.assertEqual(B.reported, 1)
def test_semaphore_abandon(self):
"Test abandon with job semaphores"
self.updateConfigLayout('layout-semaphore')
self.sched.reconfigure(self.config)
self.waitUntilSettled()
tenant = self.sched.abide.tenants.get('openstack')
self.executor_server.hold_jobs_in_build = True
tenant = self.sched.abide.tenants.get('openstack')
check_pipeline = tenant.layout.pipelines['check']
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.assertFalse('test-mutex' in self.sched.mutex.mutexes)
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertTrue('test-mutex' in self.sched.mutex.mutexes)
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
self.waitUntilSettled()
@ -2265,31 +2367,47 @@ class TestScheduler(ZuulTestCase):
items = check_pipeline.getAllItems()
self.assertEqual(len(items), 0)
# The mutex should be released
self.assertFalse('test-mutex' in self.sched.mutex.mutexes)
# The semaphore should be released
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
def test_mutex_reconfigure(self):
"Test reconfigure with job mutexes"
self.updateConfigLayout('layout-mutex')
def test_semaphore_reconfigure(self):
"Test reconfigure with job semaphores"
self.updateConfigLayout('layout-semaphore')
self.sched.reconfigure(self.config)
self.waitUntilSettled()
tenant = self.sched.abide.tenants.get('openstack')
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.assertFalse('test-mutex' in self.sched.mutex.mutexes)
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertTrue('test-mutex' in self.sched.mutex.mutexes)
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.updateConfigLayout('layout-mutex-reconfiguration')
# reconfigure without layout change
self.sched.reconfigure(self.config)
self.waitUntilSettled()
tenant = self.sched.abide.tenants.get('openstack')
# semaphore still must be held
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.updateConfigLayout('layout-semaphore-reconfiguration')
self.sched.reconfigure(self.config)
self.waitUntilSettled()
tenant = self.sched.abide.tenants.get('openstack')
self.executor_server.release('project-test1')
self.waitUntilSettled()
@ -2297,8 +2415,9 @@ class TestScheduler(ZuulTestCase):
# There should be no builds anymore
self.assertEqual(len(self.builds), 0)
# The mutex should be released
self.assertFalse('test-mutex' in self.sched.mutex.mutexes)
# The semaphore should be released
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
def test_live_reconfiguration(self):
"Test that live reconfiguration works"
@ -4903,3 +5022,239 @@ class TestSchedulerMerges(ZuulTestCase):
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
class TestSemaphoreMultiTenant(ZuulTestCase):
tenant_config_file = 'config/multi-tenant-semaphore/main.yaml'
def test_semaphore_tenant_isolation(self):
"Test semaphores in multiple tenants"
self.waitUntilSettled()
tenant_one = self.sched.abide.tenants.get('tenant-one')
tenant_two = self.sched.abide.tenants.get('tenant-two')
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
C = self.fake_gerrit.addFakeChange('org/project2', 'master', 'C')
D = self.fake_gerrit.addFakeChange('org/project2', 'master', 'D')
E = self.fake_gerrit.addFakeChange('org/project2', 'master', 'E')
self.assertFalse('test-semaphore' in
tenant_one.semaphore_handler.semaphores)
self.assertFalse('test-semaphore' in
tenant_two.semaphore_handler.semaphores)
# add patches to project1 of tenant-one
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
# one build of project1-test1 must run
# semaphore of tenant-one must be acquired once
# semaphore of tenant-two must not be acquired
self.assertEqual(len(self.builds), 1)
self.assertEqual(self.builds[0].name, 'project1-test1')
self.assertTrue('test-semaphore' in
tenant_one.semaphore_handler.semaphores)
self.assertEqual(len(tenant_one.semaphore_handler.semaphores.get(
'test-semaphore', [])), 1)
self.assertFalse('test-semaphore' in
tenant_two.semaphore_handler.semaphores)
# add patches to project2 of tenant-two
self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(D.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(E.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
# one build of project1-test1 must run
# two builds of project2-test1 must run
# semaphore of tenant-one must be acquired once
# semaphore of tenant-two must be acquired twice
self.assertEqual(len(self.builds), 3)
self.assertEqual(self.builds[0].name, 'project1-test1')
self.assertEqual(self.builds[1].name, 'project2-test1')
self.assertEqual(self.builds[2].name, 'project2-test1')
self.assertTrue('test-semaphore' in
tenant_one.semaphore_handler.semaphores)
self.assertEqual(len(tenant_one.semaphore_handler.semaphores.get(
'test-semaphore', [])), 1)
self.assertTrue('test-semaphore' in
tenant_two.semaphore_handler.semaphores)
self.assertEqual(len(tenant_two.semaphore_handler.semaphores.get(
'test-semaphore', [])), 2)
self.executor_server.release('project1-test1')
self.waitUntilSettled()
# one build of project1-test1 must run
# two builds of project2-test1 must run
# semaphore of tenant-one must be acquired once
# semaphore of tenant-two must be acquired twice
self.assertEqual(len(self.builds), 3)
self.assertEqual(self.builds[0].name, 'project2-test1')
self.assertEqual(self.builds[1].name, 'project2-test1')
self.assertEqual(self.builds[2].name, 'project1-test1')
self.assertTrue('test-semaphore' in
tenant_one.semaphore_handler.semaphores)
self.assertEqual(len(tenant_one.semaphore_handler.semaphores.get(
'test-semaphore', [])), 1)
self.assertTrue('test-semaphore' in
tenant_two.semaphore_handler.semaphores)
self.assertEqual(len(tenant_two.semaphore_handler.semaphores.get(
'test-semaphore', [])), 2)
self.executor_server.release('project2-test1')
self.waitUntilSettled()
# one build of project1-test1 must run
# one build of project2-test1 must run
# semaphore of tenant-one must be acquired once
# semaphore of tenant-two must be acquired once
self.assertEqual(len(self.builds), 2)
self.assertTrue('test-semaphore' in
tenant_one.semaphore_handler.semaphores)
self.assertEqual(len(tenant_one.semaphore_handler.semaphores.get(
'test-semaphore', [])), 1)
self.assertTrue('test-semaphore' in
tenant_two.semaphore_handler.semaphores)
self.assertEqual(len(tenant_two.semaphore_handler.semaphores.get(
'test-semaphore', [])), 1)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
# no build must run
# semaphore of tenant-one must not be acquired
# semaphore of tenant-two must not be acquired
self.assertEqual(len(self.builds), 0)
self.assertFalse('test-semaphore' in
tenant_one.semaphore_handler.semaphores)
self.assertFalse('test-semaphore' in
tenant_two.semaphore_handler.semaphores)
self.assertEqual(A.reported, 1)
self.assertEqual(B.reported, 1)
class TestSemaphoreInRepo(ZuulTestCase):
tenant_config_file = 'config/in-repo/main.yaml'
def test_semaphore_in_repo(self):
"Test semaphores in repo config"
# This tests dynamic semaphore handling in project repos. The semaphore
# max value should not be evaluated dynamically but must be updated
# after the change lands.
self.waitUntilSettled()
tenant = self.sched.abide.tenants.get('tenant-one')
in_repo_conf = textwrap.dedent(
"""
- job:
name: project-test2
semaphore: test-semaphore
- project:
name: org/project
tenant-one-gate:
jobs:
- project-test2
# the max value in dynamic layout must be ignored
- semaphore:
name: test-semaphore
max: 2
""")
in_repo_playbook = textwrap.dedent(
"""
- hosts: all
tasks: []
""")
file_dict = {'.zuul.yaml': in_repo_conf,
'playbooks/project-test2.yaml': in_repo_playbook}
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
files=file_dict)
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
B.setDependsOn(A, 1)
C.setDependsOn(A, 1)
self.executor_server.hold_jobs_in_build = True
A.addApproval('code-review', 2)
B.addApproval('code-review', 2)
C.addApproval('code-review', 2)
self.fake_gerrit.addEvent(A.addApproval('approved', 1))
self.fake_gerrit.addEvent(B.addApproval('approved', 1))
self.fake_gerrit.addEvent(C.addApproval('approved', 1))
self.waitUntilSettled()
# check that the layout in a queue item still has max value of 1
# for test-semaphore
pipeline = tenant.layout.pipelines.get('tenant-one-gate')
queue = None
for queue_candidate in pipeline.queues:
if queue_candidate.name == 'org/project':
queue = queue_candidate
break
queue_item = queue.queue[0]
item_dynamic_layout = queue_item.current_build_set.layout
dynamic_test_semaphore = \
item_dynamic_layout.semaphores.get('test-semaphore')
self.assertEqual(dynamic_test_semaphore.max, 1)
# one build must be in queue, one semaphores acquired
self.assertEqual(len(self.builds), 1)
self.assertEqual(self.builds[0].name, 'project-test2')
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(len(tenant.semaphore_handler.semaphores.get(
'test-semaphore', [])), 1)
self.executor_server.release('project-test2')
self.waitUntilSettled()
# change A must be merged
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(A.reported, 2)
# send change-merged event as the gerrit mock doesn't send it
self.fake_gerrit.addEvent(A.getChangeMergedEvent())
self.waitUntilSettled()
# now that change A was merged, the new semaphore max must be effective
tenant = self.sched.abide.tenants.get('tenant-one')
self.assertEqual(tenant.layout.semaphores.get('test-semaphore').max, 2)
# two builds must be in queue, two semaphores acquired
self.assertEqual(len(self.builds), 2)
self.assertEqual(self.builds[0].name, 'project-test2')
self.assertEqual(self.builds[1].name, 'project-test2')
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(len(tenant.semaphore_handler.semaphores.get(
'test-semaphore', [])), 2)
self.executor_server.release('project-test2')
self.waitUntilSettled()
self.assertEqual(len(self.builds), 0)
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
self.waitUntilSettled()
self.assertEqual(len(self.builds), 0)
self.assertEqual(A.reported, 2)
self.assertEqual(B.reported, 2)
self.assertEqual(C.reported, 2)

View File

@ -86,7 +86,8 @@ def configuration_exceptions(stanza, conf):
class ZuulSafeLoader(yaml.SafeLoader):
zuul_node_types = frozenset(('job', 'nodeset', 'secret', 'pipeline',
'project', 'project-template'))
'project', 'project-template',
'semaphore'))
def __init__(self, stream, context):
super(ZuulSafeLoader, self).__init__(stream)
@ -222,7 +223,7 @@ class JobParser(object):
'success-url': str,
'hold-following-changes': bool,
'voting': bool,
'mutex': str,
'semaphore': str,
'tags': to_list(str),
'branches': to_list(str),
'files': to_list(str),
@ -250,7 +251,7 @@ class JobParser(object):
'workspace',
'voting',
'hold-following-changes',
'mutex',
'semaphore',
'attempts',
'failure-message',
'success-message',
@ -720,6 +721,25 @@ class PipelineParser(object):
return pipeline
class SemaphoreParser(object):
@staticmethod
def getSchema():
semaphore = {vs.Required('name'): str,
'max': int,
'_source_context': model.SourceContext,
'_start_mark': yaml.Mark,
}
return vs.Schema(semaphore)
@staticmethod
def fromYaml(conf):
SemaphoreParser.getSchema()(conf)
semaphore = model.Semaphore(conf['name'], conf.get('max', 1))
semaphore.source_context = conf.get('_source_context')
return semaphore
class TenantParser(object):
log = logging.getLogger("zuul.TenantParser")
@ -966,6 +986,9 @@ class TenantParser(object):
for config_job in data.jobs:
layout.addJob(JobParser.fromYaml(tenant, layout, config_job))
for config_semaphore in data.semaphores:
layout.addSemaphore(SemaphoreParser.fromYaml(config_semaphore))
for config_template in data.project_templates:
layout.addProjectTemplate(ProjectTemplateParser.fromYaml(
tenant, layout, config_template))
@ -1072,6 +1095,12 @@ class ConfigLoader(object):
# or deleting pipelines in dynamic layout changes.
layout.pipelines = tenant.layout.pipelines
# NOTE: the semaphore definitions are copied from the static layout
# here. For semaphores there should be no per patch max value but
# exactly one value at any time. So we do not support dynamic semaphore
# configuration changes.
layout.semaphores = tenant.layout.semaphores
for config_job in config.jobs:
layout.addJob(JobParser.fromYaml(tenant, layout, config_job))

View File

@ -81,8 +81,8 @@ class PipelineManager(object):
tags.append('[hold]')
if not variant.voting:
tags.append('[nonvoting]')
if variant.mutex:
tags.append('[mutex: %s]' % variant.mutex)
if variant.semaphore:
tags.append('[semaphore: %s]' % variant.semaphore)
tags = ' '.join(tags)
self.log.info(" %s%s %s" % (repr(variant),
efilters, tags))
@ -386,7 +386,8 @@ class PipelineManager(object):
if not item.current_build_set.layout:
return False
jobs = item.findJobsToRun(self.sched.mutex)
jobs = item.findJobsToRun(
item.pipeline.layout.tenant.semaphore_handler)
if jobs:
self._executeJobs(item, jobs)
@ -411,7 +412,8 @@ class PipelineManager(object):
self.log.exception("Exception while canceling build %s "
"for change %s" % (build, item.change))
finally:
self.sched.mutex.release(build.build_set.item, build.job)
old_build_set.layout.tenant.semaphore_handler.release(
old_build_set.item, build.job)
if not was_running:
try:
@ -663,7 +665,7 @@ class PipelineManager(object):
item = build.build_set.item
item.setResult(build)
self.sched.mutex.release(item, build.job)
item.pipeline.layout.tenant.semaphore_handler.release(item, build.job)
self.log.debug("Item %s status is now:\n %s" %
(item, item.formatStatus()))

View File

@ -14,6 +14,8 @@
import abc
import copy
import logging
import os
import re
import struct
@ -760,7 +762,7 @@ class Job(object):
post_run=(),
run=(),
implied_run=(),
mutex=None,
semaphore=None,
attempts=3,
final=False,
roles=frozenset(),
@ -1369,7 +1371,7 @@ class QueueItem(object):
return False
return self.item_ahead.isHoldingFollowingChanges()
def findJobsToRun(self, mutex):
def findJobsToRun(self, semaphore_handler):
torun = []
if not self.live:
return []
@ -1408,9 +1410,9 @@ class QueueItem(object):
# The nodes for this job are not ready, skip
# it for now.
continue
if mutex.acquire(self, job):
# If this job needs a mutex, either acquire it or make
# sure that we have it before running the job.
if semaphore_handler.acquire(self, job):
# If this job needs a semaphore, either acquire it or
# make sure that we have it before running the job.
torun.append(job)
return torun
@ -2174,6 +2176,7 @@ class UnparsedTenantConfig(object):
self.projects = {}
self.nodesets = []
self.secrets = []
self.semaphores = []
def copy(self):
r = UnparsedTenantConfig()
@ -2183,6 +2186,7 @@ class UnparsedTenantConfig(object):
r.projects = copy.deepcopy(self.projects)
r.nodesets = copy.deepcopy(self.nodesets)
r.secrets = copy.deepcopy(self.secrets)
r.semaphores = copy.deepcopy(self.semaphores)
return r
def extend(self, conf):
@ -2194,6 +2198,7 @@ class UnparsedTenantConfig(object):
self.projects.setdefault(k, []).extend(v)
self.nodesets.extend(conf.nodesets)
self.secrets.extend(conf.secrets)
self.semaphores.extend(conf.semaphores)
return
if not isinstance(conf, list):
@ -2224,6 +2229,8 @@ class UnparsedTenantConfig(object):
self.nodesets.append(value)
elif key == 'secret':
self.secrets.append(value)
elif key == 'semaphore':
self.semaphores.append(value)
else:
raise Exception("Configuration item `%s` not recognized "
"(when parsing %s)" %
@ -2247,6 +2254,7 @@ class Layout(object):
self.jobs = {'noop': [Job('noop')]}
self.nodesets = {}
self.secrets = {}
self.semaphores = {}
def getJob(self, name):
if name in self.jobs:
@ -2285,6 +2293,11 @@ class Layout(object):
raise Exception("Secret %s already defined" % (secret.name,))
self.secrets[secret.name] = secret
def addSemaphore(self, semaphore):
if semaphore.name in self.semaphores:
raise Exception("Semaphore %s already defined" % (semaphore.name,))
self.semaphores[semaphore.name] = semaphore
def addPipeline(self, pipeline):
self.pipelines[pipeline.name] = pipeline
@ -2355,6 +2368,95 @@ class Layout(object):
return ret
class Semaphore(object):
def __init__(self, name, max=1):
self.name = name
self.max = int(max)
class SemaphoreHandler(object):
log = logging.getLogger("zuul.SemaphoreHandler")
def __init__(self):
self.semaphores = {}
def acquire(self, item, job):
if not job.semaphore:
return True
semaphore_key = job.semaphore
m = self.semaphores.get(semaphore_key)
if not m:
# The semaphore is not held, acquire it
self._acquire(semaphore_key, item, job.name)
return True
if (item, job.name) in m:
# This item already holds the semaphore
return True
# semaphore is there, check max
if len(m) < self._max_count(item, job.semaphore):
self._acquire(semaphore_key, item, job.name)
return True
return False
def release(self, item, job):
if not job.semaphore:
return
semaphore_key = job.semaphore
m = self.semaphores.get(semaphore_key)
if not m:
# The semaphore is not held, nothing to do
self.log.error("Semaphore can not be released for %s "
"because the semaphore is not held" %
item)
return
if (item, job.name) in m:
# This item is a holder of the semaphore
self._release(semaphore_key, item, job.name)
return
self.log.error("Semaphore can not be released for %s "
"which does not hold it" % item)
def _acquire(self, semaphore_key, item, job_name):
self.log.debug("Semaphore acquire {semaphore}: job {job}, item {item}"
.format(semaphore=semaphore_key,
job=job_name,
item=item))
if semaphore_key not in self.semaphores:
self.semaphores[semaphore_key] = []
self.semaphores[semaphore_key].append((item, job_name))
def _release(self, semaphore_key, item, job_name):
self.log.debug("Semaphore release {semaphore}: job {job}, item {item}"
.format(semaphore=semaphore_key,
job=job_name,
item=item))
sem_item = (item, job_name)
if sem_item in self.semaphores[semaphore_key]:
self.semaphores[semaphore_key].remove(sem_item)
# cleanup if there is no user of the semaphore anymore
if len(self.semaphores[semaphore_key]) == 0:
del self.semaphores[semaphore_key]
@staticmethod
def _max_count(item, semaphore_name):
if not item.current_build_set.layout:
# This should not occur as the layout of the item must already be
# built when acquiring or releasing a semaphore for a job.
raise Exception("Item {} has no layout".format(item))
# find the right semaphore
default_semaphore = Semaphore(semaphore_name, 1)
semaphores = item.current_build_set.layout.semaphores
return semaphores.get(semaphore_name, default_semaphore).max
class Tenant(object):
def __init__(self, name):
self.name = name
@ -2375,6 +2477,8 @@ class Tenant(object):
# A mapping of source -> {config_repos: {}, project_repos: {}}
self.sources = {}
self.semaphore_handler = SemaphoreHandler()
def addConfigRepo(self, source, project):
sd = self.sources.setdefault(source.name,
{'config_repos': {},

View File

@ -33,68 +33,6 @@ from zuul import exceptions
from zuul import version as zuul_version
class MutexHandler(object):
log = logging.getLogger("zuul.MutexHandler")
def __init__(self):
self.mutexes = {}
def acquire(self, item, job):
if not job.mutex:
return True
mutex_name = job.mutex
m = self.mutexes.get(mutex_name)
if not m:
# The mutex is not held, acquire it
self._acquire(mutex_name, item, job.name)
return True
held_item, held_job_name = m
if held_item is item and held_job_name == job.name:
# This item already holds the mutex
return True
held_build = held_item.current_build_set.getBuild(held_job_name)
if held_build and held_build.result:
# The build that held the mutex is complete, release it
# and let the new item have it.
self.log.error("Held mutex %s being released because "
"the build that holds it is complete" %
(mutex_name,))
self._release(mutex_name, item, job.name)
self._acquire(mutex_name, item, job.name)
return True
return False
def release(self, item, job):
if not job.mutex:
return
mutex_name = job.mutex
m = self.mutexes.get(mutex_name)
if not m:
# The mutex is not held, nothing to do
self.log.error("Mutex can not be released for %s "
"because the mutex is not held" %
(item,))
return
held_item, held_job_name = m
if held_item is item and held_job_name == job.name:
# This item holds the mutex
self._release(mutex_name, item, job.name)
return
self.log.error("Mutex can not be released for %s "
"which does not hold it" %
(item,))
def _acquire(self, mutex_name, item, job_name):
self.log.debug("Job %s of item %s acquiring mutex %s" %
(job_name, item, mutex_name))
self.mutexes[mutex_name] = (item, job_name)
def _release(self, mutex_name, item, job_name):
self.log.debug("Job %s of item %s releasing mutex %s" %
(job_name, item, mutex_name))
del self.mutexes[mutex_name]
class ManagementEvent(object):
"""An event that should be processed within the main queue run loop"""
def __init__(self):
@ -269,7 +207,6 @@ class Scheduler(threading.Thread):
self.connections = None
self.statsd = extras.try_import('statsd.statsd')
# TODO(jeblair): fix this
self.mutex = MutexHandler()
# Despite triggers being part of the pipeline, there is one trigger set
# per scheduler. The pipeline handles the trigger filters but since
# the events are handled by the scheduler itself it needs to handle
@ -593,19 +530,27 @@ class Scheduler(threading.Thread):
except Exception:
self.log.exception(
"Exception while canceling build %s "
"for change %s" % (build, item.change))
"for change %s" % (build, build.build_set.item.change))
finally:
self.mutex.release(build.build_set.item, build.job)
tenant.semaphore_handler.release(
build.build_set.item, build.job)
def _reconfigureTenant(self, tenant):
# This is called from _doReconfigureEvent while holding the
# layout lock
old_tenant = self.abide.tenants.get(tenant.name)
if old_tenant:
# Copy over semaphore handler so we don't loose the currently
# held semaphores.
tenant.semaphore_handler = old_tenant.semaphore_handler
self._reenqueueTenant(old_tenant, tenant)
# TODOv3(jeblair): update for tenants
# self.maintainConnectionCache()
self.connections.reconfigureDrivers(tenant)
# TODOv3(jeblair): remove postconfig calls?
for pipeline in tenant.layout.pipelines.values():
pipeline.source.postConfig()