Remove source from pipelines (1/2)

Now that sources are associated with projects, remove the source
attribute from pipelines altogether.

For ease of review, the test fixtures are updated in a subsequent
commit.

Story: 2000953
Change-Id: Ie6b3b6be1b11b0d4462188902121d9cd29c91284
This commit is contained in:
James E. Blair 2017-04-05 11:27:11 -07:00
parent 109da3f9a7
commit 6053de449b
9 changed files with 88 additions and 74 deletions

View File

@ -906,7 +906,8 @@ class TestScheduler(ZuulTestCase):
# TODO: move to test_gerrit (this is a unit test!)
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
tenant = self.sched.abide.tenants.get('tenant-one')
source = tenant.layout.pipelines['gate'].source
(trusted, project) = tenant.getProject('org/project')
source = project.source
# TODO(pabelanger): As we add more source / trigger APIs we should make
# it easier for users to create events for testing.
@ -2169,9 +2170,8 @@ class TestScheduler(ZuulTestCase):
def test_queue_names(self):
"Test shared change queue names"
tenant = self.sched.abide.tenants.get('tenant-one')
source = tenant.layout.pipelines['gate'].source
project1 = source.getProject('org/project1')
project2 = source.getProject('org/project2')
(trusted, project1) = tenant.getProject('org/project1')
(trusted, project2) = tenant.getProject('org/project2')
q1 = tenant.layout.pipelines['gate'].getQueue(project1)
q2 = tenant.layout.pipelines['gate'].getQueue(project2)
self.assertEqual(q1.name, 'integrated')
@ -4415,7 +4415,8 @@ For CI problems and help debugging, contact ci@example.org"""
# processing.
tenant = self.sched.abide.tenants.get('tenant-one')
source = tenant.layout.pipelines['gate'].source
(trusted, project) = tenant.getProject('org/project')
source = project.source
# TODO(pabelanger): As we add more source / trigger APIs we should make
# it easier for users to create events for testing.

View File

@ -647,8 +647,6 @@ class PipelineParser(object):
pipeline = model.Pipeline(conf['name'], layout)
pipeline.description = conf.get('description')
pipeline.source = connections.getSource(conf['source'])
precedence = model.PRECEDENCE_MAP[conf.get('precedence')]
pipeline.precedence = precedence
pipeline.failure_message = conf.get('failure-message',

View File

@ -45,12 +45,12 @@ def make_merger_item(item):
oldrev = None
newrev = None
branch = None
connection_name = item.pipeline.source.connection.connection_name
source = item.change.project.source
connection_name = source.connection.connection_name
project = item.change.project.name
return dict(project=project,
url=item.pipeline.source.getGitUrl(
item.change.project),
url=source.getGitUrl(item.change.project),
connection_name=connection_name,
merge_mode=item.current_build_set.getMergeMode(),
refspec=refspec,
@ -209,6 +209,7 @@ class ExecutorClient(object):
return False
def execute(self, job, item, pipeline, dependent_items=[]):
tenant = pipeline.layout.tenant
uuid = str(uuid4().hex)
self.log.info(
"Execute job %s (uuid: %s) on nodes %s for change %s "
@ -319,18 +320,22 @@ class ExecutorClient(object):
projects = set()
if job.repos:
for repo in job.repos:
project = item.pipeline.source.getProject(repo)
(trusted, project) = tenant.getProject(repo)
connection = project.source.connection
params['projects'].append(
dict(name=repo,
url=item.pipeline.source.getGitUrl(project)))
dict(name=project.name,
connection_name=connection.connection_name,
url=project.source.getGitUrl(project)))
projects.add(project)
for item in all_items:
if item.change.project not in projects:
project = item.change.project
connection = item.change.project.source.connection
params['projects'].append(
dict(name=item.change.project.name,
url=item.pipeline.source.getGitUrl(
item.change.project)))
projects.add(item.change.project)
dict(name=project.name,
connection_name=connection.connection_name,
url=project.source.getGitUrl(project)))
projects.add(project)
build = Build(job, uuid)
build.parameters = params

View File

@ -35,6 +35,7 @@ class ConnectionRegistry(object):
def __init__(self):
self.connections = {}
self.sources = {}
self.drivers = {}
self.registerDriver(zuul.driver.zuul.ZuulDriver())
@ -71,6 +72,7 @@ class ConnectionRegistry(object):
# Register connections from the config
# TODO(jhesketh): import connection modules dynamically
connections = {}
sources = {}
for section_name in config.sections():
con_match = re.match(r'^connection ([\'\"]?)(.*)(\1)$',
@ -92,6 +94,9 @@ class ConnectionRegistry(object):
driver = self.drivers[con_driver]
connection = driver.getConnection(con_name, con_config)
connections[con_name] = connection
if hasattr(driver, 'getSource'):
source = driver.getSource(connection)
sources[source.canonical_hostname] = source
# If the [gerrit] or [smtp] sections still exist, load them in as a
# connection named 'gerrit' or 'smtp' respectfully
@ -126,6 +131,10 @@ class ConnectionRegistry(object):
driver, driver.name, {})
self.connections = connections
self.sources = sources
def getSourceByHostname(self, hostname):
return self.sources[hostname]
def getSource(self, connection_name):
connection = self.connections[connection_name]

View File

@ -54,7 +54,6 @@ class PipelineManager(object):
def _postConfig(self, layout):
self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
self.log.info(" Source: %s" % self.pipeline.source)
self.log.info(" Requirements:")
for f in self.changeish_filters:
self.log.info(" %s" % f)
@ -147,11 +146,12 @@ class PipelineManager(object):
def reportStart(self, item):
if not self.pipeline._disabled:
source = item.change.project.source
try:
self.log.info("Reporting start, action %s item %s" %
(self.pipeline.start_actions, item))
ret = self.sendReport(self.pipeline.start_actions,
self.pipeline.source, item)
source, item)
if ret:
self.log.error("Reporting item start %s received: %s" %
(item, ret))
@ -454,12 +454,12 @@ class PipelineManager(object):
elif hasattr(item.change, 'newrev'):
oldrev = item.change.oldrev
newrev = item.change.newrev
connection_name = self.pipeline.source.connection.connection_name
source = item.change.project.source
connection_name = source.connection.connection_name
project = item.change.project.name
return dict(project=project,
url=self.pipeline.source.getGitUrl(
item.change.project),
url=source.getGitUrl(item.change.project),
connection_name=connection_name,
merge_mode=item.current_build_set.getMergeMode(),
refspec=refspec,
@ -742,9 +742,9 @@ class PipelineManager(object):
if self.changes_merge:
succeeded = item.didAllJobsSucceed()
merged = item.reported
source = item.change.project.source
if merged:
merged = self.pipeline.source.isMerged(item.change,
item.change.branch)
merged = source.isMerged(item.change, item.change.branch)
self.log.info("Reported change %s status: all-succeeded: %s, "
"merged: %s" % (item.change, succeeded, merged))
change_queue = item.queue
@ -763,11 +763,11 @@ class PipelineManager(object):
zuul_driver = self.sched.connections.drivers['zuul']
tenant = self.pipeline.layout.tenant
zuul_driver.onChangeMerged(tenant, item.change,
self.pipeline.source)
zuul_driver.onChangeMerged(tenant, item.change, source)
def _reportItem(self, item):
self.log.debug("Reporting change %s" % item.change)
source = item.change.project.source
ret = True # Means error as returned by trigger.report
if item.getConfigError():
self.log.debug("Invalid config for change %s" % item.change)
@ -802,7 +802,7 @@ class PipelineManager(object):
try:
self.log.info("Reporting item %s, actions: %s" %
(item, actions))
ret = self.sendReport(actions, self.pipeline.source, item)
ret = self.sendReport(actions, source, item)
if ret:
self.log.error("Reporting item %s received: %s" %
(item, ret))

View File

@ -79,16 +79,17 @@ class DependentPipelineManager(PipelineManager):
self.pipeline.getQueue(change.project))
def isChangeReadyToBeEnqueued(self, change):
if not self.pipeline.source.canMerge(change,
self.getSubmitAllowNeeds()):
source = change.project.source
if not source.canMerge(change, self.getSubmitAllowNeeds()):
self.log.debug("Change %s can not merge, ignoring" % change)
return False
return True
def enqueueChangesBehind(self, change, quiet, ignore_requirements,
change_queue):
to_enqueue = []
self.log.debug("Checking for changes needing %s:" % change)
to_enqueue = []
source = change.project.source
if not hasattr(change, 'needed_by_changes'):
self.log.debug(" %s does not support dependencies" % type(change))
return
@ -100,8 +101,7 @@ class DependentPipelineManager(PipelineManager):
(other_change, other_change.project,
change_queue))
continue
if self.pipeline.source.canMerge(other_change,
self.getSubmitAllowNeeds()):
if source.canMerge(other_change, self.getSubmitAllowNeeds()):
self.log.debug(" Change %s needs %s and is ready to merge" %
(other_change, change))
to_enqueue.append(other_change)
@ -131,6 +131,7 @@ class DependentPipelineManager(PipelineManager):
def checkForChangesNeededBy(self, change, change_queue):
self.log.debug("Checking for changes needed by %s:" % change)
source = change.project.source
# Return true if okay to proceed enqueing this change,
# false if the change should not be enqueued.
if not hasattr(change, 'needs_changes'):
@ -164,8 +165,7 @@ class DependentPipelineManager(PipelineManager):
self.log.debug(" Needed change is already ahead "
"in the queue")
continue
if self.pipeline.source.canMerge(needed_change,
self.getSubmitAllowNeeds()):
if source.canMerge(needed_change, self.getSubmitAllowNeeds()):
self.log.debug(" Change %s is needed" % needed_change)
if needed_change not in changes_needed:
changes_needed.append(needed_change)

View File

@ -106,11 +106,7 @@ class Attributes(object):
class Pipeline(object):
"""A configuration that ties triggers, reporters, managers and sources.
Source
Where changes should come from. It is a named connection to
an external service defined in zuul.conf
"""A configuration that ties together triggers, reporters and managers
Trigger
A description of which events should be processed
@ -136,7 +132,6 @@ class Pipeline(object):
self.manager = None
self.queues = []
self.precedence = PRECEDENCE_NORMAL
self.source = None
self.triggers = []
self.start_actions = []
self.success_actions = []

View File

@ -120,15 +120,15 @@ class RPCListener(object):
else:
errors += 'Invalid tenant: %s\n' % (args['tenant'],)
return (args, event, errors, pipeline, project)
return (args, event, errors, project)
def handle_enqueue(self, job):
(args, event, errors, pipeline, project) = self._common_enqueue(job)
(args, event, errors, project) = self._common_enqueue(job)
if not errors:
event.change_number, event.patch_number = args['change'].split(',')
try:
pipeline.source.getChange(event, project)
project.source.getChange(event, project)
except Exception:
errors += 'Invalid change: %s\n' % (args['change'],)
@ -139,7 +139,7 @@ class RPCListener(object):
job.sendWorkComplete()
def handle_enqueue_ref(self, job):
(args, event, errors, pipeline, project) = self._common_enqueue(job)
(args, event, errors, project) = self._common_enqueue(job)
if not errors:
event.ref = args['ref']

View File

@ -486,6 +486,19 @@ class Scheduler(threading.Thread):
finally:
self.layout_lock.release()
def _reenqueueGetProject(self, tenant, project):
# Attempt to get the same project as the one passed in. If
# the project is now found on a different connection, return
# the new version of the project. If it is no longer
# available (due to a connection being removed), return None.
project_name = project.canonical_name
(trusted, new_project) = tenant.getProject(project_name)
if new_project:
return new_project
source = self.connections.getSourceByHostname(
project.canonical_hostname)
return source.getProject(project.name)
def _reenqueueTenant(self, old_tenant, tenant):
for name, new_pipeline in tenant.layout.pipelines.items():
old_pipeline = old_tenant.layout.pipelines.get(name)
@ -505,11 +518,11 @@ class Scheduler(threading.Thread):
item.items_behind = []
item.pipeline = None
item.queue = None
project_name = item.change.project.name
item.change.project = new_pipeline.source.getProject(
project_name)
if new_pipeline.manager.reEnqueueItem(item,
last_head):
item.change.project = self._reenqueueGetProject(
tenant, item.change.project)
if (item.change.project and
new_pipeline.manager.reEnqueueItem(item,
last_head)):
for build in item.current_build_set.getBuilds():
new_job = item.getJob(build.job.name)
if new_job:
@ -553,7 +566,6 @@ class Scheduler(threading.Thread):
# TODOv3(jeblair): remove postconfig calls?
for pipeline in tenant.layout.pipelines.values():
pipeline.source.postConfig()
for trigger in pipeline.triggers:
trigger.postConfig(pipeline)
for reporter in pipeline.actions:
@ -613,7 +625,7 @@ class Scheduler(threading.Thread):
tenant = self.abide.tenants.get(event.tenant_name)
(trusted, project) = tenant.getProject(event.project_name)
pipeline = tenant.layout.pipelines[event.forced_pipeline]
change = pipeline.source.getChange(event, project)
change = project.source.getChange(event, project)
self.log.debug("Event %s for change %s was directly assigned "
"to pipeline %s" % (event, change, self))
pipeline.manager.addChange(change, ignore_requirements=True)
@ -702,31 +714,25 @@ class Scheduler(threading.Thread):
event = self.trigger_event_queue.get()
self.log.debug("Processing trigger event %s" % event)
try:
source = self.connections.getSourceByHostname(
event.project_hostname)
try:
change = source.getChange(event)
except exceptions.ChangeNotFound as e:
self.log.debug("Unable to get change %s from "
"source %s",
e.change, source)
return
for tenant in self.abide.tenants.values():
reconfigured_tenant = False
if (event.type == 'change-merged' and
hasattr(change, 'files') and
change.updatesConfig()):
# The change that just landed updates the config.
# Clear out cached data for this project and
# perform a reconfiguration.
change.project.unparsed_config = None
self.reconfigureTenant(tenant)
for pipeline in tenant.layout.pipelines.values():
# Get the change even if the project is unknown to
# us for the use of updating the cache if there is
# another change depending on this foreign one.
try:
change = pipeline.source.getChange(event)
except exceptions.ChangeNotFound as e:
self.log.debug("Unable to get change %s from "
"source %s (most likely looking "
"for a change from another "
"connection trigger)",
e.change, pipeline.source)
continue
if (event.type == 'change-merged' and
hasattr(change, 'files') and
not reconfigured_tenant and
change.updatesConfig()):
# The change that just landed updates the config.
# Clear out cached data for this project and
# perform a reconfiguration.
change.project.unparsed_config = None
self.reconfigureTenant(tenant)
reconfigured_tenant = True
if event.type == 'patchset-created':
pipeline.manager.removeOldVersionsOfChange(change)
elif event.type == 'change-abandoned':