From ae639864a073f674c9a8911e0e4b482f6615f9ad Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Tue, 11 Mar 2025 14:44:52 -0700 Subject: [PATCH] Move queue-related methods to PipelineState To reimagine the Pipeline as an abstract config object, move the queue-related methods to the PipelineState class, which is where the actual queues are stored. This adds a layer of abstraction in the unit tests since we expect further changes in object relationships. Change-Id: I0ef057e58e8ceb7d9c9e1e70e36a7fe9d46c6bcb --- tests/base.py | 13 ++- tests/unit/test_circular_dependencies.py | 111 +++++++++++------------ tests/unit/test_cross_crd.py | 17 ++-- tests/unit/test_executor.py | 4 +- tests/unit/test_gerrit_crd.py | 14 ++- tests/unit/test_gerrit_legacy_crd.py | 31 +++---- tests/unit/test_github_crd.py | 3 +- tests/unit/test_github_driver.py | 10 +- tests/unit/test_reporting.py | 5 +- tests/unit/test_scheduler.py | 95 ++++++++----------- tests/unit/test_v3.py | 9 +- tests/unit/test_web.py | 28 +++--- tests/zuul_client/test_zuulclient.py | 5 +- zuul/cmd/client.py | 1 + zuul/manager/__init__.py | 19 ++-- zuul/manager/dependent.py | 2 +- zuul/manager/independent.py | 4 +- zuul/manager/serial.py | 2 +- zuul/manager/shared.py | 11 +-- zuul/manager/supercedent.py | 4 +- zuul/model.py | 76 +++++++--------- zuul/scheduler.py | 18 ++-- 22 files changed, 220 insertions(+), 262 deletions(-) diff --git a/tests/base.py b/tests/base.py index 615d725680..f52233da1b 100644 --- a/tests/base.py +++ b/tests/base.py @@ -2935,6 +2935,17 @@ class ZuulTestCase(BaseTestCase): if isinstance(pipeline.manager, ipm): self.assertEqual(len(pipeline.queues), 0) + def getAllItems(self, tenant_name, pipeline_name): + tenant = self.scheds.first.sched.abide.tenants.get(tenant_name) + manager = tenant.layout.pipelines[pipeline_name].manager + items = manager.state.getAllItems() + return items + + def getAllQueues(self, tenant_name, pipeline_name): + tenant = self.scheds.first.sched.abide.tenants.get(tenant_name) + manager = tenant.layout.pipelines[pipeline_name].manager + return manager.state.queues + def shutdown(self): # Note: when making changes to this sequence, check if # corresponding changes need to happen in @@ -3080,7 +3091,7 @@ class ZuulTestCase(BaseTestCase): def getCurrentBuilds(self): for tenant in self.scheds.first.sched.abide.tenants.values(): for pipeline in tenant.layout.pipelines.values(): - for item in pipeline.getAllItems(): + for item in pipeline.manager.state.getAllItems(): for build in item.current_build_set.builds.values(): yield build diff --git a/tests/unit/test_circular_dependencies.py b/tests/unit/test_circular_dependencies.py index bbeeb36554..a6e2dc84d5 100644 --- a/tests/unit/test_circular_dependencies.py +++ b/tests/unit/test_circular_dependencies.py @@ -850,8 +850,7 @@ class TestGerritCircularDependencies(ZuulTestCase): self.fake_gerrit.addEvent(B.addApproval("Approved", 1)) self.waitUntilSettled() - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - items_before = tenant.layout.pipelines['gate'].getAllItems() + items_before = self.getAllItems('tenant-one', 'gate') # Trigger a re-enqueue of change B self.fake_gerrit.addEvent(B.getChangeAbandonedEvent()) @@ -859,8 +858,7 @@ class TestGerritCircularDependencies(ZuulTestCase): self.fake_gerrit.addEvent(B.addApproval("Approved", 1)) self.waitUntilSettled() - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - items_after = tenant.layout.pipelines['gate'].getAllItems() + items_after = self.getAllItems('tenant-one', 'gate') # Make sure the complete cycle was re-enqueued for before, after in zip(items_before, items_after): @@ -1229,8 +1227,7 @@ class TestGerritCircularDependencies(ZuulTestCase): self.waitUntilSettled() # We only want to have a merge failure for the first item in the queue - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - items = tenant.layout.pipelines['gate'].getAllItems() + items = self.getAllItems('tenant-one', 'gate') with self.createZKContext() as context: items[0].current_build_set.updateAttributes(context, unable_to_merge=True) @@ -2012,9 +2009,7 @@ class TestGerritCircularDependencies(ZuulTestCase): self.waitUntilSettled() # Fail the node request and unpause - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - pipeline = tenant.layout.pipelines['gate'] - items = pipeline.getAllItems() + items = self.getAllItems('tenant-one', 'gate') job = items[0].current_build_set.job_graph.getJob( 'common-job', items[0].changes[0].cache_key) for req in self.fake_nodepool.getNodeRequests(): @@ -2881,9 +2876,7 @@ class TestGerritCircularDependencies(ZuulTestCase): self.waitUntilSettled() # Fail the node request and unpause - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - pipeline = tenant.layout.pipelines['check'] - items = pipeline.getAllItems() + items = self.getAllItems('tenant-one', 'check') job = items[0].current_build_set.job_graph.getJob( 'common-job', items[0].changes[0].cache_key) for req in self.fake_nodepool.getNodeRequests(): @@ -4664,8 +4657,6 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase): # patchsets. This test exercises such changes. self.executor_server.hold_jobs_in_build = True - tenant = self.scheds.first.sched.abide.tenants.get("tenant-one") - pipeline = tenant.layout.pipelines["gate"] A = self.fake_github.openFakePullRequest("gh/project1", "master", "A") B = self.fake_github.openFakePullRequest("gh/project2", "master", "B") @@ -4703,8 +4694,9 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase): self.fake_github.emitEvent(C.addLabel("approved")) self.waitUntilSettled() expected_cycle = {A.number, B.number, C.number, E.number} - self.assertEqual(len(list(pipeline.getAllItems())), 1) - for item in pipeline.getAllItems(): + items = self.getAllItems('tenant-one', 'gate') + self.assertEqual(len(list(items)), 1) + for item in items: cycle = {c.number for c in item.changes} self.assertEqual(expected_cycle, cycle) # Assert we get the same triggering change every time @@ -4718,7 +4710,8 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase): ) self.fake_github.emitEvent(A.getPullRequestEditedEvent(A.body)) self.waitUntilSettled() - self.assertEqual(len(list(pipeline.getAllItems())), 2) + items = self.getAllItems('tenant-one', 'gate') + self.assertEqual(len(list(items)), 2) # Now remove all dependencies for the three remaining changes, # and also E so that it doesn't get pulled back in as an @@ -4734,8 +4727,9 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase): self.waitUntilSettled() self.fake_github.emitEvent(C.getPullRequestEditedEvent(C.body)) self.waitUntilSettled() - self.assertEqual(len(list(pipeline.getAllItems())), 3) - for item in pipeline.getAllItems(): + items = self.getAllItems('tenant-one', 'gate') + self.assertEqual(len(list(items)), 3) + for item in items: self.assertEqual(len(item.changes), 1) # Assert we get the same triggering change every time self.assertEqual(json.loads(item.event.ref)['stable_id'], '3') @@ -4747,8 +4741,9 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase): ) self.fake_github.emitEvent(A.getPullRequestEditedEvent(A.body)) self.waitUntilSettled() - self.assertEqual(len(list(pipeline.getAllItems())), 2) - for item in pipeline.getAllItems(): + items = self.getAllItems('tenant-one', 'gate') + self.assertEqual(len(list(items)), 2) + for item in items: self.assertEqual(len(item.changes), 1) # Assert we get the same triggering change every time self.assertEqual(json.loads(item.event.ref)['stable_id'], '3') @@ -4766,8 +4761,9 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase): self.fake_github.emitEvent(B.getPullRequestEditedEvent(B.body)) self.waitUntilSettled() expected_cycle = {B.number, C.number} - self.assertEqual(len(list(pipeline.getAllItems())), 1) - for item in pipeline.getAllItems(): + items = self.getAllItems('tenant-one', 'gate') + self.assertEqual(len(list(items)), 1) + for item in items: cycle = {c.number for c in item.changes} self.assertEqual(expected_cycle, cycle) # Assert we get the same triggering change every time @@ -4790,9 +4786,6 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase): # patchsets. This test exercises such changes. self.executor_server.hold_jobs_in_build = True - tenant = self.scheds.first.sched.abide.tenants.get("tenant-one") - pipeline = tenant.layout.pipelines["gate"] - A = self.fake_github.openFakePullRequest("gh/project1", "master", "A") B = self.fake_github.openFakePullRequest("gh/project2", "master", "B") @@ -4801,8 +4794,9 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase): self.fake_github.emitEvent(A.addLabel("approved")) self.waitUntilSettled() - self.assertEqual(len(list(pipeline.getAllItems())), 1) - for item in pipeline.getAllItems(): + items = self.getAllItems('tenant-one', 'gate') + self.assertEqual(len(list(items)), 1) + for item in items: self.assertEqual(len(item.changes), 1) # Assert we get the same triggering change every time self.assertEqual(json.loads(item.event.ref)['stable_id'], '1') @@ -4818,9 +4812,10 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase): ) self.fake_github.emitEvent(A.getPullRequestEditedEvent(A.body)) self.waitUntilSettled() - self.assertEqual(len(list(pipeline.getAllItems())), 1) + items = self.getAllItems('tenant-one', 'gate') + self.assertEqual(len(list(items)), 1) expected_cycle = {A.number, B.number} - for item in pipeline.getAllItems(): + for item in items: cycle = {c.number for c in item.changes} self.assertEqual(expected_cycle, cycle) # Assert we get the same triggering change every time @@ -4837,8 +4832,9 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase): for build in self.history[-3:]: self.assertEqual(build.result, 'SUCCESS') - def assertQueueCycles(self, pipeline, queue_index, bundles): - queue = pipeline.queues[queue_index] + def assertQueueCycles(self, pipeline_name, queue_index, bundles): + queues = self.getAllQueues('tenant-one', pipeline_name) + queue = queues[queue_index] self.assertEqual(len(queue.queue), len(bundles)) for x, item in enumerate(queue.queue): @@ -4852,8 +4848,6 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase): # patchsets. This test exercises such changes. self.executor_server.hold_jobs_in_build = True - tenant = self.scheds.first.sched.abide.tenants.get("tenant-one") - pipeline = tenant.layout.pipelines["check"] A = self.fake_github.openFakePullRequest("gh/project1", "master", "A") B = self.fake_github.openFakePullRequest("gh/project2", "master", "B") @@ -4885,9 +4879,10 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase): self.fake_github.emitEvent(C.getPullRequestOpenedEvent()) self.waitUntilSettled() abce = [A, B, C, E] - self.assertEqual(len(pipeline.queues), 1) - self.assertQueueCycles(pipeline, 0, [abce]) - for item in pipeline.getAllItems(): + self.assertEqual(len(self.getAllQueues('tenant-one', 'check')), 1) + self.assertQueueCycles('check', 0, [abce]) + items = self.getAllItems('tenant-one', 'check') + for item in items: # Assert we get the same triggering change every time self.assertEqual(json.loads(item.event.ref)['stable_id'], '3') @@ -4902,8 +4897,9 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase): abc = [A, B, C] # ABC, E - self.assertQueueCycles(pipeline, 0, [abc, [E]]) - for item in pipeline.getAllItems(): + self.assertQueueCycles('check', 0, [abc, [E]]) + items = self.getAllItems('tenant-one', 'check') + for item in items: # Assert we get the same triggering change every time self.assertEqual(json.loads(item.event.ref)['stable_id'], '3') @@ -4920,11 +4916,11 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase): self.waitUntilSettled() # A, B, C individually - self.assertEqual(len(pipeline.queues), 3) - self.assertQueueCycles(pipeline, 0, [[A], [B]]) - self.assertQueueCycles(pipeline, 1, [[A], [B], [C]]) - self.assertQueueCycles(pipeline, 2, [[A]]) - for item in pipeline.getAllItems(): + self.assertEqual(len(self.getAllQueues('tenant-one', 'check')), 3) + self.assertQueueCycles('check', 0, [[A], [B]]) + self.assertQueueCycles('check', 1, [[A], [B], [C]]) + self.assertQueueCycles('check', 2, [[A]]) + for item in self.getAllItems('tenant-one', 'check'): # Assert we get the same triggering change every time self.assertEqual(json.loads(item.event.ref)['stable_id'], '3') @@ -4934,7 +4930,7 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase): ) self.fake_github.emitEvent(C.getPullRequestEditedEvent(C.body)) self.waitUntilSettled() - for item in pipeline.getAllItems(): + for item in self.getAllItems('tenant-one', 'check'): # Assert we get the same triggering change every time self.assertEqual(json.loads(item.event.ref)['stable_id'], '3') B.body = "{}\n\nDepends-On: {}\n".format( @@ -4942,13 +4938,13 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase): ) self.fake_github.emitEvent(B.getPullRequestEditedEvent(B.body)) self.waitUntilSettled() - for item in pipeline.getAllItems(): + for item in self.getAllItems('tenant-one', 'check'): # Assert we get the same triggering change every time self.assertEqual(json.loads(item.event.ref)['stable_id'], '3') - self.assertEqual(len(pipeline.queues), 2) + self.assertEqual(len(self.getAllQueues('tenant-one', 'check')), 2) bc = [B, C] - self.assertQueueCycles(pipeline, 0, [[A]]) - self.assertQueueCycles(pipeline, 1, [bc]) + self.assertQueueCycles('check', 0, [[A]]) + self.assertQueueCycles('check', 1, [bc]) # All done. self.executor_server.hold_jobs_in_build = False @@ -4967,17 +4963,15 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase): # patchsets. This test exercises such changes. self.executor_server.hold_jobs_in_build = True - tenant = self.scheds.first.sched.abide.tenants.get("tenant-one") - pipeline = tenant.layout.pipelines["check"] - A = self.fake_github.openFakePullRequest("gh/project1", "master", "A") B = self.fake_github.openFakePullRequest("gh/project2", "master", "B") self.fake_github.emitEvent(A.getPullRequestOpenedEvent()) self.waitUntilSettled() - self.assertEqual(len(list(pipeline.getAllItems())), 1) - for item in pipeline.getAllItems(): + items = self.getAllItems('tenant-one', 'check') + self.assertEqual(len(list(items)), 1) + for item in items: self.assertEqual(len(item.changes), 1) # Assert we get the same triggering change every time self.assertEqual(json.loads(item.event.ref)['stable_id'], '1') @@ -4993,14 +4987,15 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase): ) self.fake_github.emitEvent(A.getPullRequestEditedEvent(A.body)) self.waitUntilSettled() - self.assertEqual(len(list(pipeline.getAllItems())), 1) - for item in pipeline.getAllItems(): + items = self.getAllItems('tenant-one', 'check') + self.assertEqual(len(list(items)), 1) + for item in items: self.assertEqual(len(item.changes), 2) # Assert we get the same triggering change every time self.assertEqual(json.loads(item.event.ref)['stable_id'], '1') - self.assertEqual(len(pipeline.queues), 1) + self.assertEqual(len(self.getAllQueues('tenant-one', 'check')), 1) ab = [A, B] - self.assertQueueCycles(pipeline, 0, [ab]) + self.assertQueueCycles('check', 0, [ab]) # All done. self.executor_server.hold_jobs_in_build = False diff --git a/tests/unit/test_cross_crd.py b/tests/unit/test_cross_crd.py index 9766f94c1b..6ac86e20e1 100644 --- a/tests/unit/test_cross_crd.py +++ b/tests/unit/test_cross_crd.py @@ -292,23 +292,21 @@ class TestGerritToGithubCRD(ZuulTestCase): # A Depends-On: B A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % ( A.subject, B.url) - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - check_pipeline = tenant.layout.pipelines['check'] # Add two dependent changes... self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) self.waitUntilSettled() - self.assertEqual(len(check_pipeline.getAllItems()), 2) + self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 2) # ...make sure the live one is not duplicated... self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) self.waitUntilSettled() - self.assertEqual(len(check_pipeline.getAllItems()), 2) + self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 2) # ...but the non-live one is able to be. self.fake_github.emitEvent(B.getPullRequestEditedEvent()) self.waitUntilSettled() - self.assertEqual(len(check_pipeline.getAllItems()), 3) + self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 3) # Release jobs in order to avoid races with change A jobs # finishing before change B jobs. @@ -331,7 +329,7 @@ class TestGerritToGithubCRD(ZuulTestCase): 'project-merge', 'github/project2').changes self.assertEqual(changes, '1,%s' % (B.head_sha,)) - self.assertEqual(len(tenant.layout.pipelines['check'].queues), 0) + self.assertEqual(len(self.getAllQueues('tenant-one', 'check')), 0) self.assertIn('Build succeeded', A.messages[0]) @@ -755,22 +753,21 @@ class TestGithubToGerritCRD(ZuulTestCase): # A Depends-On: B event = A.editBody('Depends-On: %s\n' % (B.data['url'],)) tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - check_pipeline = tenant.layout.pipelines['check'] # Add two dependent changes... self.fake_github.emitEvent(event) self.waitUntilSettled() - self.assertEqual(len(check_pipeline.getAllItems()), 2) + self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 2) # ...make sure the live one is not duplicated... self.fake_github.emitEvent(A.getPullRequestEditedEvent()) self.waitUntilSettled() - self.assertEqual(len(check_pipeline.getAllItems()), 2) + self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 2) # ...but the non-live one is able to be. self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) self.waitUntilSettled() - self.assertEqual(len(check_pipeline.getAllItems()), 3) + self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 3) # Release jobs in order to avoid races with change A jobs # finishing before change B jobs. diff --git a/tests/unit/test_executor.py b/tests/unit/test_executor.py index fbea75d1c1..b282ea1d11 100644 --- a/tests/unit/test_executor.py +++ b/tests/unit/test_executor.py @@ -1506,9 +1506,7 @@ class TestExecutorFailure(ZuulTestCase): self.executor_api.release() self.waitUntilSettled() - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - pipeline = tenant.layout.pipelines['gate'] - items = pipeline.getAllItems() + items = self.getAllItems('tenant-one', 'gate') self.assertEqual(len(items), 1) self.hold_jobs_in_queue = False diff --git a/tests/unit/test_gerrit_crd.py b/tests/unit/test_gerrit_crd.py index 3abc575124..bb488d6d78 100644 --- a/tests/unit/test_gerrit_crd.py +++ b/tests/unit/test_gerrit_crd.py @@ -460,24 +460,22 @@ class TestGerritCRD(ZuulTestCase): self.executor_server.hold_jobs_in_build = True A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B') - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - check_pipeline = tenant.layout.pipelines['check'] # Add two git-dependent changes... B.setDependsOn(A, 1) self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) self.waitUntilSettled() - self.assertEqual(len(check_pipeline.getAllItems()), 2) + self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 2) # ...make sure the live one is not duplicated... self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) self.waitUntilSettled() - self.assertEqual(len(check_pipeline.getAllItems()), 2) + self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 2) # ...but the non-live one is able to be. self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) self.waitUntilSettled() - self.assertEqual(len(check_pipeline.getAllItems()), 3) + self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 3) # Release jobs in order to avoid races with change A jobs # finishing before change B jobs. @@ -493,7 +491,7 @@ class TestGerritCRD(ZuulTestCase): self.assertEqual(self.history[0].changes, '1,1 2,1') self.assertEqual(self.history[1].changes, '1,1') - self.assertEqual(len(tenant.layout.pipelines['check'].queues), 0) + self.assertEqual(len(self.getAllQueues('tenant-one', 'check')), 0) self.assertIn('Build succeeded', A.messages[0]) self.assertIn('Build succeeded', B.messages[0]) @@ -575,8 +573,8 @@ class TestGerritCRD(ZuulTestCase): tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') check_pipeline = tenant.layout.pipelines['check'] self.assertEqual(len(check_pipeline.queues), 3) - self.assertEqual(len(check_pipeline.getAllItems()), 3) - for item in check_pipeline.getAllItems(): + self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 3) + for item in self.getAllItems('tenant-one', 'check'): self.assertTrue(item.live) self.hold_jobs_in_queue = False diff --git a/tests/unit/test_gerrit_legacy_crd.py b/tests/unit/test_gerrit_legacy_crd.py index 60616b6cff..141491cbc1 100644 --- a/tests/unit/test_gerrit_legacy_crd.py +++ b/tests/unit/test_gerrit_legacy_crd.py @@ -355,8 +355,7 @@ class TestGerritLegacyCRD(ZuulTestCase): self.assertEqual(B.reported, 0) self.assertEqual(self.history[0].changes, '2,1 1,1') - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - self.assertEqual(len(tenant.layout.pipelines['check'].queues), 0) + self.assertEqual(len(self.getAllQueues('tenant-one', 'check')), 0) def test_crd_check_git_depends(self): "Test single-repo dependencies in independent pipelines" @@ -378,8 +377,7 @@ class TestGerritLegacyCRD(ZuulTestCase): self.assertEqual(self.history[0].changes, '1,1') self.assertEqual(self.history[-1].changes, '1,1 2,1') - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - self.assertEqual(len(tenant.layout.pipelines['check'].queues), 0) + self.assertEqual(len(self.getAllQueues('tenant-one', 'check')), 0) self.assertIn('Build succeeded', A.messages[0]) self.assertIn('Build succeeded', B.messages[0]) @@ -389,24 +387,22 @@ class TestGerritLegacyCRD(ZuulTestCase): self.executor_server.hold_jobs_in_build = True A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B') - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - check_pipeline = tenant.layout.pipelines['check'] # Add two git-dependent changes... B.setDependsOn(A, 1) self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) self.waitUntilSettled() - self.assertEqual(len(check_pipeline.getAllItems()), 2) + self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 2) # ...make sure the live one is not duplicated... self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1)) self.waitUntilSettled() - self.assertEqual(len(check_pipeline.getAllItems()), 2) + self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 2) # ...but the non-live one is able to be. self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) self.waitUntilSettled() - self.assertEqual(len(check_pipeline.getAllItems()), 3) + self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 3) # Release jobs in order to avoid races with change A jobs # finishing before change B jobs. @@ -422,7 +418,7 @@ class TestGerritLegacyCRD(ZuulTestCase): self.assertEqual(self.history[0].changes, '1,1 2,1') self.assertEqual(self.history[1].changes, '1,1') - self.assertEqual(len(tenant.layout.pipelines['check'].queues), 0) + self.assertEqual(len(self.getAllQueues('tenant-one', 'check')), 0) self.assertIn('Build succeeded', A.messages[0]) self.assertIn('Build succeeded', B.messages[0]) @@ -446,9 +442,8 @@ class TestGerritLegacyCRD(ZuulTestCase): # Make sure the items still share a change queue, and the # first one is not live. - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - self.assertEqual(len(tenant.layout.pipelines['check'].queues), 1) - queue = tenant.layout.pipelines['check'].queues[0] + self.assertEqual(len(self.getAllQueues('tenant-one', 'check')), 1) + queue = self.getAllQueues('tenant-one', 'check')[0] first_item = queue.queue[0] for item in queue.queue: self.assertEqual(item.queue, first_item.queue) @@ -465,7 +460,7 @@ class TestGerritLegacyCRD(ZuulTestCase): self.assertEqual(B.reported, 0) self.assertEqual(self.history[0].changes, '2,1 1,1') - self.assertEqual(len(tenant.layout.pipelines['check'].queues), 0) + self.assertEqual(len(self.getAllQueues('tenant-one', 'check')), 0) @skipIfMultiScheduler() def test_crd_check_reconfiguration(self): @@ -501,11 +496,9 @@ class TestGerritLegacyCRD(ZuulTestCase): # Make sure none of the items share a change queue, and all # are live. - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - check_pipeline = tenant.layout.pipelines['check'] - self.assertEqual(len(check_pipeline.queues), 3) - self.assertEqual(len(check_pipeline.getAllItems()), 3) - for item in check_pipeline.getAllItems(): + self.assertEqual(len(self.getAllQueues('tenant-one', 'check')), 3) + self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 3) + for item in self.getAllItems('tenant-one', 'check'): self.assertTrue(item.live) self.hold_jobs_in_queue = False diff --git a/tests/unit/test_github_crd.py b/tests/unit/test_github_crd.py index e5dd2f7ae4..cf9e95fd98 100644 --- a/tests/unit/test_github_crd.py +++ b/tests/unit/test_github_crd.py @@ -199,8 +199,7 @@ class TestGithubCrossRepoDeps(ZuulTestCase): self.waitUntilSettled() self.assertEqual(len(self.builds), 1) - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - items = tenant.layout.pipelines['check'].getAllItems() + items = self.getAllItems('tenant-one', 'check') self.assertEqual(len(items), 3) # Update B to point at A1 instead of A diff --git a/tests/unit/test_github_driver.py b/tests/unit/test_github_driver.py index a1bb837c25..3e28d870e2 100644 --- a/tests/unit/test_github_driver.py +++ b/tests/unit/test_github_driver.py @@ -1368,9 +1368,8 @@ class TestGithubDriver(ZuulTestCase): self.waitUntilSettled() - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - check_pipeline = tenant.layout.pipelines['check'] - self.assertEqual(check_pipeline.getAllItems(), []) + items = self.getAllItems('tenant-one', 'check') + self.assertEqual(items, []) self.assertEqual(self.countJobResults(self.history, 'ABORTED'), 2) self.executor_server.hold_jobs_in_build = False @@ -2606,9 +2605,8 @@ class TestGithubAppDriver(ZuulGithubAppTestCase): self.fake_github.emitEvent(A.getCheckRunAbortEvent(check_run)) self.waitUntilSettled() - tenant = self.scheds.first.sched.abide.tenants.get("tenant-one") - check_pipeline = tenant.layout.pipelines["check"] - self.assertEqual(0, len(check_pipeline.getAllItems())) + items = self.getAllItems('tenant-one', 'check') + self.assertEqual(0, len(items)) self.assertEqual(1, self.countJobResults(self.history, "ABORTED")) # The buildset was already dequeued, so there shouldn't be anything to diff --git a/tests/unit/test_reporting.py b/tests/unit/test_reporting.py index 0c5c5fbc91..196d903b54 100644 --- a/tests/unit/test_reporting.py +++ b/tests/unit/test_reporting.py @@ -40,16 +40,13 @@ class TestReporting(ZuulTestCase): 'check'].put(event) self.waitUntilSettled() - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - check_pipeline = tenant.layout.pipelines['check'] - # A should have been reported two times: start, cancel self.assertEqual(2, A.reported) self.assertEqual(2, len(A.messages)) self.assertIn("Build started (check)", A.messages[0]) self.assertIn("Build canceled (check)", A.messages[1]) # There shouldn't be any successful items - self.assertEqual(len(check_pipeline.getAllItems()), 0) + self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 0) # But one canceled self.assertEqual(self.countJobResults(self.history, "ABORTED"), 1) diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index 229f9596d3..d5ba84d3b6 100644 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -188,8 +188,7 @@ class TestSchedulerZone(ZuulTestCase): self.executor_server.zk_client.client.start() # Find the build in the scheduler so we can check its status - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - items = tenant.layout.pipelines['gate'].getAllItems() + items = self.getAllItems('tenant-one', 'gate') builds = items[0].current_build_set.getBuilds() build = builds[0] @@ -1058,8 +1057,7 @@ class TestScheduler(ZuulTestCase): # project-test1 and project-test2 for C self.assertEqual(len(self.builds), 5) - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - items = tenant.layout.pipelines['gate'].getAllItems() + items = self.getAllItems('tenant-one', 'gate') builds = items[0].current_build_set.getBuilds() self.assertEqual(self.countJobResults(builds, 'SUCCESS'), 1) self.assertEqual(self.countJobResults(builds, None), 2) @@ -2944,8 +2942,6 @@ class TestScheduler(ZuulTestCase): A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - check_pipeline = tenant.layout.pipelines['check'] # Add two git-dependent changes B.setDependsOn(A, 1) @@ -2955,7 +2951,7 @@ class TestScheduler(ZuulTestCase): self.waitUntilSettled() # A live item, and a non-live/live pair - items = check_pipeline.getAllItems() + items = self.getAllItems('tenant-one', 'check') self.assertEqual(len(items), 3) self.assertEqual(items[0].changes[0].number, '1') @@ -2977,7 +2973,7 @@ class TestScheduler(ZuulTestCase): # The live copy of A,1 should be gone, but the non-live and B # should continue, and we should have a new A,2 - items = check_pipeline.getAllItems() + items = self.getAllItems('tenant-one', 'check') self.assertEqual(len(items), 3) self.assertEqual(items[0].changes[0].number, '1') @@ -2999,7 +2995,7 @@ class TestScheduler(ZuulTestCase): # The live copy of B,1 should be gone, and it's non-live copy of A,1 # but we should have a new B,2 (still based on A,1) - items = check_pipeline.getAllItems() + items = self.getAllItems('tenant-one', 'check') self.assertEqual(len(items), 3) self.assertEqual(items[0].changes[0].number, '1') @@ -3065,8 +3061,6 @@ class TestScheduler(ZuulTestCase): A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B') - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - check_pipeline = tenant.layout.pipelines['check'] # Add two git-dependent changes B.setDependsOn(A, 1) @@ -3075,7 +3069,7 @@ class TestScheduler(ZuulTestCase): self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) self.waitUntilSettled() # A live item, and a non-live/live pair - items = check_pipeline.getAllItems() + items = self.getAllItems('tenant-one', 'check') self.assertEqual(len(items), 3) self.assertEqual(items[0].changes[0].number, '1') @@ -3093,7 +3087,7 @@ class TestScheduler(ZuulTestCase): # The live copy of A should be gone, but the non-live and B # should continue - items = check_pipeline.getAllItems() + items = self.getAllItems('tenant-one', 'check') self.assertEqual(len(items), 2) self.assertEqual(items[0].changes[0].number, '1') @@ -3319,9 +3313,7 @@ class TestScheduler(ZuulTestCase): self.assertEqual(A.reported, False) # Check queue is empty afterwards - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - check_pipeline = tenant.layout.pipelines['check'] - items = check_pipeline.getAllItems() + items = self.getAllItems('tenant-one', 'check') self.assertEqual(len(items), 0) self.assertEqual(len(self.history), 0) @@ -3764,8 +3756,8 @@ class TestScheduler(ZuulTestCase): gate.state.refresh(ctx) gate.manager.getChangeQueue(fake_a, None) gate.manager.getChangeQueue(fake_b, None) - q1 = gate.getQueue(project1.canonical_name, None) - q2 = gate.getQueue(project2.canonical_name, None) + q1 = gate.manager.state.getQueue(project1.canonical_name, None) + q2 = gate.manager.state.getQueue(project2.canonical_name, None) self.assertEqual(q1.name, 'integrated') self.assertEqual(q2.name, 'integrated') @@ -3790,8 +3782,8 @@ class TestScheduler(ZuulTestCase): gate.state.refresh(ctx) gate.manager.getChangeQueue(fake_a, None) gate.manager.getChangeQueue(fake_b, None) - q1 = gate.getQueue(project1.canonical_name, None) - q2 = gate.getQueue(project2.canonical_name, None) + q1 = gate.manager.state.getQueue(project1.canonical_name, None) + q2 = gate.manager.state.getQueue(project2.canonical_name, None) self.assertEqual(q1.name, 'integrated') self.assertEqual(q2.name, 'integrated') @@ -3816,8 +3808,8 @@ class TestScheduler(ZuulTestCase): gate.state.refresh(ctx) gate.manager.getChangeQueue(fake_a, None) gate.manager.getChangeQueue(fake_b, None) - q1 = gate.getQueue(project1.canonical_name, None) - q2 = gate.getQueue(project2.canonical_name, None) + q1 = gate.manager.state.getQueue(project1.canonical_name, None) + q2 = gate.manager.state.getQueue(project2.canonical_name, None) self.assertEqual(q1.name, 'integrated') self.assertEqual(q2.name, 'integrated') @@ -3841,8 +3833,8 @@ class TestScheduler(ZuulTestCase): gate.state.refresh(ctx) gate.manager.getChangeQueue(fake_a, None) gate.manager.getChangeQueue(fake_b, None) - q1 = gate.getQueue(project1.canonical_name, None) - q2 = gate.getQueue(project2.canonical_name, None) + q1 = gate.manager.state.getQueue(project1.canonical_name, None) + q2 = gate.manager.state.getQueue(project2.canonical_name, None) self.assertEqual(q1.name, 'integrated') self.assertEqual(q2.name, 'integrated') @@ -3867,8 +3859,8 @@ class TestScheduler(ZuulTestCase): gate.state.refresh(ctx) gate.manager.getChangeQueue(fake_a, None) gate.manager.getChangeQueue(fake_b, None) - q1 = gate.getQueue(project1.canonical_name, None) - q2 = gate.getQueue(project2.canonical_name, None) + q1 = gate.manager.state.getQueue(project1.canonical_name, None) + q2 = gate.manager.state.getQueue(project2.canonical_name, None) self.assertEqual(q1.name, 'integrated') self.assertEqual(q2.name, 'integrated') @@ -3995,13 +3987,13 @@ class TestScheduler(ZuulTestCase): self.scheds.execute(lambda app: app.sched.reconfigure(app.config)) self.waitUntilSettled() - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - pipeline = tenant.layout.pipelines['gate'] - items = pipeline.getAllItems() + items = self.getAllItems('tenant-one', 'gate') self.assertEqual(len(items), 1) self.assertIsNone(items[0].layout_uuid) # Assert that the layout cache is empty after a reconfiguration. + tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') + pipeline = tenant.layout.pipelines['gate'] self.assertEqual(pipeline.manager._layout_cache, {}) B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B', @@ -4011,7 +4003,7 @@ class TestScheduler(ZuulTestCase): self.fake_gerrit.addEvent(B.addApproval('Approved', 1)) self.waitUntilSettled() - items = pipeline.getAllItems() + items = self.getAllItems('tenant-one', 'gate') self.assertEqual(len(items), 2) for item in items: # Layout UUID should be set again for all live items. It had to @@ -4071,9 +4063,7 @@ class TestScheduler(ZuulTestCase): self.scheds.execute(lambda app: app.sched.reconfigure(app.config)) self.waitUntilSettled() - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - pipeline = tenant.layout.pipelines['check'] - items = pipeline.getAllItems() + items = self.getAllItems('tenant-one', 'check') self.assertEqual(len(items), 2) # Assert that the layout UUID of the live item is reset during a @@ -4082,6 +4072,8 @@ class TestScheduler(ZuulTestCase): self.assertIsNone(items[1].layout_uuid) # Cache should be empty after a reconfiguration + tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') + pipeline = tenant.layout.pipelines['check'] self.assertEqual(pipeline.manager._layout_cache, {}) self.executor_server.hold_jobs_in_build = False @@ -5908,8 +5900,7 @@ For CI problems and help debugging, contact ci@example.org""" self.executor_server.release('.*-test*') self.waitUntilSettled() - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - items = tenant.layout.pipelines['check'].getAllItems() + items = self.getAllItems('tenant-one', 'check') build_set = items[0].current_build_set job = list(filter(lambda j: j.name == 'project-test1', items[0].getJobs()))[0] @@ -5950,8 +5941,7 @@ For CI problems and help debugging, contact ci@example.org""" self.executor_server.zk_client.client.start() # Find the build in the scheduler so we can check its status - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - items = tenant.layout.pipelines['gate'].getAllItems() + items = self.getAllItems('tenant-one', 'gate') builds = items[0].current_build_set.getBuilds() build = builds[0] @@ -6490,8 +6480,7 @@ For CI problems and help debugging, contact ci@example.org""" self.assertEqual(jobs[0].state, zuul.model.MergeRequest.HOLD) self.assertEqual(len(jobs), 1) - pipeline = tenant.layout.pipelines['post'] - self.assertEqual(len(pipeline.getAllItems()), 1) + self.assertEqual(len(self.getAllItems('tenant-one', 'post')), 1) self.merger_api.release() self.waitUntilSettled() @@ -6807,9 +6796,9 @@ class TestChangeQueues(ZuulTestCase): ]) tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') _, p = tenant.getProject(project) - q1 = tenant.layout.pipelines['gate'].getQueue( + q1 = tenant.layout.pipelines['gate'].manager.state.getQueue( p.canonical_name, 'master') - q2 = tenant.layout.pipelines['gate'].getQueue( + q2 = tenant.layout.pipelines['gate'].manager.state.getQueue( p.canonical_name, 'stable') self.assertEqual(q1.name, queue_name) self.assertEqual(q2.name, queue_name) @@ -6866,11 +6855,11 @@ class TestChangeQueues(ZuulTestCase): ]) tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') _, p = tenant.getProject(project) - q1 = tenant.layout.pipelines['gate'].getQueue( + q1 = tenant.layout.pipelines['gate'].manager.state.getQueue( p.canonical_name, 'master') - q2 = tenant.layout.pipelines['gate'].getQueue( + q2 = tenant.layout.pipelines['gate'].manager.state.getQueue( p.canonical_name, 'stable') - q3 = tenant.layout.pipelines['gate'].getQueue( + q3 = tenant.layout.pipelines['gate'].manager.state.getQueue( p.canonical_name, None) # There should be no branch specific queues anymore @@ -8489,7 +8478,6 @@ class TestSemaphore(ZuulTestCase): "Test abandon with job semaphores" self.executor_server.hold_jobs_in_build = True tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - check_pipeline = tenant.layout.pipelines['check'] A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') self.assertEqual( @@ -8507,7 +8495,7 @@ class TestSemaphore(ZuulTestCase): self.waitUntilSettled() # The check pipeline should be empty - items = check_pipeline.getAllItems() + items = self.getAllItems('tenant-one', 'check') self.assertEqual(len(items), 0) # The semaphore should be released @@ -8528,7 +8516,6 @@ class TestSemaphore(ZuulTestCase): self.fake_nodepool.paused = True tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - check_pipeline = tenant.layout.pipelines['check'] A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') self.assertEqual( @@ -8546,7 +8533,7 @@ class TestSemaphore(ZuulTestCase): self.waitUntilSettled() # The check pipeline should be empty - items = check_pipeline.getAllItems() + items = self.getAllItems('tenant-one', 'check') self.assertEqual(len(items), 0) # The semaphore should be released @@ -8573,7 +8560,6 @@ class TestSemaphore(ZuulTestCase): self.fake_nodepool.paused = True tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - check_pipeline = tenant.layout.pipelines['check'] A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') self.assertEqual( @@ -8604,7 +8590,7 @@ class TestSemaphore(ZuulTestCase): self.waitUntilSettled() # The check pipeline should be empty - items = check_pipeline.getAllItems() + items = self.getAllItems('tenant-one', 'check') self.assertEqual(len(items), 0) # The semaphore should be released @@ -8619,7 +8605,6 @@ class TestSemaphore(ZuulTestCase): "Test new patchset with job semaphores" self.executor_server.hold_jobs_in_build = True tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - check_pipeline = tenant.layout.pipelines['check'] A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') self.assertEqual( @@ -8641,7 +8626,7 @@ class TestSemaphore(ZuulTestCase): len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")), 1) - items = check_pipeline.getAllItems() + items = self.getAllItems('tenant-one', 'check') self.assertEqual(items[0].changes[0].number, '1') self.assertEqual(items[0].changes[0].patchset, '2') self.assertTrue(items[0].live) @@ -8718,8 +8703,7 @@ class TestSemaphore(ZuulTestCase): 1) # Save some variables for later use while the job is running - check_pipeline = tenant.layout.pipelines['check'] - item = check_pipeline.getAllItems()[0] + item = self.getAllItems('tenant-one', 'check')[0] job = list(filter(lambda j: j.name == 'semaphore-one-test1', item.getJobs()))[0] @@ -9243,8 +9227,7 @@ class TestSchedulerFailFast(ZuulTestCase): self.assertEqual(self.builds[1].name, 'project-test2') # But both changes should still be in the pipeline - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - items = tenant.layout.pipelines['gate'].getAllItems() + items = self.getAllItems('tenant-one', 'gate') self.assertEqual(len(items), 2) self.assertEqual(A.reported, 1) self.assertEqual(B.reported, 1) diff --git a/tests/unit/test_v3.py b/tests/unit/test_v3.py index e617c2a2d0..aefd577f8a 100644 --- a/tests/unit/test_v3.py +++ b/tests/unit/test_v3.py @@ -1720,9 +1720,6 @@ class TestInRepoConfig(ZuulTestCase): def test_dynamic_config_new_patchset(self): self.executor_server.hold_jobs_in_build = True - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - check_pipeline = tenant.layout.pipelines['check'] - in_repo_conf = textwrap.dedent( """ - job: @@ -1753,7 +1750,7 @@ class TestInRepoConfig(ZuulTestCase): self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) self.waitUntilSettled() - items = check_pipeline.getAllItems() + items = self.getAllItems('tenant-one', 'check') self.assertEqual(items[0].changes[0].number, '1') self.assertEqual(items[0].changes[0].patchset, '1') self.assertTrue(items[0].live) @@ -1783,7 +1780,7 @@ class TestInRepoConfig(ZuulTestCase): self.waitUntilSettled() - items = check_pipeline.getAllItems() + items = self.getAllItems('tenant-one', 'check') self.assertEqual(items[0].changes[0].number, '1') self.assertEqual(items[0].changes[0].patchset, '2') self.assertTrue(items[0].live) @@ -4079,7 +4076,7 @@ class TestInRepoJoin(ZuulTestCase): self.fake_gerrit.addEvent(A.addApproval('Approved', 1)) self.waitUntilSettled() - items = gate_pipeline.getAllItems() + items = self.getAllItems('tenant-one', 'gate') self.assertEqual(items[0].changes[0].number, '1') self.assertEqual(items[0].changes[0].patchset, '1') self.assertTrue(items[0].live) diff --git a/tests/unit/test_web.py b/tests/unit/test_web.py index 5fce8fbb42..cc1e63c3b6 100644 --- a/tests/unit/test_web.py +++ b/tests/unit/test_web.py @@ -3219,8 +3219,7 @@ class TestTenantScopedWebApi(BaseTestWeb): self.waitUntilSettled() - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - items = tenant.layout.pipelines['gate'].getAllItems() + items = self.getAllItems('tenant-one', 'gate') enqueue_times = {} for item in items: enqueue_times[str(item.changes[0])] = item.enqueue_time @@ -3247,7 +3246,7 @@ class TestTenantScopedWebApi(BaseTestWeb): self.assertEqual(True, data) # ensure that enqueue times are durable - items = tenant.layout.pipelines['gate'].getAllItems() + items = self.getAllItems('tenant-one', 'gate') for item in items: self.assertEqual( enqueue_times[str(item.changes[0])], item.enqueue_time) @@ -3308,8 +3307,7 @@ class TestTenantScopedWebApi(BaseTestWeb): self.waitUntilSettled() - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - items = tenant.layout.pipelines['gate'].getAllItems() + items = self.getAllItems('tenant-one', 'gate') enqueue_times = {} for item in items: enqueue_times[str(item.changes[0])] = item.enqueue_time @@ -3336,7 +3334,7 @@ class TestTenantScopedWebApi(BaseTestWeb): self.assertEqual(True, data) # ensure that enqueue times are durable - items = tenant.layout.pipelines['gate'].getAllItems() + items = self.getAllItems('tenant-one', 'gate') for item in items: self.assertEqual( enqueue_times[str(item.changes[0])], item.enqueue_time) @@ -3399,9 +3397,8 @@ class TestTenantScopedWebApi(BaseTestWeb): self.waitUntilSettled() - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - items = [i for i in tenant.layout.pipelines['check'].getAllItems() - if i.live] + items = self.getAllItems('tenant-one', 'check') + items = [i for i in items if i.live] enqueue_times = {} for item in items: enqueue_times[str(item.changes[0])] = item.enqueue_time @@ -3429,8 +3426,8 @@ class TestTenantScopedWebApi(BaseTestWeb): self.waitUntilSettled() # ensure that enqueue times are durable - items = [i for i in tenant.layout.pipelines['check'].getAllItems() - if i.live] + items = self.getAllItems('tenant-one', 'check') + items = [i for i in items if i.live] for item in items: self.assertEqual( enqueue_times[str(item.changes[0])], item.enqueue_time) @@ -3438,8 +3435,8 @@ class TestTenantScopedWebApi(BaseTestWeb): # We can't reliably test for side effects in the check # pipeline since the change queues are independent, so we # directly examine the queues. - queue_items = [(item.changes[0].number, item.live) for item in - tenant.layout.pipelines['check'].getAllItems()] + items = self.getAllItems('tenant-one', 'check') + queue_items = [(item.changes[0].number, item.live) for item in items] expected = [('1', False), ('2', True), ('1', False), @@ -4102,8 +4099,7 @@ class TestCLIViaWebApi(BaseTestWeb): self.waitUntilSettled() - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - items = tenant.layout.pipelines['gate'].getAllItems() + items = self.getAllItems('tenant-one', 'gate') enqueue_times = {} for item in items: enqueue_times[str(item.changes[0])] = item.enqueue_time @@ -4129,7 +4125,7 @@ class TestCLIViaWebApi(BaseTestWeb): self.waitUntilSettled() # ensure that enqueue times are durable - items = tenant.layout.pipelines['gate'].getAllItems() + items = self.getAllItems('tenant-one', 'gate') for item in items: self.assertEqual( enqueue_times[str(item.changes[0])], item.enqueue_time) diff --git a/tests/zuul_client/test_zuulclient.py b/tests/zuul_client/test_zuulclient.py index 4e2ecf7a25..90640e303f 100644 --- a/tests/zuul_client/test_zuulclient.py +++ b/tests/zuul_client/test_zuulclient.py @@ -352,8 +352,7 @@ class TestZuulClientAdmin(BaseTestWeb): self.waitUntilSettled() - tenant = self.scheds.first.sched.abide.tenants.get('tenant-one') - items = tenant.layout.pipelines['gate'].getAllItems() + items = self.getAllItems('tenant-one', 'gate') enqueue_times = {} for item in items: enqueue_times[str(item.changes[0])] = item.enqueue_time @@ -379,7 +378,7 @@ class TestZuulClientAdmin(BaseTestWeb): self.waitUntilSettled() # ensure that enqueue times are durable - items = tenant.layout.pipelines['gate'].getAllItems() + items = self.getAllItems('tenant-one', 'gate') for item in items: self.assertEqual( enqueue_times[str(item.changes[0])], item.enqueue_time) diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py index f8e3b20960..608deffe87 100755 --- a/zuul/cmd/client.py +++ b/zuul/cmd/client.py @@ -1096,6 +1096,7 @@ class Client(zuul.cmd.ZuulApp): with ZKContext(zk_client, plock, None, self.log) as context: pipeline.manager = IndependentPipelineManager( None, pipeline) + pipeline.manager.tenant = tenant pipeline.manager.state = PipelineState.new( context, _path=path, layout_uuid=None) PipelineChangeList.new(context, pipeline=pipeline) diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index 7d51dba43e..5ffff3d470 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -51,7 +51,8 @@ class DynamicChangeQueueContextManager(object): if (self.allow_delete and self.change_queue and not self.change_queue.queue): - self.change_queue.pipeline.removeQueue(self.change_queue) + self.change_queue.pipeline.manager.state.removeQueue( + self.change_queue) class StaticChangeQueueContextManager(object): @@ -227,7 +228,7 @@ class PipelineManager(metaclass=ABCMeta): queue_projects = set(self.getRelativePriorityQueue( change.project)) items = [] - for i in self.pipeline.getAllItems(): + for i in self.state.getAllItems(): if not i.live: continue item_projects = set([ @@ -280,7 +281,7 @@ class PipelineManager(metaclass=ABCMeta): def _maintainCache(self): active_layout_uuids = set() referenced_change_keys = set() - for item in self.pipeline.getAllItems(): + for item in self.state.getAllItems(): if item.layout_uuid: active_layout_uuids.add(item.layout_uuid) @@ -308,7 +309,7 @@ class PipelineManager(metaclass=ABCMeta): def isChangeAlreadyInPipeline(self, change): # Checks live items in the pipeline - for item in self.pipeline.getAllItems(): + for item in self.state.getAllItems(): if not item.live: continue for c in item.changes: @@ -350,7 +351,7 @@ class PipelineManager(metaclass=ABCMeta): self.sched.query_cache.clearIfOlderThan(event) to_refresh = set() - for item in self.pipeline.getAllItems(): + for item in self.state.getAllItems(): for item_change in item.changes: if not isinstance(item_change, model.Change): continue @@ -466,7 +467,7 @@ class PipelineManager(metaclass=ABCMeta): if change_queue is not None: items = change_queue.queue else: - items = self.pipeline.getAllItems() + items = self.state.getAllItems() for item in items: for c in item.changes: @@ -476,7 +477,7 @@ class PipelineManager(metaclass=ABCMeta): def findOldVersionOfChangeAlreadyInQueue(self, change): # Return the item and the old version of the change - for item in self.pipeline.getAllItems(): + for item in self.state.getAllItems(): if not item.live: continue for item_change in item.changes: @@ -980,7 +981,7 @@ class PipelineManager(metaclass=ABCMeta): f"pipeline max of {pipeline_max}") count = additional - for item in self.pipeline.getAllItems(): + for item in self.state.getAllItems(): count += len(item.changes) if count > pipeline_max: return FalseWithReason( @@ -2535,7 +2536,7 @@ class PipelineManager(metaclass=ABCMeta): dt = None item_changes = 0 changes = sum(len(i.changes) - for i in self.pipeline.getAllItems()) + for i in self.state.getAllItems()) # TODO(jeblair): add items keys like changes tenant = self.pipeline.tenant diff --git a/zuul/manager/dependent.py b/zuul/manager/dependent.py index 6e04d6e2af..abb7c446a9 100644 --- a/zuul/manager/dependent.py +++ b/zuul/manager/dependent.py @@ -278,4 +278,4 @@ class DependentPipelineManager(SharedQueuePipelineManager): # remove the queue (if empty) if item.queue.dynamic: if not item.queue.queue: - self.pipeline.removeQueue(item.queue) + self.state.removeQueue(item.queue) diff --git a/zuul/manager/independent.py b/zuul/manager/independent.py index dad3dea650..2505229363 100644 --- a/zuul/manager/independent.py +++ b/zuul/manager/independent.py @@ -35,7 +35,7 @@ class IndependentPipelineManager(PipelineManager): pipeline=self.pipeline, dynamic=True) change_queue.addProject(change.project, None) - self.pipeline.addQueue(change_queue) + self.state.addQueue(change_queue) log.debug("Dynamically created queue %s", id(change_queue)) return DynamicChangeQueueContextManager( change_queue, allow_delete=True) @@ -125,4 +125,4 @@ class IndependentPipelineManager(PipelineManager): # An independent pipeline manager dynamically removes empty # queues if not item.queue.queue: - self.pipeline.removeQueue(item.queue) + self.state.removeQueue(item.queue) diff --git a/zuul/manager/serial.py b/zuul/manager/serial.py index 2555c63f59..72c59f012c 100644 --- a/zuul/manager/serial.py +++ b/zuul/manager/serial.py @@ -37,4 +37,4 @@ class SerialPipelineManager(SharedQueuePipelineManager): # remove the queue (if empty) if item.queue.dynamic: if not item.queue.queue: - self.pipeline.removeQueue(item.queue) + self.state.removeQueue(item.queue) diff --git a/zuul/manager/shared.py b/zuul/manager/shared.py index e1193e0593..3b66b0102a 100644 --- a/zuul/manager/shared.py +++ b/zuul/manager/shared.py @@ -35,10 +35,9 @@ class ChangeQueueManager: change_queue = self.created_for_branches.get(branch) if not change_queue: - p = self.pipeline_manager.pipeline name = self.name or project.name change_queue = self.pipeline_manager.constructChangeQueue(name) - p.addQueue(change_queue) + self.pipeline_manager.state.addQueue(change_queue) self.created_for_branches[branch] = change_queue if not change_queue.matches(project.canonical_name, branch): @@ -111,8 +110,8 @@ class SharedQueuePipelineManager(PipelineManager, metaclass=ABCMeta): # Ignore the existing queue, since we can always get the correct queue # from the pipeline. This avoids enqueuing changes in a wrong queue # e.g. during re-configuration. - queue = self.pipeline.getQueue(change.project.canonical_name, - change.branch) + queue = self.state.getQueue(change.project.canonical_name, + change.branch) if queue: return StaticChangeQueueContextManager(queue) else: @@ -133,7 +132,7 @@ class SharedQueuePipelineManager(PipelineManager, metaclass=ABCMeta): ) # No specific per-branch queue matched so look again with no branch - queue = self.pipeline.getQueue(change.project.canonical_name, None) + queue = self.state.getQueue(change.project.canonical_name, None) if queue: return StaticChangeQueueContextManager(queue) @@ -144,7 +143,7 @@ class SharedQueuePipelineManager(PipelineManager, metaclass=ABCMeta): pipeline=self.pipeline, dynamic=True) change_queue.addProject(change.project, None) - self.pipeline.addQueue(change_queue) + self.state.addQueue(change_queue) log.debug("Dynamically created queue %s", change_queue) return DynamicChangeQueueContextManager( change_queue, allow_delete=True) diff --git a/zuul/manager/supercedent.py b/zuul/manager/supercedent.py index c659d9365e..bce823f715 100644 --- a/zuul/manager/supercedent.py +++ b/zuul/manager/supercedent.py @@ -51,7 +51,7 @@ class SupercedentPipelineManager(PipelineManager): window_decrease_type='none', dynamic=True) change_queue.addProject(change.project, None) - self.pipeline.addQueue(change_queue) + self.pipeline.manager.state.addQueue(change_queue) log.debug("Dynamically created queue %s", change_queue) return DynamicChangeQueueContextManager( change_queue, allow_delete=True) @@ -88,4 +88,4 @@ class SupercedentPipelineManager(PipelineManager): # A supercedent pipeline manager dynamically removes empty # queues if not item.queue.queue: - self.pipeline.removeQueue(item.queue) + self.state.removeQueue(item.queue) diff --git a/zuul/model.py b/zuul/model.py index 49dadb0c34..9904a8dfd9 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -650,39 +650,6 @@ class Pipeline(object): def setManager(self, manager): self.manager = manager - def addQueue(self, queue): - with self.state.activeContext(self.manager.current_context): - self.queues.append(queue) - - def getQueue(self, project_cname, branch): - # Queues might be branch specific so match with branch - for queue in self.queues: - if queue.matches(project_cname, branch): - return queue - return None - - def removeQueue(self, queue): - if queue in self.queues: - with self.state.activeContext(self.manager.current_context): - self.queues.remove(queue) - queue.delete(self.manager.current_context) - - def promoteQueue(self, queue): - if queue not in self.queues: - return - with self.state.activeContext(self.manager.current_context): - self.queues.remove(queue) - self.queues.insert(0, queue) - - def getAllItems(self, include_old=False): - items = [] - for shared_queue in self.queues: - items.extend(shared_queue.queue) - if include_old: - for shared_queue in self.state.old_queues: - items.extend(shared_queue.queue) - return items - def formatStatusJSON(self, websocket_url=None): j_pipeline = dict(name=self.name, description=self.description, @@ -827,6 +794,39 @@ class PipelineState(zkobject.ZKObject): with self.activeContext(context): self.old_queues.remove(queue) + def addQueue(self, queue): + with self.activeContext(self.pipeline.manager.current_context): + self.queues.append(queue) + + def getQueue(self, project_cname, branch): + # Queues might be branch specific so match with branch + for queue in self.queues: + if queue.matches(project_cname, branch): + return queue + return None + + def removeQueue(self, queue): + if queue in self.queues: + with self.activeContext(self.pipeline.manager.current_context): + self.queues.remove(queue) + queue.delete(self.pipeline.manager.current_context) + + def promoteQueue(self, queue): + if queue not in self.queues: + return + with self.activeContext(self.pipeline.manager.current_context): + self.queues.remove(queue) + self.queues.insert(0, queue) + + def getAllItems(self, include_old=False): + items = [] + for shared_queue in self.queues: + items.extend(shared_queue.queue) + if include_old: + for shared_queue in self.old_queues: + items.extend(shared_queue.queue) + return items + def serialize(self, context): if self._read_only: raise RuntimeError("Attempt to serialize read-only pipeline state") @@ -962,12 +962,6 @@ class PipelineState(zkobject.ZKObject): }) return data - def _getKnownItems(self): - items = [] - for queue in (*self.old_queues, *self.queues): - items.extend(queue.queue) - return items - def cleanup(self, context): pipeline_path = self.getPath() try: @@ -976,7 +970,7 @@ class PipelineState(zkobject.ZKObject): except NoNodeError: all_items = set() - known_item_objs = self._getKnownItems() + known_item_objs = self.getAllItems(include_old=True) known_items = {i.uuid for i in known_item_objs} items_referenced_by_builds = set() for i in known_item_objs: @@ -6824,7 +6818,7 @@ class QueueItem(zkobject.ZKObject): # Look for this item in other queues in the pipeline. item = None found = False - for item in self.pipeline.getAllItems(): + for item in self.pipeline.state.getAllItems(): if item.live and set(item.changes) == set(self.changes): found = True break diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 77131827ab..08f1ecf78d 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -718,7 +718,8 @@ class Scheduler(threading.Thread): pipeline.state.refresh(ctx, read_only=True) # In case we're in the middle of a reconfig, # include the old queue items. - for item in pipeline.getAllItems(include_old=True): + for item in pipeline.manager.state.getAllItems( + include_old=True): nrs = item.current_build_set.getNodeRequests() for _, req_id in nrs: outstanding_requests.add(req_id) @@ -846,7 +847,8 @@ class Scheduler(threading.Thread): self.createZKContext(lock, self.log) as ctx): pipeline.state.refresh(ctx, read_only=True) # add any blobstore references - for item in pipeline.getAllItems(include_old=True): + for item in pipeline.manager.state.getAllItems( + include_old=True): live_blobs.update(item.getBlobKeys()) with self.createZKContext(None, self.log) as ctx: blobstore = BlobStore(ctx) @@ -1715,12 +1717,12 @@ class Scheduler(threading.Thread): # Attempt to keep window sizes from shrinking where possible project, branch = shared_queue.project_branches[0] - new_queue = new_pipeline.getQueue(project, branch) + new_queue = new_pipeline.manager.state.getQueue(project, branch) if new_queue and shared_queue.window and (not static_window): new_queue.updateAttributes( context, window=max(shared_queue.window, new_queue.window_floor)) - new_pipeline.state.removeOldQueue(context, shared_queue) + new_pipeline.manager.state.removeOldQueue(context, shared_queue) for item in items_to_remove: log = get_annotated_logger(self.log, item.event) log.info("Removing item %s during reconfiguration", item) @@ -1868,7 +1870,7 @@ class Scheduler(threading.Thread): builds_to_cancel = [] requests_to_cancel = [] - for item in pipeline.getAllItems(): + for item in pipeline.manager.state.getAllItems(): with item.activeContext(pipeline.manager.current_context): item.item_ahead = None item.items_behind = [] @@ -2004,7 +2006,7 @@ class Scheduler(threading.Thread): quiet=True, ignore_requirements=True) # Regardless, move this shared change queue to the head. - pipeline.promoteQueue(change_queue) + pipeline.manager.state.promoteQueue(change_queue) def _doDequeueEvent(self, event): tenant = self.abide.tenants.get(event.tenant_name) @@ -2116,7 +2118,7 @@ class Scheduler(threading.Thread): waiting = False for tenant in self.abide.tenants.values(): for pipeline in tenant.layout.pipelines.values(): - for item in pipeline.getAllItems(): + for item in pipeline.manager.state.getAllItems(): for build in item.current_build_set.getBuilds(): if build.result is None: self.log.debug("%s waiting on %s" % @@ -2829,7 +2831,7 @@ class Scheduler(threading.Thread): ) return - for item in pipeline.getAllItems(): + for item in pipeline.manager.state.getAllItems(): # If the provided buildset UUID doesn't match any current one, # we assume that it's not current anymore. if item.current_build_set.uuid == event.build_set_uuid: