Store semaphore state in Zookeeper

In order for semaphores to work in a multi-scheduler environment we need
to store their state in Zookeeper.

The state for acquired semaphores is stored in
`/zuul/semaphores/<tenant>/<semaphore>`. The semaphore node will hold a
list of handles in the form `<queue-item-uuid>-<job-name>`.

The semaphore handler will try to acquire/release a semaphore
optimistically and retry in case of concurrent updates.

Change-Id: Iddc71ae26008cea081838b1001d33a98ed05c0d8
This commit is contained in:
Simon Westphahl 2021-01-26 09:58:11 +01:00
parent 0101820328
commit 59ac5f6880
5 changed files with 330 additions and 267 deletions

View File

@ -7471,8 +7471,9 @@ class TestSemaphore(ZuulTestCase):
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
@ -7487,8 +7488,9 @@ class TestSemaphore(ZuulTestCase):
# By default we first lock the semaphore and then get the nodes
# so at this point the semaphore needs to be aquired.
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
1)
self.fake_nodepool.paused = False
self.waitUntilSettled()
@ -7504,8 +7506,9 @@ class TestSemaphore(ZuulTestCase):
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test1')
self.assertEqual(self.builds[2].name, 'semaphore-one-test2')
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
1)
self.executor_server.release('semaphore-one-test2')
self.waitUntilSettled()
@ -7514,8 +7517,9 @@ class TestSemaphore(ZuulTestCase):
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test1')
self.assertEqual(self.builds[2].name, 'semaphore-one-test1')
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
1)
self.executor_server.release('semaphore-one-test1')
self.waitUntilSettled()
@ -7524,8 +7528,9 @@ class TestSemaphore(ZuulTestCase):
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test1')
self.assertEqual(self.builds[2].name, 'semaphore-one-test2')
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
1)
self.executor_server.release('semaphore-one-test2')
self.waitUntilSettled()
@ -7533,8 +7538,9 @@ class TestSemaphore(ZuulTestCase):
self.assertEqual(len(self.builds), 2)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test1')
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
@ -7544,8 +7550,9 @@ class TestSemaphore(ZuulTestCase):
self.assertEqual(A.reported, 1)
self.assertEqual(B.reported, 1)
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
def test_semaphore_two(self):
"Test semaphores with max>1"
@ -7554,8 +7561,9 @@ class TestSemaphore(ZuulTestCase):
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
self.assertFalse('test-semaphore-two' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders(
"test-semaphore-two")), 0)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
@ -7566,10 +7574,9 @@ class TestSemaphore(ZuulTestCase):
self.assertEqual(self.builds[1].name, 'semaphore-two-test1')
self.assertEqual(self.builds[2].name, 'semaphore-two-test2')
self.assertEqual(self.builds[3].name, 'project-test1')
self.assertTrue('test-semaphore-two' in
tenant.semaphore_handler.semaphores)
self.assertEqual(len(tenant.semaphore_handler.semaphores.get(
'test-semaphore-two', [])), 2)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders(
"test-semaphore-two")), 2)
self.executor_server.release('semaphore-two-test1')
self.waitUntilSettled()
@ -7579,10 +7586,9 @@ class TestSemaphore(ZuulTestCase):
self.assertEqual(self.builds[1].name, 'semaphore-two-test2')
self.assertEqual(self.builds[2].name, 'project-test1')
self.assertEqual(self.builds[3].name, 'semaphore-two-test1')
self.assertTrue('test-semaphore-two' in
tenant.semaphore_handler.semaphores)
self.assertEqual(len(tenant.semaphore_handler.semaphores.get(
'test-semaphore-two', [])), 2)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders(
"test-semaphore-two")), 2)
self.executor_server.release('semaphore-two-test2')
self.waitUntilSettled()
@ -7592,10 +7598,9 @@ class TestSemaphore(ZuulTestCase):
self.assertEqual(self.builds[1].name, 'project-test1')
self.assertEqual(self.builds[2].name, 'semaphore-two-test1')
self.assertEqual(self.builds[3].name, 'semaphore-two-test2')
self.assertTrue('test-semaphore-two' in
tenant.semaphore_handler.semaphores)
self.assertEqual(len(tenant.semaphore_handler.semaphores.get(
'test-semaphore-two', [])), 2)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders(
"test-semaphore-two")), 2)
self.executor_server.release('semaphore-two-test1')
self.waitUntilSettled()
@ -7604,10 +7609,9 @@ class TestSemaphore(ZuulTestCase):
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test1')
self.assertEqual(self.builds[2].name, 'semaphore-two-test2')
self.assertTrue('test-semaphore-two' in
tenant.semaphore_handler.semaphores)
self.assertEqual(len(tenant.semaphore_handler.semaphores.get(
'test-semaphore-two', [])), 1)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders(
"test-semaphore-two")), 1)
self.executor_server.release('semaphore-two-test2')
self.waitUntilSettled()
@ -7615,8 +7619,9 @@ class TestSemaphore(ZuulTestCase):
self.assertEqual(len(self.builds), 2)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test1')
self.assertFalse('test-semaphore-two' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders(
"test-semaphore-two")), 0)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
@ -7635,15 +7640,17 @@ class TestSemaphore(ZuulTestCase):
self.fake_nodepool.pause()
A = self.fake_gerrit.addFakeChange('org/project2', 'master', 'A')
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
# By default we first lock the semaphore and then get the nodes
# so at this point the semaphore needs to be aquired.
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
1)
# Fail the node request and unpause
req = self.fake_nodepool.getNodeRequests()[0]
@ -7653,8 +7660,10 @@ class TestSemaphore(ZuulTestCase):
# At this point the job that holds the semaphore failed with
# node_failure and the semaphore must be released.
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
self.assertEquals(1, A.reported)
self.assertIn('semaphore-one-test3 semaphore-one-test3 : NODE_FAILURE',
A.messages[0])
@ -7671,8 +7680,9 @@ class TestSemaphore(ZuulTestCase):
A = self.fake_gerrit.addFakeChange('org/project3', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project3', 'master', 'B')
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
@ -7680,8 +7690,9 @@ class TestSemaphore(ZuulTestCase):
# Here we first get the resources and then lock the semaphore
# so at this point the semaphore should not be aquired.
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
self.fake_nodepool.paused = False
self.waitUntilSettled()
@ -7693,14 +7704,14 @@ class TestSemaphore(ZuulTestCase):
self.executor_server.release('semaphore-one-test1')
self.waitUntilSettled()
self.assertEqual(len(self.builds), 3)
self.assertEqual(self.builds[0].name, 'project-test1')
self.assertEqual(self.builds[1].name, 'project-test1')
self.assertEqual(self.builds[2].name,
'semaphore-one-test2-resources-first')
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
1)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
@ -7714,15 +7725,16 @@ class TestSemaphore(ZuulTestCase):
self.fake_nodepool.pause()
A = self.fake_gerrit.addFakeChange('org/project4', 'master', 'A')
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
# With resources first we first get the nodes so at this point the
# semaphore must not be aquired.
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(len(tenant.semaphore_handler.semaphoreHolders(
"test-semaphore")), 0)
# Fail the node request and unpause
req = self.fake_nodepool.getNodeRequests()[0]
@ -7732,8 +7744,8 @@ class TestSemaphore(ZuulTestCase):
# At this point the job should never have acuired a semaphore so check
# that it still has not locked a semaphore.
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(len(tenant.semaphore_handler.semaphoreHolders(
"test-semaphore")), 0)
self.assertEquals(1, A.reported)
self.assertIn('semaphore-one-test1-resources-first : NODE_FAILURE',
A.messages[0])
@ -7743,8 +7755,9 @@ class TestSemaphore(ZuulTestCase):
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
A = self.fake_gerrit.addFakeChange('org/project2', 'master', 'A')
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
# Simulate a single zk error in useNodeSet
orig_useNodeSet = self.scheds.first.sched.nodepool.useNodeSet
@ -7760,8 +7773,9 @@ class TestSemaphore(ZuulTestCase):
self.waitUntilSettled()
# The semaphore should be released
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
# cleanup the queue
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
@ -7774,14 +7788,16 @@ class TestSemaphore(ZuulTestCase):
check_pipeline = tenant.layout.pipelines['check']
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
1)
self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
self.waitUntilSettled()
@ -7791,8 +7807,9 @@ class TestSemaphore(ZuulTestCase):
self.assertEqual(len(items), 0)
# The semaphore should be released
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
@ -7810,14 +7827,16 @@ class TestSemaphore(ZuulTestCase):
check_pipeline = tenant.layout.pipelines['check']
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
1)
self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
self.waitUntilSettled()
@ -7827,8 +7846,9 @@ class TestSemaphore(ZuulTestCase):
self.assertEqual(len(items), 0)
# The semaphore should be released
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
self.executor_server.hold_jobs_in_build = False
self.fake_nodepool.paused = False
@ -7852,8 +7872,9 @@ class TestSemaphore(ZuulTestCase):
check_pipeline = tenant.layout.pipelines['check']
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
@ -7866,8 +7887,9 @@ class TestSemaphore(ZuulTestCase):
if len(self.scheds.first.sched.nodepool.requests) == 0:
break
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
1)
self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
self.waitUntilSettled()
@ -7877,8 +7899,9 @@ class TestSemaphore(ZuulTestCase):
self.assertEqual(len(items), 0)
# The semaphore should be released
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
self.executor_server.release()
self.waitUntilSettled()
@ -7890,25 +7913,24 @@ class TestSemaphore(ZuulTestCase):
check_pipeline = tenant.layout.pipelines['check']
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
semaphore = tenant.semaphore_handler.semaphores['test-semaphore']
self.assertEqual(len(semaphore), 1)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
1)
A.addPatchset()
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(2))
self.waitUntilSettled()
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
semaphore = tenant.semaphore_handler.semaphores['test-semaphore']
self.assertEqual(len(semaphore), 1)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
1)
items = check_pipeline.getAllItems()
self.assertEqual(items[0].change.number, '1')
@ -7920,22 +7942,25 @@ class TestSemaphore(ZuulTestCase):
self.waitUntilSettled()
# The semaphore should be released
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
def test_semaphore_reconfigure(self):
"Test reconfigure with job semaphores"
self.executor_server.hold_jobs_in_build = True
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
1)
# reconfigure without layout change
self.scheds.execute(lambda app: app.sched.reconfigure(app.config))
@ -7943,8 +7968,9 @@ class TestSemaphore(ZuulTestCase):
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
# semaphore still must be held
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
1)
self.commitConfigUpdate(
'common-config',
@ -7960,8 +7986,9 @@ class TestSemaphore(ZuulTestCase):
self.assertEqual(len(self.builds), 0)
# The semaphore should be released
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
def test_semaphore_reconfigure_job_removal(self):
"Test job removal during reconfiguration with semaphores"
@ -7969,14 +7996,16 @@ class TestSemaphore(ZuulTestCase):
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
1)
self.commitConfigUpdate(
'common-config',
@ -7995,8 +8024,9 @@ class TestSemaphore(ZuulTestCase):
self.assertEqual(len(items), 0)
# The semaphore should be released
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
@ -8016,14 +8046,16 @@ class TestSemaphore(ZuulTestCase):
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
1)
self.commitConfigUpdate(
'common-config',
@ -8046,8 +8078,9 @@ class TestSemaphore(ZuulTestCase):
self.assertEqual(len(items), 0)
# The semaphore should be released
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
@ -8070,10 +8103,12 @@ class TestSemaphoreMultiTenant(ZuulTestCase):
C = self.fake_gerrit.addFakeChange('org/project2', 'master', 'C')
D = self.fake_gerrit.addFakeChange('org/project2', 'master', 'D')
E = self.fake_gerrit.addFakeChange('org/project2', 'master', 'E')
self.assertFalse('test-semaphore' in
tenant_one.semaphore_handler.semaphores)
self.assertFalse('test-semaphore' in
tenant_two.semaphore_handler.semaphores)
self.assertEqual(
len(tenant_one.semaphore_handler.semaphoreHolders(
"test-semaphore")), 0)
self.assertEqual(
len(tenant_two.semaphore_handler.semaphoreHolders(
"test-semaphore")), 0)
# add patches to project1 of tenant-one
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
@ -8085,12 +8120,12 @@ class TestSemaphoreMultiTenant(ZuulTestCase):
# semaphore of tenant-two must not be acquired
self.assertEqual(len(self.builds), 1)
self.assertEqual(self.builds[0].name, 'project1-test1')
self.assertTrue('test-semaphore' in
tenant_one.semaphore_handler.semaphores)
self.assertEqual(len(tenant_one.semaphore_handler.semaphores.get(
'test-semaphore', [])), 1)
self.assertFalse('test-semaphore' in
tenant_two.semaphore_handler.semaphores)
self.assertEqual(
len(tenant_one.semaphore_handler.semaphoreHolders(
"test-semaphore")), 1)
self.assertEqual(
len(tenant_two.semaphore_handler.semaphoreHolders(
"test-semaphore")), 0)
# add patches to project2 of tenant-two
self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
@ -8106,14 +8141,12 @@ class TestSemaphoreMultiTenant(ZuulTestCase):
self.assertEqual(self.builds[0].name, 'project1-test1')
self.assertEqual(self.builds[1].name, 'project2-test1')
self.assertEqual(self.builds[2].name, 'project2-test1')
self.assertTrue('test-semaphore' in
tenant_one.semaphore_handler.semaphores)
self.assertEqual(len(tenant_one.semaphore_handler.semaphores.get(
'test-semaphore', [])), 1)
self.assertTrue('test-semaphore' in
tenant_two.semaphore_handler.semaphores)
self.assertEqual(len(tenant_two.semaphore_handler.semaphores.get(
'test-semaphore', [])), 2)
self.assertEqual(
len(tenant_one.semaphore_handler.semaphoreHolders(
"test-semaphore")), 1)
self.assertEqual(
len(tenant_two.semaphore_handler.semaphoreHolders(
"test-semaphore")), 2)
self.executor_server.release('project1-test1')
self.waitUntilSettled()
@ -8126,14 +8159,12 @@ class TestSemaphoreMultiTenant(ZuulTestCase):
self.assertEqual(self.builds[0].name, 'project2-test1')
self.assertEqual(self.builds[1].name, 'project2-test1')
self.assertEqual(self.builds[2].name, 'project1-test1')
self.assertTrue('test-semaphore' in
tenant_one.semaphore_handler.semaphores)
self.assertEqual(len(tenant_one.semaphore_handler.semaphores.get(
'test-semaphore', [])), 1)
self.assertTrue('test-semaphore' in
tenant_two.semaphore_handler.semaphores)
self.assertEqual(len(tenant_two.semaphore_handler.semaphores.get(
'test-semaphore', [])), 2)
self.assertEqual(
len(tenant_one.semaphore_handler.semaphoreHolders(
"test-semaphore")), 1)
self.assertEqual(
len(tenant_two.semaphore_handler.semaphoreHolders(
"test-semaphore")), 2)
self.executor_server.release('project2-test1')
self.waitUntilSettled()
@ -8143,14 +8174,12 @@ class TestSemaphoreMultiTenant(ZuulTestCase):
# semaphore of tenant-one must be acquired once
# semaphore of tenant-two must be acquired once
self.assertEqual(len(self.builds), 2)
self.assertTrue('test-semaphore' in
tenant_one.semaphore_handler.semaphores)
self.assertEqual(len(tenant_one.semaphore_handler.semaphores.get(
'test-semaphore', [])), 1)
self.assertTrue('test-semaphore' in
tenant_two.semaphore_handler.semaphores)
self.assertEqual(len(tenant_two.semaphore_handler.semaphores.get(
'test-semaphore', [])), 1)
self.assertEqual(
len(tenant_one.semaphore_handler.semaphoreHolders(
"test-semaphore")), 1)
self.assertEqual(
len(tenant_two.semaphore_handler.semaphoreHolders(
"test-semaphore")), 1)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
@ -8161,10 +8190,12 @@ class TestSemaphoreMultiTenant(ZuulTestCase):
# semaphore of tenant-one must not be acquired
# semaphore of tenant-two must not be acquired
self.assertEqual(len(self.builds), 0)
self.assertFalse('test-semaphore' in
tenant_one.semaphore_handler.semaphores)
self.assertFalse('test-semaphore' in
tenant_two.semaphore_handler.semaphores)
self.assertEqual(
len(tenant_one.semaphore_handler.semaphoreHolders(
"test-semaphore")), 0)
self.assertEqual(
len(tenant_two.semaphore_handler.semaphoreHolders(
"test-semaphore")), 0)
self.assertEqual(A.reported, 1)
self.assertEqual(B.reported, 1)
@ -8319,10 +8350,9 @@ class TestSemaphoreInRepo(ZuulTestCase):
# one build must be in queue, one semaphores acquired
self.assertEqual(len(self.builds), 1)
self.assertEqual(self.builds[0].name, 'project-test2')
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(len(tenant.semaphore_handler.semaphores.get(
'test-semaphore', [])), 1)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
1)
self.executor_server.release('project-test2')
self.waitUntilSettled()
@ -8343,17 +8373,18 @@ class TestSemaphoreInRepo(ZuulTestCase):
self.assertEqual(len(self.builds), 2)
self.assertEqual(self.builds[0].name, 'project-test2')
self.assertEqual(self.builds[1].name, 'project-test2')
self.assertTrue('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(len(tenant.semaphore_handler.semaphores.get(
'test-semaphore', [])), 2)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
2)
self.executor_server.release('project-test2')
self.waitUntilSettled()
self.assertEqual(len(self.builds), 0)
self.assertFalse('test-semaphore' in
tenant.semaphore_handler.semaphores)
self.assertEqual(
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
0
)
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()

View File

@ -35,6 +35,7 @@ from zuul.lib import encryption
from zuul.lib.keystorage import KeyStorage
from zuul.lib.logutil import get_annotated_logger
from zuul.lib.re2util import filter_allowed_disallowed
from zuul.zk.semaphore import SemaphoreHandler
# Several forms accept either a single item or a list, this makes
@ -1661,6 +1662,9 @@ class TenantParser(object):
tenant.layout = self._parseLayout(
tenant, parsed_config, loading_errors)
tenant.semaphore_handler = SemaphoreHandler(
self.scheduler.zk_client, tenant.name, tenant.layout
)
return tenant

View File

@ -4883,113 +4883,6 @@ class Queue(ConfigObject):
)
class SemaphoreHandler(object):
log = logging.getLogger("zuul.SemaphoreHandler")
def __init__(self):
self.semaphores = {}
def acquire(self, item, job, request_resources):
"""
Aquires a semaphore for an item job combination. This gets called twice
during the lifecycle of a job. The first call is before requesting
build resources. The second call is before running the job. In which
call we really acquire the semaphore is defined by the job.
:param item: The item
:param job: The job
:param request_resources: True if we want to acquire for the request
resources phase, False if we want to acquire
for the run phase.
"""
if not job.semaphore:
return True
log = get_annotated_logger(self.log, item.event)
if job.semaphore.resources_first and request_resources:
# We're currently in the resource request phase and want to get the
# resources before locking. So we don't need to do anything here.
return True
else:
# As a safety net we want to acuire the semaphore at least in the
# run phase so don't filter this here as re-acuiring the semaphore
# is not a problem here if it has been already acquired before in
# the resources phase.
pass
semaphore_key = job.semaphore.name
m = self.semaphores.get(semaphore_key)
if not m:
# The semaphore is not held, acquire it
self._acquire(semaphore_key, item, job.name, log)
return True
if (item, job.name) in m:
# This item already holds the semaphore
return True
# semaphore is there, check max
if len(m) < self._max_count(item, job.semaphore.name):
self._acquire(semaphore_key, item, job.name, log)
return True
return False
def release(self, item, job):
if not job.semaphore:
return
log = get_annotated_logger(self.log, item.event)
semaphore_key = job.semaphore.name
m = self.semaphores.get(semaphore_key)
if not m:
# The semaphore is not held, nothing to do
log.error("Semaphore can not be released for %s "
"because the semaphore is not held", item)
return
if (item, job.name) in m:
# This item is a holder of the semaphore
self._release(semaphore_key, item, job.name, log)
return
log.error("Semaphore can not be released for %s "
"which does not hold it", item)
def _acquire(self, semaphore_key, item, job_name, log):
log.debug("Semaphore acquire {semaphore}: job {job}, item {item}"
.format(semaphore=semaphore_key,
job=job_name,
item=item))
if semaphore_key not in self.semaphores:
self.semaphores[semaphore_key] = []
self.semaphores[semaphore_key].append((item, job_name))
def _release(self, semaphore_key, item, job_name, log):
log.debug("Semaphore release {semaphore}: job {job}, item {item}"
.format(semaphore=semaphore_key,
job=job_name,
item=item))
sem_item = (item, job_name)
if sem_item in self.semaphores[semaphore_key]:
self.semaphores[semaphore_key].remove(sem_item)
# cleanup if there is no user of the semaphore anymore
if len(self.semaphores[semaphore_key]) == 0:
del self.semaphores[semaphore_key]
@staticmethod
def _max_count(item, semaphore_name):
if not item.layout:
# This should not occur as the layout of the item must already be
# built when acquiring or releasing a semaphore for a job.
raise Exception("Item {} has no layout".format(item))
# find the right semaphore
default_semaphore = Semaphore(semaphore_name, 1)
semaphores = item.layout.semaphores
return semaphores.get(semaphore_name, default_semaphore).max
class Tenant(object):
def __init__(self, name):
self.name = name
@ -5012,7 +4905,7 @@ class Tenant(object):
self.untrusted_projects = []
# The parsed config from those projects.
self.untrusted_projects_config = None
self.semaphore_handler = SemaphoreHandler()
self.semaphore_handler = None
# Metadata about projects for this tenant
# canonical project name -> TenantProjectConfig
self.project_configs = {}

View File

@ -897,10 +897,6 @@ class Scheduler(threading.Thread):
old_tenant = self.abide.tenants.get(tenant.name)
if old_tenant:
# Copy over semaphore handler so we don't loose the currently
# held semaphores.
tenant.semaphore_handler = old_tenant.semaphore_handler
self._reenqueueTenant(old_tenant, tenant)
# TODOv3(jeblair): update for tenants

139
zuul/zk/semaphore.py Normal file
View File

@ -0,0 +1,139 @@
# Copyright 2021 BMW Group
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
from urllib.parse import quote_plus
from kazoo.exceptions import BadVersionError, NoNodeError
from zuul.lib.logutil import get_annotated_logger
from zuul.zk import ZooKeeperBase
def holdersFromData(data):
if not data:
return []
return json.loads(data.decode("utf8"))
def holdersToData(holders):
return json.dumps(holders).encode("utf8")
class SemaphoreHandler(ZooKeeperBase):
log = logging.getLogger("zuul.zk.SemaphoreHandler")
semaphore_root = "/zuul/semaphores"
def __init__(self, client, tenant_name, layout):
super().__init__(client)
self.layout = layout
self.tenant_root = f"{self.semaphore_root}/{tenant_name}"
def acquire(self, item, job, request_resources):
if not job.semaphore:
return True
log = get_annotated_logger(self.log, item.event)
if job.semaphore.resources_first and request_resources:
# We're currently in the resource request phase and want to get the
# resources before locking. So we don't need to do anything here.
return True
else:
# As a safety net we want to acuire the semaphore at least in the
# run phase so don't filter this here as re-acuiring the semaphore
# is not a problem here if it has been already acquired before in
# the resources phase.
pass
semaphore_key = quote_plus(job.semaphore.name)
semaphore_path = f"{self.tenant_root}/{semaphore_key}"
semaphore_handle = f"{item.uuid}-{job.name}"
self.kazoo_client.ensure_path(semaphore_path)
semaphore_holders, zstat = self.getHolders(semaphore_path)
if semaphore_handle in semaphore_holders:
return True
# semaphore is there, check max
while len(semaphore_holders) < self._max_count(job.semaphore.name):
semaphore_holders.append(semaphore_handle)
try:
self.kazoo_client.set(semaphore_path,
holdersToData(semaphore_holders),
version=zstat.version)
except BadVersionError:
log.debug(
"Retrying semaphore %s acquire due to concurrent update",
job.semaphore.name)
semaphore_holders, zstat = self.getHolders(semaphore_path)
continue
log.debug("Semaphore %s acquired: job %s, item %s",
job.semaphore.name, job.name, item)
return True
return False
def getHolders(self, semaphore_path):
data, zstat = self.kazoo_client.get(semaphore_path)
return holdersFromData(data), zstat
def release(self, item, job):
if not job.semaphore:
return
log = get_annotated_logger(self.log, item.event)
semaphore_key = quote_plus(job.semaphore.name)
semaphore_path = f"{self.tenant_root}/{semaphore_key}"
semaphore_handle = f"{item.uuid}-{job.name}"
while True:
try:
semaphore_holders, zstat = self.getHolders(semaphore_path)
semaphore_holders.remove(semaphore_handle)
except (ValueError, NoNodeError):
log.error("Semaphore can not be released for %s "
"because the semaphore is not held", item)
break
try:
self.kazoo_client.set(semaphore_path,
holdersToData(semaphore_holders),
zstat.version)
except BadVersionError:
log.debug(
"Retrying semaphore %s release due to concurrent update",
job.semaphore.name)
continue
log.debug("Semaphore %s released: job %s, item %s",
job.semaphore.name, job.name, item)
break
def semaphoreHolders(self, semaphore_name):
semaphore_key = quote_plus(semaphore_name)
semaphore_path = f"{self.tenant_root}/{semaphore_key}"
try:
holders, _ = self.getHolders(semaphore_path)
except NoNodeError:
holders = []
return holders
def _max_count(self, semaphore_name: str) -> int:
semaphore = self.layout.semaphores.get(semaphore_name)
return 1 if semaphore is None else semaphore.max