Remove layout.pipelines

We have two parallel data structures, layout.pipelines and
layout.pipeline_managers.  Most activity involves the managers,
and there is a reference to the pipeline from the manager, so
lets simplify the layout by removing the pipelines data structure.

Change-Id: I4102e8c3b90ef7a8d8867f0d3fe119247bed3f18
This commit is contained in:
James E. Blair
2025-03-25 14:24:03 -07:00
parent bb53e772d3
commit 3967e10ccb
13 changed files with 126 additions and 112 deletions

View File

@ -3311,7 +3311,7 @@ class ZuulTestCase(BaseTestCase):
self.log.debug(
f"Tenant trigger queue {tenant.name} not empty")
return False
for pipeline_name in tenant.layout.pipelines:
for pipeline_name in tenant.layout.pipeline_managers:
if sched.pipeline_management_events[tenant.name][
pipeline_name
].hasEvents():
@ -3432,10 +3432,9 @@ class ZuulTestCase(BaseTestCase):
ctx = None
for tenant in sched.abide.tenants.values():
with tenant_read_lock(self.zk_client, tenant.name):
for pipeline in tenant.layout.pipelines.values():
manager = tenant.layout.pipeline_managers[pipeline.name]
for manager in tenant.layout.pipeline_managers.values():
with (pipeline_lock(self.zk_client, tenant.name,
pipeline.name) as lock,
manager.pipeline.name) as lock,
self.createZKContext(lock) as ctx):
with manager.currentContext(ctx):
manager.state.refresh(ctx)
@ -3480,8 +3479,7 @@ class ZuulTestCase(BaseTestCase):
for build in self.builds:
self.log.info("Running build: %s" % build)
for tenant in self.scheds.first.sched.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
manager = tenant.layout.pipeline_managers.get(pipeline.name)
for manager in tenant.layout.pipeline_managers.values():
for pipeline_queue in manager.state.queues:
if len(pipeline_queue.queue) != 0:
status = ''
@ -3489,7 +3487,7 @@ class ZuulTestCase(BaseTestCase):
status += item.formatStatus()
self.log.info(
'Tenant %s pipeline %s queue %s contents:' % (
tenant.name, pipeline.name,
tenant.name, manager.pipeline.name,
pipeline_queue.name))
for l in status.split('\n'):
if l.strip():
@ -3527,12 +3525,11 @@ class ZuulTestCase(BaseTestCase):
def assertEmptyQueues(self):
# Make sure there are no orphaned jobs
for tenant in self.scheds.first.sched.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
manager = tenant.layout.pipeline_managers.get(pipeline.name)
for manager in tenant.layout.pipeline_managers.values():
for pipeline_queue in manager.state.queues:
if len(pipeline_queue.queue) != 0:
print('pipeline %s queue %s contents %s' % (
pipeline.name, pipeline_queue.name,
manager.pipeline.name, pipeline_queue.name,
pipeline_queue.queue))
self.assertEqual(len(pipeline_queue.queue), 0,
"Pipelines queues should be empty")

View File

@ -381,9 +381,9 @@ class TestSQLConnectionMysql(ZuulTestCase):
# Check the results
tenant = self.scheds.first.sched.abide.tenants.get("tenant-one")
pipeline = tenant.layout.pipelines['check']
manager = tenant.layout.pipeline_managers['check']
reporter = self.scheds.first.connections.getSqlReporter(
pipeline)
manager.pipeline)
with self.scheds.first.connections.getSqlConnection().\
engine.connect() as conn:

View File

@ -83,7 +83,7 @@ class TestJob(BaseTestCase):
self.manager.current_context = self.zk_context
self.manager.state = model.PipelineState()
self.manager.state._set(manager=self.manager)
self.layout.addPipeline(self.pipeline, self.manager)
self.layout.addPipelineManager(self.manager)
with self.zk_context as ctx:
self.queue = model.ChangeQueue.new(
ctx, manager=self.manager)

View File

@ -141,9 +141,9 @@ class TestReporting(ZuulTestCase):
self.assertHistory([])
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
pipeline = tenant.layout.pipelines['check']
manager = tenant.layout.pipeline_managers['check']
reporter = self.scheds.first.connections.getSqlReporter(
pipeline)
manager.pipeline)
with self.scheds.first.connections.getSqlConnection().\
engine.connect() as conn:
@ -170,9 +170,9 @@ class TestReporting(ZuulTestCase):
self.assertHistory([])
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
pipeline = tenant.layout.pipelines['check']
manager = tenant.layout.pipeline_managers['check']
reporter = self.scheds.first.connections.getSqlReporter(
pipeline)
manager.pipeline)
with self.scheds.first.connections.getSqlConnection().\
engine.connect() as conn:

View File

@ -1158,11 +1158,9 @@ class TestScheduler(ZuulTestCase):
time.sleep(2)
found_job = None
pipeline = self.scheds.first.sched.abide.tenants[
'tenant-one'].layout.pipelines['gate']
manager = self.scheds.first.sched.abide.tenants[
'tenant-one'].layout.pipeline_managers['gate']
pipeline_status = pipeline.formatStatusJSON(
pipeline_status = manager.pipeline.formatStatusJSON(
manager,
self.scheds.first.sched.globals.websocket_url)
for queue in pipeline_status['change_queues']:
@ -4298,7 +4296,7 @@ class TestScheduler(ZuulTestCase):
], ordered=False)
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
self.assertEqual(len(tenant.layout.pipelines), 0)
self.assertEqual(len(tenant.layout.pipeline_managers), 0)
def test_live_reconfiguration_del_tenant(self):
# Test tenant deletion while changes are enqueued
@ -4451,9 +4449,9 @@ class TestScheduler(ZuulTestCase):
def get_job():
tenant = self.scheds.first.sched.abide.tenants['tenant-one']
for pipeline in tenant.layout.pipelines.values():
pipeline_status = pipeline.formatStatusJSON(
tenant.layout.pipeline_managers[pipeline.name],
for manager in tenant.layout.pipeline_managers.values():
pipeline_status = manager.pipeline.formatStatusJSON(
manager,
self.scheds.first.sched.globals.websocket_url)
for queue in pipeline_status['change_queues']:
for head in queue['heads']:
@ -4684,9 +4682,8 @@ class TestScheduler(ZuulTestCase):
# Ensure that the status json has the ref so we can render it in the
# web ui.
tenant = self.scheds.first.sched.abide.tenants['tenant-one']
pipeline = tenant.layout.pipelines['periodic']
manager = tenant.layout.pipeline_managers['periodic']
pipeline_status = pipeline.formatStatusJSON(
pipeline_status = manager.pipeline.formatStatusJSON(
manager,
self.scheds.first.sched.globals.websocket_url)
@ -4860,7 +4857,7 @@ class TestScheduler(ZuulTestCase):
report_mock.side_effect = Exception('Gerrit failed to report')
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
check = tenant.layout.pipelines['check']
check = tenant.layout.pipeline_managers['check'].pipeline
check.success_actions = sorted(check.success_actions,
key=lambda x: x.name)
@ -5499,34 +5496,39 @@ For CI problems and help debugging, contact ci@example.org"""
"dependencies was unable to be automatically merged with the "
"current state of its repository. Please rebase the change and "
"upload a new patchset.",
tenant.layout.pipelines['check'].merge_conflict_message)
tenant.layout.pipeline_managers['check'].
pipeline.merge_conflict_message)
self.assertEqual(
"The merge failed! For more information...",
tenant.layout.pipelines['gate'].merge_conflict_message)
tenant.layout.pipeline_managers['gate'].
pipeline.merge_conflict_message)
self.assertEqual(
len(tenant.layout.pipelines['check'].merge_conflict_actions), 1)
len(tenant.layout.pipeline_managers['check'].
pipeline.merge_conflict_actions), 1)
self.assertEqual(
len(tenant.layout.pipelines['gate'].merge_conflict_actions), 2)
len(tenant.layout.pipeline_managers['gate'].
pipeline.merge_conflict_actions), 2)
self.assertTrue(isinstance(
tenant.layout.pipelines['check'].merge_conflict_actions[0],
tenant.layout.pipeline_managers['check'].
pipeline.merge_conflict_actions[0],
gerritreporter.GerritReporter))
self.assertTrue(
(
isinstance(tenant.layout.pipelines['gate'].
merge_conflict_actions[0],
isinstance(tenant.layout.pipeline_managers['gate'].
pipeline.merge_conflict_actions[0],
zuul.driver.smtp.smtpreporter.SMTPReporter) and
isinstance(tenant.layout.pipelines['gate'].
merge_conflict_actions[1],
isinstance(tenant.layout.pipeline_managers['gate'].
pipeline.merge_conflict_actions[1],
gerritreporter.GerritReporter)
) or (
isinstance(tenant.layout.pipelines['gate'].
merge_conflict_actions[0],
isinstance(tenant.layout.pipeline_managers['gate'].
pipeline.merge_conflict_actions[0],
gerritreporter.GerritReporter) and
isinstance(tenant.layout.pipelines['gate'].
merge_conflict_actions[1],
isinstance(tenant.layout.pipeline_managers['gate'].
pipeline.merge_conflict_actions[1],
zuul.driver.smtp.smtpreporter.SMTPReporter)
)
)
@ -5542,34 +5544,39 @@ For CI problems and help debugging, contact ci@example.org"""
"dependencies was unable to be automatically merged with the "
"current state of its repository. Please rebase the change and "
"upload a new patchset.",
tenant.layout.pipelines['check'].merge_conflict_message)
tenant.layout.pipeline_managers['check'].
pipeline.merge_conflict_message)
self.assertEqual(
"The merge failed! For more information...",
tenant.layout.pipelines['gate'].merge_conflict_message)
tenant.layout.pipeline_managers['gate'].
pipeline.merge_conflict_message)
self.assertEqual(
len(tenant.layout.pipelines['check'].merge_conflict_actions), 1)
len(tenant.layout.pipeline_managers['check'].
pipeline.merge_conflict_actions), 1)
self.assertEqual(
len(tenant.layout.pipelines['gate'].merge_conflict_actions), 2)
len(tenant.layout.pipeline_managers['gate'].
pipeline.merge_conflict_actions), 2)
self.assertTrue(isinstance(
tenant.layout.pipelines['check'].merge_conflict_actions[0],
tenant.layout.pipeline_managers['check'].
pipeline.merge_conflict_actions[0],
gerritreporter.GerritReporter))
self.assertTrue(
(
isinstance(tenant.layout.pipelines['gate'].
merge_conflict_actions[0],
isinstance(tenant.layout.pipeline_managers['gate'].
pipeline.merge_conflict_actions[0],
zuul.driver.smtp.smtpreporter.SMTPReporter) and
isinstance(tenant.layout.pipelines['gate'].
merge_conflict_actions[1],
isinstance(tenant.layout.pipeline_managers['gate'].
pipeline.merge_conflict_actions[1],
gerritreporter.GerritReporter)
) or (
isinstance(tenant.layout.pipelines['gate'].
merge_conflict_actions[0],
isinstance(tenant.layout.pipeline_managers['gate'].
pipeline.merge_conflict_actions[0],
gerritreporter.GerritReporter) and
isinstance(tenant.layout.pipelines['gate'].
merge_conflict_actions[1],
isinstance(tenant.layout.pipeline_managers['gate'].
pipeline.merge_conflict_actions[1],
zuul.driver.smtp.smtpreporter.SMTPReporter)
)
)
@ -5713,7 +5720,8 @@ For CI problems and help debugging, contact ci@example.org"""
"Test a pipeline will only report to the disabled trigger when failing"
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
self.assertEqual(3, tenant.layout.pipelines['check'].disable_at)
self.assertEqual(
3, tenant.layout.pipeline_managers['check'].pipeline.disable_at)
self.assertEqual(
0,
tenant.layout.pipeline_managers[
@ -5825,7 +5833,8 @@ For CI problems and help debugging, contact ci@example.org"""
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
self.assertEqual(3, tenant.layout.pipelines['check'].disable_at)
self.assertEqual(
3, tenant.layout.pipeline_managers['check'].pipeline.disable_at)
self.assertEqual(
0,
tenant.layout.pipeline_managers[
@ -8193,7 +8202,8 @@ class TestSemaphore(ZuulTestCase):
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
status = tenant.layout.pipelines["check"].formatStatusJSON(
status = tenant.layout.pipeline_managers[
"check"].pipeline.formatStatusJSON(
tenant.layout.pipeline_managers["check"])
jobs = status["change_queues"][0]["heads"][0][0]["jobs"]
self.assertEqual(jobs[0]["waiting_status"],

View File

@ -384,7 +384,7 @@ class TestScaleOutScheduler(ZuulTestCase):
], ordered=False)
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
self.assertEqual(len(tenant.layout.pipelines), 0)
self.assertEqual(len(tenant.layout.pipeline_managers), 0)
stat = self.zk_client.client.exists(pipeline_zk_path)
self.assertIsNone(stat)

View File

@ -6354,7 +6354,8 @@ class TestDataReturn(AnsibleZuulTestCase):
# Make sure skipped jobs are not reported as failing
tenant = self.scheds.first.sched.abide.tenants.get("tenant-one")
status = tenant.layout.pipelines["check"].formatStatusJSON(
status = tenant.layout.pipeline_managers[
"check"].pipeline.formatStatusJSON(
tenant.layout.pipeline_managers["check"])
self.assertEqual(
status["change_queues"][0]["heads"][0][0]["failing_reasons"], [])
@ -9397,7 +9398,8 @@ class TestProvidesRequiresMysql(ZuulTestCase):
# Verify the waiting status for both jobs is "repo state"
tenant = self.scheds.first.sched.abide.tenants.get('tenant-one')
status = tenant.layout.pipelines["gate"].formatStatusJSON(
status = tenant.layout.pipeline_managers[
"gate"].pipeline.formatStatusJSON(
tenant.layout.pipeline_managers["gate"])
jobs = status["change_queues"][0]["heads"][0][0]["jobs"]
self.assertEqual(jobs[0]["waiting_status"], 'repo state')
@ -9410,7 +9412,8 @@ class TestProvidesRequiresMysql(ZuulTestCase):
self.waitUntilSettled()
# Verify the nodepool waiting status
status = tenant.layout.pipelines["gate"].formatStatusJSON(
status = tenant.layout.pipeline_managers[
"gate"].pipeline.formatStatusJSON(
tenant.layout.pipeline_managers["gate"])
jobs = status["change_queues"][0]["heads"][0][0]["jobs"]
self.assertEqual(jobs[0]["waiting_status"],
@ -9425,7 +9428,8 @@ class TestProvidesRequiresMysql(ZuulTestCase):
self.waitUntilSettled()
# Verify the executor waiting status
status = tenant.layout.pipelines["gate"].formatStatusJSON(
status = tenant.layout.pipeline_managers[
"gate"].pipeline.formatStatusJSON(
tenant.layout.pipeline_managers["gate"])
jobs = status["change_queues"][0]["heads"][0][0]["jobs"]
self.assertEqual(jobs[0]["waiting_status"], 'executor')
@ -9438,7 +9442,8 @@ class TestProvidesRequiresMysql(ZuulTestCase):
self.executor_api.release()
self.waitUntilSettled()
status = tenant.layout.pipelines["gate"].formatStatusJSON(
status = tenant.layout.pipeline_managers[
"gate"].pipeline.formatStatusJSON(
tenant.layout.pipeline_managers["gate"])
jobs = status["change_queues"][0]["heads"][0][0]["jobs"]
self.assertIsNone(jobs[0]["waiting_status"])
@ -9454,7 +9459,8 @@ class TestProvidesRequiresMysql(ZuulTestCase):
self.assertEqual(len(self.builds), 1)
status = tenant.layout.pipelines["gate"].formatStatusJSON(
status = tenant.layout.pipeline_managers[
"gate"].pipeline.formatStatusJSON(
tenant.layout.pipeline_managers["gate"])
# First change

View File

@ -2788,7 +2788,7 @@ class TenantParser(object):
with parse_context.accumulator.catchErrors():
manager = self.createManager(parse_context, pipeline,
tenant)
layout.addPipeline(pipeline, manager)
layout.addPipelineManager(manager)
for nodeset in parsed_config.nodesets:
with parse_context.errorContext(stanza='nodeset', conf=nodeset):
@ -2820,7 +2820,8 @@ class TenantParser(object):
with parse_context.errorContext(stanza='job', conf=job):
with parse_context.accumulator.catchErrors():
job.validateReferences(layout)
for pipeline in layout.pipelines.values():
for manager in layout.pipeline_managers.values():
pipeline = manager.pipeline
with parse_context.errorContext(stanza='pipeline', conf=pipeline):
with parse_context.accumulator.catchErrors():
pipeline.validateReferences(layout)
@ -3367,7 +3368,7 @@ class ConfigLoader(object):
# have their own version of reality. We do not support
# creating, updating, or deleting pipelines in dynamic
# layout changes.
layout.pipelines = tenant.layout.pipelines
layout.pipeline_managers = tenant.layout.pipeline_managers
# NOTE: the semaphore definitions are copied from the
# static layout here. For semaphores there should be no

View File

@ -29,7 +29,8 @@ class GerritDriver(Driver, ConnectionInterface, TriggerInterface,
def reconfigure(self, tenant):
connection_checker_map = {}
connection_filter_map = {}
for pipeline in tenant.layout.pipelines.values():
for manager in tenant.layout.pipeline_managers.values():
pipeline = manager.pipeline
for trigger in pipeline.triggers:
if isinstance(trigger, gerrittrigger.GerritTrigger):
con = trigger.connection

View File

@ -108,7 +108,8 @@ class TimerDriver(Driver, TriggerInterface):
def _addJobs(self, tenant):
jobs = {}
for pipeline in tenant.layout.pipelines.values():
for manager in tenant.layout.pipeline_managers.values():
pipeline = manager.pipeline
for ef in pipeline.event_filters:
if not isinstance(ef.trigger, timertrigger.TimerTrigger):
continue

View File

@ -624,7 +624,7 @@ class Pipeline(object):
# valid.
for pipeline in self.supercedes:
if not layout.pipelines.get(pipeline):
if not layout.pipeline_managers.get(pipeline):
raise Exception(
'The pipeline "{this}" supercedes an unknown pipeline '
'{other}.'.format(
@ -9583,13 +9583,12 @@ class Layout(object):
"Provider %s is already defined" % provider.name)
self.providers[provider.name] = provider
def addPipeline(self, pipeline, manager):
if pipeline.name in self.pipelines:
def addPipelineManager(self, manager):
if manager.pipeline.name in self.pipeline_managers:
raise Exception(
"Pipeline %s is already defined" % pipeline.name)
"Pipeline %s is already defined" % manager.pipeline.name)
self.pipelines[pipeline.name] = pipeline
self.pipeline_managers[pipeline.name] = manager
self.pipeline_managers[manager.pipeline.name] = manager
def addProjectTemplate(self, project_template):
for job in project_template.embeddedJobs():

View File

@ -841,13 +841,11 @@ class Scheduler(threading.Thread):
start_ltime = self.zk_client.getCurrentLtime()
# lock and refresh the pipeline
for tenant in self.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
for manager in tenant.layout.pipeline_managers.values():
with (pipeline_lock(
self.zk_client, tenant.name,
pipeline.name) as lock,
manager.pipeline.name) as lock,
self.createZKContext(lock, self.log) as ctx):
manager = tenant.layout.pipeline_managers.get(
pipeline.name)
manager.state.refresh(ctx, read_only=True)
# add any blobstore references
for item in manager.state.getAllItems(
@ -906,8 +904,8 @@ class Scheduler(threading.Thread):
tenant = self.abide.tenants.get(tenant_name)
if tenant is None:
raise ValueError(f'Unknown tenant {event.tenant_name}')
pipeline = tenant.layout.pipelines.get(pipeline_name)
if pipeline is None:
manager = tenant.layout.pipeline_managers.get(pipeline_name)
if manager is None:
raise ValueError(f'Unknown pipeline {event.pipeline_name}')
self.pipeline_management_events[tenant_name][pipeline_name].put(
@ -1017,11 +1015,11 @@ class Scheduler(threading.Thread):
if not self.statsd:
return
try:
for pipeline in tenant.layout.pipelines.values():
for manager in tenant.layout.pipeline_managers.values():
# stats.gauges.zuul.tenant.<tenant>.pipeline.
# <pipeline>.current_changes
key = 'zuul.tenant.%s.pipeline.%s' % (
tenant.name, pipeline.name)
tenant.name, manager.pipeline.name)
self.statsd.gauge(key + '.current_changes', 0)
except Exception:
self.log.exception("Exception reporting initial "
@ -1766,9 +1764,9 @@ class Scheduler(threading.Thread):
# This is called from _doReconfigureEvent while holding the
# layout lock
if old_tenant:
for name, old_pipeline in old_tenant.layout.pipelines.items():
for name, old_manager in \
old_tenant.layout.pipeline_managers.items():
new_manager = tenant.layout.pipeline_managers.get(name)
old_manager = old_tenant.layout.pipeline_managers.get(name)
if not new_manager:
with old_manager.currentContext(context):
try:
@ -1776,14 +1774,15 @@ class Scheduler(threading.Thread):
except Exception:
self.log.exception(
"Failed to cleanup deleted pipeline %s:",
old_pipeline)
old_manager.pipeline)
self.management_events[tenant.name].initialize()
self.trigger_events[tenant.name].initialize()
self.connections.reconfigureDrivers(tenant)
# TODOv3(jeblair): remove postconfig calls?
for pipeline in tenant.layout.pipelines.values():
for manager in tenant.layout.pipeline_managers.values():
pipeline = manager.pipeline
self.pipeline_management_events[tenant.name][
pipeline.name].initialize()
self.pipeline_trigger_events[tenant.name][
@ -1848,14 +1847,14 @@ class Scheduler(threading.Thread):
# Called when a tenant is deleted during reconfiguration
self.log.info("Removing tenant %s during reconfiguration" %
(tenant,))
for pipeline in tenant.layout.pipelines.values():
manager = tenant.layout.pipeline_managers.get(pipeline.name)
for manager in tenant.layout.pipeline_managers.values():
with manager.currentContext(context):
try:
self._reconfigureDeletePipeline(manager)
except Exception:
self.log.exception(
"Failed to cleanup deleted pipeline %s:", pipeline)
"Failed to cleanup deleted pipeline %s:",
manager.pipeline)
# Delete the tenant root path for this tenant in ZooKeeper to remove
# all tenant specific event queues
@ -2019,8 +2018,8 @@ class Scheduler(threading.Thread):
tenant = self.abide.tenants.get(event.tenant_name)
if tenant is None:
raise ValueError('Unknown tenant %s' % event.tenant_name)
pipeline = tenant.layout.pipelines.get(event.pipeline_name)
if pipeline is None:
manager = tenant.layout.pipeline_managers.get(event.pipeline_name)
if manager is None:
raise ValueError('Unknown pipeline %s' % event.pipeline_name)
manager = tenant.layout.pipeline_managers.get(event.pipeline_name)
canonical_name = event.project_hostname + '/' + event.project_name
@ -2057,10 +2056,9 @@ class Scheduler(threading.Thread):
tenant = self.abide.tenants.get(event.tenant_name)
if tenant is None:
raise ValueError(f'Unknown tenant {event.tenant_name}')
pipeline = tenant.layout.pipelines.get(event.pipeline_name)
if pipeline is None:
raise ValueError(f'Unknown pipeline {event.pipeline_name}')
manager = tenant.layout.pipeline_managers.get(event.pipeline_name)
if manager is None:
raise ValueError(f'Unknown pipeline {event.pipeline_name}')
canonical_name = event.project_hostname + '/' + event.project_name
(trusted, project) = tenant.getProject(canonical_name)
if project is None:
@ -2115,7 +2113,7 @@ class Scheduler(threading.Thread):
for tenant in tenants:
if tenant.name in notified:
continue
for pipeline_name in tenant.layout.pipelines.keys():
for pipeline_name in tenant.layout.pipeline_managers.keys():
event = PipelineSemaphoreReleaseEvent()
self.pipeline_management_events[
tenant.name][pipeline_name].put(
@ -2126,13 +2124,12 @@ class Scheduler(threading.Thread):
self.log.debug("Checking if all builds are complete")
waiting = False
for tenant in self.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values():
manager = tenant.layout.pipeline_managers[pipeline.name]
for manager in tenant.layout.pipeline_managers.values():
for item in manager.state.getAllItems():
for build in item.current_build_set.getBuilds():
if build.result is None:
self.log.debug("%s waiting on %s" %
(pipeline.manager, build))
(manager, build))
waiting = True
if not waiting:
self.log.debug("All builds are complete")
@ -2719,11 +2716,11 @@ class Scheduler(threading.Thread):
tenant = self.abide.tenants.get(event.tenant_name)
if tenant is None:
raise ValueError(f'Unknown tenant {event.tenant_name}')
pipeline = tenant.layout.pipelines.get(event.pipeline_name)
if pipeline is None:
manager = tenant.layout.pipeline_managers.get(event.pipeline_name)
if manager is None:
raise ValueError(f'Unknown pipeline {event.pipeline_name}')
self.pipeline_management_events[tenant.name][
pipeline.name
manager.pipeline.name
].put(event)
event_forwarded = True
except Exception:

View File

@ -1161,8 +1161,8 @@ class ZuulWebAPI(object):
# in the pipeline management event queue and don't need to
# take the detour via the tenant management event queue.
pipeline_name = body['pipeline']
pipeline = tenant.layout.pipelines.get(pipeline_name)
if pipeline is None:
manager = tenant.layout.pipeline_managers.get(pipeline_name)
if manager is None:
raise cherrypy.HTTPError(400, 'Unknown pipeline')
event = DequeueEvent(
@ -1196,14 +1196,16 @@ class ZuulWebAPI(object):
# in the pipeline management event queue and don't need to
# take the detour via the tenant management event queue.
pipeline_name = body['pipeline']
pipeline = tenant.layout.pipelines.get(pipeline_name)
if pipeline is None:
manager = tenant.layout.pipeline_managers.get(pipeline_name)
if manager is None:
raise cherrypy.HTTPError(400, 'Unknown pipeline')
if 'change' in body:
return self._enqueue(tenant, project, pipeline, body['change'])
return self._enqueue(tenant, project, manager.pipeline,
body['change'])
elif all(p in body for p in ['ref', 'oldrev', 'newrev']):
return self._enqueue_ref(tenant, project, pipeline, body['ref'],
return self._enqueue_ref(tenant, project,
manager.pipeline, body['ref'],
body['oldrev'], body['newrev'])
else:
raise cherrypy.HTTPError(400, 'Invalid request body')
@ -1248,8 +1250,8 @@ class ZuulWebAPI(object):
# Validate the pipeline so we can enqueue the event directly
# in the pipeline management event queue and don't need to
# take the detour via the tenant management event queue.
pipeline = tenant.layout.pipelines.get(pipeline_name)
if pipeline is None:
manager = tenant.layout.pipeline_managers.get(pipeline_name)
if manager is None:
raise cherrypy.HTTPError(400, 'Unknown pipeline')
event = PromoteEvent(tenant_name, pipeline_name, changes)
@ -1982,9 +1984,9 @@ class ZuulWebAPI(object):
@cherrypy.tools.check_tenant_auth()
def pipelines(self, tenant_name, tenant, auth):
ret = []
for pipeline, pipeline_config in tenant.layout.pipelines.items():
for pipeline_name, manager in tenant.layout.pipeline_managers.items():
triggers = []
for trigger in pipeline_config.triggers:
for trigger in manager.pipeline.triggers:
if isinstance(trigger.connection, BaseConnection):
name = trigger.connection.connection_name
else:
@ -1994,7 +1996,7 @@ class ZuulWebAPI(object):
"name": name,
"driver": trigger.driver.name,
})
ret.append({"name": pipeline, "triggers": triggers})
ret.append({"name": pipeline_name, "triggers": triggers})
return ret