Merge "Change mutex to counting semaphore" into feature/zuulv3
This commit is contained in:
commit
1376e05c32
|
@ -637,10 +637,12 @@ each job as it builds a list from the project specification.
|
|||
would largely defeat the parallelization of dependent change testing
|
||||
that is the main feature of Zuul. Default: ``false``.
|
||||
|
||||
**mutex (optional)**
|
||||
This is a string that names a mutex that should be observed by this
|
||||
job. Only one build of any job that references the same named mutex
|
||||
will be enqueued at a time. This applies across all pipelines.
|
||||
**semaphore (optional)**
|
||||
This is a string that names a semaphore that should be observed by this
|
||||
job. The semaphore defines how many jobs which reference that semaphore
|
||||
can be enqueued at a time. This applies across all pipelines in the same
|
||||
tenant. The max value of the semaphore can be specified in the config
|
||||
repositories and defaults to 1.
|
||||
|
||||
**branch (optional)**
|
||||
This job should only be run on matching branches. This field is
|
||||
|
@ -843,6 +845,21 @@ template specifies a job that is also specified in another template,
|
|||
or specified in the project itself, the configuration defined by
|
||||
either the last template or the project itself will take priority.
|
||||
|
||||
|
||||
Semaphores
|
||||
""""""""""
|
||||
|
||||
When using semaphores the maximum value of each one can be specified in their
|
||||
respective config repositories. Unspecified semaphores default to 1::
|
||||
|
||||
- semaphore:
|
||||
name: semaphore-foo
|
||||
max: 5
|
||||
- semaphore:
|
||||
name: semaphore-bar
|
||||
max: 3
|
||||
|
||||
|
||||
logging.conf
|
||||
~~~~~~~~~~~~
|
||||
This file is optional. If provided, it should be a standard
|
||||
|
|
|
@ -6,3 +6,7 @@
|
|||
tenant-one-gate:
|
||||
jobs:
|
||||
- project-test1
|
||||
|
||||
- semaphore:
|
||||
name: test-semaphore
|
||||
max: 1
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
- pipeline:
|
||||
name: check
|
||||
manager: independent
|
||||
source: gerrit
|
||||
trigger:
|
||||
gerrit:
|
||||
- event: patchset-created
|
||||
success:
|
||||
gerrit:
|
||||
verified: 1
|
||||
failure:
|
||||
gerrit:
|
||||
verified: -1
|
|
@ -0,0 +1 @@
|
|||
test
|
|
@ -0,0 +1 @@
|
|||
test
|
13
tests/fixtures/config/multi-tenant-semaphore/git/tenant-one-config/zuul.yaml
vendored
Normal file
13
tests/fixtures/config/multi-tenant-semaphore/git/tenant-one-config/zuul.yaml
vendored
Normal file
|
@ -0,0 +1,13 @@
|
|||
- job:
|
||||
name: project1-test1
|
||||
semaphore: test-semaphore
|
||||
|
||||
- project:
|
||||
name: org/project1
|
||||
check:
|
||||
jobs:
|
||||
- project1-test1
|
||||
|
||||
- semaphore:
|
||||
name: test-semaphore
|
||||
max: 1
|
13
tests/fixtures/config/multi-tenant-semaphore/git/tenant-two-config/zuul.yaml
vendored
Normal file
13
tests/fixtures/config/multi-tenant-semaphore/git/tenant-two-config/zuul.yaml
vendored
Normal file
|
@ -0,0 +1,13 @@
|
|||
- job:
|
||||
name: project2-test1
|
||||
semaphore: test-semaphore
|
||||
|
||||
- project:
|
||||
name: org/project2
|
||||
check:
|
||||
jobs:
|
||||
- project2-test1
|
||||
|
||||
- semaphore:
|
||||
name: test-semaphore
|
||||
max: 2
|
|
@ -0,0 +1,15 @@
|
|||
- tenant:
|
||||
name: tenant-one
|
||||
source:
|
||||
gerrit:
|
||||
config-repos:
|
||||
- common-config
|
||||
- tenant-one-config
|
||||
|
||||
- tenant:
|
||||
name: tenant-two
|
||||
source:
|
||||
gerrit:
|
||||
config-repos:
|
||||
- common-config
|
||||
- tenant-two-config
|
|
@ -1,32 +0,0 @@
|
|||
- pipeline:
|
||||
name: check
|
||||
manager: independent
|
||||
source: gerrit
|
||||
trigger:
|
||||
gerrit:
|
||||
- event: patchset-created
|
||||
success:
|
||||
gerrit:
|
||||
verified: 1
|
||||
failure:
|
||||
gerrit:
|
||||
verified: -1
|
||||
|
||||
- job:
|
||||
name: project-test1
|
||||
|
||||
- job:
|
||||
name: mutex-one
|
||||
mutex: test-mutex
|
||||
|
||||
- job:
|
||||
name: mutex-two
|
||||
mutex: test-mutex
|
||||
|
||||
- project:
|
||||
name: org/project
|
||||
check:
|
||||
jobs:
|
||||
- project-test1
|
||||
- mutex-one
|
||||
- mutex-two
|
2
tests/fixtures/config/single-tenant/git/layout-semaphore/playbooks/semaphore-one-test1.yaml
vendored
Normal file
2
tests/fixtures/config/single-tenant/git/layout-semaphore/playbooks/semaphore-one-test1.yaml
vendored
Normal file
|
@ -0,0 +1,2 @@
|
|||
- hosts: all
|
||||
tasks: []
|
2
tests/fixtures/config/single-tenant/git/layout-semaphore/playbooks/semaphore-one-test2.yaml
vendored
Normal file
2
tests/fixtures/config/single-tenant/git/layout-semaphore/playbooks/semaphore-one-test2.yaml
vendored
Normal file
|
@ -0,0 +1,2 @@
|
|||
- hosts: all
|
||||
tasks: []
|
2
tests/fixtures/config/single-tenant/git/layout-semaphore/playbooks/semaphore-two-test1.yaml
vendored
Normal file
2
tests/fixtures/config/single-tenant/git/layout-semaphore/playbooks/semaphore-two-test1.yaml
vendored
Normal file
|
@ -0,0 +1,2 @@
|
|||
- hosts: all
|
||||
tasks: []
|
2
tests/fixtures/config/single-tenant/git/layout-semaphore/playbooks/semaphore-two-test2.yaml
vendored
Normal file
2
tests/fixtures/config/single-tenant/git/layout-semaphore/playbooks/semaphore-two-test2.yaml
vendored
Normal file
|
@ -0,0 +1,2 @@
|
|||
- hosts: all
|
||||
tasks: []
|
|
@ -0,0 +1,52 @@
|
|||
- pipeline:
|
||||
name: check
|
||||
manager: independent
|
||||
source: gerrit
|
||||
trigger:
|
||||
gerrit:
|
||||
- event: patchset-created
|
||||
success:
|
||||
gerrit:
|
||||
verified: 1
|
||||
failure:
|
||||
gerrit:
|
||||
verified: -1
|
||||
|
||||
- job:
|
||||
name: project-test1
|
||||
|
||||
- job:
|
||||
name: semaphore-one-test1
|
||||
semaphore: test-semaphore
|
||||
|
||||
- job:
|
||||
name: semaphore-one-test2
|
||||
semaphore: test-semaphore
|
||||
|
||||
- job:
|
||||
name: semaphore-two-test1
|
||||
semaphore: test-semaphore-two
|
||||
|
||||
- job:
|
||||
name: semaphore-two-test2
|
||||
semaphore: test-semaphore-two
|
||||
|
||||
- project:
|
||||
name: org/project
|
||||
check:
|
||||
jobs:
|
||||
- project-test1
|
||||
- semaphore-one-test1
|
||||
- semaphore-one-test2
|
||||
|
||||
- project:
|
||||
name: org/project1
|
||||
check:
|
||||
jobs:
|
||||
- project-test1
|
||||
- semaphore-two-test1
|
||||
- semaphore-two-test2
|
||||
|
||||
- semaphore:
|
||||
name: test-semaphore-two
|
||||
max: 2
|
|
@ -1,23 +0,0 @@
|
|||
pipelines:
|
||||
- name: check
|
||||
manager: IndependentPipelineManager
|
||||
trigger:
|
||||
gerrit:
|
||||
- event: patchset-created
|
||||
success:
|
||||
gerrit:
|
||||
verified: 1
|
||||
failure:
|
||||
gerrit:
|
||||
verified: -1
|
||||
|
||||
jobs:
|
||||
- name: mutex-one
|
||||
mutex: test-mutex
|
||||
- name: mutex-two
|
||||
mutex: test-mutex
|
||||
|
||||
projects:
|
||||
- name: org/project
|
||||
check:
|
||||
- project-test1
|
|
@ -15,6 +15,8 @@
|
|||
# under the License.
|
||||
|
||||
import json
|
||||
import textwrap
|
||||
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
|
@ -2177,58 +2179,68 @@ class TestScheduler(ZuulTestCase):
|
|||
self.assertEqual('https://server/job/project-test2/0/',
|
||||
status_jobs[2]['report_url'])
|
||||
|
||||
def test_mutex(self):
|
||||
"Test job mutexes"
|
||||
self.updateConfigLayout('layout-mutex')
|
||||
def test_semaphore_one(self):
|
||||
"Test semaphores with max=1 (mutex)"
|
||||
self.updateConfigLayout('layout-semaphore')
|
||||
self.sched.reconfigure(self.config)
|
||||
|
||||
self.waitUntilSettled()
|
||||
tenant = self.sched.abide.tenants.get('openstack')
|
||||
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
|
||||
self.assertFalse('test-mutex' in self.sched.mutex.mutexes)
|
||||
self.assertFalse('test-semaphore' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(len(self.builds), 3)
|
||||
self.assertEqual(self.builds[0].name, 'project-test1')
|
||||
self.assertEqual(self.builds[1].name, 'mutex-one')
|
||||
self.assertEqual(self.builds[1].name, 'semaphore-one-test1')
|
||||
self.assertEqual(self.builds[2].name, 'project-test1')
|
||||
|
||||
self.executor_server.release('mutex-one')
|
||||
self.executor_server.release('semaphore-one-test1')
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(len(self.builds), 3)
|
||||
self.assertEqual(self.builds[0].name, 'project-test1')
|
||||
self.assertEqual(self.builds[1].name, 'project-test1')
|
||||
self.assertEqual(self.builds[2].name, 'mutex-two')
|
||||
self.assertTrue('test-mutex' in self.sched.mutex.mutexes)
|
||||
self.assertEqual(self.builds[2].name, 'semaphore-one-test2')
|
||||
self.assertTrue('test-semaphore' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
|
||||
self.executor_server.release('mutex-two')
|
||||
self.executor_server.release('semaphore-one-test2')
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(len(self.builds), 3)
|
||||
self.assertEqual(self.builds[0].name, 'project-test1')
|
||||
self.assertEqual(self.builds[1].name, 'project-test1')
|
||||
self.assertEqual(self.builds[2].name, 'mutex-one')
|
||||
self.assertTrue('test-mutex' in self.sched.mutex.mutexes)
|
||||
self.assertEqual(self.builds[2].name, 'semaphore-one-test1')
|
||||
self.assertTrue('test-semaphore' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
|
||||
self.executor_server.release('mutex-one')
|
||||
self.executor_server.release('semaphore-one-test1')
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(len(self.builds), 3)
|
||||
self.assertEqual(self.builds[0].name, 'project-test1')
|
||||
self.assertEqual(self.builds[1].name, 'project-test1')
|
||||
self.assertEqual(self.builds[2].name, 'mutex-two')
|
||||
self.assertTrue('test-mutex' in self.sched.mutex.mutexes)
|
||||
self.assertEqual(self.builds[2].name, 'semaphore-one-test2')
|
||||
self.assertTrue('test-semaphore' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
|
||||
self.executor_server.release('mutex-two')
|
||||
self.executor_server.release('semaphore-one-test2')
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(len(self.builds), 2)
|
||||
self.assertEqual(self.builds[0].name, 'project-test1')
|
||||
self.assertEqual(self.builds[1].name, 'project-test1')
|
||||
self.assertFalse('test-mutex' in self.sched.mutex.mutexes)
|
||||
self.assertFalse('test-semaphore' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
|
@ -2238,25 +2250,115 @@ class TestScheduler(ZuulTestCase):
|
|||
|
||||
self.assertEqual(A.reported, 1)
|
||||
self.assertEqual(B.reported, 1)
|
||||
self.assertFalse('test-mutex' in self.sched.mutex.mutexes)
|
||||
self.assertFalse('test-semaphore' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
|
||||
def test_mutex_abandon(self):
|
||||
"Test abandon with job mutexes"
|
||||
self.updateConfigLayout('layout-mutex')
|
||||
def test_semaphore_two(self):
|
||||
"Test semaphores with max>1"
|
||||
self.updateConfigLayout('layout-semaphore')
|
||||
self.sched.reconfigure(self.config)
|
||||
|
||||
self.waitUntilSettled()
|
||||
tenant = self.sched.abide.tenants.get('openstack')
|
||||
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
|
||||
B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
|
||||
self.assertFalse('test-semaphore-two' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(len(self.builds), 4)
|
||||
self.assertEqual(self.builds[0].name, 'project-test1')
|
||||
self.assertEqual(self.builds[1].name, 'semaphore-two-test1')
|
||||
self.assertEqual(self.builds[2].name, 'semaphore-two-test2')
|
||||
self.assertEqual(self.builds[3].name, 'project-test1')
|
||||
self.assertTrue('test-semaphore-two' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
self.assertEqual(len(tenant.semaphore_handler.semaphores.get(
|
||||
'test-semaphore-two', [])), 2)
|
||||
|
||||
self.executor_server.release('semaphore-two-test1')
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(len(self.builds), 4)
|
||||
self.assertEqual(self.builds[0].name, 'project-test1')
|
||||
self.assertEqual(self.builds[1].name, 'semaphore-two-test2')
|
||||
self.assertEqual(self.builds[2].name, 'project-test1')
|
||||
self.assertEqual(self.builds[3].name, 'semaphore-two-test1')
|
||||
self.assertTrue('test-semaphore-two' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
self.assertEqual(len(tenant.semaphore_handler.semaphores.get(
|
||||
'test-semaphore-two', [])), 2)
|
||||
|
||||
self.executor_server.release('semaphore-two-test2')
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(len(self.builds), 4)
|
||||
self.assertEqual(self.builds[0].name, 'project-test1')
|
||||
self.assertEqual(self.builds[1].name, 'project-test1')
|
||||
self.assertEqual(self.builds[2].name, 'semaphore-two-test1')
|
||||
self.assertEqual(self.builds[3].name, 'semaphore-two-test2')
|
||||
self.assertTrue('test-semaphore-two' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
self.assertEqual(len(tenant.semaphore_handler.semaphores.get(
|
||||
'test-semaphore-two', [])), 2)
|
||||
|
||||
self.executor_server.release('semaphore-two-test1')
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(len(self.builds), 3)
|
||||
self.assertEqual(self.builds[0].name, 'project-test1')
|
||||
self.assertEqual(self.builds[1].name, 'project-test1')
|
||||
self.assertEqual(self.builds[2].name, 'semaphore-two-test2')
|
||||
self.assertTrue('test-semaphore-two' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
self.assertEqual(len(tenant.semaphore_handler.semaphores.get(
|
||||
'test-semaphore-two', [])), 1)
|
||||
|
||||
self.executor_server.release('semaphore-two-test2')
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(len(self.builds), 2)
|
||||
self.assertEqual(self.builds[0].name, 'project-test1')
|
||||
self.assertEqual(self.builds[1].name, 'project-test1')
|
||||
self.assertFalse('test-semaphore-two' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
|
||||
self.waitUntilSettled()
|
||||
self.assertEqual(len(self.builds), 0)
|
||||
|
||||
self.assertEqual(A.reported, 1)
|
||||
self.assertEqual(B.reported, 1)
|
||||
|
||||
def test_semaphore_abandon(self):
|
||||
"Test abandon with job semaphores"
|
||||
self.updateConfigLayout('layout-semaphore')
|
||||
self.sched.reconfigure(self.config)
|
||||
|
||||
self.waitUntilSettled()
|
||||
tenant = self.sched.abide.tenants.get('openstack')
|
||||
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
|
||||
tenant = self.sched.abide.tenants.get('openstack')
|
||||
check_pipeline = tenant.layout.pipelines['check']
|
||||
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
self.assertFalse('test-mutex' in self.sched.mutex.mutexes)
|
||||
self.assertFalse('test-semaphore' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertTrue('test-mutex' in self.sched.mutex.mutexes)
|
||||
self.assertTrue('test-semaphore' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
|
||||
self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
|
||||
self.waitUntilSettled()
|
||||
|
@ -2265,31 +2367,47 @@ class TestScheduler(ZuulTestCase):
|
|||
items = check_pipeline.getAllItems()
|
||||
self.assertEqual(len(items), 0)
|
||||
|
||||
# The mutex should be released
|
||||
self.assertFalse('test-mutex' in self.sched.mutex.mutexes)
|
||||
# The semaphore should be released
|
||||
self.assertFalse('test-semaphore' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
def test_mutex_reconfigure(self):
|
||||
"Test reconfigure with job mutexes"
|
||||
self.updateConfigLayout('layout-mutex')
|
||||
def test_semaphore_reconfigure(self):
|
||||
"Test reconfigure with job semaphores"
|
||||
self.updateConfigLayout('layout-semaphore')
|
||||
self.sched.reconfigure(self.config)
|
||||
|
||||
self.waitUntilSettled()
|
||||
tenant = self.sched.abide.tenants.get('openstack')
|
||||
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
self.assertFalse('test-mutex' in self.sched.mutex.mutexes)
|
||||
self.assertFalse('test-semaphore' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertTrue('test-mutex' in self.sched.mutex.mutexes)
|
||||
self.assertTrue('test-semaphore' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
|
||||
self.updateConfigLayout('layout-mutex-reconfiguration')
|
||||
# reconfigure without layout change
|
||||
self.sched.reconfigure(self.config)
|
||||
self.waitUntilSettled()
|
||||
tenant = self.sched.abide.tenants.get('openstack')
|
||||
|
||||
# semaphore still must be held
|
||||
self.assertTrue('test-semaphore' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
|
||||
self.updateConfigLayout('layout-semaphore-reconfiguration')
|
||||
self.sched.reconfigure(self.config)
|
||||
self.waitUntilSettled()
|
||||
tenant = self.sched.abide.tenants.get('openstack')
|
||||
|
||||
self.executor_server.release('project-test1')
|
||||
self.waitUntilSettled()
|
||||
|
@ -2297,8 +2415,9 @@ class TestScheduler(ZuulTestCase):
|
|||
# There should be no builds anymore
|
||||
self.assertEqual(len(self.builds), 0)
|
||||
|
||||
# The mutex should be released
|
||||
self.assertFalse('test-mutex' in self.sched.mutex.mutexes)
|
||||
# The semaphore should be released
|
||||
self.assertFalse('test-semaphore' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
|
||||
def test_live_reconfiguration(self):
|
||||
"Test that live reconfiguration works"
|
||||
|
@ -4903,3 +5022,239 @@ class TestSchedulerMerges(ZuulTestCase):
|
|||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled()
|
||||
|
||||
|
||||
class TestSemaphoreMultiTenant(ZuulTestCase):
|
||||
tenant_config_file = 'config/multi-tenant-semaphore/main.yaml'
|
||||
|
||||
def test_semaphore_tenant_isolation(self):
|
||||
"Test semaphores in multiple tenants"
|
||||
|
||||
self.waitUntilSettled()
|
||||
tenant_one = self.sched.abide.tenants.get('tenant-one')
|
||||
tenant_two = self.sched.abide.tenants.get('tenant-two')
|
||||
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
|
||||
B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
|
||||
C = self.fake_gerrit.addFakeChange('org/project2', 'master', 'C')
|
||||
D = self.fake_gerrit.addFakeChange('org/project2', 'master', 'D')
|
||||
E = self.fake_gerrit.addFakeChange('org/project2', 'master', 'E')
|
||||
self.assertFalse('test-semaphore' in
|
||||
tenant_one.semaphore_handler.semaphores)
|
||||
self.assertFalse('test-semaphore' in
|
||||
tenant_two.semaphore_handler.semaphores)
|
||||
|
||||
# add patches to project1 of tenant-one
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
# one build of project1-test1 must run
|
||||
# semaphore of tenant-one must be acquired once
|
||||
# semaphore of tenant-two must not be acquired
|
||||
self.assertEqual(len(self.builds), 1)
|
||||
self.assertEqual(self.builds[0].name, 'project1-test1')
|
||||
self.assertTrue('test-semaphore' in
|
||||
tenant_one.semaphore_handler.semaphores)
|
||||
self.assertEqual(len(tenant_one.semaphore_handler.semaphores.get(
|
||||
'test-semaphore', [])), 1)
|
||||
self.assertFalse('test-semaphore' in
|
||||
tenant_two.semaphore_handler.semaphores)
|
||||
|
||||
# add patches to project2 of tenant-two
|
||||
self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
|
||||
self.fake_gerrit.addEvent(D.getPatchsetCreatedEvent(1))
|
||||
self.fake_gerrit.addEvent(E.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
# one build of project1-test1 must run
|
||||
# two builds of project2-test1 must run
|
||||
# semaphore of tenant-one must be acquired once
|
||||
# semaphore of tenant-two must be acquired twice
|
||||
self.assertEqual(len(self.builds), 3)
|
||||
self.assertEqual(self.builds[0].name, 'project1-test1')
|
||||
self.assertEqual(self.builds[1].name, 'project2-test1')
|
||||
self.assertEqual(self.builds[2].name, 'project2-test1')
|
||||
self.assertTrue('test-semaphore' in
|
||||
tenant_one.semaphore_handler.semaphores)
|
||||
self.assertEqual(len(tenant_one.semaphore_handler.semaphores.get(
|
||||
'test-semaphore', [])), 1)
|
||||
self.assertTrue('test-semaphore' in
|
||||
tenant_two.semaphore_handler.semaphores)
|
||||
self.assertEqual(len(tenant_two.semaphore_handler.semaphores.get(
|
||||
'test-semaphore', [])), 2)
|
||||
|
||||
self.executor_server.release('project1-test1')
|
||||
self.waitUntilSettled()
|
||||
|
||||
# one build of project1-test1 must run
|
||||
# two builds of project2-test1 must run
|
||||
# semaphore of tenant-one must be acquired once
|
||||
# semaphore of tenant-two must be acquired twice
|
||||
self.assertEqual(len(self.builds), 3)
|
||||
self.assertEqual(self.builds[0].name, 'project2-test1')
|
||||
self.assertEqual(self.builds[1].name, 'project2-test1')
|
||||
self.assertEqual(self.builds[2].name, 'project1-test1')
|
||||
self.assertTrue('test-semaphore' in
|
||||
tenant_one.semaphore_handler.semaphores)
|
||||
self.assertEqual(len(tenant_one.semaphore_handler.semaphores.get(
|
||||
'test-semaphore', [])), 1)
|
||||
self.assertTrue('test-semaphore' in
|
||||
tenant_two.semaphore_handler.semaphores)
|
||||
self.assertEqual(len(tenant_two.semaphore_handler.semaphores.get(
|
||||
'test-semaphore', [])), 2)
|
||||
|
||||
self.executor_server.release('project2-test1')
|
||||
self.waitUntilSettled()
|
||||
|
||||
# one build of project1-test1 must run
|
||||
# one build of project2-test1 must run
|
||||
# semaphore of tenant-one must be acquired once
|
||||
# semaphore of tenant-two must be acquired once
|
||||
self.assertEqual(len(self.builds), 2)
|
||||
self.assertTrue('test-semaphore' in
|
||||
tenant_one.semaphore_handler.semaphores)
|
||||
self.assertEqual(len(tenant_one.semaphore_handler.semaphores.get(
|
||||
'test-semaphore', [])), 1)
|
||||
self.assertTrue('test-semaphore' in
|
||||
tenant_two.semaphore_handler.semaphores)
|
||||
self.assertEqual(len(tenant_two.semaphore_handler.semaphores.get(
|
||||
'test-semaphore', [])), 1)
|
||||
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
|
||||
self.waitUntilSettled()
|
||||
|
||||
# no build must run
|
||||
# semaphore of tenant-one must not be acquired
|
||||
# semaphore of tenant-two must not be acquired
|
||||
self.assertEqual(len(self.builds), 0)
|
||||
self.assertFalse('test-semaphore' in
|
||||
tenant_one.semaphore_handler.semaphores)
|
||||
self.assertFalse('test-semaphore' in
|
||||
tenant_two.semaphore_handler.semaphores)
|
||||
|
||||
self.assertEqual(A.reported, 1)
|
||||
self.assertEqual(B.reported, 1)
|
||||
|
||||
|
||||
class TestSemaphoreInRepo(ZuulTestCase):
|
||||
tenant_config_file = 'config/in-repo/main.yaml'
|
||||
|
||||
def test_semaphore_in_repo(self):
|
||||
"Test semaphores in repo config"
|
||||
|
||||
# This tests dynamic semaphore handling in project repos. The semaphore
|
||||
# max value should not be evaluated dynamically but must be updated
|
||||
# after the change lands.
|
||||
|
||||
self.waitUntilSettled()
|
||||
tenant = self.sched.abide.tenants.get('tenant-one')
|
||||
|
||||
in_repo_conf = textwrap.dedent(
|
||||
"""
|
||||
- job:
|
||||
name: project-test2
|
||||
semaphore: test-semaphore
|
||||
|
||||
- project:
|
||||
name: org/project
|
||||
tenant-one-gate:
|
||||
jobs:
|
||||
- project-test2
|
||||
|
||||
# the max value in dynamic layout must be ignored
|
||||
- semaphore:
|
||||
name: test-semaphore
|
||||
max: 2
|
||||
""")
|
||||
|
||||
in_repo_playbook = textwrap.dedent(
|
||||
"""
|
||||
- hosts: all
|
||||
tasks: []
|
||||
""")
|
||||
|
||||
file_dict = {'.zuul.yaml': in_repo_conf,
|
||||
'playbooks/project-test2.yaml': in_repo_playbook}
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
|
||||
files=file_dict)
|
||||
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
|
||||
C = self.fake_gerrit.addFakeChange('org/project', 'master', 'C')
|
||||
B.setDependsOn(A, 1)
|
||||
C.setDependsOn(A, 1)
|
||||
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
|
||||
A.addApproval('code-review', 2)
|
||||
B.addApproval('code-review', 2)
|
||||
C.addApproval('code-review', 2)
|
||||
self.fake_gerrit.addEvent(A.addApproval('approved', 1))
|
||||
self.fake_gerrit.addEvent(B.addApproval('approved', 1))
|
||||
self.fake_gerrit.addEvent(C.addApproval('approved', 1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
# check that the layout in a queue item still has max value of 1
|
||||
# for test-semaphore
|
||||
pipeline = tenant.layout.pipelines.get('tenant-one-gate')
|
||||
queue = None
|
||||
for queue_candidate in pipeline.queues:
|
||||
if queue_candidate.name == 'org/project':
|
||||
queue = queue_candidate
|
||||
break
|
||||
queue_item = queue.queue[0]
|
||||
item_dynamic_layout = queue_item.current_build_set.layout
|
||||
dynamic_test_semaphore = \
|
||||
item_dynamic_layout.semaphores.get('test-semaphore')
|
||||
self.assertEqual(dynamic_test_semaphore.max, 1)
|
||||
|
||||
# one build must be in queue, one semaphores acquired
|
||||
self.assertEqual(len(self.builds), 1)
|
||||
self.assertEqual(self.builds[0].name, 'project-test2')
|
||||
self.assertTrue('test-semaphore' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
self.assertEqual(len(tenant.semaphore_handler.semaphores.get(
|
||||
'test-semaphore', [])), 1)
|
||||
|
||||
self.executor_server.release('project-test2')
|
||||
self.waitUntilSettled()
|
||||
|
||||
# change A must be merged
|
||||
self.assertEqual(A.data['status'], 'MERGED')
|
||||
self.assertEqual(A.reported, 2)
|
||||
|
||||
# send change-merged event as the gerrit mock doesn't send it
|
||||
self.fake_gerrit.addEvent(A.getChangeMergedEvent())
|
||||
self.waitUntilSettled()
|
||||
|
||||
# now that change A was merged, the new semaphore max must be effective
|
||||
tenant = self.sched.abide.tenants.get('tenant-one')
|
||||
self.assertEqual(tenant.layout.semaphores.get('test-semaphore').max, 2)
|
||||
|
||||
# two builds must be in queue, two semaphores acquired
|
||||
self.assertEqual(len(self.builds), 2)
|
||||
self.assertEqual(self.builds[0].name, 'project-test2')
|
||||
self.assertEqual(self.builds[1].name, 'project-test2')
|
||||
self.assertTrue('test-semaphore' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
self.assertEqual(len(tenant.semaphore_handler.semaphores.get(
|
||||
'test-semaphore', [])), 2)
|
||||
|
||||
self.executor_server.release('project-test2')
|
||||
self.waitUntilSettled()
|
||||
|
||||
self.assertEqual(len(self.builds), 0)
|
||||
self.assertFalse('test-semaphore' in
|
||||
tenant.semaphore_handler.semaphores)
|
||||
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
|
||||
self.waitUntilSettled()
|
||||
self.assertEqual(len(self.builds), 0)
|
||||
|
||||
self.assertEqual(A.reported, 2)
|
||||
self.assertEqual(B.reported, 2)
|
||||
self.assertEqual(C.reported, 2)
|
||||
|
|
|
@ -86,7 +86,8 @@ def configuration_exceptions(stanza, conf):
|
|||
|
||||
class ZuulSafeLoader(yaml.SafeLoader):
|
||||
zuul_node_types = frozenset(('job', 'nodeset', 'secret', 'pipeline',
|
||||
'project', 'project-template'))
|
||||
'project', 'project-template',
|
||||
'semaphore'))
|
||||
|
||||
def __init__(self, stream, context):
|
||||
super(ZuulSafeLoader, self).__init__(stream)
|
||||
|
@ -222,7 +223,7 @@ class JobParser(object):
|
|||
'success-url': str,
|
||||
'hold-following-changes': bool,
|
||||
'voting': bool,
|
||||
'mutex': str,
|
||||
'semaphore': str,
|
||||
'tags': to_list(str),
|
||||
'branches': to_list(str),
|
||||
'files': to_list(str),
|
||||
|
@ -250,7 +251,7 @@ class JobParser(object):
|
|||
'workspace',
|
||||
'voting',
|
||||
'hold-following-changes',
|
||||
'mutex',
|
||||
'semaphore',
|
||||
'attempts',
|
||||
'failure-message',
|
||||
'success-message',
|
||||
|
@ -720,6 +721,25 @@ class PipelineParser(object):
|
|||
return pipeline
|
||||
|
||||
|
||||
class SemaphoreParser(object):
|
||||
@staticmethod
|
||||
def getSchema():
|
||||
semaphore = {vs.Required('name'): str,
|
||||
'max': int,
|
||||
'_source_context': model.SourceContext,
|
||||
'_start_mark': yaml.Mark,
|
||||
}
|
||||
|
||||
return vs.Schema(semaphore)
|
||||
|
||||
@staticmethod
|
||||
def fromYaml(conf):
|
||||
SemaphoreParser.getSchema()(conf)
|
||||
semaphore = model.Semaphore(conf['name'], conf.get('max', 1))
|
||||
semaphore.source_context = conf.get('_source_context')
|
||||
return semaphore
|
||||
|
||||
|
||||
class TenantParser(object):
|
||||
log = logging.getLogger("zuul.TenantParser")
|
||||
|
||||
|
@ -966,6 +986,9 @@ class TenantParser(object):
|
|||
for config_job in data.jobs:
|
||||
layout.addJob(JobParser.fromYaml(tenant, layout, config_job))
|
||||
|
||||
for config_semaphore in data.semaphores:
|
||||
layout.addSemaphore(SemaphoreParser.fromYaml(config_semaphore))
|
||||
|
||||
for config_template in data.project_templates:
|
||||
layout.addProjectTemplate(ProjectTemplateParser.fromYaml(
|
||||
tenant, layout, config_template))
|
||||
|
@ -1072,6 +1095,12 @@ class ConfigLoader(object):
|
|||
# or deleting pipelines in dynamic layout changes.
|
||||
layout.pipelines = tenant.layout.pipelines
|
||||
|
||||
# NOTE: the semaphore definitions are copied from the static layout
|
||||
# here. For semaphores there should be no per patch max value but
|
||||
# exactly one value at any time. So we do not support dynamic semaphore
|
||||
# configuration changes.
|
||||
layout.semaphores = tenant.layout.semaphores
|
||||
|
||||
for config_job in config.jobs:
|
||||
layout.addJob(JobParser.fromYaml(tenant, layout, config_job))
|
||||
|
||||
|
|
|
@ -81,8 +81,8 @@ class PipelineManager(object):
|
|||
tags.append('[hold]')
|
||||
if not variant.voting:
|
||||
tags.append('[nonvoting]')
|
||||
if variant.mutex:
|
||||
tags.append('[mutex: %s]' % variant.mutex)
|
||||
if variant.semaphore:
|
||||
tags.append('[semaphore: %s]' % variant.semaphore)
|
||||
tags = ' '.join(tags)
|
||||
self.log.info(" %s%s %s" % (repr(variant),
|
||||
efilters, tags))
|
||||
|
@ -386,7 +386,8 @@ class PipelineManager(object):
|
|||
if not item.current_build_set.layout:
|
||||
return False
|
||||
|
||||
jobs = item.findJobsToRun(self.sched.mutex)
|
||||
jobs = item.findJobsToRun(
|
||||
item.pipeline.layout.tenant.semaphore_handler)
|
||||
if jobs:
|
||||
self._executeJobs(item, jobs)
|
||||
|
||||
|
@ -411,7 +412,8 @@ class PipelineManager(object):
|
|||
self.log.exception("Exception while canceling build %s "
|
||||
"for change %s" % (build, item.change))
|
||||
finally:
|
||||
self.sched.mutex.release(build.build_set.item, build.job)
|
||||
old_build_set.layout.tenant.semaphore_handler.release(
|
||||
old_build_set.item, build.job)
|
||||
|
||||
if not was_running:
|
||||
try:
|
||||
|
@ -663,7 +665,7 @@ class PipelineManager(object):
|
|||
item = build.build_set.item
|
||||
|
||||
item.setResult(build)
|
||||
self.sched.mutex.release(item, build.job)
|
||||
item.pipeline.layout.tenant.semaphore_handler.release(item, build.job)
|
||||
self.log.debug("Item %s status is now:\n %s" %
|
||||
(item, item.formatStatus()))
|
||||
|
||||
|
|
114
zuul/model.py
114
zuul/model.py
|
@ -14,6 +14,8 @@
|
|||
|
||||
import abc
|
||||
import copy
|
||||
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import struct
|
||||
|
@ -770,7 +772,7 @@ class Job(object):
|
|||
post_run=(),
|
||||
run=(),
|
||||
implied_run=(),
|
||||
mutex=None,
|
||||
semaphore=None,
|
||||
attempts=3,
|
||||
final=False,
|
||||
roles=frozenset(),
|
||||
|
@ -1385,7 +1387,7 @@ class QueueItem(object):
|
|||
return False
|
||||
return self.item_ahead.isHoldingFollowingChanges()
|
||||
|
||||
def findJobsToRun(self, mutex):
|
||||
def findJobsToRun(self, semaphore_handler):
|
||||
torun = []
|
||||
if not self.live:
|
||||
return []
|
||||
|
@ -1424,9 +1426,9 @@ class QueueItem(object):
|
|||
# The nodes for this job are not ready, skip
|
||||
# it for now.
|
||||
continue
|
||||
if mutex.acquire(self, job):
|
||||
# If this job needs a mutex, either acquire it or make
|
||||
# sure that we have it before running the job.
|
||||
if semaphore_handler.acquire(self, job):
|
||||
# If this job needs a semaphore, either acquire it or
|
||||
# make sure that we have it before running the job.
|
||||
torun.append(job)
|
||||
return torun
|
||||
|
||||
|
@ -2209,6 +2211,7 @@ class UnparsedTenantConfig(object):
|
|||
self.projects = {}
|
||||
self.nodesets = []
|
||||
self.secrets = []
|
||||
self.semaphores = []
|
||||
|
||||
def copy(self):
|
||||
r = UnparsedTenantConfig()
|
||||
|
@ -2218,6 +2221,7 @@ class UnparsedTenantConfig(object):
|
|||
r.projects = copy.deepcopy(self.projects)
|
||||
r.nodesets = copy.deepcopy(self.nodesets)
|
||||
r.secrets = copy.deepcopy(self.secrets)
|
||||
r.semaphores = copy.deepcopy(self.semaphores)
|
||||
return r
|
||||
|
||||
def extend(self, conf):
|
||||
|
@ -2229,6 +2233,7 @@ class UnparsedTenantConfig(object):
|
|||
self.projects.setdefault(k, []).extend(v)
|
||||
self.nodesets.extend(conf.nodesets)
|
||||
self.secrets.extend(conf.secrets)
|
||||
self.semaphores.extend(conf.semaphores)
|
||||
return
|
||||
|
||||
if not isinstance(conf, list):
|
||||
|
@ -2259,6 +2264,8 @@ class UnparsedTenantConfig(object):
|
|||
self.nodesets.append(value)
|
||||
elif key == 'secret':
|
||||
self.secrets.append(value)
|
||||
elif key == 'semaphore':
|
||||
self.semaphores.append(value)
|
||||
else:
|
||||
raise Exception("Configuration item `%s` not recognized "
|
||||
"(when parsing %s)" %
|
||||
|
@ -2282,6 +2289,7 @@ class Layout(object):
|
|||
self.jobs = {'noop': [Job('noop')]}
|
||||
self.nodesets = {}
|
||||
self.secrets = {}
|
||||
self.semaphores = {}
|
||||
|
||||
def getJob(self, name):
|
||||
if name in self.jobs:
|
||||
|
@ -2320,6 +2328,11 @@ class Layout(object):
|
|||
raise Exception("Secret %s already defined" % (secret.name,))
|
||||
self.secrets[secret.name] = secret
|
||||
|
||||
def addSemaphore(self, semaphore):
|
||||
if semaphore.name in self.semaphores:
|
||||
raise Exception("Semaphore %s already defined" % (semaphore.name,))
|
||||
self.semaphores[semaphore.name] = semaphore
|
||||
|
||||
def addPipeline(self, pipeline):
|
||||
self.pipelines[pipeline.name] = pipeline
|
||||
|
||||
|
@ -2390,6 +2403,95 @@ class Layout(object):
|
|||
return ret
|
||||
|
||||
|
||||
class Semaphore(object):
|
||||
def __init__(self, name, max=1):
|
||||
self.name = name
|
||||
self.max = int(max)
|
||||
|
||||
|
||||
class SemaphoreHandler(object):
|
||||
log = logging.getLogger("zuul.SemaphoreHandler")
|
||||
|
||||
def __init__(self):
|
||||
self.semaphores = {}
|
||||
|
||||
def acquire(self, item, job):
|
||||
if not job.semaphore:
|
||||
return True
|
||||
|
||||
semaphore_key = job.semaphore
|
||||
|
||||
m = self.semaphores.get(semaphore_key)
|
||||
if not m:
|
||||
# The semaphore is not held, acquire it
|
||||
self._acquire(semaphore_key, item, job.name)
|
||||
return True
|
||||
if (item, job.name) in m:
|
||||
# This item already holds the semaphore
|
||||
return True
|
||||
|
||||
# semaphore is there, check max
|
||||
if len(m) < self._max_count(item, job.semaphore):
|
||||
self._acquire(semaphore_key, item, job.name)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def release(self, item, job):
|
||||
if not job.semaphore:
|
||||
return
|
||||
|
||||
semaphore_key = job.semaphore
|
||||
|
||||
m = self.semaphores.get(semaphore_key)
|
||||
if not m:
|
||||
# The semaphore is not held, nothing to do
|
||||
self.log.error("Semaphore can not be released for %s "
|
||||
"because the semaphore is not held" %
|
||||
item)
|
||||
return
|
||||
if (item, job.name) in m:
|
||||
# This item is a holder of the semaphore
|
||||
self._release(semaphore_key, item, job.name)
|
||||
return
|
||||
self.log.error("Semaphore can not be released for %s "
|
||||
"which does not hold it" % item)
|
||||
|
||||
def _acquire(self, semaphore_key, item, job_name):
|
||||
self.log.debug("Semaphore acquire {semaphore}: job {job}, item {item}"
|
||||
.format(semaphore=semaphore_key,
|
||||
job=job_name,
|
||||
item=item))
|
||||
if semaphore_key not in self.semaphores:
|
||||
self.semaphores[semaphore_key] = []
|
||||
self.semaphores[semaphore_key].append((item, job_name))
|
||||
|
||||
def _release(self, semaphore_key, item, job_name):
|
||||
self.log.debug("Semaphore release {semaphore}: job {job}, item {item}"
|
||||
.format(semaphore=semaphore_key,
|
||||
job=job_name,
|
||||
item=item))
|
||||
sem_item = (item, job_name)
|
||||
if sem_item in self.semaphores[semaphore_key]:
|
||||
self.semaphores[semaphore_key].remove(sem_item)
|
||||
|
||||
# cleanup if there is no user of the semaphore anymore
|
||||
if len(self.semaphores[semaphore_key]) == 0:
|
||||
del self.semaphores[semaphore_key]
|
||||
|
||||
@staticmethod
|
||||
def _max_count(item, semaphore_name):
|
||||
if not item.current_build_set.layout:
|
||||
# This should not occur as the layout of the item must already be
|
||||
# built when acquiring or releasing a semaphore for a job.
|
||||
raise Exception("Item {} has no layout".format(item))
|
||||
|
||||
# find the right semaphore
|
||||
default_semaphore = Semaphore(semaphore_name, 1)
|
||||
semaphores = item.current_build_set.layout.semaphores
|
||||
return semaphores.get(semaphore_name, default_semaphore).max
|
||||
|
||||
|
||||
class Tenant(object):
|
||||
def __init__(self, name):
|
||||
self.name = name
|
||||
|
@ -2410,6 +2512,8 @@ class Tenant(object):
|
|||
# A mapping of source -> {config_repos: {}, project_repos: {}}
|
||||
self.sources = {}
|
||||
|
||||
self.semaphore_handler = SemaphoreHandler()
|
||||
|
||||
def addConfigRepo(self, source, project):
|
||||
sd = self.sources.setdefault(source.name,
|
||||
{'config_repos': {},
|
||||
|
|
|
@ -33,68 +33,6 @@ from zuul import exceptions
|
|||
from zuul import version as zuul_version
|
||||
|
||||
|
||||
class MutexHandler(object):
|
||||
log = logging.getLogger("zuul.MutexHandler")
|
||||
|
||||
def __init__(self):
|
||||
self.mutexes = {}
|
||||
|
||||
def acquire(self, item, job):
|
||||
if not job.mutex:
|
||||
return True
|
||||
mutex_name = job.mutex
|
||||
m = self.mutexes.get(mutex_name)
|
||||
if not m:
|
||||
# The mutex is not held, acquire it
|
||||
self._acquire(mutex_name, item, job.name)
|
||||
return True
|
||||
held_item, held_job_name = m
|
||||
if held_item is item and held_job_name == job.name:
|
||||
# This item already holds the mutex
|
||||
return True
|
||||
held_build = held_item.current_build_set.getBuild(held_job_name)
|
||||
if held_build and held_build.result:
|
||||
# The build that held the mutex is complete, release it
|
||||
# and let the new item have it.
|
||||
self.log.error("Held mutex %s being released because "
|
||||
"the build that holds it is complete" %
|
||||
(mutex_name,))
|
||||
self._release(mutex_name, item, job.name)
|
||||
self._acquire(mutex_name, item, job.name)
|
||||
return True
|
||||
return False
|
||||
|
||||
def release(self, item, job):
|
||||
if not job.mutex:
|
||||
return
|
||||
mutex_name = job.mutex
|
||||
m = self.mutexes.get(mutex_name)
|
||||
if not m:
|
||||
# The mutex is not held, nothing to do
|
||||
self.log.error("Mutex can not be released for %s "
|
||||
"because the mutex is not held" %
|
||||
(item,))
|
||||
return
|
||||
held_item, held_job_name = m
|
||||
if held_item is item and held_job_name == job.name:
|
||||
# This item holds the mutex
|
||||
self._release(mutex_name, item, job.name)
|
||||
return
|
||||
self.log.error("Mutex can not be released for %s "
|
||||
"which does not hold it" %
|
||||
(item,))
|
||||
|
||||
def _acquire(self, mutex_name, item, job_name):
|
||||
self.log.debug("Job %s of item %s acquiring mutex %s" %
|
||||
(job_name, item, mutex_name))
|
||||
self.mutexes[mutex_name] = (item, job_name)
|
||||
|
||||
def _release(self, mutex_name, item, job_name):
|
||||
self.log.debug("Job %s of item %s releasing mutex %s" %
|
||||
(job_name, item, mutex_name))
|
||||
del self.mutexes[mutex_name]
|
||||
|
||||
|
||||
class ManagementEvent(object):
|
||||
"""An event that should be processed within the main queue run loop"""
|
||||
def __init__(self):
|
||||
|
@ -269,7 +207,6 @@ class Scheduler(threading.Thread):
|
|||
self.connections = None
|
||||
self.statsd = extras.try_import('statsd.statsd')
|
||||
# TODO(jeblair): fix this
|
||||
self.mutex = MutexHandler()
|
||||
# Despite triggers being part of the pipeline, there is one trigger set
|
||||
# per scheduler. The pipeline handles the trigger filters but since
|
||||
# the events are handled by the scheduler itself it needs to handle
|
||||
|
@ -593,19 +530,27 @@ class Scheduler(threading.Thread):
|
|||
except Exception:
|
||||
self.log.exception(
|
||||
"Exception while canceling build %s "
|
||||
"for change %s" % (build, item.change))
|
||||
"for change %s" % (build, build.build_set.item.change))
|
||||
finally:
|
||||
self.mutex.release(build.build_set.item, build.job)
|
||||
tenant.semaphore_handler.release(
|
||||
build.build_set.item, build.job)
|
||||
|
||||
def _reconfigureTenant(self, tenant):
|
||||
# This is called from _doReconfigureEvent while holding the
|
||||
# layout lock
|
||||
old_tenant = self.abide.tenants.get(tenant.name)
|
||||
|
||||
if old_tenant:
|
||||
# Copy over semaphore handler so we don't loose the currently
|
||||
# held semaphores.
|
||||
tenant.semaphore_handler = old_tenant.semaphore_handler
|
||||
|
||||
self._reenqueueTenant(old_tenant, tenant)
|
||||
|
||||
# TODOv3(jeblair): update for tenants
|
||||
# self.maintainConnectionCache()
|
||||
self.connections.reconfigureDrivers(tenant)
|
||||
|
||||
# TODOv3(jeblair): remove postconfig calls?
|
||||
for pipeline in tenant.layout.pipelines.values():
|
||||
pipeline.source.postConfig()
|
||||
|
|
Loading…
Reference in New Issue