Move queue-related methods to PipelineState

To reimagine the Pipeline as an abstract config object, move
the queue-related methods to the PipelineState class, which
is where the actual queues are stored.

This adds a layer of abstraction in the unit tests since we
expect further changes in object relationships.

Change-Id: I0ef057e58e8ceb7d9c9e1e70e36a7fe9d46c6bcb
This commit is contained in:
James E. Blair 2025-03-11 14:44:52 -07:00
parent 29aa948725
commit ae639864a0
22 changed files with 220 additions and 262 deletions

View File

@ -2935,6 +2935,17 @@ class ZuulTestCase(BaseTestCase):
if isinstance(pipeline.manager, ipm):
self.assertEqual(len(pipeline.queues), 0)
def getAllItems(self, tenant_name, pipeline_name):
tenant = self.scheds.first.sched.abide.tenants.get(tenant_name)
manager = tenant.layout.pipelines[pipeline_name].manager
items = manager.state.getAllItems()
return items
def getAllQueues(self, tenant_name, pipeline_name):
tenant = self.scheds.first.sched.abide.tenants.get(tenant_name)
manager = tenant.layout.pipelines[pipeline_name].manager
return manager.state.queues
def shutdown(self):
# Note: when making changes to this sequence, check if
# corresponding changes need to happen in
@ -3080,7 +3091,7 @@ class ZuulTestCase(BaseTestCase):
def getCurrentBuilds(self):
for tenant in self.scheds.first.sched.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
for item in pipeline.getAllItems():
for item in pipeline.manager.state.getAllItems():
for build in item.current_build_set.builds.values():
yield build

View File

@ -850,8 +850,7 @@ class TestGerritCircularDependencies(ZuulTestCase):
self.fake_gerrit.addEvent(B.addApproval("Approved", 1))
self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
items_before = tenant.layout.pipelines['gate'].getAllItems()
items_before = self.getAllItems('tenant-one', 'gate')
# Trigger a re-enqueue of change B
self.fake_gerrit.addEvent(B.getChangeAbandonedEvent())
@ -859,8 +858,7 @@ class TestGerritCircularDependencies(ZuulTestCase):
self.fake_gerrit.addEvent(B.addApproval("Approved", 1))
self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
items_after = tenant.layout.pipelines['gate'].getAllItems()
items_after = self.getAllItems('tenant-one', 'gate')
# Make sure the complete cycle was re-enqueued
for before, after in zip(items_before, items_after):
@ -1229,8 +1227,7 @@ class TestGerritCircularDependencies(ZuulTestCase):
self.waitUntilSettled()
# We only want to have a merge failure for the first item in the queue
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
items = tenant.layout.pipelines['gate'].getAllItems()
items = self.getAllItems('tenant-one', 'gate')
with self.createZKContext() as context:
items[0].current_build_set.updateAttributes(context,
unable_to_merge=True)
@ -2012,9 +2009,7 @@ class TestGerritCircularDependencies(ZuulTestCase):
self.waitUntilSettled()
# Fail the node request and unpause
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
pipeline = tenant.layout.pipelines['gate']
items = pipeline.getAllItems()
items = self.getAllItems('tenant-one', 'gate')
job = items[0].current_build_set.job_graph.getJob(
'common-job', items[0].changes[0].cache_key)
for req in self.fake_nodepool.getNodeRequests():
@ -2881,9 +2876,7 @@ class TestGerritCircularDependencies(ZuulTestCase):
self.waitUntilSettled()
# Fail the node request and unpause
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
pipeline = tenant.layout.pipelines['check']
items = pipeline.getAllItems()
items = self.getAllItems('tenant-one', 'check')
job = items[0].current_build_set.job_graph.getJob(
'common-job', items[0].changes[0].cache_key)
for req in self.fake_nodepool.getNodeRequests():
@ -4664,8 +4657,6 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase):
# patchsets. This test exercises such changes.
self.executor_server.hold_jobs_in_build = True
tenant = self.scheds.first.sched.abide.tenants.get("tenant-one")
pipeline = tenant.layout.pipelines["gate"]
A = self.fake_github.openFakePullRequest("gh/project1", "master", "A")
B = self.fake_github.openFakePullRequest("gh/project2", "master", "B")
@ -4703,8 +4694,9 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase):
self.fake_github.emitEvent(C.addLabel("approved"))
self.waitUntilSettled()
expected_cycle = {A.number, B.number, C.number, E.number}
self.assertEqual(len(list(pipeline.getAllItems())), 1)
for item in pipeline.getAllItems():
items = self.getAllItems('tenant-one', 'gate')
self.assertEqual(len(list(items)), 1)
for item in items:
cycle = {c.number for c in item.changes}
self.assertEqual(expected_cycle, cycle)
# Assert we get the same triggering change every time
@ -4718,7 +4710,8 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase):
)
self.fake_github.emitEvent(A.getPullRequestEditedEvent(A.body))
self.waitUntilSettled()
self.assertEqual(len(list(pipeline.getAllItems())), 2)
items = self.getAllItems('tenant-one', 'gate')
self.assertEqual(len(list(items)), 2)
# Now remove all dependencies for the three remaining changes,
# and also E so that it doesn't get pulled back in as an
@ -4734,8 +4727,9 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase):
self.waitUntilSettled()
self.fake_github.emitEvent(C.getPullRequestEditedEvent(C.body))
self.waitUntilSettled()
self.assertEqual(len(list(pipeline.getAllItems())), 3)
for item in pipeline.getAllItems():
items = self.getAllItems('tenant-one', 'gate')
self.assertEqual(len(list(items)), 3)
for item in items:
self.assertEqual(len(item.changes), 1)
# Assert we get the same triggering change every time
self.assertEqual(json.loads(item.event.ref)['stable_id'], '3')
@ -4747,8 +4741,9 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase):
)
self.fake_github.emitEvent(A.getPullRequestEditedEvent(A.body))
self.waitUntilSettled()
self.assertEqual(len(list(pipeline.getAllItems())), 2)
for item in pipeline.getAllItems():
items = self.getAllItems('tenant-one', 'gate')
self.assertEqual(len(list(items)), 2)
for item in items:
self.assertEqual(len(item.changes), 1)
# Assert we get the same triggering change every time
self.assertEqual(json.loads(item.event.ref)['stable_id'], '3')
@ -4766,8 +4761,9 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase):
self.fake_github.emitEvent(B.getPullRequestEditedEvent(B.body))
self.waitUntilSettled()
expected_cycle = {B.number, C.number}
self.assertEqual(len(list(pipeline.getAllItems())), 1)
for item in pipeline.getAllItems():
items = self.getAllItems('tenant-one', 'gate')
self.assertEqual(len(list(items)), 1)
for item in items:
cycle = {c.number for c in item.changes}
self.assertEqual(expected_cycle, cycle)
# Assert we get the same triggering change every time
@ -4790,9 +4786,6 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase):
# patchsets. This test exercises such changes.
self.executor_server.hold_jobs_in_build = True
tenant = self.scheds.first.sched.abide.tenants.get("tenant-one")
pipeline = tenant.layout.pipelines["gate"]
A = self.fake_github.openFakePullRequest("gh/project1", "master", "A")
B = self.fake_github.openFakePullRequest("gh/project2", "master", "B")
@ -4801,8 +4794,9 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase):
self.fake_github.emitEvent(A.addLabel("approved"))
self.waitUntilSettled()
self.assertEqual(len(list(pipeline.getAllItems())), 1)
for item in pipeline.getAllItems():
items = self.getAllItems('tenant-one', 'gate')
self.assertEqual(len(list(items)), 1)
for item in items:
self.assertEqual(len(item.changes), 1)
# Assert we get the same triggering change every time
self.assertEqual(json.loads(item.event.ref)['stable_id'], '1')
@ -4818,9 +4812,10 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase):
)
self.fake_github.emitEvent(A.getPullRequestEditedEvent(A.body))
self.waitUntilSettled()
self.assertEqual(len(list(pipeline.getAllItems())), 1)
items = self.getAllItems('tenant-one', 'gate')
self.assertEqual(len(list(items)), 1)
expected_cycle = {A.number, B.number}
for item in pipeline.getAllItems():
for item in items:
cycle = {c.number for c in item.changes}
self.assertEqual(expected_cycle, cycle)
# Assert we get the same triggering change every time
@ -4837,8 +4832,9 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase):
for build in self.history[-3:]:
self.assertEqual(build.result, 'SUCCESS')
def assertQueueCycles(self, pipeline, queue_index, bundles):
queue = pipeline.queues[queue_index]
def assertQueueCycles(self, pipeline_name, queue_index, bundles):
queues = self.getAllQueues('tenant-one', pipeline_name)
queue = queues[queue_index]
self.assertEqual(len(queue.queue), len(bundles))
for x, item in enumerate(queue.queue):
@ -4852,8 +4848,6 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase):
# patchsets. This test exercises such changes.
self.executor_server.hold_jobs_in_build = True
tenant = self.scheds.first.sched.abide.tenants.get("tenant-one")
pipeline = tenant.layout.pipelines["check"]
A = self.fake_github.openFakePullRequest("gh/project1", "master", "A")
B = self.fake_github.openFakePullRequest("gh/project2", "master", "B")
@ -4885,9 +4879,10 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase):
self.fake_github.emitEvent(C.getPullRequestOpenedEvent())
self.waitUntilSettled()
abce = [A, B, C, E]
self.assertEqual(len(pipeline.queues), 1)
self.assertQueueCycles(pipeline, 0, [abce])
for item in pipeline.getAllItems():
self.assertEqual(len(self.getAllQueues('tenant-one', 'check')), 1)
self.assertQueueCycles('check', 0, [abce])
items = self.getAllItems('tenant-one', 'check')
for item in items:
# Assert we get the same triggering change every time
self.assertEqual(json.loads(item.event.ref)['stable_id'], '3')
@ -4902,8 +4897,9 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase):
abc = [A, B, C]
# ABC<nonlive>, E<live>
self.assertQueueCycles(pipeline, 0, [abc, [E]])
for item in pipeline.getAllItems():
self.assertQueueCycles('check', 0, [abc, [E]])
items = self.getAllItems('tenant-one', 'check')
for item in items:
# Assert we get the same triggering change every time
self.assertEqual(json.loads(item.event.ref)['stable_id'], '3')
@ -4920,11 +4916,11 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase):
self.waitUntilSettled()
# A, B, C individually
self.assertEqual(len(pipeline.queues), 3)
self.assertQueueCycles(pipeline, 0, [[A], [B]])
self.assertQueueCycles(pipeline, 1, [[A], [B], [C]])
self.assertQueueCycles(pipeline, 2, [[A]])
for item in pipeline.getAllItems():
self.assertEqual(len(self.getAllQueues('tenant-one', 'check')), 3)
self.assertQueueCycles('check', 0, [[A], [B]])
self.assertQueueCycles('check', 1, [[A], [B], [C]])
self.assertQueueCycles('check', 2, [[A]])
for item in self.getAllItems('tenant-one', 'check'):
# Assert we get the same triggering change every time
self.assertEqual(json.loads(item.event.ref)['stable_id'], '3')
@ -4934,7 +4930,7 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase):
)
self.fake_github.emitEvent(C.getPullRequestEditedEvent(C.body))
self.waitUntilSettled()
for item in pipeline.getAllItems():
for item in self.getAllItems('tenant-one', 'check'):
# Assert we get the same triggering change every time
self.assertEqual(json.loads(item.event.ref)['stable_id'], '3')
B.body = "{}\n\nDepends-On: {}\n".format(
@ -4942,13 +4938,13 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase):
)
self.fake_github.emitEvent(B.getPullRequestEditedEvent(B.body))
self.waitUntilSettled()
for item in pipeline.getAllItems():
for item in self.getAllItems('tenant-one', 'check'):
# Assert we get the same triggering change every time
self.assertEqual(json.loads(item.event.ref)['stable_id'], '3')
self.assertEqual(len(pipeline.queues), 2)
self.assertEqual(len(self.getAllQueues('tenant-one', 'check')), 2)
bc = [B, C]
self.assertQueueCycles(pipeline, 0, [[A]])
self.assertQueueCycles(pipeline, 1, [bc])
self.assertQueueCycles('check', 0, [[A]])
self.assertQueueCycles('check', 1, [bc])
# All done.
self.executor_server.hold_jobs_in_build = False
@ -4967,17 +4963,15 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase):
# patchsets. This test exercises such changes.
self.executor_server.hold_jobs_in_build = True
tenant = self.scheds.first.sched.abide.tenants.get("tenant-one")
pipeline = tenant.layout.pipelines["check"]
A = self.fake_github.openFakePullRequest("gh/project1", "master", "A")
B = self.fake_github.openFakePullRequest("gh/project2", "master", "B")
self.fake_github.emitEvent(A.getPullRequestOpenedEvent())
self.waitUntilSettled()
self.assertEqual(len(list(pipeline.getAllItems())), 1)
for item in pipeline.getAllItems():
items = self.getAllItems('tenant-one', 'check')
self.assertEqual(len(list(items)), 1)
for item in items:
self.assertEqual(len(item.changes), 1)
# Assert we get the same triggering change every time
self.assertEqual(json.loads(item.event.ref)['stable_id'], '1')
@ -4993,14 +4987,15 @@ class TestGithubAppCircularDependencies(ZuulGithubAppTestCase):
)
self.fake_github.emitEvent(A.getPullRequestEditedEvent(A.body))
self.waitUntilSettled()
self.assertEqual(len(list(pipeline.getAllItems())), 1)
for item in pipeline.getAllItems():
items = self.getAllItems('tenant-one', 'check')
self.assertEqual(len(list(items)), 1)
for item in items:
self.assertEqual(len(item.changes), 2)
# Assert we get the same triggering change every time
self.assertEqual(json.loads(item.event.ref)['stable_id'], '1')
self.assertEqual(len(pipeline.queues), 1)
self.assertEqual(len(self.getAllQueues('tenant-one', 'check')), 1)
ab = [A, B]
self.assertQueueCycles(pipeline, 0, [ab])
self.assertQueueCycles('check', 0, [ab])
# All done.
self.executor_server.hold_jobs_in_build = False

View File

@ -292,23 +292,21 @@ class TestGerritToGithubCRD(ZuulTestCase):
# A Depends-On: B
A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % (
A.subject, B.url)
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
check_pipeline = tenant.layout.pipelines['check']
# Add two dependent changes...
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(len(check_pipeline.getAllItems()), 2)
self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 2)
# ...make sure the live one is not duplicated...
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(len(check_pipeline.getAllItems()), 2)
self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 2)
# ...but the non-live one is able to be.
self.fake_github.emitEvent(B.getPullRequestEditedEvent())
self.waitUntilSettled()
self.assertEqual(len(check_pipeline.getAllItems()), 3)
self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 3)
# Release jobs in order to avoid races with change A jobs
# finishing before change B jobs.
@ -331,7 +329,7 @@ class TestGerritToGithubCRD(ZuulTestCase):
'project-merge', 'github/project2').changes
self.assertEqual(changes, '1,%s' %
(B.head_sha,))
self.assertEqual(len(tenant.layout.pipelines['check'].queues), 0)
self.assertEqual(len(self.getAllQueues('tenant-one', 'check')), 0)
self.assertIn('Build succeeded', A.messages[0])
@ -755,22 +753,21 @@ class TestGithubToGerritCRD(ZuulTestCase):
# A Depends-On: B
event = A.editBody('Depends-On: %s\n' % (B.data['url'],))
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
check_pipeline = tenant.layout.pipelines['check']
# Add two dependent changes...
self.fake_github.emitEvent(event)
self.waitUntilSettled()
self.assertEqual(len(check_pipeline.getAllItems()), 2)
self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 2)
# ...make sure the live one is not duplicated...
self.fake_github.emitEvent(A.getPullRequestEditedEvent())
self.waitUntilSettled()
self.assertEqual(len(check_pipeline.getAllItems()), 2)
self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 2)
# ...but the non-live one is able to be.
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(len(check_pipeline.getAllItems()), 3)
self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 3)
# Release jobs in order to avoid races with change A jobs
# finishing before change B jobs.

View File

@ -1506,9 +1506,7 @@ class TestExecutorFailure(ZuulTestCase):
self.executor_api.release()
self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
pipeline = tenant.layout.pipelines['gate']
items = pipeline.getAllItems()
items = self.getAllItems('tenant-one', 'gate')
self.assertEqual(len(items), 1)
self.hold_jobs_in_queue = False

View File

@ -460,24 +460,22 @@ class TestGerritCRD(ZuulTestCase):
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
check_pipeline = tenant.layout.pipelines['check']
# Add two git-dependent changes...
B.setDependsOn(A, 1)
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(len(check_pipeline.getAllItems()), 2)
self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 2)
# ...make sure the live one is not duplicated...
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(len(check_pipeline.getAllItems()), 2)
self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 2)
# ...but the non-live one is able to be.
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(len(check_pipeline.getAllItems()), 3)
self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 3)
# Release jobs in order to avoid races with change A jobs
# finishing before change B jobs.
@ -493,7 +491,7 @@ class TestGerritCRD(ZuulTestCase):
self.assertEqual(self.history[0].changes, '1,1 2,1')
self.assertEqual(self.history[1].changes, '1,1')
self.assertEqual(len(tenant.layout.pipelines['check'].queues), 0)
self.assertEqual(len(self.getAllQueues('tenant-one', 'check')), 0)
self.assertIn('Build succeeded', A.messages[0])
self.assertIn('Build succeeded', B.messages[0])
@ -575,8 +573,8 @@ class TestGerritCRD(ZuulTestCase):
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
check_pipeline = tenant.layout.pipelines['check']
self.assertEqual(len(check_pipeline.queues), 3)
self.assertEqual(len(check_pipeline.getAllItems()), 3)
for item in check_pipeline.getAllItems():
self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 3)
for item in self.getAllItems('tenant-one', 'check'):
self.assertTrue(item.live)
self.hold_jobs_in_queue = False

View File

@ -355,8 +355,7 @@ class TestGerritLegacyCRD(ZuulTestCase):
self.assertEqual(B.reported, 0)
self.assertEqual(self.history[0].changes, '2,1 1,1')
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
self.assertEqual(len(tenant.layout.pipelines['check'].queues), 0)
self.assertEqual(len(self.getAllQueues('tenant-one', 'check')), 0)
def test_crd_check_git_depends(self):
"Test single-repo dependencies in independent pipelines"
@ -378,8 +377,7 @@ class TestGerritLegacyCRD(ZuulTestCase):
self.assertEqual(self.history[0].changes, '1,1')
self.assertEqual(self.history[-1].changes, '1,1 2,1')
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
self.assertEqual(len(tenant.layout.pipelines['check'].queues), 0)
self.assertEqual(len(self.getAllQueues('tenant-one', 'check')), 0)
self.assertIn('Build succeeded', A.messages[0])
self.assertIn('Build succeeded', B.messages[0])
@ -389,24 +387,22 @@ class TestGerritLegacyCRD(ZuulTestCase):
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project1', 'master', 'B')
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
check_pipeline = tenant.layout.pipelines['check']
# Add two git-dependent changes...
B.setDependsOn(A, 1)
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(len(check_pipeline.getAllItems()), 2)
self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 2)
# ...make sure the live one is not duplicated...
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(len(check_pipeline.getAllItems()), 2)
self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 2)
# ...but the non-live one is able to be.
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(len(check_pipeline.getAllItems()), 3)
self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 3)
# Release jobs in order to avoid races with change A jobs
# finishing before change B jobs.
@ -422,7 +418,7 @@ class TestGerritLegacyCRD(ZuulTestCase):
self.assertEqual(self.history[0].changes, '1,1 2,1')
self.assertEqual(self.history[1].changes, '1,1')
self.assertEqual(len(tenant.layout.pipelines['check'].queues), 0)
self.assertEqual(len(self.getAllQueues('tenant-one', 'check')), 0)
self.assertIn('Build succeeded', A.messages[0])
self.assertIn('Build succeeded', B.messages[0])
@ -446,9 +442,8 @@ class TestGerritLegacyCRD(ZuulTestCase):
# Make sure the items still share a change queue, and the
# first one is not live.
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
self.assertEqual(len(tenant.layout.pipelines['check'].queues), 1)
queue = tenant.layout.pipelines['check'].queues[0]
self.assertEqual(len(self.getAllQueues('tenant-one', 'check')), 1)
queue = self.getAllQueues('tenant-one', 'check')[0]
first_item = queue.queue[0]
for item in queue.queue:
self.assertEqual(item.queue, first_item.queue)
@ -465,7 +460,7 @@ class TestGerritLegacyCRD(ZuulTestCase):
self.assertEqual(B.reported, 0)
self.assertEqual(self.history[0].changes, '2,1 1,1')
self.assertEqual(len(tenant.layout.pipelines['check'].queues), 0)
self.assertEqual(len(self.getAllQueues('tenant-one', 'check')), 0)
@skipIfMultiScheduler()
def test_crd_check_reconfiguration(self):
@ -501,11 +496,9 @@ class TestGerritLegacyCRD(ZuulTestCase):
# Make sure none of the items share a change queue, and all
# are live.
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
check_pipeline = tenant.layout.pipelines['check']
self.assertEqual(len(check_pipeline.queues), 3)
self.assertEqual(len(check_pipeline.getAllItems()), 3)
for item in check_pipeline.getAllItems():
self.assertEqual(len(self.getAllQueues('tenant-one', 'check')), 3)
self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 3)
for item in self.getAllItems('tenant-one', 'check'):
self.assertTrue(item.live)
self.hold_jobs_in_queue = False

View File

@ -199,8 +199,7 @@ class TestGithubCrossRepoDeps(ZuulTestCase):
self.waitUntilSettled()
self.assertEqual(len(self.builds), 1)
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
items = tenant.layout.pipelines['check'].getAllItems()
items = self.getAllItems('tenant-one', 'check')
self.assertEqual(len(items), 3)
# Update B to point at A1 instead of A

View File

@ -1368,9 +1368,8 @@ class TestGithubDriver(ZuulTestCase):
self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
check_pipeline = tenant.layout.pipelines['check']
self.assertEqual(check_pipeline.getAllItems(), [])
items = self.getAllItems('tenant-one', 'check')
self.assertEqual(items, [])
self.assertEqual(self.countJobResults(self.history, 'ABORTED'), 2)
self.executor_server.hold_jobs_in_build = False
@ -2606,9 +2605,8 @@ class TestGithubAppDriver(ZuulGithubAppTestCase):
self.fake_github.emitEvent(A.getCheckRunAbortEvent(check_run))
self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get("tenant-one")
check_pipeline = tenant.layout.pipelines["check"]
self.assertEqual(0, len(check_pipeline.getAllItems()))
items = self.getAllItems('tenant-one', 'check')
self.assertEqual(0, len(items))
self.assertEqual(1, self.countJobResults(self.history, "ABORTED"))
# The buildset was already dequeued, so there shouldn't be anything to

View File

@ -40,16 +40,13 @@ class TestReporting(ZuulTestCase):
'check'].put(event)
self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
check_pipeline = tenant.layout.pipelines['check']
# A should have been reported two times: start, cancel
self.assertEqual(2, A.reported)
self.assertEqual(2, len(A.messages))
self.assertIn("Build started (check)", A.messages[0])
self.assertIn("Build canceled (check)", A.messages[1])
# There shouldn't be any successful items
self.assertEqual(len(check_pipeline.getAllItems()), 0)
self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 0)
# But one canceled
self.assertEqual(self.countJobResults(self.history, "ABORTED"), 1)

View File

@ -188,8 +188,7 @@ class TestSchedulerZone(ZuulTestCase):
self.executor_server.zk_client.client.start()
# Find the build in the scheduler so we can check its status
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
items = tenant.layout.pipelines['gate'].getAllItems()
items = self.getAllItems('tenant-one', 'gate')
builds = items[0].current_build_set.getBuilds()
build = builds[0]
@ -1058,8 +1057,7 @@ class TestScheduler(ZuulTestCase):
# project-test1 and project-test2 for C
self.assertEqual(len(self.builds), 5)
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
items = tenant.layout.pipelines['gate'].getAllItems()
items = self.getAllItems('tenant-one', 'gate')
builds = items[0].current_build_set.getBuilds()
self.assertEqual(self.countJobResults(builds, 'SUCCESS'), 1)
self.assertEqual(self.countJobResults(builds, None), 2)
@ -2944,8 +2942,6 @@ class TestScheduler(ZuulTestCase):
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
check_pipeline = tenant.layout.pipelines['check']
# Add two git-dependent changes
B.setDependsOn(A, 1)
@ -2955,7 +2951,7 @@ class TestScheduler(ZuulTestCase):
self.waitUntilSettled()
# A live item, and a non-live/live pair
items = check_pipeline.getAllItems()
items = self.getAllItems('tenant-one', 'check')
self.assertEqual(len(items), 3)
self.assertEqual(items[0].changes[0].number, '1')
@ -2977,7 +2973,7 @@ class TestScheduler(ZuulTestCase):
# The live copy of A,1 should be gone, but the non-live and B
# should continue, and we should have a new A,2
items = check_pipeline.getAllItems()
items = self.getAllItems('tenant-one', 'check')
self.assertEqual(len(items), 3)
self.assertEqual(items[0].changes[0].number, '1')
@ -2999,7 +2995,7 @@ class TestScheduler(ZuulTestCase):
# The live copy of B,1 should be gone, and it's non-live copy of A,1
# but we should have a new B,2 (still based on A,1)
items = check_pipeline.getAllItems()
items = self.getAllItems('tenant-one', 'check')
self.assertEqual(len(items), 3)
self.assertEqual(items[0].changes[0].number, '1')
@ -3065,8 +3061,6 @@ class TestScheduler(ZuulTestCase):
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
check_pipeline = tenant.layout.pipelines['check']
# Add two git-dependent changes
B.setDependsOn(A, 1)
@ -3075,7 +3069,7 @@ class TestScheduler(ZuulTestCase):
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
# A live item, and a non-live/live pair
items = check_pipeline.getAllItems()
items = self.getAllItems('tenant-one', 'check')
self.assertEqual(len(items), 3)
self.assertEqual(items[0].changes[0].number, '1')
@ -3093,7 +3087,7 @@ class TestScheduler(ZuulTestCase):
# The live copy of A should be gone, but the non-live and B
# should continue
items = check_pipeline.getAllItems()
items = self.getAllItems('tenant-one', 'check')
self.assertEqual(len(items), 2)
self.assertEqual(items[0].changes[0].number, '1')
@ -3319,9 +3313,7 @@ class TestScheduler(ZuulTestCase):
self.assertEqual(A.reported, False)
# Check queue is empty afterwards
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
check_pipeline = tenant.layout.pipelines['check']
items = check_pipeline.getAllItems()
items = self.getAllItems('tenant-one', 'check')
self.assertEqual(len(items), 0)
self.assertEqual(len(self.history), 0)
@ -3764,8 +3756,8 @@ class TestScheduler(ZuulTestCase):
gate.state.refresh(ctx)
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
q2 = gate.getQueue(project2.canonical_name, None)
q1 = gate.manager.state.getQueue(project1.canonical_name, None)
q2 = gate.manager.state.getQueue(project2.canonical_name, None)
self.assertEqual(q1.name, 'integrated')
self.assertEqual(q2.name, 'integrated')
@ -3790,8 +3782,8 @@ class TestScheduler(ZuulTestCase):
gate.state.refresh(ctx)
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
q2 = gate.getQueue(project2.canonical_name, None)
q1 = gate.manager.state.getQueue(project1.canonical_name, None)
q2 = gate.manager.state.getQueue(project2.canonical_name, None)
self.assertEqual(q1.name, 'integrated')
self.assertEqual(q2.name, 'integrated')
@ -3816,8 +3808,8 @@ class TestScheduler(ZuulTestCase):
gate.state.refresh(ctx)
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
q2 = gate.getQueue(project2.canonical_name, None)
q1 = gate.manager.state.getQueue(project1.canonical_name, None)
q2 = gate.manager.state.getQueue(project2.canonical_name, None)
self.assertEqual(q1.name, 'integrated')
self.assertEqual(q2.name, 'integrated')
@ -3841,8 +3833,8 @@ class TestScheduler(ZuulTestCase):
gate.state.refresh(ctx)
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
q2 = gate.getQueue(project2.canonical_name, None)
q1 = gate.manager.state.getQueue(project1.canonical_name, None)
q2 = gate.manager.state.getQueue(project2.canonical_name, None)
self.assertEqual(q1.name, 'integrated')
self.assertEqual(q2.name, 'integrated')
@ -3867,8 +3859,8 @@ class TestScheduler(ZuulTestCase):
gate.state.refresh(ctx)
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.getQueue(project1.canonical_name, None)
q2 = gate.getQueue(project2.canonical_name, None)
q1 = gate.manager.state.getQueue(project1.canonical_name, None)
q2 = gate.manager.state.getQueue(project2.canonical_name, None)
self.assertEqual(q1.name, 'integrated')
self.assertEqual(q2.name, 'integrated')
@ -3995,13 +3987,13 @@ class TestScheduler(ZuulTestCase):
self.scheds.execute(lambda app: app.sched.reconfigure(app.config))
self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
pipeline = tenant.layout.pipelines['gate']
items = pipeline.getAllItems()
items = self.getAllItems('tenant-one', 'gate')
self.assertEqual(len(items), 1)
self.assertIsNone(items[0].layout_uuid)
# Assert that the layout cache is empty after a reconfiguration.
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
pipeline = tenant.layout.pipelines['gate']
self.assertEqual(pipeline.manager._layout_cache, {})
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B',
@ -4011,7 +4003,7 @@ class TestScheduler(ZuulTestCase):
self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
self.waitUntilSettled()
items = pipeline.getAllItems()
items = self.getAllItems('tenant-one', 'gate')
self.assertEqual(len(items), 2)
for item in items:
# Layout UUID should be set again for all live items. It had to
@ -4071,9 +4063,7 @@ class TestScheduler(ZuulTestCase):
self.scheds.execute(lambda app: app.sched.reconfigure(app.config))
self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
pipeline = tenant.layout.pipelines['check']
items = pipeline.getAllItems()
items = self.getAllItems('tenant-one', 'check')
self.assertEqual(len(items), 2)
# Assert that the layout UUID of the live item is reset during a
@ -4082,6 +4072,8 @@ class TestScheduler(ZuulTestCase):
self.assertIsNone(items[1].layout_uuid)
# Cache should be empty after a reconfiguration
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
pipeline = tenant.layout.pipelines['check']
self.assertEqual(pipeline.manager._layout_cache, {})
self.executor_server.hold_jobs_in_build = False
@ -5908,8 +5900,7 @@ For CI problems and help debugging, contact ci@example.org"""
self.executor_server.release('.*-test*')
self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
items = tenant.layout.pipelines['check'].getAllItems()
items = self.getAllItems('tenant-one', 'check')
build_set = items[0].current_build_set
job = list(filter(lambda j: j.name == 'project-test1',
items[0].getJobs()))[0]
@ -5950,8 +5941,7 @@ For CI problems and help debugging, contact ci@example.org"""
self.executor_server.zk_client.client.start()
# Find the build in the scheduler so we can check its status
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
items = tenant.layout.pipelines['gate'].getAllItems()
items = self.getAllItems('tenant-one', 'gate')
builds = items[0].current_build_set.getBuilds()
build = builds[0]
@ -6490,8 +6480,7 @@ For CI problems and help debugging, contact ci@example.org"""
self.assertEqual(jobs[0].state, zuul.model.MergeRequest.HOLD)
self.assertEqual(len(jobs), 1)
pipeline = tenant.layout.pipelines['post']
self.assertEqual(len(pipeline.getAllItems()), 1)
self.assertEqual(len(self.getAllItems('tenant-one', 'post')), 1)
self.merger_api.release()
self.waitUntilSettled()
@ -6807,9 +6796,9 @@ class TestChangeQueues(ZuulTestCase):
])
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
_, p = tenant.getProject(project)
q1 = tenant.layout.pipelines['gate'].getQueue(
q1 = tenant.layout.pipelines['gate'].manager.state.getQueue(
p.canonical_name, 'master')
q2 = tenant.layout.pipelines['gate'].getQueue(
q2 = tenant.layout.pipelines['gate'].manager.state.getQueue(
p.canonical_name, 'stable')
self.assertEqual(q1.name, queue_name)
self.assertEqual(q2.name, queue_name)
@ -6866,11 +6855,11 @@ class TestChangeQueues(ZuulTestCase):
])
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
_, p = tenant.getProject(project)
q1 = tenant.layout.pipelines['gate'].getQueue(
q1 = tenant.layout.pipelines['gate'].manager.state.getQueue(
p.canonical_name, 'master')
q2 = tenant.layout.pipelines['gate'].getQueue(
q2 = tenant.layout.pipelines['gate'].manager.state.getQueue(
p.canonical_name, 'stable')
q3 = tenant.layout.pipelines['gate'].getQueue(
q3 = tenant.layout.pipelines['gate'].manager.state.getQueue(
p.canonical_name, None)
# There should be no branch specific queues anymore
@ -8489,7 +8478,6 @@ class TestSemaphore(ZuulTestCase):
"Test abandon with job semaphores"
self.executor_server.hold_jobs_in_build = True
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
check_pipeline = tenant.layout.pipelines['check']
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.assertEqual(
@ -8507,7 +8495,7 @@ class TestSemaphore(ZuulTestCase):
self.waitUntilSettled()
# The check pipeline should be empty
items = check_pipeline.getAllItems()
items = self.getAllItems('tenant-one', 'check')
self.assertEqual(len(items), 0)
# The semaphore should be released
@ -8528,7 +8516,6 @@ class TestSemaphore(ZuulTestCase):
self.fake_nodepool.paused = True
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
check_pipeline = tenant.layout.pipelines['check']
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.assertEqual(
@ -8546,7 +8533,7 @@ class TestSemaphore(ZuulTestCase):
self.waitUntilSettled()
# The check pipeline should be empty
items = check_pipeline.getAllItems()
items = self.getAllItems('tenant-one', 'check')
self.assertEqual(len(items), 0)
# The semaphore should be released
@ -8573,7 +8560,6 @@ class TestSemaphore(ZuulTestCase):
self.fake_nodepool.paused = True
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
check_pipeline = tenant.layout.pipelines['check']
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.assertEqual(
@ -8604,7 +8590,7 @@ class TestSemaphore(ZuulTestCase):
self.waitUntilSettled()
# The check pipeline should be empty
items = check_pipeline.getAllItems()
items = self.getAllItems('tenant-one', 'check')
self.assertEqual(len(items), 0)
# The semaphore should be released
@ -8619,7 +8605,6 @@ class TestSemaphore(ZuulTestCase):
"Test new patchset with job semaphores"
self.executor_server.hold_jobs_in_build = True
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
check_pipeline = tenant.layout.pipelines['check']
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
self.assertEqual(
@ -8641,7 +8626,7 @@ class TestSemaphore(ZuulTestCase):
len(tenant.semaphore_handler.semaphoreHolders("test-semaphore")),
1)
items = check_pipeline.getAllItems()
items = self.getAllItems('tenant-one', 'check')
self.assertEqual(items[0].changes[0].number, '1')
self.assertEqual(items[0].changes[0].patchset, '2')
self.assertTrue(items[0].live)
@ -8718,8 +8703,7 @@ class TestSemaphore(ZuulTestCase):
1)
# Save some variables for later use while the job is running
check_pipeline = tenant.layout.pipelines['check']
item = check_pipeline.getAllItems()[0]
item = self.getAllItems('tenant-one', 'check')[0]
job = list(filter(lambda j: j.name == 'semaphore-one-test1',
item.getJobs()))[0]
@ -9243,8 +9227,7 @@ class TestSchedulerFailFast(ZuulTestCase):
self.assertEqual(self.builds[1].name, 'project-test2')
# But both changes should still be in the pipeline
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
items = tenant.layout.pipelines['gate'].getAllItems()
items = self.getAllItems('tenant-one', 'gate')
self.assertEqual(len(items), 2)
self.assertEqual(A.reported, 1)
self.assertEqual(B.reported, 1)

View File

@ -1720,9 +1720,6 @@ class TestInRepoConfig(ZuulTestCase):
def test_dynamic_config_new_patchset(self):
self.executor_server.hold_jobs_in_build = True
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
check_pipeline = tenant.layout.pipelines['check']
in_repo_conf = textwrap.dedent(
"""
- job:
@ -1753,7 +1750,7 @@ class TestInRepoConfig(ZuulTestCase):
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
items = check_pipeline.getAllItems()
items = self.getAllItems('tenant-one', 'check')
self.assertEqual(items[0].changes[0].number, '1')
self.assertEqual(items[0].changes[0].patchset, '1')
self.assertTrue(items[0].live)
@ -1783,7 +1780,7 @@ class TestInRepoConfig(ZuulTestCase):
self.waitUntilSettled()
items = check_pipeline.getAllItems()
items = self.getAllItems('tenant-one', 'check')
self.assertEqual(items[0].changes[0].number, '1')
self.assertEqual(items[0].changes[0].patchset, '2')
self.assertTrue(items[0].live)
@ -4079,7 +4076,7 @@ class TestInRepoJoin(ZuulTestCase):
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
self.waitUntilSettled()
items = gate_pipeline.getAllItems()
items = self.getAllItems('tenant-one', 'gate')
self.assertEqual(items[0].changes[0].number, '1')
self.assertEqual(items[0].changes[0].patchset, '1')
self.assertTrue(items[0].live)

View File

@ -3219,8 +3219,7 @@ class TestTenantScopedWebApi(BaseTestWeb):
self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
items = tenant.layout.pipelines['gate'].getAllItems()
items = self.getAllItems('tenant-one', 'gate')
enqueue_times = {}
for item in items:
enqueue_times[str(item.changes[0])] = item.enqueue_time
@ -3247,7 +3246,7 @@ class TestTenantScopedWebApi(BaseTestWeb):
self.assertEqual(True, data)
# ensure that enqueue times are durable
items = tenant.layout.pipelines['gate'].getAllItems()
items = self.getAllItems('tenant-one', 'gate')
for item in items:
self.assertEqual(
enqueue_times[str(item.changes[0])], item.enqueue_time)
@ -3308,8 +3307,7 @@ class TestTenantScopedWebApi(BaseTestWeb):
self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
items = tenant.layout.pipelines['gate'].getAllItems()
items = self.getAllItems('tenant-one', 'gate')
enqueue_times = {}
for item in items:
enqueue_times[str(item.changes[0])] = item.enqueue_time
@ -3336,7 +3334,7 @@ class TestTenantScopedWebApi(BaseTestWeb):
self.assertEqual(True, data)
# ensure that enqueue times are durable
items = tenant.layout.pipelines['gate'].getAllItems()
items = self.getAllItems('tenant-one', 'gate')
for item in items:
self.assertEqual(
enqueue_times[str(item.changes[0])], item.enqueue_time)
@ -3399,9 +3397,8 @@ class TestTenantScopedWebApi(BaseTestWeb):
self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
items = [i for i in tenant.layout.pipelines['check'].getAllItems()
if i.live]
items = self.getAllItems('tenant-one', 'check')
items = [i for i in items if i.live]
enqueue_times = {}
for item in items:
enqueue_times[str(item.changes[0])] = item.enqueue_time
@ -3429,8 +3426,8 @@ class TestTenantScopedWebApi(BaseTestWeb):
self.waitUntilSettled()
# ensure that enqueue times are durable
items = [i for i in tenant.layout.pipelines['check'].getAllItems()
if i.live]
items = self.getAllItems('tenant-one', 'check')
items = [i for i in items if i.live]
for item in items:
self.assertEqual(
enqueue_times[str(item.changes[0])], item.enqueue_time)
@ -3438,8 +3435,8 @@ class TestTenantScopedWebApi(BaseTestWeb):
# We can't reliably test for side effects in the check
# pipeline since the change queues are independent, so we
# directly examine the queues.
queue_items = [(item.changes[0].number, item.live) for item in
tenant.layout.pipelines['check'].getAllItems()]
items = self.getAllItems('tenant-one', 'check')
queue_items = [(item.changes[0].number, item.live) for item in items]
expected = [('1', False),
('2', True),
('1', False),
@ -4102,8 +4099,7 @@ class TestCLIViaWebApi(BaseTestWeb):
self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
items = tenant.layout.pipelines['gate'].getAllItems()
items = self.getAllItems('tenant-one', 'gate')
enqueue_times = {}
for item in items:
enqueue_times[str(item.changes[0])] = item.enqueue_time
@ -4129,7 +4125,7 @@ class TestCLIViaWebApi(BaseTestWeb):
self.waitUntilSettled()
# ensure that enqueue times are durable
items = tenant.layout.pipelines['gate'].getAllItems()
items = self.getAllItems('tenant-one', 'gate')
for item in items:
self.assertEqual(
enqueue_times[str(item.changes[0])], item.enqueue_time)

View File

@ -352,8 +352,7 @@ class TestZuulClientAdmin(BaseTestWeb):
self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
items = tenant.layout.pipelines['gate'].getAllItems()
items = self.getAllItems('tenant-one', 'gate')
enqueue_times = {}
for item in items:
enqueue_times[str(item.changes[0])] = item.enqueue_time
@ -379,7 +378,7 @@ class TestZuulClientAdmin(BaseTestWeb):
self.waitUntilSettled()
# ensure that enqueue times are durable
items = tenant.layout.pipelines['gate'].getAllItems()
items = self.getAllItems('tenant-one', 'gate')
for item in items:
self.assertEqual(
enqueue_times[str(item.changes[0])], item.enqueue_time)

View File

@ -1096,6 +1096,7 @@ class Client(zuul.cmd.ZuulApp):
with ZKContext(zk_client, plock, None, self.log) as context:
pipeline.manager = IndependentPipelineManager(
None, pipeline)
pipeline.manager.tenant = tenant
pipeline.manager.state = PipelineState.new(
context, _path=path, layout_uuid=None)
PipelineChangeList.new(context, pipeline=pipeline)

View File

@ -51,7 +51,8 @@ class DynamicChangeQueueContextManager(object):
if (self.allow_delete and
self.change_queue and
not self.change_queue.queue):
self.change_queue.pipeline.removeQueue(self.change_queue)
self.change_queue.pipeline.manager.state.removeQueue(
self.change_queue)
class StaticChangeQueueContextManager(object):
@ -227,7 +228,7 @@ class PipelineManager(metaclass=ABCMeta):
queue_projects = set(self.getRelativePriorityQueue(
change.project))
items = []
for i in self.pipeline.getAllItems():
for i in self.state.getAllItems():
if not i.live:
continue
item_projects = set([
@ -280,7 +281,7 @@ class PipelineManager(metaclass=ABCMeta):
def _maintainCache(self):
active_layout_uuids = set()
referenced_change_keys = set()
for item in self.pipeline.getAllItems():
for item in self.state.getAllItems():
if item.layout_uuid:
active_layout_uuids.add(item.layout_uuid)
@ -308,7 +309,7 @@ class PipelineManager(metaclass=ABCMeta):
def isChangeAlreadyInPipeline(self, change):
# Checks live items in the pipeline
for item in self.pipeline.getAllItems():
for item in self.state.getAllItems():
if not item.live:
continue
for c in item.changes:
@ -350,7 +351,7 @@ class PipelineManager(metaclass=ABCMeta):
self.sched.query_cache.clearIfOlderThan(event)
to_refresh = set()
for item in self.pipeline.getAllItems():
for item in self.state.getAllItems():
for item_change in item.changes:
if not isinstance(item_change, model.Change):
continue
@ -466,7 +467,7 @@ class PipelineManager(metaclass=ABCMeta):
if change_queue is not None:
items = change_queue.queue
else:
items = self.pipeline.getAllItems()
items = self.state.getAllItems()
for item in items:
for c in item.changes:
@ -476,7 +477,7 @@ class PipelineManager(metaclass=ABCMeta):
def findOldVersionOfChangeAlreadyInQueue(self, change):
# Return the item and the old version of the change
for item in self.pipeline.getAllItems():
for item in self.state.getAllItems():
if not item.live:
continue
for item_change in item.changes:
@ -980,7 +981,7 @@ class PipelineManager(metaclass=ABCMeta):
f"pipeline max of {pipeline_max}")
count = additional
for item in self.pipeline.getAllItems():
for item in self.state.getAllItems():
count += len(item.changes)
if count > pipeline_max:
return FalseWithReason(
@ -2535,7 +2536,7 @@ class PipelineManager(metaclass=ABCMeta):
dt = None
item_changes = 0
changes = sum(len(i.changes)
for i in self.pipeline.getAllItems())
for i in self.state.getAllItems())
# TODO(jeblair): add items keys like changes
tenant = self.pipeline.tenant

View File

@ -278,4 +278,4 @@ class DependentPipelineManager(SharedQueuePipelineManager):
# remove the queue (if empty)
if item.queue.dynamic:
if not item.queue.queue:
self.pipeline.removeQueue(item.queue)
self.state.removeQueue(item.queue)

View File

@ -35,7 +35,7 @@ class IndependentPipelineManager(PipelineManager):
pipeline=self.pipeline,
dynamic=True)
change_queue.addProject(change.project, None)
self.pipeline.addQueue(change_queue)
self.state.addQueue(change_queue)
log.debug("Dynamically created queue %s", id(change_queue))
return DynamicChangeQueueContextManager(
change_queue, allow_delete=True)
@ -125,4 +125,4 @@ class IndependentPipelineManager(PipelineManager):
# An independent pipeline manager dynamically removes empty
# queues
if not item.queue.queue:
self.pipeline.removeQueue(item.queue)
self.state.removeQueue(item.queue)

View File

@ -37,4 +37,4 @@ class SerialPipelineManager(SharedQueuePipelineManager):
# remove the queue (if empty)
if item.queue.dynamic:
if not item.queue.queue:
self.pipeline.removeQueue(item.queue)
self.state.removeQueue(item.queue)

View File

@ -35,10 +35,9 @@ class ChangeQueueManager:
change_queue = self.created_for_branches.get(branch)
if not change_queue:
p = self.pipeline_manager.pipeline
name = self.name or project.name
change_queue = self.pipeline_manager.constructChangeQueue(name)
p.addQueue(change_queue)
self.pipeline_manager.state.addQueue(change_queue)
self.created_for_branches[branch] = change_queue
if not change_queue.matches(project.canonical_name, branch):
@ -111,8 +110,8 @@ class SharedQueuePipelineManager(PipelineManager, metaclass=ABCMeta):
# Ignore the existing queue, since we can always get the correct queue
# from the pipeline. This avoids enqueuing changes in a wrong queue
# e.g. during re-configuration.
queue = self.pipeline.getQueue(change.project.canonical_name,
change.branch)
queue = self.state.getQueue(change.project.canonical_name,
change.branch)
if queue:
return StaticChangeQueueContextManager(queue)
else:
@ -133,7 +132,7 @@ class SharedQueuePipelineManager(PipelineManager, metaclass=ABCMeta):
)
# No specific per-branch queue matched so look again with no branch
queue = self.pipeline.getQueue(change.project.canonical_name, None)
queue = self.state.getQueue(change.project.canonical_name, None)
if queue:
return StaticChangeQueueContextManager(queue)
@ -144,7 +143,7 @@ class SharedQueuePipelineManager(PipelineManager, metaclass=ABCMeta):
pipeline=self.pipeline,
dynamic=True)
change_queue.addProject(change.project, None)
self.pipeline.addQueue(change_queue)
self.state.addQueue(change_queue)
log.debug("Dynamically created queue %s", change_queue)
return DynamicChangeQueueContextManager(
change_queue, allow_delete=True)

View File

@ -51,7 +51,7 @@ class SupercedentPipelineManager(PipelineManager):
window_decrease_type='none',
dynamic=True)
change_queue.addProject(change.project, None)
self.pipeline.addQueue(change_queue)
self.pipeline.manager.state.addQueue(change_queue)
log.debug("Dynamically created queue %s", change_queue)
return DynamicChangeQueueContextManager(
change_queue, allow_delete=True)
@ -88,4 +88,4 @@ class SupercedentPipelineManager(PipelineManager):
# A supercedent pipeline manager dynamically removes empty
# queues
if not item.queue.queue:
self.pipeline.removeQueue(item.queue)
self.state.removeQueue(item.queue)

View File

@ -650,39 +650,6 @@ class Pipeline(object):
def setManager(self, manager):
self.manager = manager
def addQueue(self, queue):
with self.state.activeContext(self.manager.current_context):
self.queues.append(queue)
def getQueue(self, project_cname, branch):
# Queues might be branch specific so match with branch
for queue in self.queues:
if queue.matches(project_cname, branch):
return queue
return None
def removeQueue(self, queue):
if queue in self.queues:
with self.state.activeContext(self.manager.current_context):
self.queues.remove(queue)
queue.delete(self.manager.current_context)
def promoteQueue(self, queue):
if queue not in self.queues:
return
with self.state.activeContext(self.manager.current_context):
self.queues.remove(queue)
self.queues.insert(0, queue)
def getAllItems(self, include_old=False):
items = []
for shared_queue in self.queues:
items.extend(shared_queue.queue)
if include_old:
for shared_queue in self.state.old_queues:
items.extend(shared_queue.queue)
return items
def formatStatusJSON(self, websocket_url=None):
j_pipeline = dict(name=self.name,
description=self.description,
@ -827,6 +794,39 @@ class PipelineState(zkobject.ZKObject):
with self.activeContext(context):
self.old_queues.remove(queue)
def addQueue(self, queue):
with self.activeContext(self.pipeline.manager.current_context):
self.queues.append(queue)
def getQueue(self, project_cname, branch):
# Queues might be branch specific so match with branch
for queue in self.queues:
if queue.matches(project_cname, branch):
return queue
return None
def removeQueue(self, queue):
if queue in self.queues:
with self.activeContext(self.pipeline.manager.current_context):
self.queues.remove(queue)
queue.delete(self.pipeline.manager.current_context)
def promoteQueue(self, queue):
if queue not in self.queues:
return
with self.activeContext(self.pipeline.manager.current_context):
self.queues.remove(queue)
self.queues.insert(0, queue)
def getAllItems(self, include_old=False):
items = []
for shared_queue in self.queues:
items.extend(shared_queue.queue)
if include_old:
for shared_queue in self.old_queues:
items.extend(shared_queue.queue)
return items
def serialize(self, context):
if self._read_only:
raise RuntimeError("Attempt to serialize read-only pipeline state")
@ -962,12 +962,6 @@ class PipelineState(zkobject.ZKObject):
})
return data
def _getKnownItems(self):
items = []
for queue in (*self.old_queues, *self.queues):
items.extend(queue.queue)
return items
def cleanup(self, context):
pipeline_path = self.getPath()
try:
@ -976,7 +970,7 @@ class PipelineState(zkobject.ZKObject):
except NoNodeError:
all_items = set()
known_item_objs = self._getKnownItems()
known_item_objs = self.getAllItems(include_old=True)
known_items = {i.uuid for i in known_item_objs}
items_referenced_by_builds = set()
for i in known_item_objs:
@ -6824,7 +6818,7 @@ class QueueItem(zkobject.ZKObject):
# Look for this item in other queues in the pipeline.
item = None
found = False
for item in self.pipeline.getAllItems():
for item in self.pipeline.state.getAllItems():
if item.live and set(item.changes) == set(self.changes):
found = True
break

View File

@ -718,7 +718,8 @@ class Scheduler(threading.Thread):
pipeline.state.refresh(ctx, read_only=True)
# In case we're in the middle of a reconfig,
# include the old queue items.
for item in pipeline.getAllItems(include_old=True):
for item in pipeline.manager.state.getAllItems(
include_old=True):
nrs = item.current_build_set.getNodeRequests()
for _, req_id in nrs:
outstanding_requests.add(req_id)
@ -846,7 +847,8 @@ class Scheduler(threading.Thread):
self.createZKContext(lock, self.log) as ctx):
pipeline.state.refresh(ctx, read_only=True)
# add any blobstore references
for item in pipeline.getAllItems(include_old=True):
for item in pipeline.manager.state.getAllItems(
include_old=True):
live_blobs.update(item.getBlobKeys())
with self.createZKContext(None, self.log) as ctx:
blobstore = BlobStore(ctx)
@ -1715,12 +1717,12 @@ class Scheduler(threading.Thread):
# Attempt to keep window sizes from shrinking where possible
project, branch = shared_queue.project_branches[0]
new_queue = new_pipeline.getQueue(project, branch)
new_queue = new_pipeline.manager.state.getQueue(project, branch)
if new_queue and shared_queue.window and (not static_window):
new_queue.updateAttributes(
context, window=max(shared_queue.window,
new_queue.window_floor))
new_pipeline.state.removeOldQueue(context, shared_queue)
new_pipeline.manager.state.removeOldQueue(context, shared_queue)
for item in items_to_remove:
log = get_annotated_logger(self.log, item.event)
log.info("Removing item %s during reconfiguration", item)
@ -1868,7 +1870,7 @@ class Scheduler(threading.Thread):
builds_to_cancel = []
requests_to_cancel = []
for item in pipeline.getAllItems():
for item in pipeline.manager.state.getAllItems():
with item.activeContext(pipeline.manager.current_context):
item.item_ahead = None
item.items_behind = []
@ -2004,7 +2006,7 @@ class Scheduler(threading.Thread):
quiet=True,
ignore_requirements=True)
# Regardless, move this shared change queue to the head.
pipeline.promoteQueue(change_queue)
pipeline.manager.state.promoteQueue(change_queue)
def _doDequeueEvent(self, event):
tenant = self.abide.tenants.get(event.tenant_name)
@ -2116,7 +2118,7 @@ class Scheduler(threading.Thread):
waiting = False
for tenant in self.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
for item in pipeline.getAllItems():
for item in pipeline.manager.state.getAllItems():
for build in item.current_build_set.getBuilds():
if build.result is None:
self.log.debug("%s waiting on %s" %
@ -2829,7 +2831,7 @@ class Scheduler(threading.Thread):
)
return
for item in pipeline.getAllItems():
for item in pipeline.manager.state.getAllItems():
# If the provided buildset UUID doesn't match any current one,
# we assume that it's not current anymore.
if item.current_build_set.uuid == event.build_set_uuid: