Add dynamic reconfiguration

If a change alters .zuul.yaml in a repo that is permitted to use in-repo
configuration, create a shadow configuration layout specifically for that
and any following changes with the new configuration in place.

Such configuration changes extend only to altering jobs and job trees.
More substantial changes such as altering pipelines will be ignored.  This
only applies to "project" repos (ie, the repositories under test which may
incidentally have .zuul.yaml files) rather than "config" repos (repositories
specifically designed to hold Zuul configuration in zuul.yaml files).  This
is to avoid the situation where a user might propose a change to a config
repository (and Zuul would therefore run) that would perform actions that
the gatekeepers of that repository would not normally permit.

This change also corrects an issue with job inheritance in that the Job
instances attached to the project pipeline job trees (ie, those that
represent the job as invoked in the specific pipeline configuration for
a project) were inheriting attributes at configuration time rather than
when job trees are frozen when a change is enqueued.  This could mean that
they would inherit attributes from the wrong variant of a job.

Change-Id: If3cd47094e6c6914abf0ffaeca45997c132b8e32
changes/97/340597/6
James E. Blair 6 years ago
parent 8d692398f1
commit 8b1dc3fb22
  1. 62
      tests/base.py
  2. 8
      tests/fixtures/config/in-repo/git/org_project/.zuul.yaml
  3. 1
      tests/fixtures/config/in-repo/git/org_project/README
  4. 1
      tests/fixtures/config/multi-tenant/git/org_project1/README
  5. 1
      tests/fixtures/config/multi-tenant/git/org_project2/README
  6. 1
      tests/fixtures/config/project-template/git/org_project/README
  7. 88
      tests/test_model.py
  8. 28
      tests/test_v3.py
  9. 153
      zuul/configloader.py
  10. 15
      zuul/launcher/server.py
  11. 103
      zuul/manager/__init__.py
  12. 10
      zuul/merger/client.py
  13. 20
      zuul/merger/merger.py
  14. 9
      zuul/merger/server.py
  15. 151
      zuul/model.py
  16. 2
      zuul/reporter/__init__.py
  17. 15
      zuul/scheduler.py

@ -108,7 +108,7 @@ class FakeChange(object):
'VRFY': ('Verified', -2, 2)}
def __init__(self, gerrit, number, project, branch, subject,
status='NEW', upstream_root=None):
status='NEW', upstream_root=None, files={}):
self.gerrit = gerrit
self.reported = 0
self.queried = 0
@ -142,11 +142,11 @@ class FakeChange(object):
'url': 'https://hostname/%s' % number}
self.upstream_root = upstream_root
self.addPatchset()
self.addPatchset(files=files)
self.data['submitRecords'] = self.getSubmitRecords()
self.open = status == 'NEW'
def add_fake_change_to_repo(self, msg, fn, large):
def addFakeChangeToRepo(self, msg, files, large):
path = os.path.join(self.upstream_root, self.project)
repo = git.Repo(path)
ref = ChangeReference.create(repo, '1/%s/%s' % (self.number,
@ -158,12 +158,11 @@ class FakeChange(object):
path = os.path.join(self.upstream_root, self.project)
if not large:
fn = os.path.join(path, fn)
f = open(fn, 'w')
f.write("test %s %s %s\n" %
(self.branch, self.number, self.latest_patchset))
f.close()
repo.index.add([fn])
for fn, content in files.items():
fn = os.path.join(path, fn)
with open(fn, 'w') as f:
f.write(content)
repo.index.add([fn])
else:
for fni in range(100):
fn = os.path.join(path, str(fni))
@ -180,19 +179,20 @@ class FakeChange(object):
repo.heads['master'].checkout()
return r
def addPatchset(self, files=[], large=False):
def addPatchset(self, files=None, large=False):
self.latest_patchset += 1
if files:
fn = files[0]
else:
if not files:
fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
data = ("test %s %s %s\n" %
(self.branch, self.number, self.latest_patchset))
files = {fn: data}
msg = self.subject + '-' + str(self.latest_patchset)
c = self.add_fake_change_to_repo(msg, fn, large)
c = self.addFakeChangeToRepo(msg, files, large)
ps_files = [{'file': '/COMMIT_MSG',
'type': 'ADDED'},
{'file': 'README',
'type': 'MODIFIED'}]
for f in files:
for f in files.keys():
ps_files.append({'file': f, 'type': 'ADDED'})
d = {'approvals': [],
'createdOn': time.time(),
@ -400,11 +400,12 @@ class FakeGerritConnection(zuul.connection.gerrit.GerritConnection):
self.queries = []
self.upstream_root = upstream_root
def addFakeChange(self, project, branch, subject, status='NEW'):
def addFakeChange(self, project, branch, subject, status='NEW',
files=None):
self.change_number += 1
c = FakeChange(self, self.change_number, project, branch, subject,
upstream_root=self.upstream_root,
status=status)
status=status, files=files)
self.changes[self.change_number] = c
return c
@ -937,9 +938,8 @@ class ZuulTestCase(BaseTestCase):
self.config.set('zuul', 'state_dir', self.state_root)
# For each project in config:
self.init_repo("org/project")
self.init_repo("org/project1")
self.init_repo("org/project2")
# TODOv3(jeblair): remove these and replace with new git
# filesystem fixtures
self.init_repo("org/project3")
self.init_repo("org/project4")
self.init_repo("org/project5")
@ -1107,13 +1107,12 @@ class ZuulTestCase(BaseTestCase):
'git')
if os.path.exists(git_path):
for reponame in os.listdir(git_path):
self.copyDirToRepo(reponame,
project = reponame.replace('_', '/')
self.copyDirToRepo(project,
os.path.join(git_path, reponame))
def copyDirToRepo(self, project, source_path):
repo_path = os.path.join(self.upstream_root, project)
if not os.path.exists(repo_path):
self.init_repo(project)
self.init_repo(project)
files = {}
for (dirpath, dirnames, filenames) in os.walk(source_path):
@ -1126,7 +1125,7 @@ class ZuulTestCase(BaseTestCase):
content = f.read()
files[relative_filepath] = content
self.addCommitToRepo(project, 'add content from fixture',
files, branch='master')
files, branch='master', tag='init')
def setup_repos(self):
"""Subclasses can override to manipulate repos before tests"""
@ -1176,21 +1175,13 @@ class ZuulTestCase(BaseTestCase):
config_writer.set_value('user', 'email', 'user@example.com')
config_writer.set_value('user', 'name', 'User Name')
fn = os.path.join(path, 'README')
f = open(fn, 'w')
f.write("test\n")
f.close()
repo.index.add([fn])
repo.index.commit('initial commit')
master = repo.create_head('master')
repo.create_tag('init')
repo.head.reference = master
zuul.merger.merger.reset_repo_to_head(repo)
repo.git.clean('-x', '-f', '-d')
self.create_branch(project, 'mp')
def create_branch(self, project, branch):
path = os.path.join(self.upstream_root, project)
repo = git.Repo.init(path)
@ -1452,7 +1443,8 @@ tenants:
f.close()
self.config.set('zuul', 'tenant_config', f.name)
def addCommitToRepo(self, project, message, files, branch='master'):
def addCommitToRepo(self, project, message, files,
branch='master', tag=None):
path = os.path.join(self.upstream_root, project)
repo = git.Repo(path)
repo.head.reference = branch
@ -1467,3 +1459,5 @@ tenants:
repo.head.reference = branch
repo.git.clean('-x', '-f', '-d')
repo.heads[branch].checkout()
if tag:
repo.create_tag(tag)

@ -0,0 +1,8 @@
- job:
name: project-test1
- project:
name: org/project
tenant-one-gate:
jobs:
- project-test1

@ -51,6 +51,11 @@ class TestJob(BaseTestCase):
def test_job_inheritance(self):
layout = model.Layout()
pipeline = model.Pipeline('gate', layout)
layout.addPipeline(pipeline)
queue = model.ChangeQueue(pipeline)
base = configloader.JobParser.fromYaml(layout, {
'name': 'base',
'timeout': 30,
@ -71,17 +76,88 @@ class TestJob(BaseTestCase):
})
layout.addJob(python27diablo)
project_config = configloader.ProjectParser.fromYaml(layout, {
'name': 'project',
'gate': {
'jobs': [
'python27'
]
}
})
layout.addProjectConfig(project_config, update_pipeline=False)
project = model.Project('project')
change = model.Change(project)
change.branch = 'master'
item = queue.enqueueChange(change)
item.current_build_set.layout = layout
self.assertTrue(base.changeMatches(change))
self.assertTrue(python27.changeMatches(change))
self.assertFalse(python27diablo.changeMatches(change))
item.freezeJobTree()
self.assertEqual(len(item.getJobs()), 1)
job = item.getJobs()[0]
self.assertEqual(job.name, 'python27')
self.assertEqual(job.timeout, 40)
change.branch = 'stable/diablo'
item = queue.enqueueChange(change)
item.current_build_set.layout = layout
self.assertTrue(base.changeMatches(change))
self.assertTrue(python27.changeMatches(change))
self.assertTrue(python27diablo.changeMatches(change))
item.freezeJobTree()
self.assertEqual(len(item.getJobs()), 1)
job = item.getJobs()[0]
self.assertEqual(job.name, 'python27')
self.assertEqual(job.timeout, 50)
def test_job_inheritance_job_tree(self):
layout = model.Layout()
pipeline = model.Pipeline('gate', layout)
layout.addPipeline(pipeline)
queue = model.ChangeQueue(pipeline)
project = model.Project('project')
tree = pipeline.addProject(project)
tree.addJob(layout.getJob('python27'))
base = configloader.JobParser.fromYaml(layout, {
'name': 'base',
'timeout': 30,
})
layout.addJob(base)
python27 = configloader.JobParser.fromYaml(layout, {
'name': 'python27',
'parent': 'base',
'timeout': 40,
})
layout.addJob(python27)
python27diablo = configloader.JobParser.fromYaml(layout, {
'name': 'python27',
'branches': [
'stable/diablo'
],
'timeout': 50,
})
layout.addJob(python27diablo)
project_config = configloader.ProjectParser.fromYaml(layout, {
'name': 'project',
'gate': {
'jobs': [
{'python27': {'timeout': 70}}
]
}
})
layout.addProjectConfig(project_config, update_pipeline=False)
project = model.Project('project')
change = model.Change(project)
change.branch = 'master'
item = queue.enqueueChange(change)
item.current_build_set.layout = layout
self.assertTrue(base.changeMatches(change))
self.assertTrue(python27.changeMatches(change))
@ -91,9 +167,11 @@ class TestJob(BaseTestCase):
self.assertEqual(len(item.getJobs()), 1)
job = item.getJobs()[0]
self.assertEqual(job.name, 'python27')
self.assertEqual(job.timeout, 40)
self.assertEqual(job.timeout, 70)
change.branch = 'stable/diablo'
item = queue.enqueueChange(change)
item.current_build_set.layout = layout
self.assertTrue(base.changeMatches(change))
self.assertTrue(python27.changeMatches(change))
@ -103,7 +181,7 @@ class TestJob(BaseTestCase):
self.assertEqual(len(item.getJobs()), 1)
job = item.getJobs()[0]
self.assertEqual(job.name, 'python27')
self.assertEqual(job.timeout, 50)
self.assertEqual(job.timeout, 70)
class TestJobTimeData(BaseTestCase):

@ -74,28 +74,38 @@ class TestInRepoConfig(ZuulTestCase):
tenant_config_file = 'config/in-repo/main.yaml'
def setup_repos(self):
def test_in_repo_config(self):
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('CRVW', 2)
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
self.waitUntilSettled()
self.assertEqual(self.getJobFromHistory('project-test1').result,
'SUCCESS')
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(A.reported, 2,
"A should report start and success")
self.assertIn('tenant-one-gate', A.messages[1],
"A should transit tenant-one gate")
def test_dynamic_config(self):
in_repo_conf = textwrap.dedent(
"""
- job:
name: project-test1
name: project-test2
- project:
name: org/project
tenant-one-gate:
jobs:
- project-test1
- project-test2
""")
self.addCommitToRepo('org/project', 'add zuul conf',
{'.zuul.yaml': in_repo_conf})
def test_in_repo_config(self):
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A',
files={'.zuul.yaml': in_repo_conf})
A.addApproval('CRVW', 2)
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
self.waitUntilSettled()
self.assertEqual(self.getJobFromHistory('project-test1').result,
self.assertEqual(self.getJobFromHistory('project-test2').result,
'SUCCESS')
self.assertEqual(A.data['status'], 'MERGED')
self.assertEqual(A.reported, 2,

@ -18,7 +18,6 @@ import yaml
import voluptuous as vs
from zuul import model
import zuul.manager
import zuul.manager.dependent
import zuul.manager.independent
from zuul import change_matcher
@ -73,8 +72,6 @@ class JobParser(object):
'irrelevant-files': to_list(str),
'nodes': [node],
'timeout': int,
'_project_source': str, # used internally
'_project_name': str, # used internally
}
return vs.Schema(job)
@ -99,12 +96,6 @@ class JobParser(object):
# accumulate onto any previously applied tags from
# metajobs.
job.tags = job.tags.union(set(tags))
if not job.project_source:
# Thes attributes may not be overidden -- the first
# reference definition of a job is in the repo where it is
# first defined.
job.project_source = conf.get('_project_source')
job.project_name = conf.get('_project_name')
job.failure_message = conf.get('failure-message', job.failure_message)
job.success_message = conf.get('success-message', job.success_message)
job.failure_url = conf.get('failure-url', job.failure_url)
@ -161,12 +152,12 @@ class ProjectTemplateParser(object):
tree = model.JobTree(None)
for conf_job in conf:
if isinstance(conf_job, six.string_types):
tree.addJob(layout.getJob(conf_job))
tree.addJob(model.Job(conf_job))
elif isinstance(conf_job, dict):
# A dictionary in a job tree may override params, or
# be the root of a sub job tree, or both.
jobname, attrs = dict.items()[0]
jobs = attrs.pop('jobs')
jobname, attrs = conf_job.items()[0]
jobs = attrs.pop('jobs', None)
if attrs:
# We are overriding params, so make a new job def
attrs['name'] = jobname
@ -457,43 +448,64 @@ class TenantParser(object):
def fromYaml(base, connections, scheduler, merger, conf):
TenantParser.getSchema(connections)(conf)
tenant = model.Tenant(conf['name'])
tenant_config = model.UnparsedTenantConfig()
incdata = TenantParser._loadTenantInRepoLayouts(merger, connections,
conf)
tenant_config.extend(incdata)
tenant.layout = TenantParser._parseLayout(base, tenant_config,
unparsed_config = model.UnparsedTenantConfig()
tenant.config_repos, tenant.project_repos = \
TenantParser._loadTenantConfigRepos(connections, conf)
tenant.config_repos_config, tenant.project_repos_config = \
TenantParser._loadTenantInRepoLayouts(
merger, connections, tenant.config_repos, tenant.project_repos)
unparsed_config.extend(tenant.config_repos_config)
unparsed_config.extend(tenant.project_repos_config)
tenant.layout = TenantParser._parseLayout(base, unparsed_config,
scheduler, connections)
tenant.layout.tenant = tenant
return tenant
@staticmethod
def _loadTenantInRepoLayouts(merger, connections, conf_tenant):
config = model.UnparsedTenantConfig()
jobs = []
def _loadTenantConfigRepos(connections, conf_tenant):
config_repos = []
project_repos = []
for source_name, conf_source in conf_tenant.get('source', {}).items():
source = connections.getSource(source_name)
# Get main config files. These files are permitted the
# full range of configuration.
for conf_repo in conf_source.get('config-repos', []):
project = source.getProject(conf_repo)
url = source.getGitUrl(project)
job = merger.getFiles(project.name, url, 'master',
files=['zuul.yaml', '.zuul.yaml'])
job.project = project
job.config_repo = True
jobs.append(job)
config_repos.append((source, project))
# Get in-project-repo config files which have a restricted
# set of options.
for conf_repo in conf_source.get('project-repos', []):
project = source.getProject(conf_repo)
url = source.getGitUrl(project)
# TODOv3(jeblair): config should be branch specific
job = merger.getFiles(project.name, url, 'master',
files=['.zuul.yaml'])
job.project = project
job.config_repo = False
jobs.append(job)
project_repos.append((source, project))
return config_repos, project_repos
@staticmethod
def _loadTenantInRepoLayouts(merger, connections, config_repos,
project_repos):
config_repos_config = model.UnparsedTenantConfig()
project_repos_config = model.UnparsedTenantConfig()
jobs = []
for (source, project) in config_repos:
# Get main config files. These files are permitted the
# full range of configuration.
url = source.getGitUrl(project)
job = merger.getFiles(project.name, url, 'master',
files=['zuul.yaml', '.zuul.yaml'])
job.project = project
job.config_repo = True
jobs.append(job)
for (source, project) in project_repos:
# Get in-project-repo config files which have a restricted
# set of options.
url = source.getGitUrl(project)
# TODOv3(jeblair): config should be branch specific
job = merger.getFiles(project.name, url, 'master',
files=['.zuul.yaml'])
job.project = project
job.config_repo = False
jobs.append(job)
for job in jobs:
# Note: this is an ordered list -- we wait for cat jobs to
@ -509,38 +521,30 @@ class TenantParser(object):
(job.project, fn))
if job.config_repo:
incdata = TenantParser._parseConfigRepoLayout(
job.files[fn], source_name, job.project.name)
job.files[fn])
config_repos_config.extend(incdata)
else:
incdata = TenantParser._parseProjectRepoLayout(
job.files[fn], source_name, job.project.name)
config.extend(incdata)
return config
job.files[fn])
project_repos_config.extend(incdata)
job.project.unparsed_config = incdata
return config_repos_config, project_repos_config
@staticmethod
def _parseConfigRepoLayout(data, source_name, project_name):
def _parseConfigRepoLayout(data):
# This is the top-level configuration for a tenant.
config = model.UnparsedTenantConfig()
config.extend(yaml.load(data))
# Remember where this job was defined
for conf_job in config.jobs:
conf_job['_project_source'] = source_name
conf_job['_project_name'] = project_name
return config
@staticmethod
def _parseProjectRepoLayout(data, source_name, project_name):
def _parseProjectRepoLayout(data):
# TODOv3(jeblair): this should implement some rules to protect
# aspects of the config that should not be changed in-repo
config = model.UnparsedTenantConfig()
config.extend(yaml.load(data))
# Remember where this job was defined
for conf_job in config.jobs:
conf_job['_project_source'] = source_name
conf_job['_project_name'] = project_name
return config
@staticmethod
@ -572,14 +576,18 @@ class TenantParser(object):
class ConfigLoader(object):
log = logging.getLogger("zuul.ConfigLoader")
def expandConfigPath(self, config_path):
if config_path:
config_path = os.path.expanduser(config_path)
if not os.path.exists(config_path):
raise Exception("Unable to read tenant config file at %s" %
config_path)
return config_path
def loadConfig(self, config_path, scheduler, merger, connections):
abide = model.Abide()
if config_path:
config_path = os.path.expanduser(config_path)
if not os.path.exists(config_path):
raise Exception("Unable to read tenant config file at %s" %
config_path)
config_path = self.expandConfigPath(config_path)
with open(config_path) as config_file:
self.log.info("Loading configuration from %s" % (config_path,))
data = yaml.load(config_file)
@ -592,3 +600,32 @@ class ConfigLoader(object):
merger, conf_tenant)
abide.tenants[tenant.name] = tenant
return abide
def createDynamicLayout(self, tenant, files):
config = tenant.config_repos_config.copy()
for source, project in tenant.project_repos:
# TODOv3(jeblair): config should be branch specific
data = files.getFile(project.name, 'master', '.zuul.yaml')
if not data:
data = project.unparsed_config
if not data:
continue
incdata = TenantParser._parseProjectRepoLayout(data)
config.extend(incdata)
layout = model.Layout()
# TODOv3(jeblair): copying the pipelines could be dangerous/confusing.
layout.pipelines = tenant.layout.pipelines
for config_job in config.jobs:
layout.addJob(JobParser.fromYaml(layout, config_job))
for config_template in config.project_templates:
layout.addProjectTemplate(ProjectTemplateParser.fromYaml(
layout, config_template))
for config_project in config.projects:
layout.addProjectConfig(ProjectParser.fromYaml(
layout, config_project), update_pipeline=False)
return layout

@ -157,6 +157,7 @@ class LaunchServer(object):
def register(self):
self.worker.registerFunction("launcher:launch")
# TODOv3: abort
self.worker.registerFunction("merger:merge")
self.worker.registerFunction("merger:cat")
def stop(self):
@ -202,6 +203,9 @@ class LaunchServer(object):
elif job.name == 'merger:cat':
self.log.debug("Got cat job: %s" % job.unique)
self.cat(job)
elif job.name == 'merger:merge':
self.log.debug("Got merge job: %s" % job.unique)
self.merge(job)
else:
self.log.error("Unable to handle job %s" % job.name)
job.sendWorkFail()
@ -294,3 +298,14 @@ class LaunchServer(object):
files=files,
zuul_url=self.zuul_url)
job.sendWorkComplete(json.dumps(result))
def merge(self, job):
args = json.loads(job.arguments)
ret = self.merger.mergeChanges(args['items'], args.get('files'))
result = dict(merged=(ret is not None),
zuul_url=self.zuul_url)
if args.get('files'):
result['commit'], result['files'] = ret
else:
result['commit'] = ret
job.sendWorkComplete(json.dumps(result))

@ -14,6 +14,7 @@ import extras
import logging
from zuul import exceptions
import zuul.configloader
from zuul.model import NullChange
statsd = extras.try_import('statsd.statsd')
@ -363,7 +364,15 @@ class BasePipelineManager(object):
def launchJobs(self, item):
# TODO(jeblair): This should return a value indicating a job
# was launched. Appears to be a longstanding bug.
jobs = self.pipeline.findJobsToRun(item, self.sched.mutex)
if not item.current_build_set.layout:
return False
# We may be working with a dynamic layout. Get a pipeline
# object from *that* layout to find out which jobs we should
# run.
layout = item.current_build_set.layout
pipeline = layout.pipelines[self.pipeline.name]
jobs = pipeline.findJobsToRun(item, self.sched.mutex)
if jobs:
self._launchJobs(item, jobs)
@ -392,6 +401,78 @@ class BasePipelineManager(object):
canceled = True
return canceled
def _makeMergerItem(self, item):
# Create a dictionary with all info about the item needed by
# the merger.
number = None
patchset = None
oldrev = None
newrev = None
if hasattr(item.change, 'number'):
number = item.change.number
patchset = item.change.patchset
elif hasattr(item.change, 'newrev'):
oldrev = item.change.oldrev
newrev = item.change.newrev
connection_name = self.pipeline.source.connection.connection_name
return dict(project=item.change.project.name,
url=self.pipeline.source.getGitUrl(
item.change.project),
connection_name=connection_name,
merge_mode=item.change.project.merge_mode,
refspec=item.change.refspec,
branch=item.change.branch,
ref=item.current_build_set.ref,
number=number,
patchset=patchset,
oldrev=oldrev,
newrev=newrev,
)
def getLayout(self, item):
if not item.change.updatesConfig():
if item.item_ahead:
return item.item_ahead.current_build_set.layout
else:
return item.queue.pipeline.layout
# This item updates the config, ask the merger for the result.
build_set = item.current_build_set
if build_set.merge_state == build_set.PENDING:
return None
if build_set.merge_state == build_set.COMPLETE:
if build_set.unable_to_merge:
return None
# Load layout
loader = zuul.configloader.ConfigLoader()
self.log.debug("Load dynamic layout with %s" % build_set.files)
layout = loader.createDynamicLayout(item.pipeline.layout.tenant,
build_set.files)
return layout
build_set.merge_state = build_set.PENDING
self.log.debug("Preparing dynamic layout for: %s" % item.change)
dependent_items = self.getDependentItems(item)
dependent_items.reverse()
all_items = dependent_items + [item]
merger_items = map(self._makeMergerItem, all_items)
self.sched.merger.mergeChanges(merger_items,
item.current_build_set,
['.zuul.yaml'],
self.pipeline.precedence)
def prepareLayout(self, item):
# Get a copy of the layout in the context of the current
# queue.
# Returns True if the ref is ready, false otherwise
if not item.current_build_set.ref:
item.current_build_set.setConfiguration()
if not item.current_build_set.layout:
item.current_build_set.layout = self.getLayout(item)
if not item.current_build_set.layout:
return False
if not item.job_tree:
item.freezeJobTree()
return True
def _processOneItem(self, item, nnfi):
changed = False
item_ahead = item.item_ahead
@ -416,6 +497,7 @@ class BasePipelineManager(object):
dep_items = self.getFailingDependentItems(item)
actionable = change_queue.isActionable(item)
item.active = actionable
ready = False
if dep_items:
failing_reasons.append('a needed change is failing')
self.cancelJobs(item, prime=False)
@ -433,13 +515,14 @@ class BasePipelineManager(object):
change_queue.moveItem(item, nnfi)
changed = True
self.cancelJobs(item)
if actionable:
if not item.current_build_set.ref:
item.current_build_set.setConfiguration()
if self.provisionNodes(item):
changed = True
if self.launchJobs(item):
changed = True
if actionable:
ready = self.prepareLayout(item)
if item.current_build_set.unable_to_merge:
failing_reasons.append("it has a merge conflict")
if ready and self.provisionNodes(item):
changed = True
if actionable and ready and self.launchJobs(item):
changed = True
if self.pipeline.didAnyJobFail(item):
failing_reasons.append("at least one job failed")
if (not item.live) and (not item.items_behind):
@ -533,6 +616,7 @@ class BasePipelineManager(object):
build_set.zuul_url = event.zuul_url
if event.merged:
build_set.commit = event.commit
build_set.files.setFiles(event.files)
elif event.updated:
if not isinstance(item.change, NullChange):
build_set.commit = item.change.newrev
@ -591,6 +675,9 @@ class BasePipelineManager(object):
actions = self.pipeline.success_actions
item.setReportedResult('SUCCESS')
self.pipeline._consecutive_failures = 0
elif not self.pipeline.didMergerSucceed(item):
actions = self.pipeline.merge_failure_actions
item.setReportedResult('MERGER_FAILURE')
else:
actions = self.pipeline.failure_actions
item.setReportedResult('FAILURE')

@ -107,9 +107,10 @@ class MergeClient(object):
timeout=300)
return job
def mergeChanges(self, items, build_set,
def mergeChanges(self, items, build_set, files=None,
precedence=zuul.model.PRECEDENCE_NORMAL):
data = dict(items=items)
data = dict(items=items,
files=files)
self.submitJob('merger:merge', data, build_set, precedence)
def updateRepo(self, project, url, build_set,
@ -133,14 +134,15 @@ class MergeClient(object):
merged = data.get('merged', False)
updated = data.get('updated', False)
commit = data.get('commit')
job.files = data.get('files', {})
files = data.get('files', {})
job.files = files
self.log.info("Merge %s complete, merged: %s, updated: %s, "
"commit: %s" %
(job, merged, updated, commit))
job.setComplete()
if job.build_set:
self.sched.onMergeCompleted(job.build_set, zuul_url,
merged, updated, commit)
merged, updated, commit, files)
# The test suite expects the job to be removed from the
# internal account after the wake flag is set.
self.jobs.remove(job)

@ -180,11 +180,14 @@ class Repo(object):
origin = repo.remotes.origin
origin.update()
def getFiles(self, branch, files):
def getFiles(self, files, branch=None, commit=None):
ret = {}
repo = self.createRepoObject()
for fn in files:
if branch:
tree = repo.heads[branch].commit.tree
else:
tree = repo.commit(commit).tree
for fn in files:
if fn in tree:
ret[fn] = tree[fn].data_stream.read()
else:
@ -335,9 +338,10 @@ class Merger(object):
return None
return commit
def mergeChanges(self, items):
def mergeChanges(self, items, files=None):
recent = {}
commit = None
read_files = []
for item in items:
if item.get("number") and item.get("patchset"):
self.log.debug("Merging for change %s,%s." %
@ -348,8 +352,16 @@ class Merger(object):
commit = self._mergeItem(item, recent)
if not commit:
return None
if files:
repo = self.getRepo(item['project'], item['url'])
repo_files = repo.getFiles(files, commit=commit)
read_files.append(dict(project=item['project'],
branch=item['branch'],
files=repo_files))
if files:
return commit.hexsha, read_files
return commit.hexsha
def getFiles(self, project, url, branch, files):
repo = self.getRepo(project, url)
return repo.getFiles(branch, files)
return repo.getFiles(files, branch=branch)

@ -105,10 +105,13 @@ class MergeServer(object):
def merge(self, job):
args = json.loads(job.arguments)
commit = self.merger.mergeChanges(args['items'])
result = dict(merged=(commit is not None),
commit=commit,
ret = self.merger.mergeChanges(args['items'], args.get('files'))
result = dict(merged=(ret is not None),
zuul_url=self.zuul_url)
if args.get('files'):
result['commit'], result['files'] = ret
else:
result['commit'] = ret
job.sendWorkComplete(json.dumps(result))
def update(self, job):

@ -146,6 +146,8 @@ class Pipeline(object):
return tree
def getJobs(self, item):
# TODOv3(jeblair): can this be removed in favor of the frozen
# job list in item?
if not item.live:
return []
tree = self.getJobTree(item.change.project)
@ -213,21 +215,27 @@ class Pipeline(object):
return self._findJobsToRequest(tree.job_trees, item)
def haveAllJobsStarted(self, item):
for job in self.getJobs(item):
if not item.hasJobTree():
return False
for job in item.getJobs():
build = item.current_build_set.getBuild(job.name)
if not build or not build.start_time:
return False
return True
def areAllJobsComplete(self, item):
for job in self.getJobs(item):
if not item.hasJobTree():
return False
for job in item.getJobs():
build = item.current_build_set.getBuild(job.name)
if not build or not build.result:
return False
return True
def didAllJobsSucceed(self, item):
for job in self.getJobs(item):
if not item.hasJobTree():
return False
for job in item.getJobs():
if not job.voting:
continue
build = item.current_build_set.getBuild(job.name)
@ -243,7 +251,9 @@ class Pipeline(object):
return True
def didAnyJobFail(self, item):
for job in self.getJobs(item):
if not item.hasJobTree():
return False
for job in item.getJobs():
if not job.voting:
continue
build = item.current_build_set.getBuild(job.name)
@ -254,7 +264,9 @@ class Pipeline(object):
def isHoldingFollowingChanges(self, item):
if not item.live:
return False
for job in self.getJobs(item):
if not item.hasJobTree():
return False
for job in item.getJobs():
if not job.hold_following_changes:
continue
build = item.current_build_set.getBuild(job.name)
@ -375,7 +387,6 @@ class ChangeQueue(object):
def enqueueChange(self, change):
item = QueueItem(self, change)
item.freezeJobTree()
self.enqueueItem(item)
item.enqueue_time = time.time()
return item
@ -457,6 +468,7 @@ class Project(object):
# of layout projects, this should matter
# when deciding whether to enqueue their changes
self.foreign = foreign
self.unparsed_config = None
def __str__(self):
return self.name
@ -489,8 +501,6 @@ class Job(object):
pre_run=None,
post_run=None,
voting=None,
project_source=None,
project_name=None,
failure_message=None,
success_message=None,
failure_url=None,
@ -652,6 +662,27 @@ class Worker(object):
return '<Worker %s>' % self.name
class RepoFiles(object):
# When we ask a merger to prepare a future multiple-repo state and
# collect files so that we can dynamically load our configuration,
# this class provides easy access to that data.
def __init__(self):
self.projects = {}
def __repr__(self):
return '<RepoFiles %s>' % self.projects
def setFiles(self, items):
self.projects = {}
for item in items:
project = self.projects.setdefault(item['project'], {})
branch = project.setdefault(item['branch'], {})
branch.update(item['files'])
def getFile(self, project, branch, fn):
return self.projects.get(project, {}).get(branch, {}).get(fn)
class BuildSet(object):
# Merge states:
NEW = 1
@ -679,6 +710,8 @@ class BuildSet(object):
self.merge_state = self.NEW
self.nodes = {} # job -> nodes
self.node_requests = {} # job -> reqs
self.files = RepoFiles()
self.layout = None
def __repr__(self):
return '<BuildSet item: %s #builds: %s merge state: %s>' % (
@ -754,6 +787,7 @@ class QueueItem(object):
self.reported = False
self.active = False # Whether an item is within an active window
self.live = True # Whether an item is intended to be processed at all
self.layout = None # This item's shadow layout
self.job_tree = None
def __repr__(self):
@ -782,37 +816,15 @@ class QueueItem(object):
def setReportedResult(self, result):
self.current_build_set.result = result
def _createJobTree(self, job_trees, parent):
for tree in job_trees:
job = tree.job
if not job.changeMatches(self.change):
continue
frozen_job = Job(job.name)
frozen_tree = JobTree(frozen_job)
inherited = set()
for variant in self.pipeline.layout.getJobs(job.name):
if variant.changeMatches(self.change):
if variant not in inherited:
frozen_job.inheritFrom(variant)
inherited.add(variant)
if job not in inherited:
# Only update from the job in the tree if it is
# unique, otherwise we might unset an attribute we
# have overloaded.
frozen_job.inheritFrom(job)
parent.job_trees.append(frozen_tree)
self._createJobTree(tree.job_trees, frozen_tree)
def createJobTree(self):
project_tree = self.pipeline.getJobTree(self.change.project)
ret = JobTree(None)
self._createJobTree(project_tree.job_trees, ret)
return ret
def freezeJobTree(self):
"""Find or create actual matching jobs for this item's change and
store the resulting job tree."""
self.job_tree = self.createJobTree()
layout = self.current_build_set.layout
self.job_tree = layout.createJobTree(self)
def hasJobTree(self):
"""Returns True if the item has a job tree."""
return self.job_tree is not None
def getJobs(self):
if not self.live or not self.job_tree:
@ -877,7 +889,7 @@ class QueueItem(object):
else:
ret['owner'] = None
max_remaining = 0
for job in self.pipeline.getJobs(self):
for job in self.getJobs():
now = time.time()
build = self.current_build_set.getBuild(job.name)
elapsed = None
@ -957,7 +969,7 @@ class QueueItem(object):
changeish.project.name,
changeish._id(),
self.item_ahead)
for job in self.pipeline.getJobs(self):
for job in self.getJobs():
build = self.current_build_set.getBuild(job.name)
if build:
result = build.result
@ -1008,6 +1020,9 @@ class Changeish(object):
def getRelatedChanges(self):
return set()
def updatesConfig(self):
return False
class Change(Changeish):
def __init__(self, project):
@ -1059,6 +1074,11 @@ class Change(Changeish):
related.update(c.getRelatedChanges())
return related
def updatesConfig(self):
if 'zuul.yaml' in self.files or '.zuul.yaml' in self.files:
return True
return False
class Ref(Changeish):
def __init__(self, project):
@ -1511,6 +1531,14 @@ class UnparsedTenantConfig(object):
self.project_templates = []
self.projects = []
def copy(self):
r = UnparsedTenantConfig()
r.pipelines = copy.deepcopy(self.pipelines)
r.jobs = copy.deepcopy(self.jobs)
r.project_templates = copy.deepcopy(self.project_templates)
r.projects = copy.deepcopy(self.projects)
return r
def extend(self, conf):
if isinstance(conf, UnparsedTenantConfig):
self.pipelines.extend(conf.pipelines)
@ -1549,6 +1577,7 @@ class UnparsedTenantConfig(object):
class Layout(object):
def __init__(self):
self.tenant = None
self.projects = {}
self.project_configs = {}
self.project_templates = {}
@ -1581,20 +1610,62 @@ class Layout(object):
def addProjectTemplate(self, project_template):
self.project_templates[project_template.name] = project_template
def addProjectConfig(self, project_config):
def addProjectConfig(self, project_config, update_pipeline=True):
self.project_configs[project_config.name] = project_config
# TODOv3(jeblair): tidy up the relationship between pipelines
# and projects and projectconfigs
# and projects and projectconfigs. Specifically, move
# job_trees out of the pipeline since they are more dynamic
# than pipelines. Remove the update_pipeline argument
if not update_pipeline:
return
for pipeline_name, pipeline_config in project_config.pipelines.items():
pipeline = self.pipelines[pipeline_name]
project = pipeline.source.getProject(project_config.name)
pipeline.job_trees[project] = pipeline_config.job_tree
def _createJobTree(self, change, job_trees, parent):
for tree in job_trees:
job = tree.job
if not job.changeMatches(change):
continue
frozen_job = Job(job.name)
frozen_tree = JobTree(frozen_job)
inherited = set()
for variant in self.getJobs(job.name):
if variant.changeMatches(change):
if variant not in inherited:
frozen_job.inheritFrom(variant)
inherited.add(variant)
if job not in inherited:
# Only update from the job in the tree if it is
# unique, otherwise we might unset an attribute we
# have overloaded.
frozen_job.inheritFrom(job)
parent.job_trees.append(frozen_tree)
self._createJobTree(change, tree.job_trees, frozen_tree)
def createJobTree(self, item):
project_config = self.project_configs[item.change.project.name]
project_tree = project_config.pipelines[item.pipeline.name].job_tree
ret = JobTree(None)
self._createJobTree(item.change, project_tree.job_trees, ret)
return ret
class Tenant(object):
def __init__(self, name):
self.name = name
self.layout = None
# The list of repos from which we will read main
# configuration. (source, project)
self.config_repos = []
# The unparsed config from those repos.
self.config_repos_config = None
# The list of projects from which we will read in-repo
# configuration. (source, project)
self.project_repos = []
# The unparsed config from those repos.
self.project_repos_config = None
class Abide(object):

2