diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 577c323948..9ff19b9b46 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -7471,8 +7471,9 @@ class TestSemaphore(ZuulTestCase): A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') - self.assertFalse('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) @@ -7487,8 +7488,9 @@ class TestSemaphore(ZuulTestCase): # By default we first lock the semaphore and then get the nodes # so at this point the semaphore needs to be aquired. - self.assertTrue('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 1) self.fake_nodepool.paused = False self.waitUntilSettled() @@ -7504,8 +7506,9 @@ class TestSemaphore(ZuulTestCase): self.assertEqual(self.builds[0].name, 'project-test1') self.assertEqual(self.builds[1].name, 'project-test1') self.assertEqual(self.builds[2].name, 'semaphore-one-test2') - self.assertTrue('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 1) self.executor_server.release('semaphore-one-test2') self.waitUntilSettled() @@ -7514,8 +7517,9 @@ class TestSemaphore(ZuulTestCase): self.assertEqual(self.builds[0].name, 'project-test1') self.assertEqual(self.builds[1].name, 'project-test1') self.assertEqual(self.builds[2].name, 'semaphore-one-test1') - self.assertTrue('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 1) self.executor_server.release('semaphore-one-test1') self.waitUntilSettled() @@ -7524,8 +7528,9 @@ class TestSemaphore(ZuulTestCase): self.assertEqual(self.builds[0].name, 'project-test1') self.assertEqual(self.builds[1].name, 'project-test1') self.assertEqual(self.builds[2].name, 'semaphore-one-test2') - self.assertTrue('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 1) self.executor_server.release('semaphore-one-test2') self.waitUntilSettled() @@ -7533,8 +7538,9 @@ class TestSemaphore(ZuulTestCase): self.assertEqual(len(self.builds), 2) self.assertEqual(self.builds[0].name, 'project-test1') self.assertEqual(self.builds[1].name, 'project-test1') - self.assertFalse('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) self.executor_server.hold_jobs_in_build = False self.executor_server.release() @@ -7544,8 +7550,9 @@ class TestSemaphore(ZuulTestCase): self.assertEqual(A.reported, 1) self.assertEqual(B.reported, 1) - self.assertFalse('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) def test_semaphore_two(self): "Test semaphores with max>1" @@ -7554,8 +7561,9 @@ class TestSemaphore(ZuulTestCase): self.executor_server.hold_jobs_in_build = True A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B') - self.assertFalse('test-semaphore-two' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders( + "test-semaphore-two")), 0) self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) @@ -7566,10 +7574,9 @@ class TestSemaphore(ZuulTestCase): self.assertEqual(self.builds[1].name, 'semaphore-two-test1') self.assertEqual(self.builds[2].name, 'semaphore-two-test2') self.assertEqual(self.builds[3].name, 'project-test1') - self.assertTrue('test-semaphore-two' in - tenant.semaphore_handler.semaphores) - self.assertEqual(len(tenant.semaphore_handler.semaphores.get( - 'test-semaphore-two', [])), 2) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders( + "test-semaphore-two")), 2) self.executor_server.release('semaphore-two-test1') self.waitUntilSettled() @@ -7579,10 +7586,9 @@ class TestSemaphore(ZuulTestCase): self.assertEqual(self.builds[1].name, 'semaphore-two-test2') self.assertEqual(self.builds[2].name, 'project-test1') self.assertEqual(self.builds[3].name, 'semaphore-two-test1') - self.assertTrue('test-semaphore-two' in - tenant.semaphore_handler.semaphores) - self.assertEqual(len(tenant.semaphore_handler.semaphores.get( - 'test-semaphore-two', [])), 2) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders( + "test-semaphore-two")), 2) self.executor_server.release('semaphore-two-test2') self.waitUntilSettled() @@ -7592,10 +7598,9 @@ class TestSemaphore(ZuulTestCase): self.assertEqual(self.builds[1].name, 'project-test1') self.assertEqual(self.builds[2].name, 'semaphore-two-test1') self.assertEqual(self.builds[3].name, 'semaphore-two-test2') - self.assertTrue('test-semaphore-two' in - tenant.semaphore_handler.semaphores) - self.assertEqual(len(tenant.semaphore_handler.semaphores.get( - 'test-semaphore-two', [])), 2) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders( + "test-semaphore-two")), 2) self.executor_server.release('semaphore-two-test1') self.waitUntilSettled() @@ -7604,10 +7609,9 @@ class TestSemaphore(ZuulTestCase): self.assertEqual(self.builds[0].name, 'project-test1') self.assertEqual(self.builds[1].name, 'project-test1') self.assertEqual(self.builds[2].name, 'semaphore-two-test2') - self.assertTrue('test-semaphore-two' in - tenant.semaphore_handler.semaphores) - self.assertEqual(len(tenant.semaphore_handler.semaphores.get( - 'test-semaphore-two', [])), 1) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders( + "test-semaphore-two")), 1) self.executor_server.release('semaphore-two-test2') self.waitUntilSettled() @@ -7615,8 +7619,9 @@ class TestSemaphore(ZuulTestCase): self.assertEqual(len(self.builds), 2) self.assertEqual(self.builds[0].name, 'project-test1') self.assertEqual(self.builds[1].name, 'project-test1') - self.assertFalse('test-semaphore-two' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders( + "test-semaphore-two")), 0) self.executor_server.hold_jobs_in_build = False self.executor_server.release() @@ -7635,15 +7640,17 @@ class TestSemaphore(ZuulTestCase): self.fake_nodepool.pause() A = self.fake_gerrit.addFakeChange('org/project2', 'master', 'A') - self.assertFalse('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) self.waitUntilSettled() # By default we first lock the semaphore and then get the nodes # so at this point the semaphore needs to be aquired. - self.assertTrue('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 1) # Fail the node request and unpause req = self.fake_nodepool.getNodeRequests()[0] @@ -7653,8 +7660,10 @@ class TestSemaphore(ZuulTestCase): # At this point the job that holds the semaphore failed with # node_failure and the semaphore must be released. - self.assertFalse('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) + self.assertEquals(1, A.reported) self.assertIn('semaphore-one-test3 semaphore-one-test3 : NODE_FAILURE', A.messages[0]) @@ -7671,8 +7680,9 @@ class TestSemaphore(ZuulTestCase): A = self.fake_gerrit.addFakeChange('org/project3', 'master', 'A') B = self.fake_gerrit.addFakeChange('org/project3', 'master', 'B') - self.assertFalse('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) @@ -7680,8 +7690,9 @@ class TestSemaphore(ZuulTestCase): # Here we first get the resources and then lock the semaphore # so at this point the semaphore should not be aquired. - self.assertFalse('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) self.fake_nodepool.paused = False self.waitUntilSettled() @@ -7693,14 +7704,14 @@ class TestSemaphore(ZuulTestCase): self.executor_server.release('semaphore-one-test1') self.waitUntilSettled() - self.assertEqual(len(self.builds), 3) self.assertEqual(self.builds[0].name, 'project-test1') self.assertEqual(self.builds[1].name, 'project-test1') self.assertEqual(self.builds[2].name, 'semaphore-one-test2-resources-first') - self.assertTrue('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 1) self.executor_server.hold_jobs_in_build = False self.executor_server.release() @@ -7714,15 +7725,16 @@ class TestSemaphore(ZuulTestCase): self.fake_nodepool.pause() A = self.fake_gerrit.addFakeChange('org/project4', 'master', 'A') - self.assertFalse('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) self.waitUntilSettled() # With resources first we first get the nodes so at this point the # semaphore must not be aquired. - self.assertFalse('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual(len(tenant.semaphore_handler.semaphoreHolders( + "test-semaphore")), 0) # Fail the node request and unpause req = self.fake_nodepool.getNodeRequests()[0] @@ -7732,8 +7744,8 @@ class TestSemaphore(ZuulTestCase): # At this point the job should never have acuired a semaphore so check # that it still has not locked a semaphore. - self.assertFalse('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual(len(tenant.semaphore_handler.semaphoreHolders( + "test-semaphore")), 0) self.assertEquals(1, A.reported) self.assertIn('semaphore-one-test1-resources-first : NODE_FAILURE', A.messages[0]) @@ -7743,8 +7755,9 @@ class TestSemaphore(ZuulTestCase): tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') A = self.fake_gerrit.addFakeChange('org/project2', 'master', 'A') - self.assertFalse('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) # Simulate a single zk error in useNodeSet orig_useNodeSet = self.scheds.first.sched.nodepool.useNodeSet @@ -7760,8 +7773,9 @@ class TestSemaphore(ZuulTestCase): self.waitUntilSettled() # The semaphore should be released - self.assertFalse('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) # cleanup the queue self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) @@ -7774,14 +7788,16 @@ class TestSemaphore(ZuulTestCase): check_pipeline = tenant.layout.pipelines['check'] A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') - self.assertFalse('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) self.waitUntilSettled() - self.assertTrue('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 1) self.fake_gerrit.addEvent(A.getChangeAbandonedEvent()) self.waitUntilSettled() @@ -7791,8 +7807,9 @@ class TestSemaphore(ZuulTestCase): self.assertEqual(len(items), 0) # The semaphore should be released - self.assertFalse('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) self.executor_server.hold_jobs_in_build = False self.executor_server.release() @@ -7810,14 +7827,16 @@ class TestSemaphore(ZuulTestCase): check_pipeline = tenant.layout.pipelines['check'] A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') - self.assertFalse('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) self.waitUntilSettled() - self.assertTrue('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 1) self.fake_gerrit.addEvent(A.getChangeAbandonedEvent()) self.waitUntilSettled() @@ -7827,8 +7846,9 @@ class TestSemaphore(ZuulTestCase): self.assertEqual(len(items), 0) # The semaphore should be released - self.assertFalse('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) self.executor_server.hold_jobs_in_build = False self.fake_nodepool.paused = False @@ -7852,8 +7872,9 @@ class TestSemaphore(ZuulTestCase): check_pipeline = tenant.layout.pipelines['check'] A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') - self.assertFalse('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) self.waitUntilSettled() @@ -7866,8 +7887,9 @@ class TestSemaphore(ZuulTestCase): if len(self.scheds.first.sched.nodepool.requests) == 0: break - self.assertTrue('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 1) self.fake_gerrit.addEvent(A.getChangeAbandonedEvent()) self.waitUntilSettled() @@ -7877,8 +7899,9 @@ class TestSemaphore(ZuulTestCase): self.assertEqual(len(items), 0) # The semaphore should be released - self.assertFalse('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) self.executor_server.release() self.waitUntilSettled() @@ -7890,25 +7913,24 @@ class TestSemaphore(ZuulTestCase): check_pipeline = tenant.layout.pipelines['check'] A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') - self.assertFalse('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) self.waitUntilSettled() - self.assertTrue('test-semaphore' in - tenant.semaphore_handler.semaphores) - semaphore = tenant.semaphore_handler.semaphores['test-semaphore'] - self.assertEqual(len(semaphore), 1) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 1) A.addPatchset() self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(2)) self.waitUntilSettled() - self.assertTrue('test-semaphore' in - tenant.semaphore_handler.semaphores) - semaphore = tenant.semaphore_handler.semaphores['test-semaphore'] - self.assertEqual(len(semaphore), 1) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 1) items = check_pipeline.getAllItems() self.assertEqual(items[0].change.number, '1') @@ -7920,22 +7942,25 @@ class TestSemaphore(ZuulTestCase): self.waitUntilSettled() # The semaphore should be released - self.assertFalse('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) def test_semaphore_reconfigure(self): "Test reconfigure with job semaphores" self.executor_server.hold_jobs_in_build = True tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') - self.assertFalse('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) self.waitUntilSettled() - self.assertTrue('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 1) # reconfigure without layout change self.scheds.execute(lambda app: app.sched.reconfigure(app.config)) @@ -7943,8 +7968,9 @@ class TestSemaphore(ZuulTestCase): tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') # semaphore still must be held - self.assertTrue('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 1) self.commitConfigUpdate( 'common-config', @@ -7960,8 +7986,9 @@ class TestSemaphore(ZuulTestCase): self.assertEqual(len(self.builds), 0) # The semaphore should be released - self.assertFalse('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) def test_semaphore_reconfigure_job_removal(self): "Test job removal during reconfiguration with semaphores" @@ -7969,14 +7996,16 @@ class TestSemaphore(ZuulTestCase): tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') - self.assertFalse('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) self.waitUntilSettled() - self.assertTrue('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 1) self.commitConfigUpdate( 'common-config', @@ -7995,8 +8024,9 @@ class TestSemaphore(ZuulTestCase): self.assertEqual(len(items), 0) # The semaphore should be released - self.assertFalse('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) self.executor_server.hold_jobs_in_build = False self.executor_server.release() @@ -8016,14 +8046,16 @@ class TestSemaphore(ZuulTestCase): tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') - self.assertFalse('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) self.waitUntilSettled() - self.assertTrue('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 1) self.commitConfigUpdate( 'common-config', @@ -8046,8 +8078,9 @@ class TestSemaphore(ZuulTestCase): self.assertEqual(len(items), 0) # The semaphore should be released - self.assertFalse('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0) self.executor_server.hold_jobs_in_build = False self.executor_server.release() @@ -8070,10 +8103,12 @@ class TestSemaphoreMultiTenant(ZuulTestCase): C = self.fake_gerrit.addFakeChange('org/project2', 'master', 'C') D = self.fake_gerrit.addFakeChange('org/project2', 'master', 'D') E = self.fake_gerrit.addFakeChange('org/project2', 'master', 'E') - self.assertFalse('test-semaphore' in - tenant_one.semaphore_handler.semaphores) - self.assertFalse('test-semaphore' in - tenant_two.semaphore_handler.semaphores) + self.assertEqual( + len(tenant_one.semaphore_handler.semaphoreHolders( + "test-semaphore")), 0) + self.assertEqual( + len(tenant_two.semaphore_handler.semaphoreHolders( + "test-semaphore")), 0) # add patches to project1 of tenant-one self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) @@ -8085,12 +8120,12 @@ class TestSemaphoreMultiTenant(ZuulTestCase): # semaphore of tenant-two must not be acquired self.assertEqual(len(self.builds), 1) self.assertEqual(self.builds[0].name, 'project1-test1') - self.assertTrue('test-semaphore' in - tenant_one.semaphore_handler.semaphores) - self.assertEqual(len(tenant_one.semaphore_handler.semaphores.get( - 'test-semaphore', [])), 1) - self.assertFalse('test-semaphore' in - tenant_two.semaphore_handler.semaphores) + self.assertEqual( + len(tenant_one.semaphore_handler.semaphoreHolders( + "test-semaphore")), 1) + self.assertEqual( + len(tenant_two.semaphore_handler.semaphoreHolders( + "test-semaphore")), 0) # add patches to project2 of tenant-two self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1)) @@ -8106,14 +8141,12 @@ class TestSemaphoreMultiTenant(ZuulTestCase): self.assertEqual(self.builds[0].name, 'project1-test1') self.assertEqual(self.builds[1].name, 'project2-test1') self.assertEqual(self.builds[2].name, 'project2-test1') - self.assertTrue('test-semaphore' in - tenant_one.semaphore_handler.semaphores) - self.assertEqual(len(tenant_one.semaphore_handler.semaphores.get( - 'test-semaphore', [])), 1) - self.assertTrue('test-semaphore' in - tenant_two.semaphore_handler.semaphores) - self.assertEqual(len(tenant_two.semaphore_handler.semaphores.get( - 'test-semaphore', [])), 2) + self.assertEqual( + len(tenant_one.semaphore_handler.semaphoreHolders( + "test-semaphore")), 1) + self.assertEqual( + len(tenant_two.semaphore_handler.semaphoreHolders( + "test-semaphore")), 2) self.executor_server.release('project1-test1') self.waitUntilSettled() @@ -8126,14 +8159,12 @@ class TestSemaphoreMultiTenant(ZuulTestCase): self.assertEqual(self.builds[0].name, 'project2-test1') self.assertEqual(self.builds[1].name, 'project2-test1') self.assertEqual(self.builds[2].name, 'project1-test1') - self.assertTrue('test-semaphore' in - tenant_one.semaphore_handler.semaphores) - self.assertEqual(len(tenant_one.semaphore_handler.semaphores.get( - 'test-semaphore', [])), 1) - self.assertTrue('test-semaphore' in - tenant_two.semaphore_handler.semaphores) - self.assertEqual(len(tenant_two.semaphore_handler.semaphores.get( - 'test-semaphore', [])), 2) + self.assertEqual( + len(tenant_one.semaphore_handler.semaphoreHolders( + "test-semaphore")), 1) + self.assertEqual( + len(tenant_two.semaphore_handler.semaphoreHolders( + "test-semaphore")), 2) self.executor_server.release('project2-test1') self.waitUntilSettled() @@ -8143,14 +8174,12 @@ class TestSemaphoreMultiTenant(ZuulTestCase): # semaphore of tenant-one must be acquired once # semaphore of tenant-two must be acquired once self.assertEqual(len(self.builds), 2) - self.assertTrue('test-semaphore' in - tenant_one.semaphore_handler.semaphores) - self.assertEqual(len(tenant_one.semaphore_handler.semaphores.get( - 'test-semaphore', [])), 1) - self.assertTrue('test-semaphore' in - tenant_two.semaphore_handler.semaphores) - self.assertEqual(len(tenant_two.semaphore_handler.semaphores.get( - 'test-semaphore', [])), 1) + self.assertEqual( + len(tenant_one.semaphore_handler.semaphoreHolders( + "test-semaphore")), 1) + self.assertEqual( + len(tenant_two.semaphore_handler.semaphoreHolders( + "test-semaphore")), 1) self.executor_server.hold_jobs_in_build = False self.executor_server.release() @@ -8161,10 +8190,12 @@ class TestSemaphoreMultiTenant(ZuulTestCase): # semaphore of tenant-one must not be acquired # semaphore of tenant-two must not be acquired self.assertEqual(len(self.builds), 0) - self.assertFalse('test-semaphore' in - tenant_one.semaphore_handler.semaphores) - self.assertFalse('test-semaphore' in - tenant_two.semaphore_handler.semaphores) + self.assertEqual( + len(tenant_one.semaphore_handler.semaphoreHolders( + "test-semaphore")), 0) + self.assertEqual( + len(tenant_two.semaphore_handler.semaphoreHolders( + "test-semaphore")), 0) self.assertEqual(A.reported, 1) self.assertEqual(B.reported, 1) @@ -8319,10 +8350,9 @@ class TestSemaphoreInRepo(ZuulTestCase): # one build must be in queue, one semaphores acquired self.assertEqual(len(self.builds), 1) self.assertEqual(self.builds[0].name, 'project-test2') - self.assertTrue('test-semaphore' in - tenant.semaphore_handler.semaphores) - self.assertEqual(len(tenant.semaphore_handler.semaphores.get( - 'test-semaphore', [])), 1) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 1) self.executor_server.release('project-test2') self.waitUntilSettled() @@ -8343,17 +8373,18 @@ class TestSemaphoreInRepo(ZuulTestCase): self.assertEqual(len(self.builds), 2) self.assertEqual(self.builds[0].name, 'project-test2') self.assertEqual(self.builds[1].name, 'project-test2') - self.assertTrue('test-semaphore' in - tenant.semaphore_handler.semaphores) - self.assertEqual(len(tenant.semaphore_handler.semaphores.get( - 'test-semaphore', [])), 2) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 2) self.executor_server.release('project-test2') self.waitUntilSettled() self.assertEqual(len(self.builds), 0) - self.assertFalse('test-semaphore' in - tenant.semaphore_handler.semaphores) + self.assertEqual( + len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), + 0 + ) self.executor_server.hold_jobs_in_build = False self.executor_server.release() diff --git a/zuul/configloader.py b/zuul/configloader.py index 5ce090000a..4217e3e2b4 100644 --- a/zuul/configloader.py +++ b/zuul/configloader.py @@ -35,6 +35,7 @@ from zuul.lib import encryption from zuul.lib.keystorage import KeyStorage from zuul.lib.logutil import get_annotated_logger from zuul.lib.re2util import filter_allowed_disallowed +from zuul.zk.semaphore import SemaphoreHandler # Several forms accept either a single item or a list, this makes @@ -1661,6 +1662,9 @@ class TenantParser(object): tenant.layout = self._parseLayout( tenant, parsed_config, loading_errors) + tenant.semaphore_handler = SemaphoreHandler( + self.scheduler.zk_client, tenant.name, tenant.layout + ) return tenant diff --git a/zuul/model.py b/zuul/model.py index 684266c9e4..e00d08cde5 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -4883,113 +4883,6 @@ class Queue(ConfigObject): ) -class SemaphoreHandler(object): - log = logging.getLogger("zuul.SemaphoreHandler") - - def __init__(self): - self.semaphores = {} - - def acquire(self, item, job, request_resources): - """ - Aquires a semaphore for an item job combination. This gets called twice - during the lifecycle of a job. The first call is before requesting - build resources. The second call is before running the job. In which - call we really acquire the semaphore is defined by the job. - - :param item: The item - :param job: The job - :param request_resources: True if we want to acquire for the request - resources phase, False if we want to acquire - for the run phase. - """ - if not job.semaphore: - return True - - log = get_annotated_logger(self.log, item.event) - if job.semaphore.resources_first and request_resources: - # We're currently in the resource request phase and want to get the - # resources before locking. So we don't need to do anything here. - return True - else: - # As a safety net we want to acuire the semaphore at least in the - # run phase so don't filter this here as re-acuiring the semaphore - # is not a problem here if it has been already acquired before in - # the resources phase. - pass - - semaphore_key = job.semaphore.name - - m = self.semaphores.get(semaphore_key) - if not m: - # The semaphore is not held, acquire it - self._acquire(semaphore_key, item, job.name, log) - return True - if (item, job.name) in m: - # This item already holds the semaphore - return True - - # semaphore is there, check max - if len(m) < self._max_count(item, job.semaphore.name): - self._acquire(semaphore_key, item, job.name, log) - return True - - return False - - def release(self, item, job): - if not job.semaphore: - return - - log = get_annotated_logger(self.log, item.event) - semaphore_key = job.semaphore.name - - m = self.semaphores.get(semaphore_key) - if not m: - # The semaphore is not held, nothing to do - log.error("Semaphore can not be released for %s " - "because the semaphore is not held", item) - return - if (item, job.name) in m: - # This item is a holder of the semaphore - self._release(semaphore_key, item, job.name, log) - return - log.error("Semaphore can not be released for %s " - "which does not hold it", item) - - def _acquire(self, semaphore_key, item, job_name, log): - log.debug("Semaphore acquire {semaphore}: job {job}, item {item}" - .format(semaphore=semaphore_key, - job=job_name, - item=item)) - if semaphore_key not in self.semaphores: - self.semaphores[semaphore_key] = [] - self.semaphores[semaphore_key].append((item, job_name)) - - def _release(self, semaphore_key, item, job_name, log): - log.debug("Semaphore release {semaphore}: job {job}, item {item}" - .format(semaphore=semaphore_key, - job=job_name, - item=item)) - sem_item = (item, job_name) - if sem_item in self.semaphores[semaphore_key]: - self.semaphores[semaphore_key].remove(sem_item) - - # cleanup if there is no user of the semaphore anymore - if len(self.semaphores[semaphore_key]) == 0: - del self.semaphores[semaphore_key] - - @staticmethod - def _max_count(item, semaphore_name): - if not item.layout: - # This should not occur as the layout of the item must already be - # built when acquiring or releasing a semaphore for a job. - raise Exception("Item {} has no layout".format(item)) - - # find the right semaphore - default_semaphore = Semaphore(semaphore_name, 1) - semaphores = item.layout.semaphores - return semaphores.get(semaphore_name, default_semaphore).max - - class Tenant(object): def __init__(self, name): self.name = name @@ -5012,7 +4905,7 @@ class Tenant(object): self.untrusted_projects = [] # The parsed config from those projects. self.untrusted_projects_config = None - self.semaphore_handler = SemaphoreHandler() + self.semaphore_handler = None # Metadata about projects for this tenant # canonical project name -> TenantProjectConfig self.project_configs = {} diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 9efd47d787..01c1125df4 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -897,10 +897,6 @@ class Scheduler(threading.Thread): old_tenant = self.abide.tenants.get(tenant.name) if old_tenant: - # Copy over semaphore handler so we don't loose the currently - # held semaphores. - tenant.semaphore_handler = old_tenant.semaphore_handler - self._reenqueueTenant(old_tenant, tenant) # TODOv3(jeblair): update for tenants diff --git a/zuul/zk/semaphore.py b/zuul/zk/semaphore.py new file mode 100644 index 0000000000..e710c199ef --- /dev/null +++ b/zuul/zk/semaphore.py @@ -0,0 +1,139 @@ +# Copyright 2021 BMW Group +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import json +import logging +from urllib.parse import quote_plus + +from kazoo.exceptions import BadVersionError, NoNodeError + +from zuul.lib.logutil import get_annotated_logger +from zuul.zk import ZooKeeperBase + + +def holdersFromData(data): + if not data: + return [] + return json.loads(data.decode("utf8")) + + +def holdersToData(holders): + return json.dumps(holders).encode("utf8") + + +class SemaphoreHandler(ZooKeeperBase): + log = logging.getLogger("zuul.zk.SemaphoreHandler") + + semaphore_root = "/zuul/semaphores" + + def __init__(self, client, tenant_name, layout): + super().__init__(client) + self.layout = layout + self.tenant_root = f"{self.semaphore_root}/{tenant_name}" + + def acquire(self, item, job, request_resources): + if not job.semaphore: + return True + + log = get_annotated_logger(self.log, item.event) + if job.semaphore.resources_first and request_resources: + # We're currently in the resource request phase and want to get the + # resources before locking. So we don't need to do anything here. + return True + else: + # As a safety net we want to acuire the semaphore at least in the + # run phase so don't filter this here as re-acuiring the semaphore + # is not a problem here if it has been already acquired before in + # the resources phase. + pass + + semaphore_key = quote_plus(job.semaphore.name) + semaphore_path = f"{self.tenant_root}/{semaphore_key}" + semaphore_handle = f"{item.uuid}-{job.name}" + + self.kazoo_client.ensure_path(semaphore_path) + semaphore_holders, zstat = self.getHolders(semaphore_path) + + if semaphore_handle in semaphore_holders: + return True + + # semaphore is there, check max + while len(semaphore_holders) < self._max_count(job.semaphore.name): + semaphore_holders.append(semaphore_handle) + + try: + self.kazoo_client.set(semaphore_path, + holdersToData(semaphore_holders), + version=zstat.version) + except BadVersionError: + log.debug( + "Retrying semaphore %s acquire due to concurrent update", + job.semaphore.name) + semaphore_holders, zstat = self.getHolders(semaphore_path) + continue + + log.debug("Semaphore %s acquired: job %s, item %s", + job.semaphore.name, job.name, item) + return True + + return False + + def getHolders(self, semaphore_path): + data, zstat = self.kazoo_client.get(semaphore_path) + return holdersFromData(data), zstat + + def release(self, item, job): + if not job.semaphore: + return + + log = get_annotated_logger(self.log, item.event) + semaphore_key = quote_plus(job.semaphore.name) + semaphore_path = f"{self.tenant_root}/{semaphore_key}" + semaphore_handle = f"{item.uuid}-{job.name}" + + while True: + try: + semaphore_holders, zstat = self.getHolders(semaphore_path) + semaphore_holders.remove(semaphore_handle) + except (ValueError, NoNodeError): + log.error("Semaphore can not be released for %s " + "because the semaphore is not held", item) + break + + try: + self.kazoo_client.set(semaphore_path, + holdersToData(semaphore_holders), + zstat.version) + except BadVersionError: + log.debug( + "Retrying semaphore %s release due to concurrent update", + job.semaphore.name) + continue + + log.debug("Semaphore %s released: job %s, item %s", + job.semaphore.name, job.name, item) + break + + def semaphoreHolders(self, semaphore_name): + semaphore_key = quote_plus(semaphore_name) + semaphore_path = f"{self.tenant_root}/{semaphore_key}" + try: + holders, _ = self.getHolders(semaphore_path) + except NoNodeError: + holders = [] + return holders + + def _max_count(self, semaphore_name: str) -> int: + semaphore = self.layout.semaphores.get(semaphore_name) + return 1 if semaphore is None else semaphore.max