Merge "Store semaphore state in Zookeeper"
This commit is contained in:
commit
2f522a6417
|
@ -7484,8 +7484,9 @@ class TestSemaphore(ZuulTestCase):
|
||||||
|
|
||||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||||
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
|
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
0)
|
||||||
|
|
||||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||||
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
|
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
|
||||||
|
@ -7500,8 +7501,9 @@ class TestSemaphore(ZuulTestCase):
|
||||||
|
|
||||||
# By default we first lock the semaphore and then get the nodes
|
# By default we first lock the semaphore and then get the nodes
|
||||||
# so at this point the semaphore needs to be aquired.
|
# so at this point the semaphore needs to be aquired.
|
||||||
self.assertTrue('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
1)
|
||||||
self.fake_nodepool.paused = False
|
self.fake_nodepool.paused = False
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
|
||||||
|
@ -7517,8 +7519,9 @@ class TestSemaphore(ZuulTestCase):
|
||||||
self.assertEqual(self.builds[0].name, 'project-test1')
|
self.assertEqual(self.builds[0].name, 'project-test1')
|
||||||
self.assertEqual(self.builds[1].name, 'project-test1')
|
self.assertEqual(self.builds[1].name, 'project-test1')
|
||||||
self.assertEqual(self.builds[2].name, 'semaphore-one-test2')
|
self.assertEqual(self.builds[2].name, 'semaphore-one-test2')
|
||||||
self.assertTrue('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
1)
|
||||||
|
|
||||||
self.executor_server.release('semaphore-one-test2')
|
self.executor_server.release('semaphore-one-test2')
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
@ -7527,8 +7530,9 @@ class TestSemaphore(ZuulTestCase):
|
||||||
self.assertEqual(self.builds[0].name, 'project-test1')
|
self.assertEqual(self.builds[0].name, 'project-test1')
|
||||||
self.assertEqual(self.builds[1].name, 'project-test1')
|
self.assertEqual(self.builds[1].name, 'project-test1')
|
||||||
self.assertEqual(self.builds[2].name, 'semaphore-one-test1')
|
self.assertEqual(self.builds[2].name, 'semaphore-one-test1')
|
||||||
self.assertTrue('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
1)
|
||||||
|
|
||||||
self.executor_server.release('semaphore-one-test1')
|
self.executor_server.release('semaphore-one-test1')
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
@ -7537,8 +7541,9 @@ class TestSemaphore(ZuulTestCase):
|
||||||
self.assertEqual(self.builds[0].name, 'project-test1')
|
self.assertEqual(self.builds[0].name, 'project-test1')
|
||||||
self.assertEqual(self.builds[1].name, 'project-test1')
|
self.assertEqual(self.builds[1].name, 'project-test1')
|
||||||
self.assertEqual(self.builds[2].name, 'semaphore-one-test2')
|
self.assertEqual(self.builds[2].name, 'semaphore-one-test2')
|
||||||
self.assertTrue('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
1)
|
||||||
|
|
||||||
self.executor_server.release('semaphore-one-test2')
|
self.executor_server.release('semaphore-one-test2')
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
@ -7546,8 +7551,9 @@ class TestSemaphore(ZuulTestCase):
|
||||||
self.assertEqual(len(self.builds), 2)
|
self.assertEqual(len(self.builds), 2)
|
||||||
self.assertEqual(self.builds[0].name, 'project-test1')
|
self.assertEqual(self.builds[0].name, 'project-test1')
|
||||||
self.assertEqual(self.builds[1].name, 'project-test1')
|
self.assertEqual(self.builds[1].name, 'project-test1')
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
0)
|
||||||
|
|
||||||
self.executor_server.hold_jobs_in_build = False
|
self.executor_server.hold_jobs_in_build = False
|
||||||
self.executor_server.release()
|
self.executor_server.release()
|
||||||
|
@ -7557,8 +7563,9 @@ class TestSemaphore(ZuulTestCase):
|
||||||
|
|
||||||
self.assertEqual(A.reported, 1)
|
self.assertEqual(A.reported, 1)
|
||||||
self.assertEqual(B.reported, 1)
|
self.assertEqual(B.reported, 1)
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
0)
|
||||||
|
|
||||||
def test_semaphore_two(self):
|
def test_semaphore_two(self):
|
||||||
"Test semaphores with max>1"
|
"Test semaphores with max>1"
|
||||||
|
@ -7567,8 +7574,9 @@ class TestSemaphore(ZuulTestCase):
|
||||||
self.executor_server.hold_jobs_in_build = True
|
self.executor_server.hold_jobs_in_build = True
|
||||||
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
|
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
|
||||||
B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
|
B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
|
||||||
self.assertFalse('test-semaphore-two' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders(
|
||||||
|
"test-semaphore-two")), 0)
|
||||||
|
|
||||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||||
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
|
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
|
||||||
|
@ -7579,10 +7587,9 @@ class TestSemaphore(ZuulTestCase):
|
||||||
self.assertEqual(self.builds[1].name, 'semaphore-two-test1')
|
self.assertEqual(self.builds[1].name, 'semaphore-two-test1')
|
||||||
self.assertEqual(self.builds[2].name, 'semaphore-two-test2')
|
self.assertEqual(self.builds[2].name, 'semaphore-two-test2')
|
||||||
self.assertEqual(self.builds[3].name, 'project-test1')
|
self.assertEqual(self.builds[3].name, 'project-test1')
|
||||||
self.assertTrue('test-semaphore-two' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders(
|
||||||
self.assertEqual(len(tenant.semaphore_handler.semaphores.get(
|
"test-semaphore-two")), 2)
|
||||||
'test-semaphore-two', [])), 2)
|
|
||||||
|
|
||||||
self.executor_server.release('semaphore-two-test1')
|
self.executor_server.release('semaphore-two-test1')
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
@ -7592,10 +7599,9 @@ class TestSemaphore(ZuulTestCase):
|
||||||
self.assertEqual(self.builds[1].name, 'semaphore-two-test2')
|
self.assertEqual(self.builds[1].name, 'semaphore-two-test2')
|
||||||
self.assertEqual(self.builds[2].name, 'project-test1')
|
self.assertEqual(self.builds[2].name, 'project-test1')
|
||||||
self.assertEqual(self.builds[3].name, 'semaphore-two-test1')
|
self.assertEqual(self.builds[3].name, 'semaphore-two-test1')
|
||||||
self.assertTrue('test-semaphore-two' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders(
|
||||||
self.assertEqual(len(tenant.semaphore_handler.semaphores.get(
|
"test-semaphore-two")), 2)
|
||||||
'test-semaphore-two', [])), 2)
|
|
||||||
|
|
||||||
self.executor_server.release('semaphore-two-test2')
|
self.executor_server.release('semaphore-two-test2')
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
@ -7605,10 +7611,9 @@ class TestSemaphore(ZuulTestCase):
|
||||||
self.assertEqual(self.builds[1].name, 'project-test1')
|
self.assertEqual(self.builds[1].name, 'project-test1')
|
||||||
self.assertEqual(self.builds[2].name, 'semaphore-two-test1')
|
self.assertEqual(self.builds[2].name, 'semaphore-two-test1')
|
||||||
self.assertEqual(self.builds[3].name, 'semaphore-two-test2')
|
self.assertEqual(self.builds[3].name, 'semaphore-two-test2')
|
||||||
self.assertTrue('test-semaphore-two' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders(
|
||||||
self.assertEqual(len(tenant.semaphore_handler.semaphores.get(
|
"test-semaphore-two")), 2)
|
||||||
'test-semaphore-two', [])), 2)
|
|
||||||
|
|
||||||
self.executor_server.release('semaphore-two-test1')
|
self.executor_server.release('semaphore-two-test1')
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
@ -7617,10 +7622,9 @@ class TestSemaphore(ZuulTestCase):
|
||||||
self.assertEqual(self.builds[0].name, 'project-test1')
|
self.assertEqual(self.builds[0].name, 'project-test1')
|
||||||
self.assertEqual(self.builds[1].name, 'project-test1')
|
self.assertEqual(self.builds[1].name, 'project-test1')
|
||||||
self.assertEqual(self.builds[2].name, 'semaphore-two-test2')
|
self.assertEqual(self.builds[2].name, 'semaphore-two-test2')
|
||||||
self.assertTrue('test-semaphore-two' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders(
|
||||||
self.assertEqual(len(tenant.semaphore_handler.semaphores.get(
|
"test-semaphore-two")), 1)
|
||||||
'test-semaphore-two', [])), 1)
|
|
||||||
|
|
||||||
self.executor_server.release('semaphore-two-test2')
|
self.executor_server.release('semaphore-two-test2')
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
@ -7628,8 +7632,9 @@ class TestSemaphore(ZuulTestCase):
|
||||||
self.assertEqual(len(self.builds), 2)
|
self.assertEqual(len(self.builds), 2)
|
||||||
self.assertEqual(self.builds[0].name, 'project-test1')
|
self.assertEqual(self.builds[0].name, 'project-test1')
|
||||||
self.assertEqual(self.builds[1].name, 'project-test1')
|
self.assertEqual(self.builds[1].name, 'project-test1')
|
||||||
self.assertFalse('test-semaphore-two' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders(
|
||||||
|
"test-semaphore-two")), 0)
|
||||||
|
|
||||||
self.executor_server.hold_jobs_in_build = False
|
self.executor_server.hold_jobs_in_build = False
|
||||||
self.executor_server.release()
|
self.executor_server.release()
|
||||||
|
@ -7648,15 +7653,17 @@ class TestSemaphore(ZuulTestCase):
|
||||||
self.fake_nodepool.pause()
|
self.fake_nodepool.pause()
|
||||||
|
|
||||||
A = self.fake_gerrit.addFakeChange('org/project2', 'master', 'A')
|
A = self.fake_gerrit.addFakeChange('org/project2', 'master', 'A')
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
0)
|
||||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
|
||||||
# By default we first lock the semaphore and then get the nodes
|
# By default we first lock the semaphore and then get the nodes
|
||||||
# so at this point the semaphore needs to be aquired.
|
# so at this point the semaphore needs to be aquired.
|
||||||
self.assertTrue('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
1)
|
||||||
|
|
||||||
# Fail the node request and unpause
|
# Fail the node request and unpause
|
||||||
req = self.fake_nodepool.getNodeRequests()[0]
|
req = self.fake_nodepool.getNodeRequests()[0]
|
||||||
|
@ -7666,8 +7673,10 @@ class TestSemaphore(ZuulTestCase):
|
||||||
|
|
||||||
# At this point the job that holds the semaphore failed with
|
# At this point the job that holds the semaphore failed with
|
||||||
# node_failure and the semaphore must be released.
|
# node_failure and the semaphore must be released.
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
0)
|
||||||
|
|
||||||
self.assertEquals(1, A.reported)
|
self.assertEquals(1, A.reported)
|
||||||
self.assertIn('semaphore-one-test3 semaphore-one-test3 : NODE_FAILURE',
|
self.assertIn('semaphore-one-test3 semaphore-one-test3 : NODE_FAILURE',
|
||||||
A.messages[0])
|
A.messages[0])
|
||||||
|
@ -7684,8 +7693,9 @@ class TestSemaphore(ZuulTestCase):
|
||||||
|
|
||||||
A = self.fake_gerrit.addFakeChange('org/project3', 'master', 'A')
|
A = self.fake_gerrit.addFakeChange('org/project3', 'master', 'A')
|
||||||
B = self.fake_gerrit.addFakeChange('org/project3', 'master', 'B')
|
B = self.fake_gerrit.addFakeChange('org/project3', 'master', 'B')
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
0)
|
||||||
|
|
||||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||||
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
|
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
|
||||||
|
@ -7693,8 +7703,9 @@ class TestSemaphore(ZuulTestCase):
|
||||||
|
|
||||||
# Here we first get the resources and then lock the semaphore
|
# Here we first get the resources and then lock the semaphore
|
||||||
# so at this point the semaphore should not be aquired.
|
# so at this point the semaphore should not be aquired.
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
0)
|
||||||
self.fake_nodepool.paused = False
|
self.fake_nodepool.paused = False
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
|
||||||
|
@ -7706,14 +7717,14 @@ class TestSemaphore(ZuulTestCase):
|
||||||
|
|
||||||
self.executor_server.release('semaphore-one-test1')
|
self.executor_server.release('semaphore-one-test1')
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
|
||||||
self.assertEqual(len(self.builds), 3)
|
self.assertEqual(len(self.builds), 3)
|
||||||
self.assertEqual(self.builds[0].name, 'project-test1')
|
self.assertEqual(self.builds[0].name, 'project-test1')
|
||||||
self.assertEqual(self.builds[1].name, 'project-test1')
|
self.assertEqual(self.builds[1].name, 'project-test1')
|
||||||
self.assertEqual(self.builds[2].name,
|
self.assertEqual(self.builds[2].name,
|
||||||
'semaphore-one-test2-resources-first')
|
'semaphore-one-test2-resources-first')
|
||||||
self.assertTrue('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
1)
|
||||||
|
|
||||||
self.executor_server.hold_jobs_in_build = False
|
self.executor_server.hold_jobs_in_build = False
|
||||||
self.executor_server.release()
|
self.executor_server.release()
|
||||||
|
@ -7727,15 +7738,16 @@ class TestSemaphore(ZuulTestCase):
|
||||||
self.fake_nodepool.pause()
|
self.fake_nodepool.pause()
|
||||||
|
|
||||||
A = self.fake_gerrit.addFakeChange('org/project4', 'master', 'A')
|
A = self.fake_gerrit.addFakeChange('org/project4', 'master', 'A')
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
0)
|
||||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
|
||||||
# With resources first we first get the nodes so at this point the
|
# With resources first we first get the nodes so at this point the
|
||||||
# semaphore must not be aquired.
|
# semaphore must not be aquired.
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(len(tenant.semaphore_handler.semaphoreHolders(
|
||||||
tenant.semaphore_handler.semaphores)
|
"test-semaphore")), 0)
|
||||||
|
|
||||||
# Fail the node request and unpause
|
# Fail the node request and unpause
|
||||||
req = self.fake_nodepool.getNodeRequests()[0]
|
req = self.fake_nodepool.getNodeRequests()[0]
|
||||||
|
@ -7745,8 +7757,8 @@ class TestSemaphore(ZuulTestCase):
|
||||||
|
|
||||||
# At this point the job should never have acuired a semaphore so check
|
# At this point the job should never have acuired a semaphore so check
|
||||||
# that it still has not locked a semaphore.
|
# that it still has not locked a semaphore.
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(len(tenant.semaphore_handler.semaphoreHolders(
|
||||||
tenant.semaphore_handler.semaphores)
|
"test-semaphore")), 0)
|
||||||
self.assertEquals(1, A.reported)
|
self.assertEquals(1, A.reported)
|
||||||
self.assertIn('semaphore-one-test1-resources-first : NODE_FAILURE',
|
self.assertIn('semaphore-one-test1-resources-first : NODE_FAILURE',
|
||||||
A.messages[0])
|
A.messages[0])
|
||||||
|
@ -7756,8 +7768,9 @@ class TestSemaphore(ZuulTestCase):
|
||||||
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
|
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
|
||||||
|
|
||||||
A = self.fake_gerrit.addFakeChange('org/project2', 'master', 'A')
|
A = self.fake_gerrit.addFakeChange('org/project2', 'master', 'A')
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
0)
|
||||||
|
|
||||||
# Simulate a single zk error in useNodeSet
|
# Simulate a single zk error in useNodeSet
|
||||||
orig_useNodeSet = self.scheds.first.sched.nodepool.useNodeSet
|
orig_useNodeSet = self.scheds.first.sched.nodepool.useNodeSet
|
||||||
|
@ -7773,8 +7786,9 @@ class TestSemaphore(ZuulTestCase):
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
|
||||||
# The semaphore should be released
|
# The semaphore should be released
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
0)
|
||||||
|
|
||||||
# cleanup the queue
|
# cleanup the queue
|
||||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||||
|
@ -7787,14 +7801,16 @@ class TestSemaphore(ZuulTestCase):
|
||||||
check_pipeline = tenant.layout.pipelines['check']
|
check_pipeline = tenant.layout.pipelines['check']
|
||||||
|
|
||||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
0)
|
||||||
|
|
||||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
|
||||||
self.assertTrue('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
1)
|
||||||
|
|
||||||
self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
|
self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
@ -7804,8 +7820,9 @@ class TestSemaphore(ZuulTestCase):
|
||||||
self.assertEqual(len(items), 0)
|
self.assertEqual(len(items), 0)
|
||||||
|
|
||||||
# The semaphore should be released
|
# The semaphore should be released
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
0)
|
||||||
|
|
||||||
self.executor_server.hold_jobs_in_build = False
|
self.executor_server.hold_jobs_in_build = False
|
||||||
self.executor_server.release()
|
self.executor_server.release()
|
||||||
|
@ -7823,14 +7840,16 @@ class TestSemaphore(ZuulTestCase):
|
||||||
check_pipeline = tenant.layout.pipelines['check']
|
check_pipeline = tenant.layout.pipelines['check']
|
||||||
|
|
||||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
0)
|
||||||
|
|
||||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
|
||||||
self.assertTrue('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
1)
|
||||||
|
|
||||||
self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
|
self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
@ -7840,8 +7859,9 @@ class TestSemaphore(ZuulTestCase):
|
||||||
self.assertEqual(len(items), 0)
|
self.assertEqual(len(items), 0)
|
||||||
|
|
||||||
# The semaphore should be released
|
# The semaphore should be released
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
0)
|
||||||
|
|
||||||
self.executor_server.hold_jobs_in_build = False
|
self.executor_server.hold_jobs_in_build = False
|
||||||
self.fake_nodepool.paused = False
|
self.fake_nodepool.paused = False
|
||||||
|
@ -7865,8 +7885,9 @@ class TestSemaphore(ZuulTestCase):
|
||||||
check_pipeline = tenant.layout.pipelines['check']
|
check_pipeline = tenant.layout.pipelines['check']
|
||||||
|
|
||||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
0)
|
||||||
|
|
||||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
@ -7879,8 +7900,9 @@ class TestSemaphore(ZuulTestCase):
|
||||||
if len(self.scheds.first.sched.nodepool.requests) == 0:
|
if len(self.scheds.first.sched.nodepool.requests) == 0:
|
||||||
break
|
break
|
||||||
|
|
||||||
self.assertTrue('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
1)
|
||||||
|
|
||||||
self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
|
self.fake_gerrit.addEvent(A.getChangeAbandonedEvent())
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
@ -7890,8 +7912,9 @@ class TestSemaphore(ZuulTestCase):
|
||||||
self.assertEqual(len(items), 0)
|
self.assertEqual(len(items), 0)
|
||||||
|
|
||||||
# The semaphore should be released
|
# The semaphore should be released
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
0)
|
||||||
|
|
||||||
self.executor_server.release()
|
self.executor_server.release()
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
@ -7903,25 +7926,24 @@ class TestSemaphore(ZuulTestCase):
|
||||||
check_pipeline = tenant.layout.pipelines['check']
|
check_pipeline = tenant.layout.pipelines['check']
|
||||||
|
|
||||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
0)
|
||||||
|
|
||||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
|
||||||
self.assertTrue('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
semaphore = tenant.semaphore_handler.semaphores['test-semaphore']
|
1)
|
||||||
self.assertEqual(len(semaphore), 1)
|
|
||||||
|
|
||||||
A.addPatchset()
|
A.addPatchset()
|
||||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(2))
|
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(2))
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
|
||||||
self.assertTrue('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
semaphore = tenant.semaphore_handler.semaphores['test-semaphore']
|
1)
|
||||||
self.assertEqual(len(semaphore), 1)
|
|
||||||
|
|
||||||
items = check_pipeline.getAllItems()
|
items = check_pipeline.getAllItems()
|
||||||
self.assertEqual(items[0].change.number, '1')
|
self.assertEqual(items[0].change.number, '1')
|
||||||
|
@ -7933,22 +7955,25 @@ class TestSemaphore(ZuulTestCase):
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
|
||||||
# The semaphore should be released
|
# The semaphore should be released
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
0)
|
||||||
|
|
||||||
def test_semaphore_reconfigure(self):
|
def test_semaphore_reconfigure(self):
|
||||||
"Test reconfigure with job semaphores"
|
"Test reconfigure with job semaphores"
|
||||||
self.executor_server.hold_jobs_in_build = True
|
self.executor_server.hold_jobs_in_build = True
|
||||||
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
|
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
|
||||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
0)
|
||||||
|
|
||||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
|
||||||
self.assertTrue('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
1)
|
||||||
|
|
||||||
# reconfigure without layout change
|
# reconfigure without layout change
|
||||||
self.scheds.execute(lambda app: app.sched.reconfigure(app.config))
|
self.scheds.execute(lambda app: app.sched.reconfigure(app.config))
|
||||||
|
@ -7956,8 +7981,9 @@ class TestSemaphore(ZuulTestCase):
|
||||||
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
|
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
|
||||||
|
|
||||||
# semaphore still must be held
|
# semaphore still must be held
|
||||||
self.assertTrue('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
1)
|
||||||
|
|
||||||
self.commitConfigUpdate(
|
self.commitConfigUpdate(
|
||||||
'common-config',
|
'common-config',
|
||||||
|
@ -7973,8 +7999,9 @@ class TestSemaphore(ZuulTestCase):
|
||||||
self.assertEqual(len(self.builds), 0)
|
self.assertEqual(len(self.builds), 0)
|
||||||
|
|
||||||
# The semaphore should be released
|
# The semaphore should be released
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
0)
|
||||||
|
|
||||||
def test_semaphore_reconfigure_job_removal(self):
|
def test_semaphore_reconfigure_job_removal(self):
|
||||||
"Test job removal during reconfiguration with semaphores"
|
"Test job removal during reconfiguration with semaphores"
|
||||||
|
@ -7982,14 +8009,16 @@ class TestSemaphore(ZuulTestCase):
|
||||||
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
|
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
|
||||||
|
|
||||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
0)
|
||||||
|
|
||||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
|
||||||
self.assertTrue('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
1)
|
||||||
|
|
||||||
self.commitConfigUpdate(
|
self.commitConfigUpdate(
|
||||||
'common-config',
|
'common-config',
|
||||||
|
@ -8008,8 +8037,9 @@ class TestSemaphore(ZuulTestCase):
|
||||||
self.assertEqual(len(items), 0)
|
self.assertEqual(len(items), 0)
|
||||||
|
|
||||||
# The semaphore should be released
|
# The semaphore should be released
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
0)
|
||||||
|
|
||||||
self.executor_server.hold_jobs_in_build = False
|
self.executor_server.hold_jobs_in_build = False
|
||||||
self.executor_server.release()
|
self.executor_server.release()
|
||||||
|
@ -8029,14 +8059,16 @@ class TestSemaphore(ZuulTestCase):
|
||||||
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
|
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
|
||||||
|
|
||||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
0)
|
||||||
|
|
||||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
|
||||||
self.assertTrue('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
1)
|
||||||
|
|
||||||
self.commitConfigUpdate(
|
self.commitConfigUpdate(
|
||||||
'common-config',
|
'common-config',
|
||||||
|
@ -8059,8 +8091,9 @@ class TestSemaphore(ZuulTestCase):
|
||||||
self.assertEqual(len(items), 0)
|
self.assertEqual(len(items), 0)
|
||||||
|
|
||||||
# The semaphore should be released
|
# The semaphore should be released
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
0)
|
||||||
|
|
||||||
self.executor_server.hold_jobs_in_build = False
|
self.executor_server.hold_jobs_in_build = False
|
||||||
self.executor_server.release()
|
self.executor_server.release()
|
||||||
|
@ -8083,10 +8116,12 @@ class TestSemaphoreMultiTenant(ZuulTestCase):
|
||||||
C = self.fake_gerrit.addFakeChange('org/project2', 'master', 'C')
|
C = self.fake_gerrit.addFakeChange('org/project2', 'master', 'C')
|
||||||
D = self.fake_gerrit.addFakeChange('org/project2', 'master', 'D')
|
D = self.fake_gerrit.addFakeChange('org/project2', 'master', 'D')
|
||||||
E = self.fake_gerrit.addFakeChange('org/project2', 'master', 'E')
|
E = self.fake_gerrit.addFakeChange('org/project2', 'master', 'E')
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant_one.semaphore_handler.semaphores)
|
len(tenant_one.semaphore_handler.semaphoreHolders(
|
||||||
self.assertFalse('test-semaphore' in
|
"test-semaphore")), 0)
|
||||||
tenant_two.semaphore_handler.semaphores)
|
self.assertEqual(
|
||||||
|
len(tenant_two.semaphore_handler.semaphoreHolders(
|
||||||
|
"test-semaphore")), 0)
|
||||||
|
|
||||||
# add patches to project1 of tenant-one
|
# add patches to project1 of tenant-one
|
||||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||||
|
@ -8098,12 +8133,12 @@ class TestSemaphoreMultiTenant(ZuulTestCase):
|
||||||
# semaphore of tenant-two must not be acquired
|
# semaphore of tenant-two must not be acquired
|
||||||
self.assertEqual(len(self.builds), 1)
|
self.assertEqual(len(self.builds), 1)
|
||||||
self.assertEqual(self.builds[0].name, 'project1-test1')
|
self.assertEqual(self.builds[0].name, 'project1-test1')
|
||||||
self.assertTrue('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant_one.semaphore_handler.semaphores)
|
len(tenant_one.semaphore_handler.semaphoreHolders(
|
||||||
self.assertEqual(len(tenant_one.semaphore_handler.semaphores.get(
|
"test-semaphore")), 1)
|
||||||
'test-semaphore', [])), 1)
|
self.assertEqual(
|
||||||
self.assertFalse('test-semaphore' in
|
len(tenant_two.semaphore_handler.semaphoreHolders(
|
||||||
tenant_two.semaphore_handler.semaphores)
|
"test-semaphore")), 0)
|
||||||
|
|
||||||
# add patches to project2 of tenant-two
|
# add patches to project2 of tenant-two
|
||||||
self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
|
self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
|
||||||
|
@ -8119,14 +8154,12 @@ class TestSemaphoreMultiTenant(ZuulTestCase):
|
||||||
self.assertEqual(self.builds[0].name, 'project1-test1')
|
self.assertEqual(self.builds[0].name, 'project1-test1')
|
||||||
self.assertEqual(self.builds[1].name, 'project2-test1')
|
self.assertEqual(self.builds[1].name, 'project2-test1')
|
||||||
self.assertEqual(self.builds[2].name, 'project2-test1')
|
self.assertEqual(self.builds[2].name, 'project2-test1')
|
||||||
self.assertTrue('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant_one.semaphore_handler.semaphores)
|
len(tenant_one.semaphore_handler.semaphoreHolders(
|
||||||
self.assertEqual(len(tenant_one.semaphore_handler.semaphores.get(
|
"test-semaphore")), 1)
|
||||||
'test-semaphore', [])), 1)
|
self.assertEqual(
|
||||||
self.assertTrue('test-semaphore' in
|
len(tenant_two.semaphore_handler.semaphoreHolders(
|
||||||
tenant_two.semaphore_handler.semaphores)
|
"test-semaphore")), 2)
|
||||||
self.assertEqual(len(tenant_two.semaphore_handler.semaphores.get(
|
|
||||||
'test-semaphore', [])), 2)
|
|
||||||
|
|
||||||
self.executor_server.release('project1-test1')
|
self.executor_server.release('project1-test1')
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
@ -8139,14 +8172,12 @@ class TestSemaphoreMultiTenant(ZuulTestCase):
|
||||||
self.assertEqual(self.builds[0].name, 'project2-test1')
|
self.assertEqual(self.builds[0].name, 'project2-test1')
|
||||||
self.assertEqual(self.builds[1].name, 'project2-test1')
|
self.assertEqual(self.builds[1].name, 'project2-test1')
|
||||||
self.assertEqual(self.builds[2].name, 'project1-test1')
|
self.assertEqual(self.builds[2].name, 'project1-test1')
|
||||||
self.assertTrue('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant_one.semaphore_handler.semaphores)
|
len(tenant_one.semaphore_handler.semaphoreHolders(
|
||||||
self.assertEqual(len(tenant_one.semaphore_handler.semaphores.get(
|
"test-semaphore")), 1)
|
||||||
'test-semaphore', [])), 1)
|
self.assertEqual(
|
||||||
self.assertTrue('test-semaphore' in
|
len(tenant_two.semaphore_handler.semaphoreHolders(
|
||||||
tenant_two.semaphore_handler.semaphores)
|
"test-semaphore")), 2)
|
||||||
self.assertEqual(len(tenant_two.semaphore_handler.semaphores.get(
|
|
||||||
'test-semaphore', [])), 2)
|
|
||||||
|
|
||||||
self.executor_server.release('project2-test1')
|
self.executor_server.release('project2-test1')
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
@ -8156,14 +8187,12 @@ class TestSemaphoreMultiTenant(ZuulTestCase):
|
||||||
# semaphore of tenant-one must be acquired once
|
# semaphore of tenant-one must be acquired once
|
||||||
# semaphore of tenant-two must be acquired once
|
# semaphore of tenant-two must be acquired once
|
||||||
self.assertEqual(len(self.builds), 2)
|
self.assertEqual(len(self.builds), 2)
|
||||||
self.assertTrue('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant_one.semaphore_handler.semaphores)
|
len(tenant_one.semaphore_handler.semaphoreHolders(
|
||||||
self.assertEqual(len(tenant_one.semaphore_handler.semaphores.get(
|
"test-semaphore")), 1)
|
||||||
'test-semaphore', [])), 1)
|
self.assertEqual(
|
||||||
self.assertTrue('test-semaphore' in
|
len(tenant_two.semaphore_handler.semaphoreHolders(
|
||||||
tenant_two.semaphore_handler.semaphores)
|
"test-semaphore")), 1)
|
||||||
self.assertEqual(len(tenant_two.semaphore_handler.semaphores.get(
|
|
||||||
'test-semaphore', [])), 1)
|
|
||||||
|
|
||||||
self.executor_server.hold_jobs_in_build = False
|
self.executor_server.hold_jobs_in_build = False
|
||||||
self.executor_server.release()
|
self.executor_server.release()
|
||||||
|
@ -8174,10 +8203,12 @@ class TestSemaphoreMultiTenant(ZuulTestCase):
|
||||||
# semaphore of tenant-one must not be acquired
|
# semaphore of tenant-one must not be acquired
|
||||||
# semaphore of tenant-two must not be acquired
|
# semaphore of tenant-two must not be acquired
|
||||||
self.assertEqual(len(self.builds), 0)
|
self.assertEqual(len(self.builds), 0)
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant_one.semaphore_handler.semaphores)
|
len(tenant_one.semaphore_handler.semaphoreHolders(
|
||||||
self.assertFalse('test-semaphore' in
|
"test-semaphore")), 0)
|
||||||
tenant_two.semaphore_handler.semaphores)
|
self.assertEqual(
|
||||||
|
len(tenant_two.semaphore_handler.semaphoreHolders(
|
||||||
|
"test-semaphore")), 0)
|
||||||
|
|
||||||
self.assertEqual(A.reported, 1)
|
self.assertEqual(A.reported, 1)
|
||||||
self.assertEqual(B.reported, 1)
|
self.assertEqual(B.reported, 1)
|
||||||
|
@ -8332,10 +8363,9 @@ class TestSemaphoreInRepo(ZuulTestCase):
|
||||||
# one build must be in queue, one semaphores acquired
|
# one build must be in queue, one semaphores acquired
|
||||||
self.assertEqual(len(self.builds), 1)
|
self.assertEqual(len(self.builds), 1)
|
||||||
self.assertEqual(self.builds[0].name, 'project-test2')
|
self.assertEqual(self.builds[0].name, 'project-test2')
|
||||||
self.assertTrue('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
self.assertEqual(len(tenant.semaphore_handler.semaphores.get(
|
1)
|
||||||
'test-semaphore', [])), 1)
|
|
||||||
|
|
||||||
self.executor_server.release('project-test2')
|
self.executor_server.release('project-test2')
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
@ -8356,17 +8386,18 @@ class TestSemaphoreInRepo(ZuulTestCase):
|
||||||
self.assertEqual(len(self.builds), 2)
|
self.assertEqual(len(self.builds), 2)
|
||||||
self.assertEqual(self.builds[0].name, 'project-test2')
|
self.assertEqual(self.builds[0].name, 'project-test2')
|
||||||
self.assertEqual(self.builds[1].name, 'project-test2')
|
self.assertEqual(self.builds[1].name, 'project-test2')
|
||||||
self.assertTrue('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
self.assertEqual(len(tenant.semaphore_handler.semaphores.get(
|
2)
|
||||||
'test-semaphore', [])), 2)
|
|
||||||
|
|
||||||
self.executor_server.release('project-test2')
|
self.executor_server.release('project-test2')
|
||||||
self.waitUntilSettled()
|
self.waitUntilSettled()
|
||||||
|
|
||||||
self.assertEqual(len(self.builds), 0)
|
self.assertEqual(len(self.builds), 0)
|
||||||
self.assertFalse('test-semaphore' in
|
self.assertEqual(
|
||||||
tenant.semaphore_handler.semaphores)
|
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
|
||||||
|
0
|
||||||
|
)
|
||||||
|
|
||||||
self.executor_server.hold_jobs_in_build = False
|
self.executor_server.hold_jobs_in_build = False
|
||||||
self.executor_server.release()
|
self.executor_server.release()
|
||||||
|
|
|
@ -35,6 +35,7 @@ from zuul.lib import encryption
|
||||||
from zuul.lib.keystorage import KeyStorage
|
from zuul.lib.keystorage import KeyStorage
|
||||||
from zuul.lib.logutil import get_annotated_logger
|
from zuul.lib.logutil import get_annotated_logger
|
||||||
from zuul.lib.re2util import filter_allowed_disallowed
|
from zuul.lib.re2util import filter_allowed_disallowed
|
||||||
|
from zuul.zk.semaphore import SemaphoreHandler
|
||||||
|
|
||||||
|
|
||||||
# Several forms accept either a single item or a list, this makes
|
# Several forms accept either a single item or a list, this makes
|
||||||
|
@ -1661,6 +1662,9 @@ class TenantParser(object):
|
||||||
|
|
||||||
tenant.layout = self._parseLayout(
|
tenant.layout = self._parseLayout(
|
||||||
tenant, parsed_config, loading_errors)
|
tenant, parsed_config, loading_errors)
|
||||||
|
tenant.semaphore_handler = SemaphoreHandler(
|
||||||
|
self.scheduler.zk_client, tenant.name, tenant.layout
|
||||||
|
)
|
||||||
|
|
||||||
return tenant
|
return tenant
|
||||||
|
|
||||||
|
|
109
zuul/model.py
109
zuul/model.py
|
@ -4876,113 +4876,6 @@ class Queue(ConfigObject):
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class SemaphoreHandler(object):
|
|
||||||
log = logging.getLogger("zuul.SemaphoreHandler")
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
self.semaphores = {}
|
|
||||||
|
|
||||||
def acquire(self, item, job, request_resources):
|
|
||||||
"""
|
|
||||||
Aquires a semaphore for an item job combination. This gets called twice
|
|
||||||
during the lifecycle of a job. The first call is before requesting
|
|
||||||
build resources. The second call is before running the job. In which
|
|
||||||
call we really acquire the semaphore is defined by the job.
|
|
||||||
|
|
||||||
:param item: The item
|
|
||||||
:param job: The job
|
|
||||||
:param request_resources: True if we want to acquire for the request
|
|
||||||
resources phase, False if we want to acquire
|
|
||||||
for the run phase.
|
|
||||||
"""
|
|
||||||
if not job.semaphore:
|
|
||||||
return True
|
|
||||||
|
|
||||||
log = get_annotated_logger(self.log, item.event)
|
|
||||||
if job.semaphore.resources_first and request_resources:
|
|
||||||
# We're currently in the resource request phase and want to get the
|
|
||||||
# resources before locking. So we don't need to do anything here.
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
# As a safety net we want to acuire the semaphore at least in the
|
|
||||||
# run phase so don't filter this here as re-acuiring the semaphore
|
|
||||||
# is not a problem here if it has been already acquired before in
|
|
||||||
# the resources phase.
|
|
||||||
pass
|
|
||||||
|
|
||||||
semaphore_key = job.semaphore.name
|
|
||||||
|
|
||||||
m = self.semaphores.get(semaphore_key)
|
|
||||||
if not m:
|
|
||||||
# The semaphore is not held, acquire it
|
|
||||||
self._acquire(semaphore_key, item, job.name, log)
|
|
||||||
return True
|
|
||||||
if (item, job.name) in m:
|
|
||||||
# This item already holds the semaphore
|
|
||||||
return True
|
|
||||||
|
|
||||||
# semaphore is there, check max
|
|
||||||
if len(m) < self._max_count(item, job.semaphore.name):
|
|
||||||
self._acquire(semaphore_key, item, job.name, log)
|
|
||||||
return True
|
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
||||||
def release(self, item, job):
|
|
||||||
if not job.semaphore:
|
|
||||||
return
|
|
||||||
|
|
||||||
log = get_annotated_logger(self.log, item.event)
|
|
||||||
semaphore_key = job.semaphore.name
|
|
||||||
|
|
||||||
m = self.semaphores.get(semaphore_key)
|
|
||||||
if not m:
|
|
||||||
# The semaphore is not held, nothing to do
|
|
||||||
log.error("Semaphore can not be released for %s "
|
|
||||||
"because the semaphore is not held", item)
|
|
||||||
return
|
|
||||||
if (item, job.name) in m:
|
|
||||||
# This item is a holder of the semaphore
|
|
||||||
self._release(semaphore_key, item, job.name, log)
|
|
||||||
return
|
|
||||||
log.error("Semaphore can not be released for %s "
|
|
||||||
"which does not hold it", item)
|
|
||||||
|
|
||||||
def _acquire(self, semaphore_key, item, job_name, log):
|
|
||||||
log.debug("Semaphore acquire {semaphore}: job {job}, item {item}"
|
|
||||||
.format(semaphore=semaphore_key,
|
|
||||||
job=job_name,
|
|
||||||
item=item))
|
|
||||||
if semaphore_key not in self.semaphores:
|
|
||||||
self.semaphores[semaphore_key] = []
|
|
||||||
self.semaphores[semaphore_key].append((item, job_name))
|
|
||||||
|
|
||||||
def _release(self, semaphore_key, item, job_name, log):
|
|
||||||
log.debug("Semaphore release {semaphore}: job {job}, item {item}"
|
|
||||||
.format(semaphore=semaphore_key,
|
|
||||||
job=job_name,
|
|
||||||
item=item))
|
|
||||||
sem_item = (item, job_name)
|
|
||||||
if sem_item in self.semaphores[semaphore_key]:
|
|
||||||
self.semaphores[semaphore_key].remove(sem_item)
|
|
||||||
|
|
||||||
# cleanup if there is no user of the semaphore anymore
|
|
||||||
if len(self.semaphores[semaphore_key]) == 0:
|
|
||||||
del self.semaphores[semaphore_key]
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _max_count(item, semaphore_name):
|
|
||||||
if not item.layout:
|
|
||||||
# This should not occur as the layout of the item must already be
|
|
||||||
# built when acquiring or releasing a semaphore for a job.
|
|
||||||
raise Exception("Item {} has no layout".format(item))
|
|
||||||
|
|
||||||
# find the right semaphore
|
|
||||||
default_semaphore = Semaphore(semaphore_name, 1)
|
|
||||||
semaphores = item.layout.semaphores
|
|
||||||
return semaphores.get(semaphore_name, default_semaphore).max
|
|
||||||
|
|
||||||
|
|
||||||
class Tenant(object):
|
class Tenant(object):
|
||||||
def __init__(self, name):
|
def __init__(self, name):
|
||||||
self.name = name
|
self.name = name
|
||||||
|
@ -5005,7 +4898,7 @@ class Tenant(object):
|
||||||
self.untrusted_projects = []
|
self.untrusted_projects = []
|
||||||
# The parsed config from those projects.
|
# The parsed config from those projects.
|
||||||
self.untrusted_projects_config = None
|
self.untrusted_projects_config = None
|
||||||
self.semaphore_handler = SemaphoreHandler()
|
self.semaphore_handler = None
|
||||||
# Metadata about projects for this tenant
|
# Metadata about projects for this tenant
|
||||||
# canonical project name -> TenantProjectConfig
|
# canonical project name -> TenantProjectConfig
|
||||||
self.project_configs = {}
|
self.project_configs = {}
|
||||||
|
|
|
@ -901,10 +901,6 @@ class Scheduler(threading.Thread):
|
||||||
old_tenant = self.abide.tenants.get(tenant.name)
|
old_tenant = self.abide.tenants.get(tenant.name)
|
||||||
|
|
||||||
if old_tenant:
|
if old_tenant:
|
||||||
# Copy over semaphore handler so we don't loose the currently
|
|
||||||
# held semaphores.
|
|
||||||
tenant.semaphore_handler = old_tenant.semaphore_handler
|
|
||||||
|
|
||||||
self._reenqueueTenant(old_tenant, tenant)
|
self._reenqueueTenant(old_tenant, tenant)
|
||||||
|
|
||||||
# TODOv3(jeblair): update for tenants
|
# TODOv3(jeblair): update for tenants
|
||||||
|
|
|
@ -0,0 +1,139 @@
|
||||||
|
# Copyright 2021 BMW Group
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
from urllib.parse import quote_plus
|
||||||
|
|
||||||
|
from kazoo.exceptions import BadVersionError, NoNodeError
|
||||||
|
|
||||||
|
from zuul.lib.logutil import get_annotated_logger
|
||||||
|
from zuul.zk import ZooKeeperBase
|
||||||
|
|
||||||
|
|
||||||
|
def holdersFromData(data):
|
||||||
|
if not data:
|
||||||
|
return []
|
||||||
|
return json.loads(data.decode("utf8"))
|
||||||
|
|
||||||
|
|
||||||
|
def holdersToData(holders):
|
||||||
|
return json.dumps(holders).encode("utf8")
|
||||||
|
|
||||||
|
|
||||||
|
class SemaphoreHandler(ZooKeeperBase):
|
||||||
|
log = logging.getLogger("zuul.zk.SemaphoreHandler")
|
||||||
|
|
||||||
|
semaphore_root = "/zuul/semaphores"
|
||||||
|
|
||||||
|
def __init__(self, client, tenant_name, layout):
|
||||||
|
super().__init__(client)
|
||||||
|
self.layout = layout
|
||||||
|
self.tenant_root = f"{self.semaphore_root}/{tenant_name}"
|
||||||
|
|
||||||
|
def acquire(self, item, job, request_resources):
|
||||||
|
if not job.semaphore:
|
||||||
|
return True
|
||||||
|
|
||||||
|
log = get_annotated_logger(self.log, item.event)
|
||||||
|
if job.semaphore.resources_first and request_resources:
|
||||||
|
# We're currently in the resource request phase and want to get the
|
||||||
|
# resources before locking. So we don't need to do anything here.
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
# As a safety net we want to acuire the semaphore at least in the
|
||||||
|
# run phase so don't filter this here as re-acuiring the semaphore
|
||||||
|
# is not a problem here if it has been already acquired before in
|
||||||
|
# the resources phase.
|
||||||
|
pass
|
||||||
|
|
||||||
|
semaphore_key = quote_plus(job.semaphore.name)
|
||||||
|
semaphore_path = f"{self.tenant_root}/{semaphore_key}"
|
||||||
|
semaphore_handle = f"{item.uuid}-{job.name}"
|
||||||
|
|
||||||
|
self.kazoo_client.ensure_path(semaphore_path)
|
||||||
|
semaphore_holders, zstat = self.getHolders(semaphore_path)
|
||||||
|
|
||||||
|
if semaphore_handle in semaphore_holders:
|
||||||
|
return True
|
||||||
|
|
||||||
|
# semaphore is there, check max
|
||||||
|
while len(semaphore_holders) < self._max_count(job.semaphore.name):
|
||||||
|
semaphore_holders.append(semaphore_handle)
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.kazoo_client.set(semaphore_path,
|
||||||
|
holdersToData(semaphore_holders),
|
||||||
|
version=zstat.version)
|
||||||
|
except BadVersionError:
|
||||||
|
log.debug(
|
||||||
|
"Retrying semaphore %s acquire due to concurrent update",
|
||||||
|
job.semaphore.name)
|
||||||
|
semaphore_holders, zstat = self.getHolders(semaphore_path)
|
||||||
|
continue
|
||||||
|
|
||||||
|
log.debug("Semaphore %s acquired: job %s, item %s",
|
||||||
|
job.semaphore.name, job.name, item)
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def getHolders(self, semaphore_path):
|
||||||
|
data, zstat = self.kazoo_client.get(semaphore_path)
|
||||||
|
return holdersFromData(data), zstat
|
||||||
|
|
||||||
|
def release(self, item, job):
|
||||||
|
if not job.semaphore:
|
||||||
|
return
|
||||||
|
|
||||||
|
log = get_annotated_logger(self.log, item.event)
|
||||||
|
semaphore_key = quote_plus(job.semaphore.name)
|
||||||
|
semaphore_path = f"{self.tenant_root}/{semaphore_key}"
|
||||||
|
semaphore_handle = f"{item.uuid}-{job.name}"
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
semaphore_holders, zstat = self.getHolders(semaphore_path)
|
||||||
|
semaphore_holders.remove(semaphore_handle)
|
||||||
|
except (ValueError, NoNodeError):
|
||||||
|
log.error("Semaphore can not be released for %s "
|
||||||
|
"because the semaphore is not held", item)
|
||||||
|
break
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.kazoo_client.set(semaphore_path,
|
||||||
|
holdersToData(semaphore_holders),
|
||||||
|
zstat.version)
|
||||||
|
except BadVersionError:
|
||||||
|
log.debug(
|
||||||
|
"Retrying semaphore %s release due to concurrent update",
|
||||||
|
job.semaphore.name)
|
||||||
|
continue
|
||||||
|
|
||||||
|
log.debug("Semaphore %s released: job %s, item %s",
|
||||||
|
job.semaphore.name, job.name, item)
|
||||||
|
break
|
||||||
|
|
||||||
|
def semaphoreHolders(self, semaphore_name):
|
||||||
|
semaphore_key = quote_plus(semaphore_name)
|
||||||
|
semaphore_path = f"{self.tenant_root}/{semaphore_key}"
|
||||||
|
try:
|
||||||
|
holders, _ = self.getHolders(semaphore_path)
|
||||||
|
except NoNodeError:
|
||||||
|
holders = []
|
||||||
|
return holders
|
||||||
|
|
||||||
|
def _max_count(self, semaphore_name: str) -> int:
|
||||||
|
semaphore = self.layout.semaphores.get(semaphore_name)
|
||||||
|
return 1 if semaphore is None else semaphore.max
|
Loading…
Reference in New Issue