Remove the manager attribute from pipelines

This moves the model.Pipeline class closer to being a tenant and
layout-independent configuration object by removing its "manager"
attribute.  Instead, we will consider the Pipeline class an attribute
of the manager.  The PipelineManager will be the root class for all
information about a pipeline.  It will hold the PipelineState and
PipelineSummary ZKObjects, as well as the Pipeline object which
describes its configuration.

This does change a lot of references, probably more than strictly
necessary.  But since, over the years, we built up several helper
methods to get to a pipeline's queues from various objects, it was
becoming unclear how to actually get to the objects we are interested
in after this change.  So to avoid increasing the length of our
object reference chains, this change rips off the bandage and
directly changes many references.

Change-Id: Ied771c56e4fc5a1e032737811a3818168ef36fce
This commit is contained in:
James E. Blair
2025-03-11 11:21:19 -07:00
parent ae639864a0
commit bb53e772d3
39 changed files with 740 additions and 672 deletions

View File

@ -2931,19 +2931,19 @@ class ZuulTestCase(BaseTestCase):
self.assertCleanZooKeeper()
ipm = zuul.manager.independent.IndependentPipelineManager
for tenant in self.scheds.first.sched.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
if isinstance(pipeline.manager, ipm):
self.assertEqual(len(pipeline.queues), 0)
for manager in tenant.layout.pipeline_managers.values():
if isinstance(manager, ipm):
self.assertEqual(len(manager.state.queues), 0)
def getAllItems(self, tenant_name, pipeline_name):
tenant = self.scheds.first.sched.abide.tenants.get(tenant_name)
manager = tenant.layout.pipelines[pipeline_name].manager
manager = tenant.layout.pipeline_managers[pipeline_name]
items = manager.state.getAllItems()
return items
def getAllQueues(self, tenant_name, pipeline_name):
tenant = self.scheds.first.sched.abide.tenants.get(tenant_name)
manager = tenant.layout.pipelines[pipeline_name].manager
manager = tenant.layout.pipeline_managers[pipeline_name]
return manager.state.queues
def shutdown(self):
@ -3090,8 +3090,8 @@ class ZuulTestCase(BaseTestCase):
def getCurrentBuilds(self):
for tenant in self.scheds.first.sched.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
for item in pipeline.manager.state.getAllItems():
for manager in tenant.layout.pipeline_managers.values():
for item in manager.state.getAllItems():
for build in item.current_build_set.builds.values():
yield build
@ -3433,11 +3433,12 @@ class ZuulTestCase(BaseTestCase):
for tenant in sched.abide.tenants.values():
with tenant_read_lock(self.zk_client, tenant.name):
for pipeline in tenant.layout.pipelines.values():
manager = tenant.layout.pipeline_managers[pipeline.name]
with (pipeline_lock(self.zk_client, tenant.name,
pipeline.name) as lock,
self.createZKContext(lock) as ctx):
with pipeline.manager.currentContext(ctx):
pipeline.state.refresh(ctx)
with manager.currentContext(ctx):
manager.state.refresh(ctx)
# return the context in case the caller wants to examine iops
return ctx
@ -3480,7 +3481,8 @@ class ZuulTestCase(BaseTestCase):
self.log.info("Running build: %s" % build)
for tenant in self.scheds.first.sched.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
for pipeline_queue in pipeline.queues:
manager = tenant.layout.pipeline_managers.get(pipeline.name)
for pipeline_queue in manager.state.queues:
if len(pipeline_queue.queue) != 0:
status = ''
for item in pipeline_queue.queue:
@ -3526,7 +3528,8 @@ class ZuulTestCase(BaseTestCase):
# Make sure there are no orphaned jobs
for tenant in self.scheds.first.sched.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
for pipeline_queue in pipeline.queues:
manager = tenant.layout.pipeline_managers.get(pipeline.name)
for pipeline_queue in manager.state.queues:
if len(pipeline_queue.queue) != 0:
print('pipeline %s queue %s contents %s' % (
pipeline.name, pipeline_queue.name,

View File

@ -453,7 +453,7 @@ class TestGerritCircularDependencies(ZuulTestCase):
# Make sure the out-of-cycle change (A) is enqueued after the cycle.
tenant = self.scheds.first.sched.abide.tenants.get("tenant-one")
queue_change_numbers = []
for queue in tenant.layout.pipelines["gate"].queues:
for queue in tenant.layout.pipeline_managers["gate"].state.queues:
for item in queue.queue:
for change in item.changes:
queue_change_numbers.append(change.number)
@ -878,7 +878,7 @@ class TestGerritCircularDependencies(ZuulTestCase):
tenant = self.scheds.first.sched.abide.tenants.get("tenant-one")
# Make the gate window smaller than the length of the cycle
for queue in tenant.layout.pipelines["gate"].queues:
for queue in tenant.layout.pipeline_managers["gate"].state.queues:
if any("org/project" in p.name for p in queue.projects):
queue.window = 1
@ -1006,8 +1006,9 @@ class TestGerritCircularDependencies(ZuulTestCase):
self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get("tenant-one")
self.assertEqual(len(tenant.layout.pipelines["check"].queues), 1)
queue = tenant.layout.pipelines["check"].queues[0].queue
manager = tenant.layout.pipeline_managers["check"]
self.assertEqual(len(manager.state.queues), 1)
queue = manager.state.queues[0].queue
self.assertEqual(len(queue), 1)
self.assertEqual(len(self.builds), 2)
@ -2601,8 +2602,8 @@ class TestGerritCircularDependencies(ZuulTestCase):
def _assert_job_deduplication_check(self):
# Make sure there are no leaked queue items
tenant = self.scheds.first.sched.abide.tenants.get("tenant-one")
pipeline = tenant.layout.pipelines["check"]
pipeline_path = pipeline.state.getPath()
manager = tenant.layout.pipeline_managers["check"]
pipeline_path = manager.state.getPath()
all_items = set(self.zk_client.client.get_children(
f"{pipeline_path}/item"))
self.assertEqual(len(all_items), 0)
@ -2691,8 +2692,8 @@ class TestGerritCircularDependencies(ZuulTestCase):
# dict(name="common-job", result="SUCCESS", changes="2,1 1,1"),
], ordered=False)
tenant = self.scheds.first.sched.abide.tenants.get("tenant-one")
pipeline = tenant.layout.pipelines["check"]
pipeline_path = pipeline.state.getPath()
manager = tenant.layout.pipeline_managers["check"]
pipeline_path = manager.state.getPath()
all_items = set(self.zk_client.client.get_children(
f"{pipeline_path}/item"))
self.assertEqual(len(all_items), 0)

View File

@ -516,8 +516,9 @@ class TestOnlineZKOperations(ZuulTestCase):
tenant = sched.abide.tenants[tenant.name]
new_layout_uuid = tenant.layout.uuid
self.assertEqual(old_layout_uuid, new_layout_uuid)
self.assertEqual(tenant.layout.pipelines[pipeline].state.layout_uuid,
old_layout_uuid)
self.assertEqual(
tenant.layout.pipeline_managers[pipeline].state.layout_uuid,
old_layout_uuid)
def test_delete_pipeline_check(self):
self._test_delete_pipeline('check')

View File

@ -279,7 +279,8 @@ class TestGerritToGithubCRD(ZuulTestCase):
(B.head_sha,))
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
self.assertEqual(len(tenant.layout.pipelines['check'].queues), 0)
self.assertEqual(
len(tenant.layout.pipeline_managers['check'].state.queues), 0)
def test_crd_check_duplicate(self):
"Test duplicate check in independent pipelines"
@ -353,8 +354,9 @@ class TestGerritToGithubCRD(ZuulTestCase):
# Make sure the items still share a change queue, and the
# first one is not live.
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
self.assertEqual(len(tenant.layout.pipelines['check'].queues), 1)
queue = tenant.layout.pipelines['check'].queues[0]
self.assertEqual(
len(tenant.layout.pipeline_managers['check'].state.queues), 1)
queue = tenant.layout.pipeline_managers['check'].state.queues[0]
first_item = queue.queue[0]
for item in queue.queue:
self.assertEqual(item.queue, first_item.queue)
@ -374,7 +376,8 @@ class TestGerritToGithubCRD(ZuulTestCase):
'project-merge', project1).changes
self.assertEqual(changes, '1,%s 1,1' %
(B.head_sha,))
self.assertEqual(len(tenant.layout.pipelines['check'].queues), 0)
self.assertEqual(
len(tenant.layout.pipeline_managers['check'].state.queues), 0)
@skipIfMultiScheduler()
def test_crd_check_reconfiguration(self):
@ -740,7 +743,8 @@ class TestGithubToGerritCRD(ZuulTestCase):
(A.head_sha,))
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
self.assertEqual(len(tenant.layout.pipelines['check'].queues), 0)
self.assertEqual(
len(tenant.layout.pipeline_managers['check'].state.queues), 0)
def test_crd_check_duplicate(self):
"Test duplicate check in independent pipelines"
@ -789,7 +793,8 @@ class TestGithubToGerritCRD(ZuulTestCase):
changes = self.getJobFromHistory(
'project-merge', 'gerrit/project1').changes
self.assertEqual(changes, '1,1')
self.assertEqual(len(tenant.layout.pipelines['check'].queues), 0)
self.assertEqual(
len(tenant.layout.pipeline_managers['check'].state.queues), 0)
self.assertIn('Build succeeded', A.comments[0])
@ -811,8 +816,9 @@ class TestGithubToGerritCRD(ZuulTestCase):
# Make sure the items still share a change queue, and the
# first one is not live.
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
self.assertEqual(len(tenant.layout.pipelines['check'].queues), 1)
queue = tenant.layout.pipelines['check'].queues[0]
self.assertEqual(
len(tenant.layout.pipeline_managers['check'].state.queues), 1)
queue = tenant.layout.pipeline_managers['check'].state.queues[0]
first_item = queue.queue[0]
for item in queue.queue:
self.assertEqual(item.queue, first_item.queue)
@ -832,7 +838,8 @@ class TestGithubToGerritCRD(ZuulTestCase):
'project-merge', project1).changes
self.assertEqual(changes, '1,1 1,%s' %
(A.head_sha,))
self.assertEqual(len(tenant.layout.pipelines['check'].queues), 0)
self.assertEqual(
len(tenant.layout.pipeline_managers['check'].state.queues), 0)
@skipIfMultiScheduler()
def test_crd_check_reconfiguration(self):
@ -868,8 +875,9 @@ class TestGithubToGerritCRD(ZuulTestCase):
# Make sure the items still share a change queue, and the
# first one is not live.
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
self.assertEqual(len(tenant.layout.pipelines['check'].queues), 1)
queue = tenant.layout.pipelines['check'].queues[0]
self.assertEqual(
len(tenant.layout.pipeline_managers['check'].state.queues), 1)
queue = tenant.layout.pipeline_managers['check'].state.queues[0]
first_item = queue.queue[0]
for item in queue.queue:
self.assertEqual(item.queue, first_item.queue)
@ -889,7 +897,8 @@ class TestGithubToGerritCRD(ZuulTestCase):
'project-merge', 'github/project2').changes
expected_changes = f'1,1 1,{A.head_sha}'
self.assertEqual(changes, expected_changes)
self.assertEqual(len(tenant.layout.pipelines['check'].queues), 0)
self.assertEqual(
len(tenant.layout.pipeline_managers['check'].state.queues), 0)
# Re-check the same PR again
self.fake_github.emitEvent(gh_event)

View File

@ -427,7 +427,8 @@ class TestGerritCRD(ZuulTestCase):
self.assertEqual(self.history[0].changes, '2,1 1,1')
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
self.assertEqual(len(tenant.layout.pipelines['check'].queues), 0)
self.assertEqual(
len(tenant.layout.pipeline_managers['check'].state.queues), 0)
def test_crd_check_git_depends(self):
"Test single-repo dependencies in independent pipelines"
@ -450,7 +451,8 @@ class TestGerritCRD(ZuulTestCase):
self.assertEqual(self.history[0].changes, '1,1')
self.assertEqual(self.history[-1].changes, '1,1 2,1')
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
self.assertEqual(len(tenant.layout.pipelines['check'].queues), 0)
self.assertEqual(
len(tenant.layout.pipeline_managers['check'].state.queues), 0)
self.assertIn('Build succeeded', A.messages[0])
self.assertIn('Build succeeded', B.messages[0])
@ -516,8 +518,9 @@ class TestGerritCRD(ZuulTestCase):
# Make sure the items still share a change queue, and the
# first one is not live.
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
self.assertEqual(len(tenant.layout.pipelines['check'].queues), 1)
queue = tenant.layout.pipelines['check'].queues[0]
self.assertEqual(
len(tenant.layout.pipeline_managers['check'].state.queues), 1)
queue = tenant.layout.pipeline_managers['check'].state.queues[0]
first_item = queue.queue[0]
for item in queue.queue:
self.assertEqual(item.queue, first_item.queue)
@ -534,7 +537,8 @@ class TestGerritCRD(ZuulTestCase):
self.assertEqual(B.reported, 0)
self.assertEqual(self.history[0].changes, '2,1 1,1')
self.assertEqual(len(tenant.layout.pipelines['check'].queues), 0)
self.assertEqual(
len(tenant.layout.pipeline_managers['check'].state.queues), 0)
@skipIfMultiScheduler()
def test_crd_check_reconfiguration(self):
@ -571,8 +575,8 @@ class TestGerritCRD(ZuulTestCase):
# Make sure none of the items share a change queue, and all
# are live.
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
check_pipeline = tenant.layout.pipelines['check']
self.assertEqual(len(check_pipeline.queues), 3)
check_pipeline = tenant.layout.pipeline_managers['check']
self.assertEqual(len(check_pipeline.state.queues), 3)
self.assertEqual(len(self.getAllItems('tenant-one', 'check')), 3)
for item in self.getAllItems('tenant-one', 'check'):
self.assertTrue(item.live)
@ -765,7 +769,8 @@ class TestGerritCRDAltBaseUrl(ZuulTestCase):
self.assertEqual(self.history[0].changes, '2,1 1,1')
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
self.assertEqual(len(tenant.layout.pipelines['check'].queues), 0)
self.assertEqual(
len(tenant.layout.pipeline_managers['check'].state.queues), 0)
class TestGerritCRDWeb(TestGerritCRD):

View File

@ -50,7 +50,8 @@ class TestGithubCrossRepoDeps(ZuulTestCase):
# There should be no more changes in the queue
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
self.assertEqual(len(tenant.layout.pipelines['check'].queues), 0)
self.assertEqual(
len(tenant.layout.pipeline_managers['check'].state.queues), 0)
@simple_layout('layouts/crd-github.yaml', driver='github')
def test_crd_dependent(self):

View File

@ -599,7 +599,8 @@ class TestGitlabDriver(ZuulTestCase):
# There should be no more changes in the queue
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
self.assertEqual(len(tenant.layout.pipelines['check'].queues), 0)
self.assertEqual(
len(tenant.layout.pipeline_managers['check'].state.queues), 0)
@simple_layout('layouts/requirements-gitlab.yaml', driver='gitlab')
def test_state_require(self):

View File

@ -76,16 +76,17 @@ class TestJob(BaseTestCase):
self.tenant.addTPC(self.tpc)
self.pipeline = model.Pipeline('gate', self.tenant)
self.pipeline.source_context = self.context
self.pipeline.manager = mock.Mock()
self.pipeline.tenant = self.tenant
self.manager = mock.Mock()
self.manager.pipeline = self.pipeline
self.manager.tenant = self.tenant
self.zk_context = LocalZKContext(self.log)
self.pipeline.manager.current_context = self.zk_context
self.pipeline.manager.state = model.PipelineState()
self.pipeline.state._set(pipeline=self.pipeline)
self.layout.addPipeline(self.pipeline)
self.manager.current_context = self.zk_context
self.manager.state = model.PipelineState()
self.manager.state._set(manager=self.manager)
self.layout.addPipeline(self.pipeline, self.manager)
with self.zk_context as ctx:
self.queue = model.ChangeQueue.new(
ctx, pipeline=self.pipeline)
ctx, manager=self.manager)
self.pcontext = configloader.ParseContext(
self.connections, None, self.tenant, AnsibleManager())
@ -312,7 +313,7 @@ class TestJob(BaseTestCase):
self.assertTrue(base.changeMatchesFiles(change))
self.assertFalse(python27.changeMatchesFiles(change))
self.pipeline.manager.getFallbackLayout = mock.Mock(return_value=None)
self.manager.getFallbackLayout = mock.Mock(return_value=None)
with self.zk_context as ctx:
item.freezeJobGraph(self.layout, ctx,
skip_file_matcher=False,

View File

@ -570,7 +570,8 @@ class TestPagureDriver(ZuulTestCase):
# There should be no more changes in the queue
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
self.assertEqual(len(tenant.layout.pipelines['check'].queues), 0)
self.assertEqual(len(
tenant.layout.pipeline_managers['check'].state.queues), 0)
@simple_layout('layouts/crd-pagure.yaml', driver='pagure')
def test_crd_dependent(self):

View File

@ -1160,7 +1160,10 @@ class TestScheduler(ZuulTestCase):
found_job = None
pipeline = self.scheds.first.sched.abide.tenants[
'tenant-one'].layout.pipelines['gate']
manager = self.scheds.first.sched.abide.tenants[
'tenant-one'].layout.pipeline_managers['gate']
pipeline_status = pipeline.formatStatusJSON(
manager,
self.scheds.first.sched.globals.websocket_url)
for queue in pipeline_status['change_queues']:
for head in queue['heads']:
@ -1572,7 +1575,7 @@ class TestScheduler(ZuulTestCase):
event.patch_number = '2'
a = source.getChange(source.getChangeKey(event), event=event)
mgr = tenant.layout.pipelines['gate'].manager
mgr = tenant.layout.pipeline_managers['gate']
self.assertFalse(source.canMerge(a, mgr.getSubmitAllowNeeds()))
A.addApproval('Code-Review', 2)
@ -3745,19 +3748,20 @@ class TestScheduler(ZuulTestCase):
# Change queues are created lazy by the dependent pipeline manager
# so retrieve the queue first without having to really enqueue a
# change first.
gate = tenant.layout.pipelines['gate']
gate = tenant.layout.pipeline_managers['gate']
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
with (pipeline_lock(self.zk_client, tenant.name, gate.name) as lock,
with (pipeline_lock(self.zk_client, tenant.name,
gate.pipeline.name) as lock,
self.createZKContext(lock) as ctx,
gate.manager.currentContext(ctx)):
gate.currentContext(ctx)):
gate.change_list.refresh(ctx)
gate.state.refresh(ctx)
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.manager.state.getQueue(project1.canonical_name, None)
q2 = gate.manager.state.getQueue(project2.canonical_name, None)
gate.getChangeQueue(fake_a, None)
gate.getChangeQueue(fake_b, None)
q1 = gate.state.getQueue(project1.canonical_name, None)
q2 = gate.state.getQueue(project2.canonical_name, None)
self.assertEqual(q1.name, 'integrated')
self.assertEqual(q2.name, 'integrated')
@ -3771,19 +3775,20 @@ class TestScheduler(ZuulTestCase):
# Change queues are created lazy by the dependent pipeline manager
# so retrieve the queue first without having to really enqueue a
# change first.
gate = tenant.layout.pipelines['gate']
gate = tenant.layout.pipeline_managers['gate']
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
with (pipeline_lock(self.zk_client, tenant.name, gate.name) as lock,
with (pipeline_lock(self.zk_client, tenant.name,
gate.pipeline.name) as lock,
self.createZKContext(lock) as ctx,
gate.manager.currentContext(ctx)):
gate.currentContext(ctx)):
gate.change_list.refresh(ctx)
gate.state.refresh(ctx)
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.manager.state.getQueue(project1.canonical_name, None)
q2 = gate.manager.state.getQueue(project2.canonical_name, None)
gate.getChangeQueue(fake_a, None)
gate.getChangeQueue(fake_b, None)
q1 = gate.state.getQueue(project1.canonical_name, None)
q2 = gate.state.getQueue(project2.canonical_name, None)
self.assertEqual(q1.name, 'integrated')
self.assertEqual(q2.name, 'integrated')
@ -3797,19 +3802,20 @@ class TestScheduler(ZuulTestCase):
# Change queues are created lazy by the dependent pipeline manager
# so retrieve the queue first without having to really enqueue a
# change first.
gate = tenant.layout.pipelines['gate']
gate = tenant.layout.pipeline_managers['gate']
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
with (pipeline_lock(self.zk_client, tenant.name, gate.name) as lock,
with (pipeline_lock(self.zk_client, tenant.name,
gate.pipeline.name) as lock,
self.createZKContext(lock) as ctx,
gate.manager.currentContext(ctx)):
gate.currentContext(ctx)):
gate.change_list.refresh(ctx)
gate.state.refresh(ctx)
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.manager.state.getQueue(project1.canonical_name, None)
q2 = gate.manager.state.getQueue(project2.canonical_name, None)
gate.getChangeQueue(fake_a, None)
gate.getChangeQueue(fake_b, None)
q1 = gate.state.getQueue(project1.canonical_name, None)
q2 = gate.state.getQueue(project2.canonical_name, None)
self.assertEqual(q1.name, 'integrated')
self.assertEqual(q2.name, 'integrated')
@ -3822,19 +3828,20 @@ class TestScheduler(ZuulTestCase):
# Change queues are created lazy by the dependent pipeline manager
# so retrieve the queue first without having to really enqueue a
# change first.
gate = tenant.layout.pipelines['gate']
gate = tenant.layout.pipeline_managers['gate']
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
with (pipeline_lock(self.zk_client, tenant.name, gate.name) as lock,
with (pipeline_lock(self.zk_client, tenant.name,
gate.pipeline.name) as lock,
self.createZKContext(lock) as ctx,
gate.manager.currentContext(ctx)):
gate.currentContext(ctx)):
gate.change_list.refresh(ctx)
gate.state.refresh(ctx)
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.manager.state.getQueue(project1.canonical_name, None)
q2 = gate.manager.state.getQueue(project2.canonical_name, None)
gate.getChangeQueue(fake_a, None)
gate.getChangeQueue(fake_b, None)
q1 = gate.state.getQueue(project1.canonical_name, None)
q2 = gate.state.getQueue(project2.canonical_name, None)
self.assertEqual(q1.name, 'integrated')
self.assertEqual(q2.name, 'integrated')
@ -3848,19 +3855,20 @@ class TestScheduler(ZuulTestCase):
# Change queues are created lazy by the dependent pipeline manager
# so retrieve the queue first without having to really enqueue a
# change first.
gate = tenant.layout.pipelines['gate']
gate = tenant.layout.pipeline_managers['gate']
FakeChange = namedtuple('FakeChange', ['project', 'branch'])
fake_a = FakeChange(project1, 'master')
fake_b = FakeChange(project2, 'master')
with (pipeline_lock(self.zk_client, tenant.name, gate.name) as lock,
with (pipeline_lock(self.zk_client, tenant.name,
gate.pipeline.name) as lock,
self.createZKContext(lock) as ctx,
gate.manager.currentContext(ctx)):
gate.currentContext(ctx)):
gate.change_list.refresh(ctx)
gate.state.refresh(ctx)
gate.manager.getChangeQueue(fake_a, None)
gate.manager.getChangeQueue(fake_b, None)
q1 = gate.manager.state.getQueue(project1.canonical_name, None)
q2 = gate.manager.state.getQueue(project2.canonical_name, None)
gate.getChangeQueue(fake_a, None)
gate.getChangeQueue(fake_b, None)
q1 = gate.state.getQueue(project1.canonical_name, None)
q2 = gate.state.getQueue(project2.canonical_name, None)
self.assertEqual(q1.name, 'integrated')
self.assertEqual(q2.name, 'integrated')
@ -3993,8 +4001,8 @@ class TestScheduler(ZuulTestCase):
# Assert that the layout cache is empty after a reconfiguration.
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
pipeline = tenant.layout.pipelines['gate']
self.assertEqual(pipeline.manager._layout_cache, {})
manager = tenant.layout.pipeline_managers['gate']
self.assertEqual(manager._layout_cache, {})
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B',
parent='refs/changes/01/1/1')
@ -4073,8 +4081,8 @@ class TestScheduler(ZuulTestCase):
# Cache should be empty after a reconfiguration
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
pipeline = tenant.layout.pipelines['check']
self.assertEqual(pipeline.manager._layout_cache, {})
manager = tenant.layout.pipeline_managers['check']
self.assertEqual(manager._layout_cache, {})
self.executor_server.hold_jobs_in_build = False
self.executor_server.release()
@ -4253,7 +4261,8 @@ class TestScheduler(ZuulTestCase):
], ordered=False)
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
self.assertEqual(len(tenant.layout.pipelines['check'].queues), 0)
self.assertEqual(
len(tenant.layout.pipeline_managers['check'].state.queues), 0)
self.assertIn('Build succeeded', A.messages[0])
def test_live_reconfiguration_del_pipeline(self):
@ -4444,6 +4453,7 @@ class TestScheduler(ZuulTestCase):
tenant = self.scheds.first.sched.abide.tenants['tenant-one']
for pipeline in tenant.layout.pipelines.values():
pipeline_status = pipeline.formatStatusJSON(
tenant.layout.pipeline_managers[pipeline.name],
self.scheds.first.sched.globals.websocket_url)
for queue in pipeline_status['change_queues']:
for head in queue['heads']:
@ -4673,9 +4683,11 @@ class TestScheduler(ZuulTestCase):
# Ensure that the status json has the ref so we can render it in the
# web ui.
pipeline = self.scheds.first.sched.abide.tenants[
'tenant-one'].layout.pipelines['periodic']
tenant = self.scheds.first.sched.abide.tenants['tenant-one']
pipeline = tenant.layout.pipelines['periodic']
manager = tenant.layout.pipeline_managers['periodic']
pipeline_status = pipeline.formatStatusJSON(
manager,
self.scheds.first.sched.globals.websocket_url)
first = pipeline_status['change_queues'][0]['heads'][0][0]
@ -5026,7 +5038,7 @@ class TestScheduler(ZuulTestCase):
self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
queue = tenant.layout.pipelines['gate'].queues[0]
queue = tenant.layout.pipeline_managers['gate'].state.queues[0]
# A failed so window is reduced by 1 to 1.
self.assertEqual(queue.window, 1)
self.assertEqual(queue.window_floor, 1)
@ -5117,7 +5129,7 @@ class TestScheduler(ZuulTestCase):
self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
queue = tenant.layout.pipelines['gate'].queues[0]
queue = tenant.layout.pipeline_managers['gate'].state.queues[0]
# A failed so window is reduced by 1 to 1.
self.assertEqual(queue.window, 1)
self.assertEqual(queue.window_floor, 1)
@ -5239,7 +5251,7 @@ class TestScheduler(ZuulTestCase):
self.assertEqual(self.builds[2].name, 'project-test2')
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
queue = tenant.layout.pipelines['gate'].queues[0]
queue = tenant.layout.pipeline_managers['gate'].state.queues[0]
self.assertEqual(queue.window, 1)
# D dropped out of the window
@ -5270,7 +5282,7 @@ class TestScheduler(ZuulTestCase):
self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
queue = tenant.layout.pipelines['gate'].queues[0]
queue = tenant.layout.pipeline_managers['gate'].state.queues[0]
self.assertEqual(queue.window, 20)
self.assertTrue(len(self.builds), 4)
@ -5281,7 +5293,7 @@ class TestScheduler(ZuulTestCase):
self.scheds.execute(lambda app: app.sched.reconfigure(app.config))
self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
queue = tenant.layout.pipelines['gate'].queues[0]
queue = tenant.layout.pipeline_managers['gate'].state.queues[0]
# Even though we have configured a smaller window, the value
# on the existing shared queue should be used.
self.assertEqual(queue.window, 20)
@ -5290,7 +5302,7 @@ class TestScheduler(ZuulTestCase):
self.scheds.execute(lambda app: app.sched.reconfigure(app.config))
self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
queue = tenant.layout.pipelines['gate'].queues[0]
queue = tenant.layout.pipeline_managers['gate'].state.queues[0]
self.assertEqual(queue.window, 20)
self.assertTrue(len(self.builds), 4)
@ -5320,7 +5332,7 @@ class TestScheduler(ZuulTestCase):
self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
queue = tenant.layout.pipelines['gate'].queues[0]
queue = tenant.layout.pipeline_managers['gate'].state.queues[0]
self.assertEqual(queue.window, 2)
self.assertEqual(len(self.builds), 4)
@ -5330,7 +5342,7 @@ class TestScheduler(ZuulTestCase):
self.scheds.execute(lambda app: app.sched.reconfigure(app.config))
self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
queue = tenant.layout.pipelines['gate'].queues[0]
queue = tenant.layout.pipeline_managers['gate'].state.queues[0]
# Because we have configured a static window, it should
# be allowed to shrink on reconfiguration.
self.assertEqual(queue.window, 1)
@ -5341,7 +5353,7 @@ class TestScheduler(ZuulTestCase):
self.scheds.execute(lambda app: app.sched.reconfigure(app.config))
self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
queue = tenant.layout.pipelines['gate'].queues[0]
queue = tenant.layout.pipeline_managers['gate'].state.queues[0]
self.assertEqual(queue.window, 1)
self.waitUntilSettled()
# B's builds should not be canceled
@ -5380,7 +5392,7 @@ class TestScheduler(ZuulTestCase):
self.log.debug("B complete")
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
queue = tenant.layout.pipelines['gate'].queues[0]
queue = tenant.layout.pipeline_managers['gate'].state.queues[0]
self.assertEqual(queue.window, 2)
self.assertEqual(len(self.builds), 2)
@ -5392,7 +5404,7 @@ class TestScheduler(ZuulTestCase):
self.log.debug("Reconfiguration complete")
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
queue = tenant.layout.pipelines['gate'].queues[0]
queue = tenant.layout.pipeline_managers['gate'].state.queues[0]
# Because we have configured a static window, it should
# be allowed to shrink on reconfiguration.
self.assertEqual(queue.window, 1)
@ -5416,7 +5428,7 @@ class TestScheduler(ZuulTestCase):
self.log.debug("Executor unpause complete")
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
queue = tenant.layout.pipelines['gate'].queues[0]
queue = tenant.layout.pipeline_managers['gate'].state.queues[0]
self.assertEqual(queue.window, 1)
self.waitUntilSettled()
@ -5703,8 +5715,11 @@ For CI problems and help debugging, contact ci@example.org"""
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
self.assertEqual(3, tenant.layout.pipelines['check'].disable_at)
self.assertEqual(
0, tenant.layout.pipelines['check'].state.consecutive_failures)
self.assertFalse(tenant.layout.pipelines['check'].state.disabled)
0,
tenant.layout.pipeline_managers[
'check'].state.consecutive_failures)
self.assertFalse(
tenant.layout.pipeline_managers['check'].state.disabled)
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
@ -5735,15 +5750,21 @@ For CI problems and help debugging, contact ci@example.org"""
self.waitUntilSettled()
self.assertEqual(
2, tenant.layout.pipelines['check'].state.consecutive_failures)
self.assertFalse(tenant.layout.pipelines['check'].state.disabled)
2,
tenant.layout.pipeline_managers[
'check'].state.consecutive_failures)
self.assertFalse(
tenant.layout.pipeline_managers['check'].state.disabled)
self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(
0, tenant.layout.pipelines['check'].state.consecutive_failures)
self.assertFalse(tenant.layout.pipelines['check'].state.disabled)
0,
tenant.layout.pipeline_managers[
'check'].state.consecutive_failures)
self.assertFalse(
tenant.layout.pipeline_managers['check'].state.disabled)
self.fake_gerrit.addEvent(D.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(E.getPatchsetCreatedEvent(1))
@ -5752,8 +5773,11 @@ For CI problems and help debugging, contact ci@example.org"""
# We should be disabled now
self.assertEqual(
3, tenant.layout.pipelines['check'].state.consecutive_failures)
self.assertTrue(tenant.layout.pipelines['check'].state.disabled)
3,
tenant.layout.pipeline_managers[
'check'].state.consecutive_failures)
self.assertTrue(
tenant.layout.pipeline_managers['check'].state.disabled)
# We need to wait between each of these patches to make sure the
# smtp messages come back in an expected order
@ -5803,16 +5827,22 @@ For CI problems and help debugging, contact ci@example.org"""
self.assertEqual(3, tenant.layout.pipelines['check'].disable_at)
self.assertEqual(
0, tenant.layout.pipelines['check'].state.consecutive_failures)
self.assertFalse(tenant.layout.pipelines['check'].state.disabled)
0,
tenant.layout.pipeline_managers[
'check'].state.consecutive_failures)
self.assertFalse(
tenant.layout.pipeline_managers['check'].state.disabled)
self.fake_gerrit.addEvent(J.getPatchsetCreatedEvent(1))
self.fake_gerrit.addEvent(K.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertEqual(
2, tenant.layout.pipelines['check'].state.consecutive_failures)
self.assertFalse(tenant.layout.pipelines['check'].state.disabled)
2,
tenant.layout.pipeline_managers[
'check'].state.consecutive_failures)
self.assertFalse(
tenant.layout.pipeline_managers['check'].state.disabled)
# J and K went back to gerrit
self.assertEqual(1, len(J.messages))
@ -6646,7 +6676,8 @@ For CI problems and help debugging, contact ci@example.org"""
self.waitUntilSettled()
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
item = tenant.layout.pipelines['check'].queues[0].queue[0]
item = tenant.layout.pipeline_managers[
'check'].state.queues[0].queue[0]
hold_job = item.getJobs()[1]
# Refresh the pipeline so that we can verify the JobData
@ -6796,9 +6827,9 @@ class TestChangeQueues(ZuulTestCase):
])
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
_, p = tenant.getProject(project)
q1 = tenant.layout.pipelines['gate'].manager.state.getQueue(
q1 = tenant.layout.pipeline_managers['gate'].state.getQueue(
p.canonical_name, 'master')
q2 = tenant.layout.pipelines['gate'].manager.state.getQueue(
q2 = tenant.layout.pipeline_managers['gate'].state.getQueue(
p.canonical_name, 'stable')
self.assertEqual(q1.name, queue_name)
self.assertEqual(q2.name, queue_name)
@ -6855,11 +6886,11 @@ class TestChangeQueues(ZuulTestCase):
])
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
_, p = tenant.getProject(project)
q1 = tenant.layout.pipelines['gate'].manager.state.getQueue(
q1 = tenant.layout.pipeline_managers['gate'].state.getQueue(
p.canonical_name, 'master')
q2 = tenant.layout.pipelines['gate'].manager.state.getQueue(
q2 = tenant.layout.pipeline_managers['gate'].state.getQueue(
p.canonical_name, 'stable')
q3 = tenant.layout.pipelines['gate'].manager.state.getQueue(
q3 = tenant.layout.pipeline_managers['gate'].state.getQueue(
p.canonical_name, None)
# There should be no branch specific queues anymore
@ -8162,7 +8193,8 @@ class TestSemaphore(ZuulTestCase):
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
status = tenant.layout.pipelines["check"].formatStatusJSON()
status = tenant.layout.pipelines["check"].formatStatusJSON(
tenant.layout.pipeline_managers["check"])
jobs = status["change_queues"][0]["heads"][0][0]["jobs"]
self.assertEqual(jobs[0]["waiting_status"],
'node request: 200-0000000000')
@ -9062,14 +9094,14 @@ class TestSemaphoreInRepo(ZuulTestCase):
# check that the layout in a queue item still has max value of 1
# for test-semaphore
pipeline = tenant.layout.pipelines.get('tenant-one-gate')
manager = tenant.layout.pipeline_managers.get('tenant-one-gate')
queue = None
for queue_candidate in pipeline.queues:
for queue_candidate in manager.state.queues:
if queue_candidate.name == 'org/project':
queue = queue_candidate
break
queue_item = queue.queue[0]
item_dynamic_layout = pipeline.manager._layout_cache.get(
item_dynamic_layout = manager._layout_cache.get(
queue_item.layout_uuid)
self.assertIsNotNone(item_dynamic_layout)
dynamic_test_semaphore = item_dynamic_layout.getSemaphore(

View File

@ -170,9 +170,9 @@ class TestScaleOutScheduler(ZuulTestCase):
self.assertHistory([])
tenant = first.sched.abide.tenants['tenant-one']
pipeline = tenant.layout.pipelines['check']
manager = tenant.layout.pipeline_managers['check']
summary = zuul.model.PipelineSummary()
summary._set(pipeline=pipeline)
summary._set(manager=manager)
with self.createZKContext() as context:
summary.refresh(context)
self.assertEqual(summary.status['change_queues'], [])
@ -336,7 +336,7 @@ class TestScaleOutScheduler(ZuulTestCase):
break
pipeline_zk_path = app.sched.abide.tenants[
"tenant-one"].layout.pipelines["check"].state.getPath()
"tenant-one"].layout.pipeline_managers["check"].state.getPath()
self.executor_server.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
@ -461,12 +461,12 @@ class TestScaleOutScheduler(ZuulTestCase):
# Test that we can deal with a truncated pipeline summary
self.executor_server.hold_jobs_in_build = True
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
pipeline = tenant.layout.pipelines['check']
manager = tenant.layout.pipeline_managers['check']
context = self.createZKContext()
def new_summary():
summary = zuul.model.PipelineSummary()
summary._set(pipeline=pipeline)
summary._set(manager=manager)
with context:
summary.refresh(context)
return summary

View File

@ -74,10 +74,10 @@ class TestTimerAlwaysDynamicBranches(ZuulTestCase):
# Ensure that the status json has the ref so we can render it in the
# web ui.
pipeline = self.scheds.first.sched.abide.tenants[
'tenant-one'].layout.pipelines['periodic']
self.assertEqual(len(pipeline.queues), 2)
for queue in pipeline.queues:
manager = self.scheds.first.sched.abide.tenants[
'tenant-one'].layout.pipeline_managers['periodic']
self.assertEqual(len(manager.state.queues), 2)
for queue in manager.state.queues:
item = queue.queue[0]
self.assertIn(item.changes[0].branch, ['master', 'stable'])

View File

@ -4042,8 +4042,8 @@ class TestInRepoJoin(ZuulTestCase):
self.executor_server.hold_jobs_in_build = True
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
gate_pipeline = tenant.layout.pipelines['gate']
self.assertEqual(gate_pipeline.queues, [])
gate_manager = tenant.layout.pipeline_managers['gate']
self.assertEqual(gate_manager.state.queues, [])
in_repo_conf = textwrap.dedent(
"""
@ -4086,7 +4086,7 @@ class TestInRepoJoin(ZuulTestCase):
self.waitUntilSettled()
# Make sure the dynamic queue got cleaned up
self.assertEqual(gate_pipeline.queues, [])
self.assertEqual(gate_manager.state.queues, [])
def test_dynamic_dependent_pipeline_failure(self):
# Test that a change behind a failing change adding a project
@ -6354,7 +6354,8 @@ class TestDataReturn(AnsibleZuulTestCase):
# Make sure skipped jobs are not reported as failing
tenant = self.scheds.first.sched.abide.tenants.get("tenant-one")
status = tenant.layout.pipelines["check"].formatStatusJSON()
status = tenant.layout.pipelines["check"].formatStatusJSON(
tenant.layout.pipeline_managers["check"])
self.assertEqual(
status["change_queues"][0]["heads"][0][0]["failing_reasons"], [])
@ -9396,7 +9397,8 @@ class TestProvidesRequiresMysql(ZuulTestCase):
# Verify the waiting status for both jobs is "repo state"
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
status = tenant.layout.pipelines["gate"].formatStatusJSON()
status = tenant.layout.pipelines["gate"].formatStatusJSON(
tenant.layout.pipeline_managers["gate"])
jobs = status["change_queues"][0]["heads"][0][0]["jobs"]
self.assertEqual(jobs[0]["waiting_status"], 'repo state')
self.assertEqual(jobs[1]["waiting_status"], 'repo state')
@ -9408,7 +9410,8 @@ class TestProvidesRequiresMysql(ZuulTestCase):
self.waitUntilSettled()
# Verify the nodepool waiting status
status = tenant.layout.pipelines["gate"].formatStatusJSON()
status = tenant.layout.pipelines["gate"].formatStatusJSON(
tenant.layout.pipeline_managers["gate"])
jobs = status["change_queues"][0]["heads"][0][0]["jobs"]
self.assertEqual(jobs[0]["waiting_status"],
'node request: 100-0000000000')
@ -9422,7 +9425,8 @@ class TestProvidesRequiresMysql(ZuulTestCase):
self.waitUntilSettled()
# Verify the executor waiting status
status = tenant.layout.pipelines["gate"].formatStatusJSON()
status = tenant.layout.pipelines["gate"].formatStatusJSON(
tenant.layout.pipeline_managers["gate"])
jobs = status["change_queues"][0]["heads"][0][0]["jobs"]
self.assertEqual(jobs[0]["waiting_status"], 'executor')
self.assertEqual(jobs[1]["waiting_status"],
@ -9434,7 +9438,8 @@ class TestProvidesRequiresMysql(ZuulTestCase):
self.executor_api.release()
self.waitUntilSettled()
status = tenant.layout.pipelines["gate"].formatStatusJSON()
status = tenant.layout.pipelines["gate"].formatStatusJSON(
tenant.layout.pipeline_managers["gate"])
jobs = status["change_queues"][0]["heads"][0][0]["jobs"]
self.assertIsNone(jobs[0]["waiting_status"])
self.assertEqual(jobs[1]["waiting_status"],
@ -9449,7 +9454,8 @@ class TestProvidesRequiresMysql(ZuulTestCase):
self.assertEqual(len(self.builds), 1)
status = tenant.layout.pipelines["gate"].formatStatusJSON()
status = tenant.layout.pipelines["gate"].formatStatusJSON(
tenant.layout.pipeline_managers["gate"])
# First change
jobs = status["change_queues"][0]["heads"][0][0]["jobs"]

View File

@ -2500,14 +2500,15 @@ class TestPipelineInit(ZooKeeperBaseTestCase):
pipeline = model.Pipeline('gate', tenant)
layout = model.Layout(tenant)
tenant.layout = layout
pipeline.manager = mock.Mock()
pipeline.manager.state = model.PipelineState.create(
pipeline, None)
manager = mock.Mock()
manager.pipeline = pipeline
manager.tenant = tenant
manager.state = model.PipelineState.create(
manager, None)
context = ZKContext(self.zk_client, None, None, self.log)
pipeline.state.refresh(context)
self.assertTrue(self.zk_client.client.exists(
pipeline.state.getPath()))
self.assertEqual(pipeline.manager.state.layout_uuid, layout.uuid)
manager.state.refresh(context)
self.assertTrue(self.zk_client.client.exists(manager.state.getPath()))
self.assertEqual(manager.state.layout_uuid, layout.uuid)
def test_pipeline_state_existing_object(self):
# Test the initialize-on-refresh code path with a pre-existing object
@ -2515,21 +2516,23 @@ class TestPipelineInit(ZooKeeperBaseTestCase):
pipeline = model.Pipeline('gate', tenant)
layout = model.Layout(tenant)
tenant.layout = layout
pipeline.manager = mock.Mock()
pipeline.manager.state = model.PipelineState.create(
pipeline, None)
pipeline.manager.change_list = model.PipelineChangeList.create(
pipeline)
manager = mock.Mock()
manager.pipeline = pipeline
manager.tenant = tenant
manager.state = model.PipelineState.create(
manager, None)
manager.change_list = model.PipelineChangeList.create(
manager)
context = ZKContext(self.zk_client, None, None, self.log)
# We refresh the change list here purely for the side effect
# of creating the pipeline state object with no data (the list
# is a subpath of the state object).
pipeline.change_list.refresh(context)
pipeline.state.refresh(context)
manager.change_list.refresh(context)
manager.state.refresh(context)
self.assertTrue(
self.zk_client.client.exists(pipeline.change_list.getPath()))
self.assertTrue(self.zk_client.client.exists(pipeline.state.getPath()))
self.assertEqual(pipeline.manager.state.layout_uuid, layout.uuid)
self.zk_client.client.exists(manager.change_list.getPath()))
self.assertTrue(self.zk_client.client.exists(manager.state.getPath()))
self.assertEqual(manager.state.layout_uuid, layout.uuid)
def test_pipeline_change_list_new_object(self):
# Test the initialize-on-refresh code path with no existing object
@ -2537,17 +2540,19 @@ class TestPipelineInit(ZooKeeperBaseTestCase):
pipeline = model.Pipeline('gate', tenant)
layout = model.Layout(tenant)
tenant.layout = layout
pipeline.manager = mock.Mock()
pipeline.manager.state = model.PipelineState.create(
pipeline, None)
pipeline.manager.change_list = model.PipelineChangeList.create(
pipeline)
manager = mock.Mock()
manager.pipeline = pipeline
manager.tenant = tenant
manager.state = model.PipelineState.create(
manager, None)
manager.change_list = model.PipelineChangeList.create(
manager)
context = ZKContext(self.zk_client, None, None, self.log)
pipeline.manager.change_list.refresh(context)
manager.change_list.refresh(context)
self.assertTrue(
self.zk_client.client.exists(pipeline.change_list.getPath()))
pipeline.state.refresh(context)
self.assertEqual(pipeline.manager.state.layout_uuid, layout.uuid)
self.zk_client.client.exists(manager.change_list.getPath()))
manager.state.refresh(context)
self.assertEqual(manager.state.layout_uuid, layout.uuid)
def test_pipeline_change_list_new_object_without_lock(self):
# Test the initialize-on-refresh code path if we don't have
@ -2556,18 +2561,20 @@ class TestPipelineInit(ZooKeeperBaseTestCase):
pipeline = model.Pipeline('gate', tenant)
layout = model.Layout(tenant)
tenant.layout = layout
pipeline.manager = mock.Mock()
pipeline.manager.state = model.PipelineState.create(
pipeline, None)
pipeline.manager.change_list = model.PipelineChangeList.create(
pipeline)
manager = mock.Mock()
manager.pipeline = pipeline
manager.tenant = tenant
manager.state = model.PipelineState.create(
manager, None)
manager.change_list = model.PipelineChangeList.create(
manager)
context = ZKContext(self.zk_client, None, None, self.log)
with testtools.ExpectedException(NoNodeError):
pipeline.change_list.refresh(context, allow_init=False)
manager.change_list.refresh(context, allow_init=False)
self.assertIsNone(
self.zk_client.client.exists(pipeline.change_list.getPath()))
pipeline.state.refresh(context)
self.assertEqual(pipeline.state.layout_uuid, layout.uuid)
self.zk_client.client.exists(manager.change_list.getPath()))
manager.state.refresh(context)
self.assertEqual(manager.state.layout_uuid, layout.uuid)
class TestPolymorphicZKObjectMixin(ZooKeeperBaseTestCase):

View File

@ -1094,12 +1094,11 @@ class Client(zuul.cmd.ZuulApp):
) as plock:
zk_client.fastRecursiveDelete(path)
with ZKContext(zk_client, plock, None, self.log) as context:
pipeline.manager = IndependentPipelineManager(
None, pipeline)
pipeline.manager.tenant = tenant
pipeline.manager.state = PipelineState.new(
manager = IndependentPipelineManager(
None, pipeline, tenant)
manager.state = PipelineState.new(
context, _path=path, layout_uuid=None)
PipelineChangeList.new(context, pipeline=pipeline)
PipelineChangeList.new(context, manager=manager)
sys.exit(0)

View File

@ -1566,21 +1566,7 @@ class PipelineParser(object):
pipeline.window_decrease_factor = conf.get(
'window-decrease-factor', 2)
manager_name = conf['manager']
if manager_name == 'dependent':
manager = zuul.manager.dependent.DependentPipelineManager(
self.pcontext.scheduler, pipeline)
elif manager_name == 'independent':
manager = zuul.manager.independent.IndependentPipelineManager(
self.pcontext.scheduler, pipeline)
elif manager_name == 'serial':
manager = zuul.manager.serial.SerialPipelineManager(
self.pcontext.scheduler, pipeline)
elif manager_name == 'supercedent':
manager = zuul.manager.supercedent.SupercedentPipelineManager(
self.pcontext.scheduler, pipeline)
pipeline.setManager(manager)
pipeline.manager_name = conf['manager']
with self.pcontext.errorContext(stanza='pipeline', conf=conf):
with self.pcontext.confAttr(conf, 'require', {}) as require_dict:
@ -2033,8 +2019,8 @@ class TenantParser(object):
# Only call the postConfig hook if we have a scheduler as this will
# change data in ZooKeeper. In case we are in a zuul-web context,
# we don't want to do that.
for pipeline in tenant.layout.pipelines.values():
pipeline.manager._postConfig()
for manager in tenant.layout.pipeline_managers.values():
manager._postConfig()
return tenant
@ -2800,7 +2786,9 @@ class TenantParser(object):
with parse_context.errorContext(stanza='pipeline',
conf=pipeline):
with parse_context.accumulator.catchErrors():
layout.addPipeline(pipeline)
manager = self.createManager(parse_context, pipeline,
tenant)
layout.addPipeline(pipeline, manager)
for nodeset in parsed_config.nodesets:
with parse_context.errorContext(stanza='nodeset', conf=nodeset):
@ -3021,6 +3009,21 @@ class TenantParser(object):
self._addLayoutItems(layout, tenant, data, parse_context)
return layout
def createManager(self, parse_context, pipeline, tenant):
if pipeline.manager_name == 'dependent':
manager = zuul.manager.dependent.DependentPipelineManager(
parse_context.scheduler, pipeline, tenant)
elif pipeline.manager_name == 'independent':
manager = zuul.manager.independent.IndependentPipelineManager(
parse_context.scheduler, pipeline, tenant)
elif pipeline.manager_name == 'serial':
manager = zuul.manager.serial.SerialPipelineManager(
parse_context.scheduler, pipeline, tenant)
elif pipeline.manager_name == 'supercedent':
manager = zuul.manager.supercedent.SupercedentPipelineManager(
parse_context.scheduler, pipeline, tenant)
return manager
class ConfigLoader(object):
log = logging.getLogger("zuul.ConfigLoader")
@ -3233,7 +3236,7 @@ class ConfigLoader(object):
def _loadDynamicProjectData(self, config, project, files,
additional_project_branches, trusted,
item, pcontext):
tenant = item.pipeline.tenant
tenant = item.manager.tenant
tpc = tenant.project_configs[project.canonical_name]
if trusted:
branches = [tpc.load_branch if tpc.load_branch else 'master']
@ -3335,7 +3338,7 @@ class ConfigLoader(object):
ansible_manager,
include_config_projects=False,
zuul_event_id=None):
tenant = item.pipeline.tenant
tenant = item.manager.tenant
log = get_annotated_logger(self.log, zuul_event_id)
pcontext = ParseContext(self.connections, self.scheduler,
tenant, ansible_manager)

View File

@ -36,7 +36,7 @@ class ElasticsearchReporter(BaseReporter):
if not phase1:
return
docs = []
index = '%s.%s-%s' % (self.index, item.pipeline.tenant.name,
index = '%s.%s-%s' % (self.index, item.manager.tenant.name,
time.strftime("%Y.%m.%d"))
changes = [
{
@ -54,8 +54,8 @@ class ElasticsearchReporter(BaseReporter):
buildset_doc = {
"uuid": item.current_build_set.uuid,
"build_type": "buildset",
"tenant": item.pipeline.tenant.name,
"pipeline": item.pipeline.name,
"tenant": item.manager.tenant.name,
"pipeline": item.manager.pipeline.name,
"changes": changes,
"project": item.changes[0].project.name,
"change": getattr(item.changes[0], 'number', None),

View File

@ -1453,7 +1453,8 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
else:
cmd += ' --label %s=%s' % (key, val)
if self.version >= (2, 13, 0):
cmd += ' --tag autogenerated:zuul:%s' % (item.pipeline.name)
cmd += ' --tag autogenerated:zuul:%s' % (
item.manager.pipeline.name)
if phase2 and submit:
cmd += ' --submit'
changeid = '%s,%s' % (change.number, change.patchset)
@ -1544,7 +1545,8 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
else:
data['comments'] = file_comments
if self.version >= (2, 13, 0):
data['tag'] = 'autogenerated:zuul:%s' % (item.pipeline.name)
data['tag'] = 'autogenerated:zuul:%s' % (
item.manager.pipeline.name)
if checks_api:
self.report_checks(log, item, change, changeid, checks_api)
if (message or data.get('labels') or data.get('comments')

View File

@ -190,7 +190,7 @@ class GithubReporter(BaseReporter):
url = item.formatItemUrl()
description = '%s status: %s' % (item.pipeline.name,
description = '%s status: %s' % (item.manager.pipeline.name,
self._commit_status)
if len(description) >= 140:
@ -318,8 +318,8 @@ class GithubReporter(BaseReporter):
# https://review.opendev.org/#/c/666258/7
external_id = json.dumps(
{
"tenant": item.pipeline.tenant.name,
"pipeline": item.pipeline.name,
"tenant": item.manager.tenant.name,
"pipeline": item.manager.pipeline.name,
"change": change.number,
}
)

View File

@ -55,9 +55,9 @@ class MQTTReporter(BaseReporter):
message = {
'timestamp': time.time(),
'action': self._action,
'tenant': item.pipeline.tenant.name,
'tenant': item.manager.tenant.name,
'zuul_ref': buildset.ref,
'pipeline': item.pipeline.name,
'pipeline': item.manager.pipeline.name,
'queue': item.queue.name,
'changes': changes,
'project': item.changes[0].project.name,
@ -155,8 +155,8 @@ class MQTTReporter(BaseReporter):
topic = None
try:
topic = self.config['topic'].format(
tenant=item.pipeline.tenant.name,
pipeline=item.pipeline.name,
tenant=item.manager.tenant.name,
pipeline=item.manager.pipeline.name,
changes=changes,
project=item.changes[0].project.name,
branch=getattr(item.changes[0], 'branch', None),

View File

@ -114,7 +114,7 @@ class PagureReporter(BaseReporter):
url = item.formatItemUrl()
description = '%s status: %s (%s)' % (
item.pipeline.name, self._commit_status, sha)
item.manager.pipeline.name, self._commit_status, sha)
self.log.debug(
'Reporting change %s, params %s, '

View File

@ -45,7 +45,7 @@ class SMTPReporter(BaseReporter):
subject = self.config['subject'].format(
change=item.changes[0],
changes=item.changes,
pipeline=item.pipeline.getSafeAttributes())
pipeline=item.manager.pipeline.getSafeAttributes())
else:
subject = "Report for changes {changes} against {ref}".format(
changes=' '.join([str(c) for c in item.changes]),

View File

@ -64,8 +64,8 @@ class SQLReporter(BaseReporter):
db_buildset = db.createBuildSet(
uuid=buildset.uuid,
tenant=item.pipeline.tenant.name,
pipeline=item.pipeline.name,
tenant=item.manager.tenant.name,
pipeline=item.manager.pipeline.name,
event_id=event_id,
event_timestamp=event_timestamp,
updated=datetime.datetime.utcnow(),
@ -120,7 +120,7 @@ class SQLReporter(BaseReporter):
try:
with self.connection.getSession() as db:
db_buildset = db.getBuildset(
tenant=buildset.item.pipeline.tenant.name,
tenant=buildset.item.manager.tenant.name,
uuid=buildset.uuid)
if not db_buildset:
db_buildset = self._createBuildset(db, buildset)
@ -235,7 +235,7 @@ class SQLReporter(BaseReporter):
if not buildset:
return None
db_buildset = db.getBuildset(
tenant=buildset.item.pipeline.tenant.name, uuid=buildset.uuid)
tenant=buildset.item.manager.tenant.name, uuid=buildset.uuid)
if not db_buildset:
self.log.warning("Creating missing buildset %s", buildset.uuid)
db_buildset = self._createBuildset(db, buildset)

View File

@ -48,15 +48,15 @@ class ZuulDriver(Driver, TriggerInterface, ReporterInterface):
self.sched = scheduler
def reconfigure(self, tenant):
for pipeline in tenant.layout.pipelines.values():
for ef in pipeline.event_filters:
for manager in tenant.layout.pipeline_managers.values():
for ef in manager.pipeline.event_filters:
if not isinstance(ef.trigger, zuultrigger.ZuulTrigger):
continue
if PARENT_CHANGE_ENQUEUED in ef._types:
# parent-change-enqueued events need to be filtered by
# pipeline
for pipeline in ef._pipelines:
key = (tenant.name, pipeline)
for pipeline_name in ef._pipelines:
key = (tenant.name, pipeline_name)
self.parent_change_enqueued_events[key] = True
elif PROJECT_CHANGE_MERGED in ef._types:
self.project_change_merged_events[tenant.name] = True
@ -78,12 +78,12 @@ class ZuulDriver(Driver, TriggerInterface, ReporterInterface):
"Unable to create project-change-merged events for "
"%s" % (change,))
def onChangeEnqueued(self, tenant, change, pipeline, event):
def onChangeEnqueued(self, tenant, change, manager, event):
log = get_annotated_logger(self.log, event)
# Called each time a change is enqueued in a pipeline
tenant_events = self.parent_change_enqueued_events.get(
(tenant.name, pipeline.name))
(tenant.name, manager.pipeline.name))
log.debug("onChangeEnqueued %s", tenant_events)
if tenant_events:
span = trace.get_current_span()
@ -95,11 +95,11 @@ class ZuulDriver(Driver, TriggerInterface, ReporterInterface):
"ZuulEvent", links=[link], attributes=attributes):
try:
self._createParentChangeEnqueuedEvents(
change, pipeline, tenant, event)
change, manager, tenant, event)
except Exception:
log.exception(
"Unable to create parent-change-enqueued events for "
"%s in %s" % (change, pipeline))
"%s in %s" % (change, manager.pipeline))
def _createProjectChangeMergedEvents(self, change, source):
changes = source.getProjectOpenChanges(
@ -123,7 +123,7 @@ class ZuulDriver(Driver, TriggerInterface, ReporterInterface):
event.timestamp = time.time()
self.sched.addTriggerEvent(self.name, event)
def _createParentChangeEnqueuedEvents(self, change, pipeline, tenant,
def _createParentChangeEnqueuedEvents(self, change, manager, tenant,
event):
log = get_annotated_logger(self.log, event)
@ -136,8 +136,7 @@ class ZuulDriver(Driver, TriggerInterface, ReporterInterface):
# numbers of github installations. This can be improved later
# with persistent storage of dependency information.
needed_by_changes = set(
pipeline.manager.resolveChangeReferences(
change.getNeededByChanges()))
manager.resolveChangeReferences(change.getNeededByChanges()))
for source in self.sched.connections.getSources():
log.debug(" Checking source: %s",
source.connection.connection_name)
@ -149,14 +148,14 @@ class ZuulDriver(Driver, TriggerInterface, ReporterInterface):
log.debug(" Following changes: %s", needed_by_changes)
for needs in needed_by_changes:
self._createParentChangeEnqueuedEvent(needs, pipeline)
self._createParentChangeEnqueuedEvent(needs, manager)
def _createParentChangeEnqueuedEvent(self, change, pipeline):
def _createParentChangeEnqueuedEvent(self, change, manager):
event = ZuulTriggerEvent()
event.type = PARENT_CHANGE_ENQUEUED
event.connection_name = "zuul"
event.trigger_name = self.name
event.pipeline_name = pipeline.name
event.pipeline_name = manager.pipeline.name
event.project_hostname = change.project.canonical_hostname
event.project_name = change.project.name
event.change_number = change.number

View File

@ -56,6 +56,7 @@ class ExecutorClient(object):
log = get_annotated_logger(self.log, item.event)
tracer = trace.get_tracer("zuul")
uuid = str(uuid4().hex)
manager = item.manager
log.info(
"Execute job %s (uuid: %s) on nodes %s for %s "
"with dependent changes %s",
@ -68,7 +69,7 @@ class ExecutorClient(object):
# TODO: deprecate and remove this variable?
params["zuul"]["_inheritance_path"] = list(job.inheritance_path)
semaphore_handler = item.pipeline.tenant.semaphore_handler
semaphore_handler = manager.tenant.semaphore_handler
params['semaphore_handle'] = semaphore_handler.getSemaphoreHandle(
item, job)
@ -78,7 +79,7 @@ class ExecutorClient(object):
build_span = tracer.start_span("Build", start_time=execute_time)
build_span_info = tracing.getSpanInfo(build_span)
build = Build.new(
pipeline.manager.current_context,
manager.current_context,
job=job,
build_set=item.current_build_set,
uuid=uuid,
@ -96,7 +97,7 @@ class ExecutorClient(object):
started_event = BuildStartedEvent(
build.uuid, build.build_set.uuid, job.uuid,
None, data, zuul_event_id=build.zuul_event_id)
self.result_events[pipeline.tenant.name][pipeline.name].put(
self.result_events[manager.tenant.name][manager.pipeline.name].put(
started_event
)
@ -104,7 +105,7 @@ class ExecutorClient(object):
completed_event = BuildCompletedEvent(
build.uuid, build.build_set.uuid, job.uuid,
None, result, zuul_event_id=build.zuul_event_id)
self.result_events[pipeline.tenant.name][pipeline.name].put(
self.result_events[manager.tenant.name][manager.pipeline.name].put(
completed_event
)
@ -151,14 +152,14 @@ class ExecutorClient(object):
uuid=uuid,
build_set_uuid=build.build_set.uuid,
job_uuid=job.uuid,
tenant_name=build.build_set.item.pipeline.tenant.name,
pipeline_name=build.build_set.item.pipeline.name,
tenant_name=manager.tenant.name,
pipeline_name=manager.pipeline.name,
zone=executor_zone,
event_id=item.event.zuul_event_id,
precedence=PRIORITY_MAP[pipeline.precedence],
)
self.executor_api.submit(request, params)
build.updateAttributes(pipeline.manager.current_context,
build.updateAttributes(manager.current_context,
build_request_ref=request.path)
def cancel(self, build):
@ -168,7 +169,7 @@ class ExecutorClient(object):
log.info("Cancel build %s for job %s", build, build.job)
build.updateAttributes(
build.build_set.item.pipeline.manager.current_context,
build.build_set.item.manager.current_context,
canceled=True)
if not build.build_request_ref:
@ -199,8 +200,8 @@ class ExecutorClient(object):
self.executor_api.update(build_request)
result = {"result": "CANCELED", "end_time": time.time()}
tenant_name = build.build_set.item.pipeline.tenant.name
pipeline_name = build.build_set.item.pipeline.name
tenant_name = build.build_set.item.manager.tenant.name
pipeline_name = build.build_set.item.manager.pipeline.name
event = BuildCompletedEvent(
build_request.uuid, build_request.build_set_uuid,
build_request.job_uuid,

View File

@ -171,7 +171,7 @@ def construct_build_params(uuid, connections, job, item, pipeline,
for change in dependent_changes:
try:
(_, project) = item.pipeline.tenant.getProject(
(_, project) = item.manager.tenant.getProject(
change['project']['canonical_name'])
if not project:
raise KeyError()

View File

@ -56,8 +56,8 @@ class LauncherClient:
request = NodesetRequest.new(
ctx,
state=self._getInitialRequestState(job),
tenant_name=item.pipeline.tenant.name,
pipeline_name=item.pipeline.name,
tenant_name=item.manager.tenant.name,
pipeline_name=item.manager.pipeline.name,
buildset_uuid=buildset.uuid,
job_uuid=job.uuid,
job_name=job.name,

View File

@ -51,7 +51,7 @@ class DynamicChangeQueueContextManager(object):
if (self.allow_delete and
self.change_queue and
not self.change_queue.queue):
self.change_queue.pipeline.manager.state.removeQueue(
self.change_queue.manager.state.removeQueue(
self.change_queue)
@ -69,12 +69,13 @@ class StaticChangeQueueContextManager(object):
class PipelineManager(metaclass=ABCMeta):
"""Abstract Base Class for enqueing and processing Changes in a Pipeline"""
def __init__(self, sched, pipeline):
def __init__(self, sched, pipeline, tenant):
self.log = logging.getLogger("zuul.Pipeline.%s.%s" %
(pipeline.tenant.name,
(tenant.name,
pipeline.name,))
self.sched = sched
self.pipeline = pipeline
self.tenant = tenant
self.relative_priority_queues = {}
# Cached dynamic layouts (layout uuid -> layout)
self._layout_cache = {}
@ -86,7 +87,7 @@ class PipelineManager(metaclass=ABCMeta):
# The pipeline summary used by zuul-web that is updated by the
# schedulers after processing a pipeline.
self.summary = model.PipelineSummary()
self.summary._set(pipeline=self.pipeline)
self.summary._set(manager=self)
self.state = None
self.change_list = None
@ -111,7 +112,7 @@ class PipelineManager(metaclass=ABCMeta):
# haven't changed their pipeline participation.
self._layout_cache = {}
layout = self.pipeline.tenant.layout
layout = self.tenant.layout
self.buildChangeQueues(layout)
# Make sure we have state and change list objects. We
# don't actually ensure they exist in ZK here; these are
@ -124,9 +125,9 @@ class PipelineManager(metaclass=ABCMeta):
# These will be out of date until they are refreshed later.
self.state = PipelineState.create(
self.pipeline, self.pipeline.state)
self, self.state)
self.change_list = PipelineChangeList.create(
self.pipeline)
self)
# Now, try to acquire a non-blocking pipeline lock and refresh
# them for the side effect of initializing them if necessary.
@ -138,26 +139,26 @@ class PipelineManager(metaclass=ABCMeta):
# read errors elsewhere in that case anyway.
try:
with (pipeline_lock(
self.sched.zk_client, self.pipeline.tenant.name,
self.sched.zk_client, self.tenant.name,
self.pipeline.name, blocking=False) as lock,
self.sched.createZKContext(lock, self.log) as ctx,
self.currentContext(ctx)):
if not self.pipeline.state.exists(ctx):
if not self.state.exists(ctx):
# We only do this if the pipeline doesn't exist in
# ZK because in that case, this process should be
# fast since it's empty. If it does exist,
# refreshing it may be slow and since other actors
# won't encounter errors due to its absence, we
# would rather defer the work to later.
self.pipeline.state.refresh(ctx)
self.pipeline.change_list.refresh(ctx)
self.state.refresh(ctx)
self.change_list.refresh(ctx)
except LockException:
pass
def buildChangeQueues(self, layout):
self.log.debug("Building relative_priority queues")
change_queues = self.relative_priority_queues
tenant = self.pipeline.tenant
tenant = self.tenant
layout_project_configs = layout.project_configs
for project_name, project_configs in layout_project_configs.items():
@ -320,14 +321,14 @@ class PipelineManager(metaclass=ABCMeta):
def isChangeRelevantToPipeline(self, change):
# Checks if any version of the change or its deps matches any
# item in the pipeline.
for change_key in self.pipeline.change_list.getChangeKeys():
for change_key in self.change_list.getChangeKeys():
if change.cache_stat.key.isSameChange(change_key):
return True
if isinstance(change, model.Change):
for dep_change_ref in change.getNeedsChanges(
self.useDependenciesByTopic(change.project)):
dep_change_key = ChangeKey.fromReference(dep_change_ref)
for change_key in self.pipeline.change_list.getChangeKeys():
for change_key in self.change_list.getChangeKeys():
if change_key.isSameChange(dep_change_key):
return True
return False
@ -366,7 +367,7 @@ class PipelineManager(metaclass=ABCMeta):
self.updateCommitDependencies(existing_change, event)
def reportEnqueue(self, item):
if not self.pipeline.state.disabled:
if not self.state.disabled:
self.log.info("Reporting enqueue, action %s item %s" %
(self.pipeline.enqueue_actions, item))
ret = self.sendReport(self.pipeline.enqueue_actions, item)
@ -375,7 +376,7 @@ class PipelineManager(metaclass=ABCMeta):
(item, ret))
def reportStart(self, item):
if not self.pipeline.state.disabled:
if not self.state.disabled:
self.log.info("Reporting start, action %s item %s" %
(self.pipeline.start_actions, item))
ret = self.sendReport(self.pipeline.start_actions, item)
@ -392,7 +393,7 @@ class PipelineManager(metaclass=ABCMeta):
final, result)
def reportDequeue(self, item, quiet=False):
if not (self.pipeline.state.disabled or quiet):
if not (self.state.disabled or quiet):
self.log.info(
"Reporting dequeue, action %s item%s",
self.pipeline.dequeue_actions,
@ -503,7 +504,7 @@ class PipelineManager(metaclass=ABCMeta):
def removeAbandonedChange(self, change, event):
log = get_annotated_logger(self.log, event)
log.debug("Change %s abandoned, removing", change)
for queue in self.pipeline.queues:
for queue in self.state.queues:
# Below we need to remove dependent changes of abandoned
# changes we remove here, but only if both are live.
# Therefore, track the changes we remove independently for
@ -574,7 +575,7 @@ class PipelineManager(metaclass=ABCMeta):
orig_ref = None
if item.event:
orig_ref = item.event.ref
event = EnqueueEvent(self.pipeline.tenant.name,
event = EnqueueEvent(self.tenant.name,
self.pipeline.name,
change.project.canonical_hostname,
change.project.name,
@ -582,7 +583,7 @@ class PipelineManager(metaclass=ABCMeta):
orig_ref=orig_ref)
event.zuul_event_id = item.event.zuul_event_id
self.sched.pipeline_management_events[
self.pipeline.tenant.name][self.pipeline.name].put(event)
self.tenant.name][self.pipeline.name].put(event)
@abstractmethod
def getChangeQueue(self, change, event, existing=None):
@ -807,11 +808,11 @@ class PipelineManager(metaclass=ABCMeta):
dependency_graph)
zuul_driver = self.sched.connections.drivers['zuul']
tenant = self.pipeline.tenant
tenant = self.tenant
with trace.use_span(tracing.restoreSpan(item.span_info)):
for c in item.changes:
zuul_driver.onChangeEnqueued(
tenant, c, self.pipeline, event)
tenant, c, self, event)
self.dequeueSupercededItems(item)
return True
@ -829,7 +830,7 @@ class PipelineManager(metaclass=ABCMeta):
# pipeline. Otherwise the change could be spammed by
# reports from unrelated pipelines.
try:
if self.pipeline.tenant.layout.getProjectPipelineConfig(
if self.tenant.layout.getProjectPipelineConfig(
ci, change):
self.sendReport(actions, ci)
except model.TemplateNotFoundError:
@ -915,9 +916,9 @@ class PipelineManager(metaclass=ABCMeta):
log.debug("%sNeeded change is merged", indent)
continue
if (self.pipeline.tenant.max_dependencies is not None and
if (self.tenant.max_dependencies is not None and
(len(dependency_graph) >
self.pipeline.tenant.max_dependencies)):
self.tenant.max_dependencies)):
log.info("%sDependency graph for change %s is too large",
indent, change)
raise exceptions.DependencyLimitExceededError(
@ -936,7 +937,7 @@ class PipelineManager(metaclass=ABCMeta):
history, quiet, indent + ' ')
def getQueueConfig(self, project):
layout = self.pipeline.tenant.layout
layout = self.tenant.layout
queue_name = None
for project_config in layout.getAllProjectConfigs(
project.canonical_name
@ -971,7 +972,7 @@ class PipelineManager(metaclass=ABCMeta):
return queue_config.dependencies_by_topic
def checkPipelineWithinLimits(self, cycle, history):
pipeline_max = self.pipeline.tenant.max_changes_per_pipeline
pipeline_max = self.tenant.max_changes_per_pipeline
if pipeline_max is None:
return True
additional = len(cycle) + len(history)
@ -1015,7 +1016,7 @@ class PipelineManager(metaclass=ABCMeta):
span_attrs = {
'zuul_event_id': item.event.zuul_event_id,
'zuul_tenant': self.pipeline.tenant.name,
'zuul_tenant': self.tenant.name,
'zuul_pipeline': self.pipeline.name,
}
for change in item.changes:
@ -1038,9 +1039,9 @@ class PipelineManager(metaclass=ABCMeta):
def dequeueSupercededItems(self, item):
for other_name in self.pipeline.supercedes:
other_pipeline = self.pipeline.tenant.layout.pipelines.get(
other_manager = self.tenant.layout.pipeline_managers.get(
other_name)
if not other_pipeline:
if not other_manager:
continue
for change in item.changes:
@ -1049,14 +1050,14 @@ class PipelineManager(metaclass=ABCMeta):
else None
)
event = model.SupercedeEvent(
other_pipeline.tenant.name,
other_pipeline.name,
other_manager.tenant.name,
other_manager.pipeline.name,
change.project.canonical_hostname,
change.project.name,
change_id,
change.ref)
self.sched.pipeline_trigger_events[
self.pipeline.tenant.name][other_pipeline.name
self.tenant.name][other_manager.pipeline.name
].put_supercede(event)
def updateCommitDependencies(self, change, event):
@ -1146,7 +1147,7 @@ class PipelineManager(metaclass=ABCMeta):
def provisionNodes(self, item):
log = item.annotateLogger(self.log)
jobs = item.findJobsToRequest(item.pipeline.tenant.semaphore_handler)
jobs = item.findJobsToRequest(self.tenant.semaphore_handler)
if not jobs:
return False
build_set = item.current_build_set
@ -1179,8 +1180,8 @@ class PipelineManager(metaclass=ABCMeta):
def _makeNodepoolRequest(self, log, build_set, job, relative_priority):
provider = self._getPausedParentProvider(build_set, job)
priority = self._calculateNodeRequestPriority(build_set, job)
tenant_name = build_set.item.pipeline.tenant.name
pipeline_name = build_set.item.pipeline.name
tenant_name = self.tenant.name
pipeline_name = self.pipeline.name
item = build_set.item
req = self.sched.nodepool.requestNodes(
build_set.uuid, job, tenant_name, pipeline_name, provider,
@ -1229,7 +1230,7 @@ class PipelineManager(metaclass=ABCMeta):
def _calculateNodeRequestPriority(self, build_set, job):
precedence_adjustment = 0
precedence = build_set.item.pipeline.precedence
precedence = self.pipeline.precedence
if self._getPausedParent(build_set, job):
precedence_adjustment = -1
initial_precedence = model.PRIORITY_MAP[precedence]
@ -1274,8 +1275,9 @@ class PipelineManager(metaclass=ABCMeta):
# If we hit an exception we don't have a build in the
# current item so a potentially aquired semaphore must be
# released as it won't be released on dequeue of the item.
pipeline = build_set.item.pipeline
tenant = pipeline.tenant
tenant = self.tenant
pipeline = self.pipeline
if COMPONENT_REGISTRY.model_api >= 33:
event_queue = self.sched.management_events[tenant.name]
else:
@ -1291,8 +1293,7 @@ class PipelineManager(metaclass=ABCMeta):
if not item.current_build_set.job_graph:
return False
jobs = item.findJobsToRun(
item.pipeline.tenant.semaphore_handler)
jobs = item.findJobsToRun(self.tenant.semaphore_handler)
if jobs:
self._executeJobs(item, jobs)
@ -1430,7 +1431,7 @@ class PipelineManager(metaclass=ABCMeta):
# config items ahead), so just use the current pipeline
# layout.
log.debug("Loading dynamic layout complete")
return item.queue.pipeline.tenant.layout
return item.manager.tenant.layout
# Untrusted layout only works with trusted updates
elif (trusted_layout and not trusted_errors and
untrusted_layout and untrusted_errors):
@ -1480,7 +1481,7 @@ class PipelineManager(metaclass=ABCMeta):
"change context. Error won't be reported.")
# We're a change to a config repo with errors not relevant
# to this repo. We use the pipeline layout.
return item.queue.pipeline.tenant.layout
return item.manager.tenant.layout
else:
raise Exception("We have reached a configuration error that is"
"not accounted for.")
@ -1505,7 +1506,7 @@ class PipelineManager(metaclass=ABCMeta):
def getFallbackLayout(self, item):
parent_item = item.item_ahead
if not parent_item:
return item.pipeline.tenant.layout
return self.tenant.layout
return self.getLayout(parent_item)
@ -1604,7 +1605,7 @@ class PipelineManager(metaclass=ABCMeta):
# If the involved projects exclude unprotected branches we should also
# exclude them from the merge and repo state except the branch of the
# change that is tested.
tenant = item.pipeline.tenant
tenant = self.tenant
items = list(item.items_ahead) + [item]
projects = {
change.project for i in items for change in i.changes
@ -1644,7 +1645,7 @@ class PipelineManager(metaclass=ABCMeta):
def scheduleGlobalRepoState(self, item):
log = item.annotateLogger(self.log)
tenant = item.pipeline.tenant
tenant = self.tenant
jobs = item.current_build_set.job_graph.getJobs()
project_cnames = set()
for job in jobs:
@ -1711,7 +1712,7 @@ class PipelineManager(metaclass=ABCMeta):
def prepareItem(self, item):
build_set = item.current_build_set
tenant = item.pipeline.tenant
tenant = self.tenant
# We always need to set the configuration of the item if it
# isn't already set.
if not build_set.ref:
@ -2045,7 +2046,7 @@ class PipelineManager(metaclass=ABCMeta):
self.log.debug("Starting queue processor: %s" % self.pipeline.name)
changed = False
change_keys = set()
for queue in self.pipeline.queues[:]:
for queue in self.state.queues[:]:
queue_changed = False
nnfi = None # Nearest non-failing item
for item in queue.queue[:]:
@ -2066,9 +2067,7 @@ class PipelineManager(metaclass=ABCMeta):
self.log.debug("Queue %s status is now:\n %s" %
(queue.name, status))
self.pipeline.change_list.setChangeKeys(
self.pipeline.manager.current_context,
change_keys)
self.change_list.setChangeKeys(self.current_context, change_keys)
self._maintainCache()
self.log.debug("Finished queue processor: %s (changed: %s)" %
(self.pipeline.name, changed))
@ -2161,13 +2160,12 @@ class PipelineManager(metaclass=ABCMeta):
log.debug("Build %s of %s completed", build, item)
pipeline = item.pipeline
if COMPONENT_REGISTRY.model_api >= 33:
event_queue = self.sched.management_events[pipeline.tenant.name]
event_queue = self.sched.management_events[self.tenant.name]
else:
event_queue = self.sched.pipeline_result_events[
pipeline.tenant.name][pipeline.name]
item.pipeline.tenant.semaphore_handler.release(
self.tenant.name][self.pipeline.name]
self.tenant.semaphore_handler.release(
event_queue, item, build.job)
if item.getJob(build.job.uuid) is None:
@ -2324,8 +2322,8 @@ class PipelineManager(metaclass=ABCMeta):
build_set.item.setNodeRequestFailure(
job, f'Node(set) request {request.id} failed')
self._resumeBuilds(build_set)
pipeline = build_set.item.pipeline
tenant = pipeline.tenant
pipeline = self.pipeline
tenant = self.tenant
if COMPONENT_REGISTRY.model_api >= 33:
event_queue = self.sched.management_events[tenant.name]
else:
@ -2498,23 +2496,23 @@ class PipelineManager(metaclass=ABCMeta):
action = 'success'
actions = self.pipeline.success_actions
item.setReportedResult('SUCCESS')
with self.pipeline.state.activeContext(self.current_context):
self.pipeline.state.consecutive_failures = 0
with self.state.activeContext(self.current_context):
self.state.consecutive_failures = 0
else:
action = 'failure'
actions = self.pipeline.failure_actions
item.setReportedResult('FAILURE')
with self.pipeline.state.activeContext(self.current_context):
self.pipeline.state.consecutive_failures += 1
if project_in_pipeline and self.pipeline.state.disabled:
with self.state.activeContext(self.current_context):
self.state.consecutive_failures += 1
if project_in_pipeline and self.state.disabled:
actions = self.pipeline.disabled_actions
# Check here if we should disable so that we only use the disabled
# reporters /after/ the last disable_at failure is still reported as
# normal.
if (self.pipeline.disable_at and not self.pipeline.state.disabled and
self.pipeline.state.consecutive_failures
if (self.pipeline.disable_at and not self.state.disabled and
self.state.consecutive_failures
>= self.pipeline.disable_at):
self.pipeline.state.updateAttributes(
self.state.updateAttributes(
self.current_context, disabled=True)
if actions:
log.info("Reporting item %s, actions: %s", item, actions)
@ -2539,7 +2537,7 @@ class PipelineManager(metaclass=ABCMeta):
for i in self.state.getAllItems())
# TODO(jeblair): add items keys like changes
tenant = self.pipeline.tenant
tenant = self.tenant
basekey = 'zuul.tenant.%s' % tenant.name
key = '%s.pipeline.%s' % (basekey, self.pipeline.name)
# stats.timers.zuul.tenant.<tenant>.pipeline.<pipeline>.resident_time
@ -2547,7 +2545,7 @@ class PipelineManager(metaclass=ABCMeta):
# stats.gauges.zuul.tenant.<tenant>.pipeline.<pipeline>.current_changes
# stats.gauges.zuul.tenant.<tenant>.pipeline.<pipeline>.window
self.sched.statsd.gauge(key + '.current_changes', changes)
self.sched.statsd.gauge(key + '.window', item.pipeline.window)
self.sched.statsd.gauge(key + '.window', self.pipeline.window)
if dt:
self.sched.statsd.timing(key + '.resident_time', dt)
self.sched.statsd.incr(key + '.total_changes', item_changes)

View File

@ -34,8 +34,8 @@ class DependentPipelineManager(SharedQueuePipelineManager):
def constructChangeQueue(self, queue_name):
p = self.pipeline
return model.ChangeQueue.new(
p.manager.current_context,
pipeline=p,
self.current_context,
manager=self,
window=p.window,
window_floor=p.window_floor,
window_ceiling=p.window_ceiling,
@ -87,7 +87,7 @@ class DependentPipelineManager(SharedQueuePipelineManager):
return
# for project in change_queue, project.source get changes, then dedup.
projects = [self.pipeline.tenant.getProject(pcn)[1] for pcn, _ in
projects = [self.tenant.getProject(pcn)[1] for pcn, _ in
change_queue.project_branches]
sources = {p.source for p in projects}
@ -100,7 +100,7 @@ class DependentPipelineManager(SharedQueuePipelineManager):
log.debug(" Checking source: %s", source)
for c in source.getChangesDependingOn(change,
projects,
self.pipeline.tenant):
self.tenant):
if c not in seen:
seen.add(c)
needed_by_changes.append(c)

View File

@ -31,8 +31,8 @@ class IndependentPipelineManager(PipelineManager):
if existing:
return DynamicChangeQueueContextManager(existing)
change_queue = model.ChangeQueue.new(
self.pipeline.manager.current_context,
pipeline=self.pipeline,
self.current_context,
manager=self,
dynamic=True)
change_queue.addProject(change.project, None)
self.state.addQueue(change_queue)

View File

@ -22,8 +22,8 @@ class SerialPipelineManager(SharedQueuePipelineManager):
def constructChangeQueue(self, queue_name):
return model.ChangeQueue.new(
self.pipeline.manager.current_context,
pipeline=self.pipeline,
self.current_context,
manager=self,
window=1,
window_floor=1,
window_ceiling=1,

View File

@ -65,7 +65,7 @@ class SharedQueuePipelineManager(PipelineManager, metaclass=ABCMeta):
def buildChangeQueues(self, layout):
self.log.debug("Building shared change queues")
change_queues_managers = {}
tenant = self.pipeline.tenant
tenant = self.tenant
layout_project_configs = layout.project_configs
for project_name, project_configs in layout_project_configs.items():
@ -139,8 +139,8 @@ class SharedQueuePipelineManager(PipelineManager, metaclass=ABCMeta):
# There is no existing queue for this change. Create a
# dynamic one for this one change's use
change_queue = model.ChangeQueue.new(
self.pipeline.manager.current_context,
pipeline=self.pipeline,
self.current_context,
manager=self,
dynamic=True)
change_queue.addProject(change.project, None)
self.state.addQueue(change_queue)

View File

@ -33,7 +33,7 @@ class SupercedentPipelineManager(PipelineManager):
# Don't use Pipeline.getQueue to find an existing queue
# because we're matching project and (branch or ref).
for queue in self.pipeline.queues:
for queue in self.state.queues:
if (queue.queue[-1].changes[0].project == change.project and
((hasattr(change, 'branch') and
hasattr(queue.queue[-1].changes[0], 'branch') and
@ -42,8 +42,8 @@ class SupercedentPipelineManager(PipelineManager):
log.debug("Found existing queue %s", queue)
return DynamicChangeQueueContextManager(queue)
change_queue = model.ChangeQueue.new(
self.pipeline.manager.current_context,
pipeline=self.pipeline,
self.current_context,
manager=self,
window=1,
window_floor=1,
window_ceiling=1,
@ -51,7 +51,7 @@ class SupercedentPipelineManager(PipelineManager):
window_decrease_type='none',
dynamic=True)
change_queue.addProject(change.project, None)
self.pipeline.manager.state.addQueue(change_queue)
self.state.addQueue(change_queue)
log.debug("Dynamically created queue %s", change_queue)
return DynamicChangeQueueContextManager(
change_queue, allow_delete=True)
@ -62,7 +62,7 @@ class SupercedentPipelineManager(PipelineManager):
# between. This is what causes the last item to "supercede"
# any previously enqueued items (which we know aren't running
# jobs because the window size is 1).
for queue in self.pipeline.queues[:]:
for queue in self.state.queues[:]:
remove = queue.queue[1:-1]
for item in remove:
self.log.debug("Item %s is superceded by %s, removing" %

View File

@ -79,8 +79,8 @@ class MergeClient(object):
if build_set is not None:
build_set_uuid = build_set.uuid
tenant_name = build_set.item.pipeline.tenant.name
pipeline_name = build_set.item.pipeline.name
tenant_name = build_set.item.manager.tenant.name
pipeline_name = build_set.item.manager.pipeline.name
parent_span = tracing.restoreSpan(build_set.span_info)
with trace.use_span(parent_span):

View File

@ -577,7 +577,7 @@ class Pipeline(object):
self.post_review = False
self.dequeue_on_new_patchset = True
self.ignore_dependencies = False
self.manager = None
self.manager_name = None
self.precedence = PRECEDENCE_NORMAL
self.supercedes = []
self.triggers = []
@ -600,22 +600,6 @@ class Pipeline(object):
self.ref_filters = []
self.event_filters = []
@property
def queues(self):
return self.manager.state.queues
@property
def state(self):
return self.manager.state
@property
def change_list(self):
return self.manager.change_list
@property
def summary(self):
return self.manager.summary
@property
def actions(self):
return (
@ -647,20 +631,17 @@ class Pipeline(object):
this=self.name,
other=pipeline))
def setManager(self, manager):
self.manager = manager
def formatStatusJSON(self, websocket_url=None):
def formatStatusJSON(self, manager, websocket_url=None):
j_pipeline = dict(name=self.name,
description=self.description,
state=self.state.state,
manager=self.manager.type)
state=manager.state.state,
manager=manager.type)
j_pipeline['triggers'] = [
{'driver': t.driver.name} for t in self.triggers
]
j_queues = []
j_pipeline['change_queues'] = j_queues
for queue in self.queues:
for queue in manager.state.queues:
if not queue.queue:
continue
j_queue = dict(name=queue.name)
@ -702,8 +683,8 @@ class PipelineState(zkobject.ZKObject):
consecutive_failures=0,
disabled=False,
layout_uuid=None,
# Local pipeline reference (not persisted in Zookeeper)
pipeline=None,
# Local pipeline manager reference (not persisted in Zookeeper)
manager=None,
_read_only=False,
)
@ -716,24 +697,25 @@ class PipelineState(zkobject.ZKObject):
old_queues=[],
consecutive_failures=0,
disabled=False,
layout_uuid=self.pipeline.tenant.layout.uuid,
layout_uuid=self.manager.tenant.layout.uuid,
)
@classmethod
def fromZK(klass, context, path, pipeline, **kw):
def fromZK(klass, context, path, manager, **kw):
obj = klass()
obj._set(pipeline=pipeline, **kw)
# Bind the state to the pipeline, so child objects can access
obj._set(manager=manager, **kw)
# Bind the state to the manager, so child objects can access
# the the full pipeline state.
pipeline.state = obj
manager.state = obj
obj._load(context, path=path)
return obj
@classmethod
def create(cls, pipeline, old_state=None):
def create(cls, manager, old_state=None):
# If we are resetting an existing pipeline, we will have an
# old_state, so just clean up the object references there and
# let the next refresh handle updating any data.
# TODO: This apparently hasn't been called in some time; fix.
if old_state:
old_state._resetObjectRefs()
return old_state
@ -742,26 +724,26 @@ class PipelineState(zkobject.ZKObject):
# seen before. It still might exist in ZK, but since we
# haven't seen it, we don't have any object references to
# clean up. We can just start with a clean object, set the
# pipeline reference, and let the next refresh deal with
# manager reference, and let the next refresh deal with
# whether there might be any data in ZK.
state = cls()
state._set(pipeline=pipeline)
state._set(manager=manager)
return state
def _resetObjectRefs(self):
# Update the pipeline references on the queue objects.
for queue in self.queues + self.old_queues:
queue.pipeline = self.pipeline
queue.manager = self.manager
def getPath(self):
if hasattr(self, '_path'):
return self._path
return self.pipelinePath(self.pipeline)
return self.pipelinePath(self.manager)
@classmethod
def pipelinePath(cls, pipeline):
safe_tenant = urllib.parse.quote_plus(pipeline.tenant.name)
safe_pipeline = urllib.parse.quote_plus(pipeline.name)
def pipelinePath(cls, manager):
safe_tenant = urllib.parse.quote_plus(manager.tenant.name)
safe_pipeline = urllib.parse.quote_plus(manager.pipeline.name)
return f"/zuul/tenant/{safe_tenant}/pipeline/{safe_pipeline}"
@classmethod
@ -795,7 +777,7 @@ class PipelineState(zkobject.ZKObject):
self.old_queues.remove(queue)
def addQueue(self, queue):
with self.activeContext(self.pipeline.manager.current_context):
with self.activeContext(self.manager.current_context):
self.queues.append(queue)
def getQueue(self, project_cname, branch):
@ -807,14 +789,14 @@ class PipelineState(zkobject.ZKObject):
def removeQueue(self, queue):
if queue in self.queues:
with self.activeContext(self.pipeline.manager.current_context):
with self.activeContext(self.manager.current_context):
self.queues.remove(queue)
queue.delete(self.pipeline.manager.current_context)
queue.delete(self.manager.current_context)
def promoteQueue(self, queue):
if queue not in self.queues:
return
with self.activeContext(self.pipeline.manager.current_context):
with self.activeContext(self.manager.current_context):
self.queues.remove(queue)
self.queues.insert(0, queue)
@ -871,7 +853,7 @@ class PipelineState(zkobject.ZKObject):
# deserialize() is used instead.
context.log.warning("Initializing pipeline state for %s; "
"this is expected only for new pipelines",
self.pipeline.name)
self.manager.pipeline.name)
self._set(**self._lateInitData())
self.internalCreate(context)
@ -879,7 +861,7 @@ class PipelineState(zkobject.ZKObject):
# We may have old change objects in the pipeline cache, so
# make sure they are the same objects we would get from the
# source change cache.
self.pipeline.manager.clearCache()
self.manager.clearCache()
# If the object doesn't exist we will get back an empty byte
# string. This happens because the postConfig call creates
@ -892,7 +874,7 @@ class PipelineState(zkobject.ZKObject):
if raw == b'':
context.log.warning("Initializing pipeline state for %s; "
"this is expected only for new pipelines",
self.pipeline.name)
self.manager.pipeline.name)
return self._lateInitData()
data = super().deserialize(raw, context)
@ -901,7 +883,7 @@ class PipelineState(zkobject.ZKObject):
# Skip this check if we're in a context where we want to
# read the state without updating it (in case we're not
# certain that the layout is up to date).
if data['layout_uuid'] != self.pipeline.tenant.layout.uuid:
if data['layout_uuid'] != self.manager.tenant.layout.uuid:
# The tenant layout has updated since our last state; we
# need to reset the state.
data = dict(
@ -910,7 +892,7 @@ class PipelineState(zkobject.ZKObject):
old_queues=data["old_queues"] + data["queues"],
consecutive_failures=0,
disabled=False,
layout_uuid=self.pipeline.tenant.layout.uuid,
layout_uuid=self.manager.tenant.layout.uuid,
)
existing_queues = {
@ -927,7 +909,7 @@ class PipelineState(zkobject.ZKObject):
queue.refresh(context)
else:
queue = ChangeQueue.fromZK(context, queue_path,
pipeline=self.pipeline)
manager=self.manager)
old_queues.append(queue)
queues = []
@ -937,18 +919,18 @@ class PipelineState(zkobject.ZKObject):
queue.refresh(context)
else:
queue = ChangeQueue.fromZK(context, queue_path,
pipeline=self.pipeline)
manager=self.manager)
queues.append(queue)
if hasattr(self.pipeline.manager, "change_queue_managers"):
if hasattr(self.manager, "change_queue_managers"):
# Clear out references to old queues
for cq_manager in self.pipeline.manager.change_queue_managers:
for cq_manager in self.manager.change_queue_managers:
cq_manager.created_for_branches.clear()
# Add queues to matching change queue managers
for queue in queues:
project_cname, branch = queue.project_branches[0]
for cq_manager in self.pipeline.manager.change_queue_managers:
for cq_manager in self.manager.change_queue_managers:
managed_projects = {
p.canonical_name for p in cq_manager.projects
}
@ -989,8 +971,8 @@ class PipelineState(zkobject.ZKObject):
items_referenced_by_builds.add(build.build_set.item.uuid)
stale_items = all_items - known_items - items_referenced_by_builds
for item_uuid in stale_items:
self.pipeline.manager.log.debug("Cleaning up stale item %s",
item_uuid)
self.manager.log.debug("Cleaning up stale item %s",
item_uuid)
context.client.delete(QueueItem.itemPath(pipeline_path, item_uuid),
recursive=True)
@ -1003,8 +985,8 @@ class PipelineState(zkobject.ZKObject):
known_queues = {q.uuid for q in (*self.old_queues, *self.queues)}
stale_queues = all_queues - known_queues
for queue_uuid in stale_queues:
self.pipeline.manager.log.debug("Cleaning up stale queue %s",
queue_uuid)
self.manager.log.debug("Cleaning up stale queue %s",
queue_uuid)
context.client.delete(
ChangeQueue.queuePath(pipeline_path, queue_uuid),
recursive=True)
@ -1050,7 +1032,7 @@ class PipelineChangeList(zkobject.ShardedZKObject):
context.log.warning(
"Initializing pipeline change list for %s; "
"this is expected only for new pipelines",
self.pipeline.name)
self.manager.pipeline.name)
self.internalCreate(context)
else:
# If we're called from a context where we can't
@ -1058,21 +1040,21 @@ class PipelineChangeList(zkobject.ShardedZKObject):
raise
def getPath(self):
return self.getChangeListPath(self.pipeline)
return self.getChangeListPath(self.manager)
@classmethod
def getChangeListPath(cls, pipeline):
pipeline_path = pipeline.state.getPath()
def getChangeListPath(cls, manager):
pipeline_path = manager.state.getPath()
return pipeline_path + '/change_list'
@classmethod
def create(cls, pipeline):
def create(cls, manager):
# This object may or may not exist in ZK, but we using any of
# that data here. We can just start with a clean object, set
# the pipeline reference, and let the next refresh deal with
# the manager reference, and let the next refresh deal with
# whether there might be any data in ZK.
change_list = cls()
change_list._set(pipeline=pipeline)
change_list._set(manager=manager)
return change_list
def serialize(self, context):
@ -1116,10 +1098,12 @@ class PipelineSummary(zkobject.ShardedZKObject):
)
def getPath(self):
return f"{PipelineState.pipelinePath(self.pipeline)}/status"
return f"{PipelineState.pipelinePath(self.manager)}/status"
def update(self, context, zuul_globals):
status = self.pipeline.formatStatusJSON(zuul_globals.websocket_url)
status = self.manager.pipeline.formatStatusJSON(
self.manager,
zuul_globals.websocket_url)
self.updateAttributes(context, status=status)
def serialize(self, context):
@ -1167,7 +1151,7 @@ class ChangeQueue(zkobject.ZKObject):
super().__init__()
self._set(
uuid=uuid4().hex,
pipeline=None,
manager=None,
name="",
project_branches=[],
_jobs=set(),
@ -1250,7 +1234,7 @@ class ChangeQueue(zkobject.ZKObject):
return data
def getPath(self):
pipeline_path = self.pipeline.state.getPath()
pipeline_path = self.manager.state.getPath()
return self.queuePath(pipeline_path, self.uuid)
@classmethod
@ -1259,10 +1243,10 @@ class ChangeQueue(zkobject.ZKObject):
@property
def zk_context(self):
return self.pipeline.manager.current_context
return self.manager.current_context
def __repr__(self):
return '<ChangeQueue %s: %s>' % (self.pipeline.name, self.name)
return '<ChangeQueue %s: %s>' % (self.manager.pipeline.name, self.name)
def getJobs(self):
return self._jobs
@ -1295,7 +1279,7 @@ class ChangeQueue(zkobject.ZKObject):
elif getattr(event, 'orig_ref', None):
event_ref_cache_key = event.orig_ref
elif hasattr(event, 'canonical_project_name'):
trusted, project = self.pipeline.tenant.getProject(
trusted, project = self.manager.tenant.getProject(
event.canonical_project_name)
if project:
change_key = project.source.getChangeKey(event)
@ -1305,7 +1289,7 @@ class ChangeQueue(zkobject.ZKObject):
# above; it's unclear what other unhandled event would
# cause an enqueue, but if it happens, log and
# continue.
self.pipeline.manager.log.warning(
self.manager.log.warning(
"Unable to identify triggering ref from event %s",
event)
event_info = EventInfo.fromEvent(event, event_ref_cache_key)
@ -3532,7 +3516,7 @@ class FrozenJob(zkobject.ZKObject):
if self.waiting_status == status:
return
self.updateAttributes(
self.buildset.item.pipeline.manager.current_context,
self.buildset.item.manager.current_context,
waiting_status=status)
def _getJobData(self, name):
@ -3645,7 +3629,7 @@ class FrozenJob(zkobject.ZKObject):
return data
def setParentData(self, parent_data, secret_parent_data, artifact_data):
context = self.buildset.item.pipeline.manager.current_context
context = self.buildset.item.manager.current_context
kw = {}
if self.parent_data != parent_data:
kw['_parent_data'] = self._makeJobData(
@ -3658,11 +3642,11 @@ class FrozenJob(zkobject.ZKObject):
context, 'artifact_data', artifact_data)
if kw:
self.updateAttributes(
self.buildset.item.pipeline.manager.current_context,
self.buildset.item.manager.current_context,
**kw)
def setArtifactData(self, artifact_data):
context = self.buildset.item.pipeline.manager.current_context
context = self.buildset.item.manager.current_context
if self.artifact_data != artifact_data:
self.updateAttributes(
context,
@ -3924,7 +3908,7 @@ class Job(ConfigObject):
project_metadata.default_branch
else:
role['project_default_branch'] = 'master'
role_trusted, role_project = item.pipeline.tenant.getProject(
role_trusted, role_project = item.manager.tenant.getProject(
role['project_canonical_name'])
role_connection = role_project.source.connection
role['connection'] = role_connection.connection_name
@ -5607,7 +5591,7 @@ class Build(zkobject.ZKObject):
@property
def pipeline(self):
return self.build_set.item.pipeline
return self.build_set.item.manager.pipeline
@property
def log_url(self):
@ -6166,7 +6150,7 @@ class BuildSet(zkobject.ZKObject):
self.addBuilds([build])
def addBuilds(self, builds):
with self.activeContext(self.item.pipeline.manager.current_context):
with self.activeContext(self.item.manager.current_context):
for build in builds:
self._addBuild(build)
@ -6176,12 +6160,12 @@ class BuildSet(zkobject.ZKObject):
self.tries[build.job.uuid] = 1
def addRetryBuild(self, build):
with self.activeContext(self.item.pipeline.manager.current_context):
with self.activeContext(self.item.manager.current_context):
self.retry_builds.setdefault(
build.job.uuid, []).append(build)
def removeBuild(self, build):
with self.activeContext(self.item.pipeline.manager.current_context):
with self.activeContext(self.item.manager.current_context):
self.tries[build.job.uuid] += 1
del self.builds[build.job.uuid]
@ -6218,19 +6202,19 @@ class BuildSet(zkobject.ZKObject):
def setJobNodeSetInfo(self, job, nodeset_info):
if job.uuid in self.nodeset_info:
raise Exception("Prior node request for %s", job.name)
with self.activeContext(self.item.pipeline.manager.current_context):
with self.activeContext(self.item.manager.current_context):
self.nodeset_info[job.uuid] = nodeset_info
def removeJobNodeSetInfo(self, job):
if job.uuid not in self.nodeset_info:
raise Exception("No job nodeset for %s" % (job.name))
with self.activeContext(self.item.pipeline.manager.current_context):
with self.activeContext(self.item.manager.current_context):
del self.nodeset_info[job.uuid]
def setJobNodeRequestID(self, job, request_id):
if job.uuid in self.node_requests:
raise Exception("Prior node request for %s" % (job.name))
with self.activeContext(self.item.pipeline.manager.current_context):
with self.activeContext(self.item.manager.current_context):
self.node_requests[job.uuid] = request_id
def getJobNodeRequestID(self, job):
@ -6243,7 +6227,7 @@ class BuildSet(zkobject.ZKObject):
def removeJobNodeRequestID(self, job):
if job.uuid in self.node_requests:
with self.activeContext(
self.item.pipeline.manager.current_context):
self.item.manager.current_context):
del self.node_requests[job.uuid]
def getTries(self, job):
@ -6270,7 +6254,7 @@ class BuildSet(zkobject.ZKObject):
break
item = item.item_ahead
if not project_metadata:
layout = self.item.pipeline.tenant.layout
layout = self.item.manager.tenant.layout
if layout:
project_metadata = layout.getProjectMetadata(
project.canonical_name
@ -6378,9 +6362,9 @@ class QueueItem(zkobject.ZKObject):
)
@property
def pipeline(self):
def manager(self):
if self.queue:
return self.queue.pipeline
return self.queue.manager
return None
@classmethod
@ -6407,7 +6391,7 @@ class QueueItem(zkobject.ZKObject):
return obj
def getPath(self):
return self.itemPath(PipelineState.pipelinePath(self.pipeline),
return self.itemPath(PipelineState.pipelinePath(self.manager),
self.uuid)
@classmethod
@ -6462,7 +6446,7 @@ class QueueItem(zkobject.ZKObject):
self._set(uuid=data["uuid"])
event = EventInfo.fromDict(data["event"]["data"])
changes = self.pipeline.manager.resolveChangeReferences(
changes = self.manager.resolveChangeReferences(
data["changes"])
build_set = self.current_build_set
if build_set and build_set.getPath() == data["current_build_set"]:
@ -6488,8 +6472,8 @@ class QueueItem(zkobject.ZKObject):
return get_annotated_logger(logger, self.event)
def __repr__(self):
if self.pipeline:
pipeline = self.pipeline.name
if self.manager:
pipeline = self.manager.pipeline.name
else:
pipeline = None
if self.live:
@ -6500,7 +6484,7 @@ class QueueItem(zkobject.ZKObject):
self.uuid, live, self.changes, pipeline)
def resetAllBuilds(self):
context = self.pipeline.manager.current_context
context = self.manager.current_context
old_build_set = self.current_build_set
have_all_files = all(c.files is not None for c in self.changes)
files_state = (BuildSet.COMPLETE if have_all_files else BuildSet.NEW)
@ -6534,14 +6518,14 @@ class QueueItem(zkobject.ZKObject):
self.current_build_set.removeBuild(build)
def setReportedResult(self, result):
self.updateAttributes(self.pipeline.manager.current_context,
self.updateAttributes(self.manager.current_context,
report_time=time.time())
self.current_build_set.updateAttributes(
self.pipeline.manager.current_context, result=result)
self.manager.current_context, result=result)
def warning(self, msgs):
with self.current_build_set.activeContext(
self.pipeline.manager.current_context):
self.manager.current_context):
if not isinstance(msgs, list):
msgs = [msgs]
for msg in msgs:
@ -6695,7 +6679,7 @@ class QueueItem(zkobject.ZKObject):
trusted and untrusted configs"""
includes_trusted = False
includes_untrusted = False
tenant = self.pipeline.tenant
tenant = self.manager.tenant
item = self
while item:
@ -6716,8 +6700,8 @@ class QueueItem(zkobject.ZKObject):
def updatesConfig(self):
"""Returns whether the changes update the config"""
for change in self.changes:
if change.updatesConfig(self.pipeline.tenant):
tenant_project = self.pipeline.tenant.getProject(
if change.updatesConfig(self.manager.tenant):
tenant_project = self.manager.tenant.getProject(
change.project.canonical_name
)[1]
# If the cycle doesn't update the config or a change
@ -6754,13 +6738,13 @@ class QueueItem(zkobject.ZKObject):
self.log.debug("Checking DB for requirements")
requirements_tuple = tuple(sorted(requirements))
if requirements_tuple not in self._cached_sql_results:
conn = self.pipeline.manager.sched.connections.getSqlConnection()
conn = self.manager.sched.connections.getSqlConnection()
if conn:
for change in self.changes:
builds = conn.getBuilds(
tenant=self.pipeline.tenant.name,
tenant=self.manager.tenant.name,
project=change.project.name,
pipeline=self.pipeline.name,
pipeline=self.manager.pipeline.name,
change=change.number,
branch=change.branch,
patchset=change.patchset,
@ -6818,7 +6802,7 @@ class QueueItem(zkobject.ZKObject):
# Look for this item in other queues in the pipeline.
item = None
found = False
for item in self.pipeline.state.getAllItems():
for item in self.manager.state.getAllItems():
if item.live and set(item.changes) == set(self.changes):
found = True
break
@ -6878,13 +6862,13 @@ class QueueItem(zkobject.ZKObject):
return ret
except RequirementsError as e:
self.log.info(str(e))
fakebuild = Build.new(self.pipeline.manager.current_context,
fakebuild = Build.new(self.manager.current_context,
job=job, build_set=self.current_build_set,
error_detail=str(e), result='FAILURE')
self.addBuild(fakebuild)
self.pipeline.manager.sched.reportBuildEnd(
self.manager.sched.reportBuildEnd(
fakebuild,
tenant=self.pipeline.tenant.name,
tenant=self.manager.tenant.name,
final=True)
self.setResult(fakebuild)
return False
@ -7025,7 +7009,7 @@ class QueueItem(zkobject.ZKObject):
# it, set it here.
if job.queued is not True:
job.updateAttributes(
self.pipeline.manager.current_context,
self.manager.current_context,
queued=True)
# Attempt to request nodes for jobs in the order jobs appear
@ -7076,7 +7060,7 @@ class QueueItem(zkobject.ZKObject):
toreq.append(job)
if job.queued is not True:
job.updateAttributes(
self.pipeline.manager.current_context,
self.manager.current_context,
queued=True)
else:
sem_names = ','.join([s.name for s in job.semaphores])
@ -7133,21 +7117,21 @@ class QueueItem(zkobject.ZKObject):
child_build = self.current_build_set.getBuild(job)
if not child_build:
fake_builds.append(
Build.new(self.pipeline.manager.current_context,
Build.new(self.manager.current_context,
job=job,
build_set=self.current_build_set,
error_detail=skipped_reason,
result='SKIPPED'))
if fake_builds:
self.addBuilds(fake_builds)
self.pipeline.manager.sched.reportBuildEnds(
self.manager.sched.reportBuildEnds(
fake_builds,
tenant=self.pipeline.tenant.name,
tenant=self.manager.tenant.name,
final=True)
def setNodeRequestFailure(self, job, error):
fakebuild = Build.new(
self.pipeline.manager.current_context,
self.manager.current_context,
job=job,
build_set=self.current_build_set,
start_time=time.time(),
@ -7156,27 +7140,27 @@ class QueueItem(zkobject.ZKObject):
result='NODE_FAILURE',
)
self.addBuild(fakebuild)
self.pipeline.manager.sched.reportBuildEnd(
self.manager.sched.reportBuildEnd(
fakebuild,
tenant=self.pipeline.tenant.name,
tenant=self.manager.tenant.name,
final=True)
self.setResult(fakebuild)
def setDequeuedNeedingChange(self, msg):
self.updateAttributes(
self.pipeline.manager.current_context,
self.manager.current_context,
dequeued_needing_change=msg)
self._setAllJobsSkipped(msg)
def setDequeuedMissingRequirements(self):
self.updateAttributes(
self.pipeline.manager.current_context,
self.manager.current_context,
dequeued_missing_requirements=True)
self._setAllJobsSkipped('Missing pipeline requirements')
def setUnableToMerge(self, errors=None):
with self.current_build_set.activeContext(
self.pipeline.manager.current_context):
self.manager.current_context):
self.current_build_set.unable_to_merge = True
if errors:
for msg in errors:
@ -7200,7 +7184,7 @@ class QueueItem(zkobject.ZKObject):
self)
if self.current_build_set.config_errors != errors:
with self.current_build_set.activeContext(
self.pipeline.manager.current_context):
self.manager.current_context):
self.current_build_set.setConfigErrors(errors)
if [x for x in errors if x.severity == SEVERITY_ERROR]:
self._setAllJobsSkipped('Buildset configuration error')
@ -7209,14 +7193,14 @@ class QueueItem(zkobject.ZKObject):
fake_builds = []
for job in self.getJobs():
fake_builds.append(Build.new(
self.pipeline.manager.current_context,
self.manager.current_context,
job=job, build_set=self.current_build_set,
error_detail=msg, result='SKIPPED'))
if fake_builds:
self.addBuilds(fake_builds)
self.pipeline.manager.sched.reportBuildEnds(
self.manager.sched.reportBuildEnds(
fake_builds,
tenant=self.pipeline.tenant.name,
tenant=self.manager.tenant.name,
final=True)
def _setMissingJobsSkipped(self, msg):
@ -7227,14 +7211,14 @@ class QueueItem(zkobject.ZKObject):
# We already have a build for this job
continue
fake_builds.append(Build.new(
self.pipeline.manager.current_context,
self.manager.current_context,
job=job, build_set=self.current_build_set,
error_detail=msg, result='SKIPPED'))
if fake_builds:
self.addBuilds(fake_builds)
self.pipeline.manager.sched.reportBuildEnds(
self.manager.sched.reportBuildEnds(
fake_builds,
tenant=self.pipeline.tenant.name,
tenant=self.manager.tenant.name,
final=True)
def formatUrlPattern(self, url_pattern, job=None, build=None):
@ -7248,8 +7232,8 @@ class QueueItem(zkobject.ZKObject):
safe_change = change.getSafeAttributes()
else:
safe_change = self.changes[0].getSafeAttributes()
safe_pipeline = self.pipeline.getSafeAttributes()
safe_tenant = self.pipeline.tenant.getSafeAttributes()
safe_pipeline = self.manager.pipeline.getSafeAttributes()
safe_tenant = self.manager.tenant.getSafeAttributes()
safe_buildset = self.current_build_set.getSafeAttributes()
safe_job = job.getSafeAttributes() if job else {}
safe_build = build.getSafeAttributes() if build else {}
@ -7277,7 +7261,7 @@ class QueueItem(zkobject.ZKObject):
def formatJobResult(self, job, build=None):
if build is None:
build = self.current_build_set.getBuild(job)
pattern = urllib.parse.urljoin(self.pipeline.tenant.web_root,
pattern = urllib.parse.urljoin(self.manager.tenant.web_root,
'build/{build.uuid}')
url = self.formatUrlPattern(pattern, job, build)
result = build.result
@ -7291,7 +7275,7 @@ class QueueItem(zkobject.ZKObject):
def formatItemUrl(self):
# If we don't have a web root set, we can't format any url
if not self.pipeline.tenant.web_root:
if not self.manager.tenant.web_root:
# Apparently we have no website
return None
@ -7299,7 +7283,7 @@ class QueueItem(zkobject.ZKObject):
# We have reported (or are reporting) and so we should
# send the buildset page url
pattern = urllib.parse.urljoin(
self.pipeline.tenant.web_root, "buildset/{buildset.uuid}"
self.manager.tenant.web_root, "buildset/{buildset.uuid}"
)
return self.formatUrlPattern(pattern)
@ -7308,7 +7292,7 @@ class QueueItem(zkobject.ZKObject):
# url. TODO: require a database, insert buildsets into it
# when they are created, and remove this case.
pattern = urllib.parse.urljoin(
self.pipeline.tenant.web_root,
self.manager.tenant.web_root,
"status/change/{change.number},{change.patchset}",
)
return self.formatUrlPattern(pattern)
@ -7482,8 +7466,8 @@ class QueueItem(zkobject.ZKObject):
def updatesJobConfig(self, job, change, layout):
log = self.annotateLogger(self.log)
layout_ahead = None
if self.pipeline.manager:
layout_ahead = self.pipeline.manager.getFallbackLayout(self)
if self.manager:
layout_ahead = self.manager.getFallbackLayout(self)
if layout_ahead and layout and layout is not layout_ahead:
# This change updates the layout. Calculate the job as it
# would be if the layout had not changed.
@ -7512,7 +7496,7 @@ class QueueItem(zkobject.ZKObject):
if old_job is None:
log.debug("Found a newly created job")
return True # A newly created job
if (job.getConfigHash(self.pipeline.tenant) !=
if (job.getConfigHash(self.manager.tenant) !=
old_job.config_hash):
log.debug("Found an updated job")
return True # This job's configuration has changed
@ -7537,7 +7521,7 @@ class QueueItem(zkobject.ZKObject):
return None
if not self.event.ref:
return None
sched = self.pipeline.manager.sched
sched = self.manager.sched
key = ChangeKey.fromReference(self.event.ref)
source = sched.connections.getSource(key.connection_name)
return source.getChange(key)
@ -9396,6 +9380,7 @@ class Layout(object):
self.project_templates = {}
self.project_metadata = {}
self.pipelines = OrderedDict()
self.pipeline_managers = OrderedDict()
# This is a dictionary of name -> [jobs]. The first element
# of the list is the first job added with that name. It is
# the reference definition for a given job. Subsequent
@ -9598,18 +9583,13 @@ class Layout(object):
"Provider %s is already defined" % provider.name)
self.providers[provider.name] = provider
def addPipeline(self, pipeline):
if pipeline.tenant is not self.tenant:
raise Exception("Pipeline created for tenant %s "
"may not be added to %s" % (
pipeline.tenant,
self.tenant))
def addPipeline(self, pipeline, manager):
if pipeline.name in self.pipelines:
raise Exception(
"Pipeline %s is already defined" % pipeline.name)
self.pipelines[pipeline.name] = pipeline
self.pipeline_managers[pipeline.name] = manager
def addProjectTemplate(self, project_template):
for job in project_template.embeddedJobs():
@ -9709,7 +9689,8 @@ class Layout(object):
for template_name in pc.templates:
templates = self.getProjectTemplates(template_name)
for template in templates:
template_ppc = template.pipelines.get(item.pipeline.name)
template_ppc = template.pipelines.get(
item.manager.pipeline.name)
if template_ppc:
if not template.changeMatches(
self.tenant, change.project.canonical_name,
@ -9732,7 +9713,7 @@ class Layout(object):
# these again)
ppc.updateVariables(pc.variables)
project_ppc = pc.pipelines.get(item.pipeline.name)
project_ppc = pc.pipelines.get(item.manager.pipeline.name)
if project_ppc:
project_in_pipeline = True
ppc.update(project_ppc)
@ -9847,9 +9828,9 @@ class Layout(object):
skip_file_matcher, redact_secrets_and_keys,
debug_messages, pending_errors):
log = item.annotateLogger(self.log)
semaphore_handler = item.pipeline.tenant.semaphore_handler
semaphore_handler = item.manager.tenant.semaphore_handler
job_list = ppc.job_list
pipeline = item.pipeline
pipeline = item.manager.pipeline
add_debug_line(debug_messages, "Freezing job graph")
for jobname in job_list.jobs:
# This is the final job we are constructing

View File

@ -178,8 +178,8 @@ class BaseReporter(object, metaclass=abc.ABCMeta):
debug = '\n '.join(item.current_build_set.debug_messages)
ret += '\nDebug information:\n ' + debug + '\n'
if item.pipeline.footer_message:
ret += '\n' + item.pipeline.footer_message
if item.manager.pipeline.footer_message:
ret += '\n' + item.manager.pipeline.footer_message
return ret
@ -188,8 +188,8 @@ class BaseReporter(object, metaclass=abc.ABCMeta):
status_url = item.formatUrlPattern(status_url)
# change, changes, and status_url are deprecated
return item.pipeline.enqueue_message.format(
pipeline=item.pipeline.getSafeAttributes(),
return item.manager.pipeline.enqueue_message.format(
pipeline=item.manager.pipeline.getSafeAttributes(),
change=item.changes[0].getSafeAttributes(),
changes=[c.getSafeAttributes() for c in item.changes],
item_url=item.formatItemUrl(),
@ -200,15 +200,15 @@ class BaseReporter(object, metaclass=abc.ABCMeta):
status_url = item.formatUrlPattern(status_url)
# change, changes, and status_url are deprecated
return item.pipeline.start_message.format(
pipeline=item.pipeline.getSafeAttributes(),
return item.manager.pipeline.start_message.format(
pipeline=item.manager.pipeline.getSafeAttributes(),
change=item.changes[0].getSafeAttributes(),
changes=[c.getSafeAttributes() for c in item.changes],
item_url=item.formatItemUrl(),
status_url=status_url)
def _formatItemReportSuccess(self, item, change, with_jobs=True):
msg = item.pipeline.success_message
msg = item.manager.pipeline.success_message
if with_jobs:
item_url = item.formatItemUrl()
if item_url is not None:
@ -254,12 +254,12 @@ class BaseReporter(object, metaclass=abc.ABCMeta):
msg, self._formatItemReportOtherChanges(item,
change_annotations))
elif item.didMergerFail():
msg = item.pipeline.merge_conflict_message
msg = item.manager.pipeline.merge_conflict_message
elif item.current_build_set.has_blocking_errors:
msg = str(item.getConfigErrors(
errors=True, warnings=False)[0].error)
else:
msg = item.pipeline.failure_message
msg = item.manager.pipeline.failure_message
if with_jobs:
item_url = item.formatItemUrl()
if item_url is not None:
@ -281,7 +281,7 @@ class BaseReporter(object, metaclass=abc.ABCMeta):
return ret
def _formatItemReportMergeConflict(self, item, change, with_jobs=True):
return item.pipeline.merge_conflict_message
return item.manager.pipeline.merge_conflict_message
def _formatItemReportMergeFailure(self, item, change, with_jobs=True):
return 'This change was not merged by the code review system.\n'
@ -298,8 +298,8 @@ class BaseReporter(object, metaclass=abc.ABCMeta):
status_url = item.formatUrlPattern(status_url)
# change, changes, and status_url are deprecated
return item.pipeline.no_jobs_message.format(
pipeline=item.pipeline.getSafeAttributes(),
return item.manager.pipeline.no_jobs_message.format(
pipeline=item.manager.pipeline.getSafeAttributes(),
change=item.changes[0].getSafeAttributes(),
changes=[c.getSafeAttributes() for c in item.changes],
item_url=item.formatItemUrl(),
@ -314,7 +314,7 @@ class BaseReporter(object, metaclass=abc.ABCMeta):
return self._formatItemReport(item, change)
def _formatItemReportDequeue(self, item, change, with_jobs=True):
msg = item.pipeline.dequeue_message
msg = item.manager.pipeline.dequeue_message
if with_jobs:
msg += '\n\n' + self._formatItemReportJobs(item)
return msg

View File

@ -617,7 +617,8 @@ class Scheduler(threading.Thread):
management_event_queues = (
self.pipeline_management_events[tenant.name]
)
for pipeline in tenant.layout.pipelines.values():
for manager in tenant.layout.pipeline_managers.values():
pipeline = manager.pipeline
base = f"zuul.tenant.{tenant.name}.pipeline.{pipeline.name}"
self.statsd.gauge(f"{base}.trigger_events",
len(trigger_event_queues[pipeline.name]))
@ -626,7 +627,7 @@ class Scheduler(threading.Thread):
self.statsd.gauge(f"{base}.management_events",
len(management_event_queues[pipeline.name]))
compressed_size, uncompressed_size =\
pipeline.state.estimateDataSize()
manager.state.estimateDataSize()
self.statsd.gauge(f'{base}.data_size_compressed',
compressed_size)
self.statsd.gauge(f'{base}.data_size_uncompressed',
@ -708,17 +709,17 @@ class Scheduler(threading.Thread):
# Get all the current node requests in the queues
outstanding_requests = set()
for tenant in self.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
for manager in tenant.layout.pipeline_managers.values():
with pipeline_lock(
self.zk_client, tenant.name, pipeline.name,
self.zk_client, tenant.name, manager.pipeline.name,
) as lock:
with self.createZKContext(lock, self.log) as ctx:
with pipeline.manager.currentContext(ctx):
pipeline.change_list.refresh(ctx)
pipeline.state.refresh(ctx, read_only=True)
with manager.currentContext(ctx):
manager.change_list.refresh(ctx)
manager.state.refresh(ctx, read_only=True)
# In case we're in the middle of a reconfig,
# include the old queue items.
for item in pipeline.manager.state.getAllItems(
for item in manager.state.getAllItems(
include_old=True):
nrs = item.current_build_set.getNodeRequests()
for _, req_id in nrs:
@ -793,13 +794,13 @@ class Scheduler(threading.Thread):
"Skipping leaked pipeline cleanup for tenant %s",
tenant.name)
continue
valid_pipelines = tenant.layout.pipelines.values()
valid_managers = tenant.layout.pipeline_managers.values()
valid_state_paths = set(
p.state.getPath() for p in valid_pipelines)
m.state.getPath() for m in valid_managers)
valid_event_root_paths = set(
PIPELINE_NAME_ROOT.format(
tenant=p.tenant.name, pipeline=p.name)
for p in valid_pipelines)
tenant=m.tenant.name, pipeline=m.pipeline.name)
for m in valid_managers)
safe_tenant = urllib.parse.quote_plus(tenant.name)
state_root = f"/zuul/tenant/{safe_tenant}/pipeline"
@ -845,9 +846,11 @@ class Scheduler(threading.Thread):
self.zk_client, tenant.name,
pipeline.name) as lock,
self.createZKContext(lock, self.log) as ctx):
pipeline.state.refresh(ctx, read_only=True)
manager = tenant.layout.pipeline_managers.get(
pipeline.name)
manager.state.refresh(ctx, read_only=True)
# add any blobstore references
for item in pipeline.manager.state.getAllItems(
for item in manager.state.getAllItems(
include_old=True):
live_blobs.update(item.getBlobKeys())
with self.createZKContext(None, self.log) as ctx:
@ -928,8 +931,9 @@ class Scheduler(threading.Thread):
# result to the main event loop.
try:
if self.statsd and build.pipeline:
tenant = build.pipeline.tenant
item = build.build_set.item
manager = item.manager
tenant = manager.tenant
job = build.job
change = item.getChangeForJob(job)
jobname = job.name.replace('.', '_').replace('/', '_')
@ -940,7 +944,7 @@ class Scheduler(threading.Thread):
branchname = (getattr(change, 'branch', '').
replace('.', '_').replace('/', '_'))
basekey = 'zuul.tenant.%s' % tenant.name
pipekey = '%s.pipeline.%s' % (basekey, build.pipeline.name)
pipekey = '%s.pipeline.%s' % (basekey, manager.pipeline.name)
# zuul.tenant.<tenant>.pipeline.<pipeline>.all_jobs
key = '%s.all_jobs' % pipekey
self.statsd.incr(key)
@ -1660,20 +1664,21 @@ class Scheduler(threading.Thread):
return None
return source.getProject(project.name)
def _reenqueuePipeline(self, tenant, new_pipeline, context):
def _reenqueuePipeline(self, tenant, manager, context):
pipeline = manager.pipeline
self.log.debug("Re-enqueueing changes for pipeline %s",
new_pipeline.name)
pipeline.name)
# TODO(jeblair): This supports an undocument and
# unanticipated hack to create a static window. If we
# really want to support this, maybe we should have a
# 'static' type? But it may be in use in the wild, so we
# should allow this at least until there's an upgrade
# path.
if (new_pipeline.window and
new_pipeline.window_increase_type == 'exponential' and
new_pipeline.window_decrease_type == 'exponential' and
new_pipeline.window_increase_factor == 1 and
new_pipeline.window_decrease_factor == 1):
if (pipeline.window and
pipeline.window_increase_type == 'exponential' and
pipeline.window_decrease_type == 'exponential' and
pipeline.window_increase_factor == 1 and
pipeline.window_decrease_factor == 1):
static_window = True
else:
static_window = False
@ -1681,7 +1686,7 @@ class Scheduler(threading.Thread):
items_to_remove = []
builds_to_cancel = []
requests_to_cancel = []
for shared_queue in list(new_pipeline.state.old_queues):
for shared_queue in list(manager.state.old_queues):
last_head = None
for item in shared_queue.queue:
# If the old item ahead made it in, re-enqueue
@ -1705,7 +1710,7 @@ class Scheduler(threading.Thread):
if not old_item_ahead or not last_head:
last_head = item
try:
reenqueued = new_pipeline.manager.reEnqueueItem(
reenqueued = manager.reEnqueueItem(
item, last_head, old_item_ahead,
item_ahead_valid=item_ahead_valid)
except Exception:
@ -1717,12 +1722,12 @@ class Scheduler(threading.Thread):
# Attempt to keep window sizes from shrinking where possible
project, branch = shared_queue.project_branches[0]
new_queue = new_pipeline.manager.state.getQueue(project, branch)
new_queue = manager.state.getQueue(project, branch)
if new_queue and shared_queue.window and (not static_window):
new_queue.updateAttributes(
context, window=max(shared_queue.window,
new_queue.window_floor))
new_pipeline.manager.state.removeOldQueue(context, shared_queue)
manager.state.removeOldQueue(context, shared_queue)
for item in items_to_remove:
log = get_annotated_logger(self.log, item.event)
log.info("Removing item %s during reconfiguration", item)
@ -1762,11 +1767,12 @@ class Scheduler(threading.Thread):
# layout lock
if old_tenant:
for name, old_pipeline in old_tenant.layout.pipelines.items():
new_pipeline = tenant.layout.pipelines.get(name)
if not new_pipeline:
with old_pipeline.manager.currentContext(context):
new_manager = tenant.layout.pipeline_managers.get(name)
old_manager = old_tenant.layout.pipeline_managers.get(name)
if not new_manager:
with old_manager.currentContext(context):
try:
self._reconfigureDeletePipeline(old_pipeline)
self._reconfigureDeletePipeline(old_manager)
except Exception:
self.log.exception(
"Failed to cleanup deleted pipeline %s:",
@ -1843,9 +1849,10 @@ class Scheduler(threading.Thread):
self.log.info("Removing tenant %s during reconfiguration" %
(tenant,))
for pipeline in tenant.layout.pipelines.values():
with pipeline.manager.currentContext(context):
manager = tenant.layout.pipeline_managers.get(pipeline.name)
with manager.currentContext(context):
try:
self._reconfigureDeletePipeline(pipeline)
self._reconfigureDeletePipeline(manager)
except Exception:
self.log.exception(
"Failed to cleanup deleted pipeline %s:", pipeline)
@ -1861,17 +1868,17 @@ class Scheduler(threading.Thread):
# periodic cleanup job.
pass
def _reconfigureDeletePipeline(self, pipeline):
self.log.info("Removing pipeline %s during reconfiguration" %
(pipeline,))
def _reconfigureDeletePipeline(self, manager):
self.log.info("Removing pipeline %s during reconfiguration",
manager.pipeline)
ctx = pipeline.manager.current_context
pipeline.state.refresh(ctx)
ctx = manager.current_context
manager.state.refresh(ctx)
builds_to_cancel = []
requests_to_cancel = []
for item in pipeline.manager.state.getAllItems():
with item.activeContext(pipeline.manager.current_context):
for item in manager.state.getAllItems():
with item.activeContext(manager.current_context):
item.item_ahead = None
item.items_behind = []
self.log.info(
@ -1919,29 +1926,29 @@ class Scheduler(threading.Thread):
try:
self.zk_client.client.delete(
PIPELINE_NAME_ROOT.format(
tenant=pipeline.tenant.name,
pipeline=pipeline.name),
tenant=manager.tenant.name,
pipeline=manager.pipeline.name),
recursive=True)
except Exception:
# In case a pipeline event has been submitted during
# reconfiguration this cleanup will fail.
self.log.exception(
"Error removing event queues for deleted pipeline %s in "
"tenant %s", pipeline.name, pipeline.tenant.name)
"tenant %s", manager.pipeline.name, manager.tenant.name)
# Delete the pipeline root path in ZooKeeper to remove all pipeline
# state.
try:
self.zk_client.client.delete(pipeline.state.getPath(),
self.zk_client.client.delete(manager.state.getPath(),
recursive=True)
except Exception:
self.log.exception(
"Error removing state for deleted pipeline %s in tenant %s",
pipeline.name, pipeline.tenant.name)
manager.pipeline.name, manager.tenant.name)
def _doPromoteEvent(self, event):
tenant = self.abide.tenants.get(event.tenant_name)
pipeline = tenant.layout.pipelines[event.pipeline_name]
manager = tenant.layout.pipeline_managers[event.pipeline_name]
change_ids = [c.split(',') for c in event.change_ids]
items_to_enqueue = []
change_queue = None
@ -1952,7 +1959,7 @@ class Scheduler(threading.Thread):
promote_operations = OrderedDict()
for number, patchset in change_ids:
found = False
for shared_queue in pipeline.queues:
for shared_queue in manager.state.queues:
for item in shared_queue.queue:
if not item.live:
continue
@ -1995,18 +2002,18 @@ class Scheduler(threading.Thread):
if item not in items_to_enqueue:
items_to_enqueue.append(item)
head_same = False
pipeline.manager.cancelJobs(item)
pipeline.manager.dequeueItem(item)
manager.cancelJobs(item)
manager.dequeueItem(item)
for item in items_to_enqueue:
for item_change in item.changes:
pipeline.manager.addChange(
manager.addChange(
item_change, item.event,
enqueue_time=item.enqueue_time,
quiet=True,
ignore_requirements=True)
# Regardless, move this shared change queue to the head.
pipeline.manager.state.promoteQueue(change_queue)
manager.state.promoteQueue(change_queue)
def _doDequeueEvent(self, event):
tenant = self.abide.tenants.get(event.tenant_name)
@ -2015,6 +2022,7 @@ class Scheduler(threading.Thread):
pipeline = tenant.layout.pipelines.get(event.pipeline_name)
if pipeline is None:
raise ValueError('Unknown pipeline %s' % event.pipeline_name)
manager = tenant.layout.pipeline_managers.get(event.pipeline_name)
canonical_name = event.project_hostname + '/' + event.project_name
(trusted, project) = tenant.getProject(canonical_name)
if project is None:
@ -2028,7 +2036,7 @@ class Scheduler(threading.Thread):
item = 'Ref %s' % event.ref
raise Exception('%s does not belong to project "%s"'
% (item, project.name))
for shared_queue in pipeline.queues:
for shared_queue in manager.state.queues:
for item in shared_queue.queue:
if not item.live:
continue
@ -2039,7 +2047,7 @@ class Scheduler(threading.Thread):
item_change.number == change.number and
item_change.patchset == change.patchset) or\
(item_change.ref == change.ref):
pipeline.manager.removeItem(item)
manager.removeItem(item)
return
raise Exception("Unable to find shared change queue for %s:%s" %
(event.project_name,
@ -2052,6 +2060,7 @@ class Scheduler(threading.Thread):
pipeline = tenant.layout.pipelines.get(event.pipeline_name)
if pipeline is None:
raise ValueError(f'Unknown pipeline {event.pipeline_name}')
manager = tenant.layout.pipeline_managers.get(event.pipeline_name)
canonical_name = event.project_hostname + '/' + event.project_name
(trusted, project) = tenant.getProject(canonical_name)
if project is None:
@ -2068,18 +2077,18 @@ class Scheduler(threading.Thread):
f'Change {change} does not belong to project "{project.name}"')
self.log.debug("Event %s for change %s was directly assigned "
"to pipeline %s", event, change, self)
pipeline.manager.addChange(change, event, ignore_requirements=True)
manager.addChange(change, event, ignore_requirements=True)
def _doSupercedeEvent(self, event):
tenant = self.abide.tenants[event.tenant_name]
pipeline = tenant.layout.pipelines[event.pipeline_name]
manager = tenant.layout.pipeline_managers[event.pipeline_name]
canonical_name = f"{event.project_hostname}/{event.project_name}"
trusted, project = tenant.getProject(canonical_name)
if project is None:
return
change_key = project.source.getChangeKey(event)
change = project.source.getChange(change_key, event=event)
for shared_queue in pipeline.queues:
for shared_queue in manager.state.queues:
for item in shared_queue.queue:
if not item.live:
continue
@ -2092,7 +2101,7 @@ class Scheduler(threading.Thread):
) or (item_change.ref == change.ref)):
log = get_annotated_logger(self.log, item.event)
log.info("Item %s is superceded, dequeuing", item)
pipeline.manager.removeItem(item)
manager.removeItem(item)
return
def _doSemaphoreReleaseEvent(self, event, tenant, notified):
@ -2118,7 +2127,8 @@ class Scheduler(threading.Thread):
waiting = False
for tenant in self.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
for item in pipeline.manager.state.getAllItems():
manager = tenant.layout.pipeline_managers[pipeline.name]
for item in manager.state.getAllItems():
for build in item.current_build_set.getBuilds():
if build.result is None:
self.log.debug("%s waiting on %s" %
@ -2266,7 +2276,8 @@ class Scheduler(threading.Thread):
loader.loadTPCs(self.abide, self.unparsed_abide)
def process_pipelines(self, tenant, tenant_lock):
for pipeline in tenant.layout.pipelines.values():
for manager in tenant.layout.pipeline_managers.values():
pipeline = manager.pipeline
if self._stopped:
return
self.abortIfPendingReconfig(tenant_lock)
@ -2278,16 +2289,16 @@ class Scheduler(threading.Thread):
self.createZKContext(lock, self.log) as ctx):
self.log.debug("Processing pipeline %s in tenant %s",
pipeline.name, tenant.name)
with pipeline.manager.currentContext(ctx):
with manager.currentContext(ctx):
if ((tenant.name, pipeline.name) in
self._profile_pipelines):
ctx.profile = True
with self.statsd_timer(f'{stats_key}.handling'):
refreshed = self._process_pipeline(
tenant, tenant_lock, pipeline)
tenant, tenant_lock, manager)
# Update pipeline summary for zuul-web
if refreshed:
pipeline.summary.update(ctx, self.globals)
manager.summary.update(ctx, self.globals)
if self.statsd:
self._contextStats(ctx, stats_key)
except PendingReconfiguration:
@ -2301,7 +2312,7 @@ class Scheduler(threading.Thread):
# In case this pipeline is locked for some reason
# other than processing events, we need to return
# to it to process them.
if self._pipelineHasEvents(tenant, pipeline):
if self._pipelineHasEvents(tenant, manager):
self.wake_event.set()
except Exception:
self.log.exception(
@ -2331,83 +2342,84 @@ class Scheduler(threading.Thread):
self.statsd.gauge(f'{stats_key}.write_bytes',
ctx.cumulative_write_bytes)
def _pipelineHasEvents(self, tenant, pipeline):
def _pipelineHasEvents(self, tenant, manager):
return any((
self.pipeline_trigger_events[
tenant.name][pipeline.name].hasEvents(),
tenant.name][manager.pipeline.name].hasEvents(),
self.pipeline_result_events[
tenant.name][pipeline.name].hasEvents(),
tenant.name][manager.pipeline.name].hasEvents(),
self.pipeline_management_events[
tenant.name][pipeline.name].hasEvents(),
pipeline.state.isDirty(self.zk_client.client),
tenant.name][manager.pipeline.name].hasEvents(),
manager.state.isDirty(self.zk_client.client),
))
def abortIfPendingReconfig(self, tenant_lock):
if tenant_lock.contender_present(RECONFIG_LOCK_ID):
raise PendingReconfiguration()
def _process_pipeline(self, tenant, tenant_lock, pipeline):
def _process_pipeline(self, tenant, tenant_lock, manager):
# Return whether or not we refreshed the pipeline.
pipeline = manager.pipeline
# We only need to process the pipeline if there are
# outstanding events.
if not self._pipelineHasEvents(tenant, pipeline):
if not self._pipelineHasEvents(tenant, manager):
self.log.debug("No events to process for pipeline %s in tenant %s",
pipeline.name, tenant.name)
return False
stats_key = f'zuul.tenant.{tenant.name}.pipeline.{pipeline.name}'
ctx = pipeline.manager.current_context
ctx = manager.current_context
with self.statsd_timer(f'{stats_key}.refresh'):
pipeline.change_list.refresh(ctx)
pipeline.summary.refresh(ctx)
pipeline.state.refresh(ctx)
manager.change_list.refresh(ctx)
manager.summary.refresh(ctx)
manager.state.refresh(ctx)
pipeline.state.setDirty(self.zk_client.client)
if pipeline.state.old_queues:
self._reenqueuePipeline(tenant, pipeline, ctx)
manager.state.setDirty(self.zk_client.client)
if manager.state.old_queues:
self._reenqueuePipeline(tenant, manager, ctx)
with self.statsd_timer(f'{stats_key}.event_process'):
self.process_pipeline_management_queue(
tenant, tenant_lock, pipeline)
tenant, tenant_lock, manager)
# Give result events priority -- they let us stop builds,
# whereas trigger events cause us to execute builds.
self.process_pipeline_result_queue(tenant, tenant_lock, pipeline)
self.process_pipeline_trigger_queue(tenant, tenant_lock, pipeline)
self.process_pipeline_result_queue(tenant, tenant_lock, manager)
self.process_pipeline_trigger_queue(tenant, tenant_lock, manager)
self.abortIfPendingReconfig(tenant_lock)
try:
with self.statsd_timer(f'{stats_key}.process'):
while not self._stopped and pipeline.manager.processQueue(
while not self._stopped and manager.processQueue(
tenant_lock):
self.abortIfPendingReconfig(tenant_lock)
pipeline.state.cleanup(ctx)
manager.state.cleanup(ctx)
except PendingReconfiguration:
# Don't do anything else here and let the next level up
# handle it.
raise
except Exception:
self.log.exception("Exception in pipeline processing:")
pipeline.state.updateAttributes(
manager.state.updateAttributes(
ctx, state=pipeline.STATE_ERROR)
# Continue processing other pipelines+tenants
else:
pipeline.state.updateAttributes(
manager.state.updateAttributes(
ctx, state=pipeline.STATE_NORMAL)
pipeline.state.clearDirty(self.zk_client.client)
manager.state.clearDirty(self.zk_client.client)
return True
def _gatherConnectionCacheKeys(self):
relevant = set()
with self.createZKContext(None, self.log) as ctx:
for tenant in self.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
for manager in tenant.layout.pipeline_managers.values():
self.log.debug("Gather relevant cache items for: %s %s",
tenant.name, pipeline.name)
tenant.name, manager.pipeline.name)
# This will raise an exception and abort the process if
# unable to refresh the change list.
pipeline.change_list.refresh(ctx, allow_init=False)
change_keys = pipeline.change_list.getChangeKeys()
relevant_changes = pipeline.manager.resolveChangeKeys(
manager.change_list.refresh(ctx, allow_init=False)
change_keys = manager.change_list.getChangeKeys()
relevant_changes = manager.resolveChangeKeys(
change_keys)
for change in relevant_changes:
relevant.add(change.cache_stat.key)
@ -2435,7 +2447,7 @@ class Scheduler(threading.Thread):
tenant.name)
# Update the pipeline changes
ctx = self.createZKContext(None, self.log)
for pipeline in tenant.layout.pipelines.values():
for manager in tenant.layout.pipeline_managers.values():
# This will raise an exception if it is unable to
# refresh the change list. We will proceed anyway
# and use our data from the last time we did
@ -2445,15 +2457,15 @@ class Scheduler(threading.Thread):
# pipeline but don't match the pipeline trigger
# criteria.
try:
pipeline.change_list.refresh(ctx, allow_init=False)
manager.change_list.refresh(ctx, allow_init=False)
except json.JSONDecodeError:
self.log.warning(
"Unable to refresh pipeline change list for %s",
pipeline.name)
manager.pipeline.name)
except Exception:
self.log.exception(
"Unable to refresh pipeline change list for %s",
pipeline.name)
manager.pipeline.name)
# Get the ltime of the last reconfiguration event
self.trigger_events[tenant.name].refreshMetadata()
@ -2567,7 +2579,7 @@ class Scheduler(threading.Thread):
span.set_attribute("reconfigure_tenant", reconfigure_tenant)
event.span_context = tracing.getSpanContext(span)
for pipeline in tenant.layout.pipelines.values():
for manager in tenant.layout.pipeline_managers.values():
# For most kinds of dependencies, it's sufficient to check
# if this change is already in the pipeline, because the
# only way to update a dependency cycle is to update one
@ -2584,17 +2596,18 @@ class Scheduler(threading.Thread):
# manager, but the result of the work goes into the change
# cache, so it's not wasted; it's just less parallelized.
if isinstance(change, Change):
pipeline.manager.updateCommitDependencies(change, event)
manager.updateCommitDependencies(change, event)
if (
pipeline.manager.eventMatches(event, change)
or pipeline.manager.isChangeRelevantToPipeline(change)
manager.eventMatches(event, change)
or manager.isChangeRelevantToPipeline(change)
):
self.pipeline_trigger_events[tenant.name][
pipeline.name
manager.pipeline.name
].put(event.driver_name, event)
def process_pipeline_trigger_queue(self, tenant, tenant_lock, pipeline):
for event in self.pipeline_trigger_events[tenant.name][pipeline.name]:
def process_pipeline_trigger_queue(self, tenant, tenant_lock, manager):
for event in self.pipeline_trigger_events[tenant.name][
manager.pipeline.name]:
log = get_annotated_logger(self.log, event.zuul_event_id)
if not isinstance(event, SupercedeEvent):
local_state = self.local_layout_state[tenant.name]
@ -2617,17 +2630,18 @@ class Scheduler(threading.Thread):
if isinstance(event, SupercedeEvent):
self._doSupercedeEvent(event)
else:
self._process_trigger_event(tenant, pipeline, event)
self._process_trigger_event(tenant, manager, event)
finally:
self.pipeline_trigger_events[tenant.name][
pipeline.name
manager.pipeline.name
].ack(event)
if self._stopped:
return
self.abortIfPendingReconfig(tenant_lock)
self.pipeline_trigger_events[tenant.name][pipeline.name].cleanup()
self.pipeline_trigger_events[
tenant.name][manager.pipeline.name].cleanup()
def _process_trigger_event(self, tenant, pipeline, event):
def _process_trigger_event(self, tenant, manager, event):
log = get_annotated_logger(
self.log, event.zuul_event_id
)
@ -2643,17 +2657,17 @@ class Scheduler(threading.Thread):
return
if event.isPatchsetCreated():
pipeline.manager.removeOldVersionsOfChange(change, event)
manager.removeOldVersionsOfChange(change, event)
elif event.isChangeAbandoned():
pipeline.manager.removeAbandonedChange(change, event)
manager.removeAbandonedChange(change, event)
# Let the pipeline update any dependencies that may need
# refreshing if this change has updated.
if event.isPatchsetCreated() or event.isMessageChanged():
pipeline.manager.refreshDeps(change, event)
manager.refreshDeps(change, event)
if pipeline.manager.eventMatches(event, change):
pipeline.manager.addChange(change, event)
if manager.eventMatches(event, change):
manager.addChange(change, event)
def process_tenant_management_queue(self, tenant):
try:
@ -2734,9 +2748,9 @@ class Scheduler(threading.Thread):
event.ack_ref.set()
self.reconfigure_event_queue.task_done()
def process_pipeline_management_queue(self, tenant, tenant_lock, pipeline):
def process_pipeline_management_queue(self, tenant, tenant_lock, manager):
for event in self.pipeline_management_events[tenant.name][
pipeline.name
manager.pipeline.name
]:
log = get_annotated_logger(self.log, event.zuul_event_id)
log.debug("Processing management event %s", event)
@ -2745,12 +2759,13 @@ class Scheduler(threading.Thread):
self._process_management_event(event)
finally:
self.pipeline_management_events[tenant.name][
pipeline.name
manager.pipeline.name
].ack(event)
if self._stopped:
return
self.abortIfPendingReconfig(tenant_lock)
self.pipeline_management_events[tenant.name][pipeline.name].cleanup()
self.pipeline_management_events[tenant.name][
manager.pipeline.name].cleanup()
def _process_management_event(self, event):
try:
@ -2775,8 +2790,9 @@ class Scheduler(threading.Thread):
"".join(traceback.format_exception(*sys.exc_info()))
)
def process_pipeline_result_queue(self, tenant, tenant_lock, pipeline):
for event in self.pipeline_result_events[tenant.name][pipeline.name]:
def process_pipeline_result_queue(self, tenant, tenant_lock, manager):
for event in self.pipeline_result_events[tenant.name][
manager.pipeline.name]:
log = get_annotated_logger(
self.log,
event=getattr(event, "zuul_event_id", None),
@ -2785,53 +2801,54 @@ class Scheduler(threading.Thread):
log.debug("Processing result event %s", event)
try:
if not self.disable_pipelines:
self._process_result_event(event, pipeline)
self._process_result_event(event, manager)
finally:
self.pipeline_result_events[tenant.name][
pipeline.name
manager.pipeline.name
].ack(event)
if self._stopped:
return
self.abortIfPendingReconfig(tenant_lock)
self.pipeline_result_events[tenant.name][pipeline.name].cleanup()
self.pipeline_result_events[
tenant.name][manager.pipeline.name].cleanup()
def _process_result_event(self, event, pipeline):
def _process_result_event(self, event, manager):
if isinstance(event, BuildStartedEvent):
self._doBuildStartedEvent(event, pipeline)
self._doBuildStartedEvent(event, manager)
elif isinstance(event, BuildStatusEvent):
self._doBuildStatusEvent(event, pipeline)
self._doBuildStatusEvent(event, manager)
elif isinstance(event, BuildPausedEvent):
self._doBuildPausedEvent(event, pipeline)
self._doBuildPausedEvent(event, manager)
elif isinstance(event, BuildCompletedEvent):
self._doBuildCompletedEvent(event, pipeline)
self._doBuildCompletedEvent(event, manager)
elif isinstance(event, MergeCompletedEvent):
self._doMergeCompletedEvent(event, pipeline)
self._doMergeCompletedEvent(event, manager)
elif isinstance(event, FilesChangesCompletedEvent):
self._doFilesChangesCompletedEvent(event, pipeline)
self._doFilesChangesCompletedEvent(event, manager)
elif isinstance(event, NodesProvisionedEvent):
self._doNodesProvisionedEvent(event, pipeline)
self._doNodesProvisionedEvent(event, manager)
elif isinstance(event, SemaphoreReleaseEvent):
# MODEL_API <= 32
# Kept for backward compatibility; semaphore release events
# are now processed in the management event queue.
self._doSemaphoreReleaseEvent(event, pipeline.tenant, set())
self._doSemaphoreReleaseEvent(event, manager.tenant, set())
else:
self.log.error("Unable to handle event %s", event)
def _getBuildSetFromPipeline(self, event, pipeline):
def _getBuildSetFromPipeline(self, event, manager):
log = get_annotated_logger(
self.log,
event=getattr(event, "zuul_event_id", None),
build=getattr(event, "build_uuid", None),
)
if not pipeline:
if not manager:
log.warning(
"Build set %s is not associated with a pipeline",
event.build_set_uuid,
)
return
for item in pipeline.manager.state.getAllItems():
for item in manager.state.getAllItems():
# If the provided buildset UUID doesn't match any current one,
# we assume that it's not current anymore.
if item.current_build_set.uuid == event.build_set_uuid:
@ -2839,10 +2856,10 @@ class Scheduler(threading.Thread):
log.warning("Build set %s is not current", event.build_set_uuid)
def _getBuildFromPipeline(self, event, pipeline):
def _getBuildFromPipeline(self, event, manager):
log = get_annotated_logger(
self.log, event.zuul_event_id, build=event.build_uuid)
build_set = self._getBuildSetFromPipeline(event, pipeline)
build_set = self._getBuildSetFromPipeline(event, manager)
if not build_set:
return
@ -2868,12 +2885,12 @@ class Scheduler(threading.Thread):
return build
def _doBuildStartedEvent(self, event, pipeline):
build = self._getBuildFromPipeline(event, pipeline)
def _doBuildStartedEvent(self, event, manager):
build = self._getBuildFromPipeline(event, manager)
if not build:
return
with build.activeContext(pipeline.manager.current_context):
with build.activeContext(manager.current_context):
build.start_time = event.data["start_time"]
log = get_annotated_logger(
@ -2883,7 +2900,7 @@ class Scheduler(threading.Thread):
job = build.job
change = item.getChangeForJob(job)
estimate = self.times.getEstimatedTime(
pipeline.tenant.name,
manager.tenant.name,
change.project.name,
getattr(change, 'branch', None),
job.name)
@ -2892,10 +2909,10 @@ class Scheduler(threading.Thread):
build.estimated_time = estimate
except Exception:
log.exception("Exception estimating build time:")
pipeline.manager.onBuildStarted(build)
manager.onBuildStarted(build)
def _doBuildStatusEvent(self, event, pipeline):
build = self._getBuildFromPipeline(event, pipeline)
def _doBuildStatusEvent(self, event, manager):
build = self._getBuildFromPipeline(event, manager)
if not build:
return
@ -2904,17 +2921,17 @@ class Scheduler(threading.Thread):
args['url'] = event.data['url']
if 'pre_fail' in event.data:
args['pre_fail'] = event.data['pre_fail']
build.updateAttributes(pipeline.manager.current_context,
build.updateAttributes(manager.current_context,
**args)
def _doBuildPausedEvent(self, event, pipeline):
build = self._getBuildFromPipeline(event, pipeline)
def _doBuildPausedEvent(self, event, manager):
build = self._getBuildFromPipeline(event, manager)
if not build:
return
# Setting paused is deferred to event processing stage to avoid a race
# with child job skipping.
with build.activeContext(pipeline.manager.current_context):
with build.activeContext(manager.current_context):
build.paused = True
build.addEvent(
BuildEvent(
@ -2924,12 +2941,12 @@ class Scheduler(threading.Thread):
event.data.get("data", {}),
event.data.get("secret_data", {}))
pipeline.manager.onBuildPaused(build)
manager.onBuildPaused(build)
def _doBuildCompletedEvent(self, event, pipeline):
def _doBuildCompletedEvent(self, event, manager):
log = get_annotated_logger(
self.log, event.zuul_event_id, build=event.build_uuid)
build = self._getBuildFromPipeline(event, pipeline)
build = self._getBuildFromPipeline(event, manager)
if not build:
self.log.error(
"Unable to find build %s. Creating a fake build to clean up "
@ -2963,7 +2980,7 @@ class Scheduler(threading.Thread):
self._cleanupCompletedBuild(build)
try:
self.sql.reportBuildEnd(
build, tenant=pipeline.tenant.name,
build, tenant=manager.tenant.name,
final=True)
except exceptions.MissingBuildsetError:
# If we have not reported start for this build, we
@ -2991,7 +3008,7 @@ class Scheduler(threading.Thread):
log.info("Build complete, result %s, warnings %s", result, warnings)
with build.activeContext(pipeline.manager.current_context):
with build.activeContext(manager.current_context):
build.error_detail = event_result.get("error_detail")
if result is None:
@ -3026,7 +3043,7 @@ class Scheduler(threading.Thread):
build.end_time = event_result["end_time"]
build.setResultData(result_data, secret_result_data)
build.build_set.updateAttributes(
pipeline.manager.current_context,
manager.current_context,
warning_messages=build.build_set.warning_messages + warnings)
build.held = event_result.get("held")
@ -3043,11 +3060,11 @@ class Scheduler(threading.Thread):
self._cleanupCompletedBuild(build)
try:
self.reportBuildEnd(
build, tenant=pipeline.tenant.name, final=(not build.retry))
build, tenant=manager.tenant.name, final=(not build.retry))
except Exception:
log.exception("Error reporting build completion to DB:")
pipeline.manager.onBuildCompleted(build)
manager.onBuildCompleted(build)
def _cleanupCompletedBuild(self, build):
# TODO (felix): Returning the nodes doesn't work in case the buildset
@ -3071,8 +3088,8 @@ class Scheduler(threading.Thread):
# internal dict after it's added to the report queue.
self.executor.removeBuild(build)
def _doMergeCompletedEvent(self, event, pipeline):
build_set = self._getBuildSetFromPipeline(event, pipeline)
def _doMergeCompletedEvent(self, event, manager):
build_set = self._getBuildSetFromPipeline(event, manager)
if not build_set:
return
@ -3084,10 +3101,10 @@ class Scheduler(threading.Thread):
"zuul_event_id": build_set.item.event.zuul_event_id,
}
)
pipeline.manager.onMergeCompleted(event, build_set)
manager.onMergeCompleted(event, build_set)
def _doFilesChangesCompletedEvent(self, event, pipeline):
build_set = self._getBuildSetFromPipeline(event, pipeline)
def _doFilesChangesCompletedEvent(self, event, manager):
build_set = self._getBuildSetFromPipeline(event, manager)
if not build_set:
return
@ -3099,9 +3116,9 @@ class Scheduler(threading.Thread):
"zuul_event_id": build_set.item.event.zuul_event_id,
}
)
pipeline.manager.onFilesChangesCompleted(event, build_set)
manager.onFilesChangesCompleted(event, build_set)
def _doNodesProvisionedEvent(self, event, pipeline):
def _doNodesProvisionedEvent(self, event, manager):
if self.nodepool.isNodeRequestID(event.request_id):
request = self.nodepool.zk_nodepool.getNodeRequest(
event.request_id)
@ -3115,7 +3132,7 @@ class Scheduler(threading.Thread):
log = get_annotated_logger(self.log, request)
# Look up the buildset to access the local node request object
build_set = self._getBuildSetFromPipeline(event, pipeline)
build_set = self._getBuildSetFromPipeline(event, manager)
if not build_set:
log.warning("Build set not found while processing"
"nodes provisioned event %s", event)
@ -3142,7 +3159,7 @@ class Scheduler(threading.Thread):
job = build_set.item.getJob(request.job_uuid)
if build_set.getJobNodeSetInfo(job) is None:
pipeline.manager.onNodesProvisioned(
manager.onNodesProvisioned(
request, nodeset_info, build_set)
else:
self.log.warning("Duplicate nodes provisioned event: %s",
@ -3194,7 +3211,7 @@ class Scheduler(threading.Thread):
if build.result is None:
build.updateAttributes(
buildset.item.pipeline.manager.current_context,
buildset.item.manager.current_context,
result='CANCELED')
if force:
@ -3205,7 +3222,7 @@ class Scheduler(threading.Thread):
self.executor.removeBuild(build)
try:
self.reportBuildEnd(
build, build.build_set.item.pipeline.tenant.name,
build, build.build_set.item.manager.tenant.name,
final=False)
except Exception:
self.log.exception(
@ -3216,19 +3233,19 @@ class Scheduler(threading.Thread):
# If final is set make sure that the job is not resurrected
# later by re-requesting nodes.
fakebuild = Build.new(
buildset.item.pipeline.manager.current_context,
buildset.item.manager.current_context,
job=job, build_set=item.current_build_set,
result='CANCELED')
buildset.addBuild(fakebuild)
finally:
# Release the semaphore in any case
pipeline = buildset.item.pipeline
tenant = pipeline.tenant
manager = buildset.item.manager
tenant = manager.tenant
if COMPONENT_REGISTRY.model_api >= 33:
event_queue = self.management_events[tenant.name]
else:
event_queue = self.pipeline_result_events[
tenant.name][pipeline.name]
tenant.name][manager.pipeline.name]
tenant.semaphore_handler.release(event_queue, item, job)
# Image related methods

View File

@ -37,5 +37,5 @@ class BaseTrigger(object, metaclass=abc.ABCMeta):
def onChangeMerged(self, change, source):
"""Called when a change has been merged."""
def onChangeEnqueued(self, change, pipeline, event):
def onChangeEnqueued(self, change, manager, event):
"""Called when a change has been enqueued."""

View File

@ -1722,14 +1722,14 @@ class ZuulWebAPI(object):
management_event_queues = self.zuulweb.pipeline_management_events[
tenant.name]
with self.zuulweb.zk_context as ctx:
for pipeline in tenant.layout.pipelines.values():
status = pipeline.summary.refresh(ctx)
for manager in tenant.layout.pipeline_managers.values():
status = manager.summary.refresh(ctx)
status['trigger_events'] = len(
trigger_event_queues[pipeline.name])
trigger_event_queues[manager.pipeline.name])
status['result_events'] = len(
result_event_queues[pipeline.name])
result_event_queues[manager.pipeline.name])
status['management_events'] = len(
management_event_queues[pipeline.name])
management_event_queues[manager.pipeline.name])
pipelines.append(status)
return data, json.dumps(data).encode('utf-8')
@ -2580,7 +2580,7 @@ class ZuulWebAPI(object):
uuid = "0" * 32
params = zuul.executor.common.construct_build_params(
uuid, self.zuulweb.connections, job, item, item.pipeline)
uuid, self.zuulweb.connections, job, item, item.manager.pipeline)
params['zuul'].update(zuul.executor.common.zuul_params_from_job(job))
del params['job_ref']
del params['parent_data']
@ -2615,15 +2615,15 @@ class ZuulWebAPI(object):
branch_name):
project = self._getProjectOrRaise(tenant, project_name)
pipeline = tenant.layout.pipelines.get(pipeline_name)
if not pipeline:
manager = tenant.layout.pipeline_managers.get(pipeline_name)
if not manager:
raise cherrypy.HTTPError(404, 'Unknown pipeline')
change = Branch(project)
change.branch = branch_name or "master"
change.cache_stat = FakeCacheKey()
with LocalZKContext(self.log) as context:
queue = ChangeQueue.new(context, pipeline=pipeline)
queue = ChangeQueue.new(context, manager=manager)
item = QueueItem.new(context, queue=queue, changes=[change])
item.freezeJobGraph(tenant.layout, context,
skip_file_matcher=True,