From 8b1dc3fb22e30989ee1f94b5cce4d02ac04b588d Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Tue, 5 Jul 2016 16:49:00 -0700 Subject: [PATCH] Add dynamic reconfiguration If a change alters .zuul.yaml in a repo that is permitted to use in-repo configuration, create a shadow configuration layout specifically for that and any following changes with the new configuration in place. Such configuration changes extend only to altering jobs and job trees. More substantial changes such as altering pipelines will be ignored. This only applies to "project" repos (ie, the repositories under test which may incidentally have .zuul.yaml files) rather than "config" repos (repositories specifically designed to hold Zuul configuration in zuul.yaml files). This is to avoid the situation where a user might propose a change to a config repository (and Zuul would therefore run) that would perform actions that the gatekeepers of that repository would not normally permit. This change also corrects an issue with job inheritance in that the Job instances attached to the project pipeline job trees (ie, those that represent the job as invoked in the specific pipeline configuration for a project) were inheriting attributes at configuration time rather than when job trees are frozen when a change is enqueued. This could mean that they would inherit attributes from the wrong variant of a job. Change-Id: If3cd47094e6c6914abf0ffaeca45997c132b8e32 --- tests/base.py | 62 ++++--- .../config/in-repo/git/org_project/.zuul.yaml | 8 + .../config/in-repo/git/org_project/README | 1 + .../multi-tenant/git/org_project1/README | 1 + .../multi-tenant/git/org_project2/README | 1 + .../project-template/git/org_project/README | 1 + tests/test_model.py | 90 ++++++++++- tests/test_v3.py | 42 +++-- zuul/configloader.py | 153 +++++++++++------- zuul/launcher/server.py | 15 ++ zuul/manager/__init__.py | 103 +++++++++++- zuul/merger/client.py | 10 +- zuul/merger/merger.py | 20 ++- zuul/merger/server.py | 9 +- zuul/model.py | 151 ++++++++++++----- zuul/reporter/__init__.py | 2 +- zuul/scheduler.py | 15 +- 17 files changed, 505 insertions(+), 179 deletions(-) create mode 100644 tests/fixtures/config/in-repo/git/org_project/.zuul.yaml create mode 100644 tests/fixtures/config/in-repo/git/org_project/README create mode 100644 tests/fixtures/config/multi-tenant/git/org_project1/README create mode 100644 tests/fixtures/config/multi-tenant/git/org_project2/README create mode 100644 tests/fixtures/config/project-template/git/org_project/README diff --git a/tests/base.py b/tests/base.py index 6321fe9784..a75d36bc2c 100755 --- a/tests/base.py +++ b/tests/base.py @@ -108,7 +108,7 @@ class FakeChange(object): 'VRFY': ('Verified', -2, 2)} def __init__(self, gerrit, number, project, branch, subject, - status='NEW', upstream_root=None): + status='NEW', upstream_root=None, files={}): self.gerrit = gerrit self.reported = 0 self.queried = 0 @@ -142,11 +142,11 @@ class FakeChange(object): 'url': 'https://hostname/%s' % number} self.upstream_root = upstream_root - self.addPatchset() + self.addPatchset(files=files) self.data['submitRecords'] = self.getSubmitRecords() self.open = status == 'NEW' - def add_fake_change_to_repo(self, msg, fn, large): + def addFakeChangeToRepo(self, msg, files, large): path = os.path.join(self.upstream_root, self.project) repo = git.Repo(path) ref = ChangeReference.create(repo, '1/%s/%s' % (self.number, @@ -158,12 +158,11 @@ class FakeChange(object): path = os.path.join(self.upstream_root, self.project) if not large: - fn = os.path.join(path, fn) - f = open(fn, 'w') - f.write("test %s %s %s\n" % - (self.branch, self.number, self.latest_patchset)) - f.close() - repo.index.add([fn]) + for fn, content in files.items(): + fn = os.path.join(path, fn) + with open(fn, 'w') as f: + f.write(content) + repo.index.add([fn]) else: for fni in range(100): fn = os.path.join(path, str(fni)) @@ -180,19 +179,20 @@ class FakeChange(object): repo.heads['master'].checkout() return r - def addPatchset(self, files=[], large=False): + def addPatchset(self, files=None, large=False): self.latest_patchset += 1 - if files: - fn = files[0] - else: + if not files: fn = '%s-%s' % (self.branch.replace('/', '_'), self.number) + data = ("test %s %s %s\n" % + (self.branch, self.number, self.latest_patchset)) + files = {fn: data} msg = self.subject + '-' + str(self.latest_patchset) - c = self.add_fake_change_to_repo(msg, fn, large) + c = self.addFakeChangeToRepo(msg, files, large) ps_files = [{'file': '/COMMIT_MSG', 'type': 'ADDED'}, {'file': 'README', 'type': 'MODIFIED'}] - for f in files: + for f in files.keys(): ps_files.append({'file': f, 'type': 'ADDED'}) d = {'approvals': [], 'createdOn': time.time(), @@ -400,11 +400,12 @@ class FakeGerritConnection(zuul.connection.gerrit.GerritConnection): self.queries = [] self.upstream_root = upstream_root - def addFakeChange(self, project, branch, subject, status='NEW'): + def addFakeChange(self, project, branch, subject, status='NEW', + files=None): self.change_number += 1 c = FakeChange(self, self.change_number, project, branch, subject, upstream_root=self.upstream_root, - status=status) + status=status, files=files) self.changes[self.change_number] = c return c @@ -937,9 +938,8 @@ class ZuulTestCase(BaseTestCase): self.config.set('zuul', 'state_dir', self.state_root) # For each project in config: - self.init_repo("org/project") - self.init_repo("org/project1") - self.init_repo("org/project2") + # TODOv3(jeblair): remove these and replace with new git + # filesystem fixtures self.init_repo("org/project3") self.init_repo("org/project4") self.init_repo("org/project5") @@ -1107,13 +1107,12 @@ class ZuulTestCase(BaseTestCase): 'git') if os.path.exists(git_path): for reponame in os.listdir(git_path): - self.copyDirToRepo(reponame, + project = reponame.replace('_', '/') + self.copyDirToRepo(project, os.path.join(git_path, reponame)) def copyDirToRepo(self, project, source_path): - repo_path = os.path.join(self.upstream_root, project) - if not os.path.exists(repo_path): - self.init_repo(project) + self.init_repo(project) files = {} for (dirpath, dirnames, filenames) in os.walk(source_path): @@ -1126,7 +1125,7 @@ class ZuulTestCase(BaseTestCase): content = f.read() files[relative_filepath] = content self.addCommitToRepo(project, 'add content from fixture', - files, branch='master') + files, branch='master', tag='init') def setup_repos(self): """Subclasses can override to manipulate repos before tests""" @@ -1176,21 +1175,13 @@ class ZuulTestCase(BaseTestCase): config_writer.set_value('user', 'email', 'user@example.com') config_writer.set_value('user', 'name', 'User Name') - fn = os.path.join(path, 'README') - f = open(fn, 'w') - f.write("test\n") - f.close() - repo.index.add([fn]) repo.index.commit('initial commit') master = repo.create_head('master') - repo.create_tag('init') repo.head.reference = master zuul.merger.merger.reset_repo_to_head(repo) repo.git.clean('-x', '-f', '-d') - self.create_branch(project, 'mp') - def create_branch(self, project, branch): path = os.path.join(self.upstream_root, project) repo = git.Repo.init(path) @@ -1452,7 +1443,8 @@ tenants: f.close() self.config.set('zuul', 'tenant_config', f.name) - def addCommitToRepo(self, project, message, files, branch='master'): + def addCommitToRepo(self, project, message, files, + branch='master', tag=None): path = os.path.join(self.upstream_root, project) repo = git.Repo(path) repo.head.reference = branch @@ -1467,3 +1459,5 @@ tenants: repo.head.reference = branch repo.git.clean('-x', '-f', '-d') repo.heads[branch].checkout() + if tag: + repo.create_tag(tag) diff --git a/tests/fixtures/config/in-repo/git/org_project/.zuul.yaml b/tests/fixtures/config/in-repo/git/org_project/.zuul.yaml new file mode 100644 index 0000000000..d6f083d3ac --- /dev/null +++ b/tests/fixtures/config/in-repo/git/org_project/.zuul.yaml @@ -0,0 +1,8 @@ +- job: + name: project-test1 + +- project: + name: org/project + tenant-one-gate: + jobs: + - project-test1 diff --git a/tests/fixtures/config/in-repo/git/org_project/README b/tests/fixtures/config/in-repo/git/org_project/README new file mode 100644 index 0000000000..9daeafb986 --- /dev/null +++ b/tests/fixtures/config/in-repo/git/org_project/README @@ -0,0 +1 @@ +test diff --git a/tests/fixtures/config/multi-tenant/git/org_project1/README b/tests/fixtures/config/multi-tenant/git/org_project1/README new file mode 100644 index 0000000000..9daeafb986 --- /dev/null +++ b/tests/fixtures/config/multi-tenant/git/org_project1/README @@ -0,0 +1 @@ +test diff --git a/tests/fixtures/config/multi-tenant/git/org_project2/README b/tests/fixtures/config/multi-tenant/git/org_project2/README new file mode 100644 index 0000000000..9daeafb986 --- /dev/null +++ b/tests/fixtures/config/multi-tenant/git/org_project2/README @@ -0,0 +1 @@ +test diff --git a/tests/fixtures/config/project-template/git/org_project/README b/tests/fixtures/config/project-template/git/org_project/README new file mode 100644 index 0000000000..9daeafb986 --- /dev/null +++ b/tests/fixtures/config/project-template/git/org_project/README @@ -0,0 +1 @@ +test diff --git a/tests/test_model.py b/tests/test_model.py index 145c119185..e22351b84e 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -51,6 +51,11 @@ class TestJob(BaseTestCase): def test_job_inheritance(self): layout = model.Layout() + + pipeline = model.Pipeline('gate', layout) + layout.addPipeline(pipeline) + queue = model.ChangeQueue(pipeline) + base = configloader.JobParser.fromYaml(layout, { 'name': 'base', 'timeout': 30, @@ -71,17 +76,21 @@ class TestJob(BaseTestCase): }) layout.addJob(python27diablo) - pipeline = model.Pipeline('gate', layout) - layout.addPipeline(pipeline) - queue = model.ChangeQueue(pipeline) + project_config = configloader.ProjectParser.fromYaml(layout, { + 'name': 'project', + 'gate': { + 'jobs': [ + 'python27' + ] + } + }) + layout.addProjectConfig(project_config, update_pipeline=False) project = model.Project('project') - tree = pipeline.addProject(project) - tree.addJob(layout.getJob('python27')) - change = model.Change(project) change.branch = 'master' item = queue.enqueueChange(change) + item.current_build_set.layout = layout self.assertTrue(base.changeMatches(change)) self.assertTrue(python27.changeMatches(change)) @@ -94,6 +103,8 @@ class TestJob(BaseTestCase): self.assertEqual(job.timeout, 40) change.branch = 'stable/diablo' + item = queue.enqueueChange(change) + item.current_build_set.layout = layout self.assertTrue(base.changeMatches(change)) self.assertTrue(python27.changeMatches(change)) @@ -105,6 +116,73 @@ class TestJob(BaseTestCase): self.assertEqual(job.name, 'python27') self.assertEqual(job.timeout, 50) + def test_job_inheritance_job_tree(self): + layout = model.Layout() + + pipeline = model.Pipeline('gate', layout) + layout.addPipeline(pipeline) + queue = model.ChangeQueue(pipeline) + + base = configloader.JobParser.fromYaml(layout, { + 'name': 'base', + 'timeout': 30, + }) + layout.addJob(base) + python27 = configloader.JobParser.fromYaml(layout, { + 'name': 'python27', + 'parent': 'base', + 'timeout': 40, + }) + layout.addJob(python27) + python27diablo = configloader.JobParser.fromYaml(layout, { + 'name': 'python27', + 'branches': [ + 'stable/diablo' + ], + 'timeout': 50, + }) + layout.addJob(python27diablo) + + project_config = configloader.ProjectParser.fromYaml(layout, { + 'name': 'project', + 'gate': { + 'jobs': [ + {'python27': {'timeout': 70}} + ] + } + }) + layout.addProjectConfig(project_config, update_pipeline=False) + + project = model.Project('project') + change = model.Change(project) + change.branch = 'master' + item = queue.enqueueChange(change) + item.current_build_set.layout = layout + + self.assertTrue(base.changeMatches(change)) + self.assertTrue(python27.changeMatches(change)) + self.assertFalse(python27diablo.changeMatches(change)) + + item.freezeJobTree() + self.assertEqual(len(item.getJobs()), 1) + job = item.getJobs()[0] + self.assertEqual(job.name, 'python27') + self.assertEqual(job.timeout, 70) + + change.branch = 'stable/diablo' + item = queue.enqueueChange(change) + item.current_build_set.layout = layout + + self.assertTrue(base.changeMatches(change)) + self.assertTrue(python27.changeMatches(change)) + self.assertTrue(python27diablo.changeMatches(change)) + + item.freezeJobTree() + self.assertEqual(len(item.getJobs()), 1) + job = item.getJobs()[0] + self.assertEqual(job.name, 'python27') + self.assertEqual(job.timeout, 70) + class TestJobTimeData(BaseTestCase): def setUp(self): diff --git a/tests/test_v3.py b/tests/test_v3.py index 8874015bdf..50e20c8bda 100644 --- a/tests/test_v3.py +++ b/tests/test_v3.py @@ -74,22 +74,6 @@ class TestInRepoConfig(ZuulTestCase): tenant_config_file = 'config/in-repo/main.yaml' - def setup_repos(self): - in_repo_conf = textwrap.dedent( - """ - - job: - name: project-test1 - - - project: - name: org/project - tenant-one-gate: - jobs: - - project-test1 - """) - - self.addCommitToRepo('org/project', 'add zuul conf', - {'.zuul.yaml': in_repo_conf}) - def test_in_repo_config(self): A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') A.addApproval('CRVW', 2) @@ -103,6 +87,32 @@ class TestInRepoConfig(ZuulTestCase): self.assertIn('tenant-one-gate', A.messages[1], "A should transit tenant-one gate") + def test_dynamic_config(self): + in_repo_conf = textwrap.dedent( + """ + - job: + name: project-test2 + + - project: + name: org/project + tenant-one-gate: + jobs: + - project-test2 + """) + + A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A', + files={'.zuul.yaml': in_repo_conf}) + A.addApproval('CRVW', 2) + self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) + self.waitUntilSettled() + self.assertEqual(self.getJobFromHistory('project-test2').result, + 'SUCCESS') + self.assertEqual(A.data['status'], 'MERGED') + self.assertEqual(A.reported, 2, + "A should report start and success") + self.assertIn('tenant-one-gate', A.messages[1], + "A should transit tenant-one gate") + class TestProjectTemplate(ZuulTestCase): tenant_config_file = 'config/project-template/main.yaml' diff --git a/zuul/configloader.py b/zuul/configloader.py index 2748c94357..5021856b69 100644 --- a/zuul/configloader.py +++ b/zuul/configloader.py @@ -18,7 +18,6 @@ import yaml import voluptuous as vs from zuul import model -import zuul.manager import zuul.manager.dependent import zuul.manager.independent from zuul import change_matcher @@ -73,8 +72,6 @@ class JobParser(object): 'irrelevant-files': to_list(str), 'nodes': [node], 'timeout': int, - '_project_source': str, # used internally - '_project_name': str, # used internally } return vs.Schema(job) @@ -99,12 +96,6 @@ class JobParser(object): # accumulate onto any previously applied tags from # metajobs. job.tags = job.tags.union(set(tags)) - if not job.project_source: - # Thes attributes may not be overidden -- the first - # reference definition of a job is in the repo where it is - # first defined. - job.project_source = conf.get('_project_source') - job.project_name = conf.get('_project_name') job.failure_message = conf.get('failure-message', job.failure_message) job.success_message = conf.get('success-message', job.success_message) job.failure_url = conf.get('failure-url', job.failure_url) @@ -161,12 +152,12 @@ class ProjectTemplateParser(object): tree = model.JobTree(None) for conf_job in conf: if isinstance(conf_job, six.string_types): - tree.addJob(layout.getJob(conf_job)) + tree.addJob(model.Job(conf_job)) elif isinstance(conf_job, dict): # A dictionary in a job tree may override params, or # be the root of a sub job tree, or both. - jobname, attrs = dict.items()[0] - jobs = attrs.pop('jobs') + jobname, attrs = conf_job.items()[0] + jobs = attrs.pop('jobs', None) if attrs: # We are overriding params, so make a new job def attrs['name'] = jobname @@ -457,43 +448,64 @@ class TenantParser(object): def fromYaml(base, connections, scheduler, merger, conf): TenantParser.getSchema(connections)(conf) tenant = model.Tenant(conf['name']) - tenant_config = model.UnparsedTenantConfig() - incdata = TenantParser._loadTenantInRepoLayouts(merger, connections, - conf) - tenant_config.extend(incdata) - tenant.layout = TenantParser._parseLayout(base, tenant_config, + unparsed_config = model.UnparsedTenantConfig() + tenant.config_repos, tenant.project_repos = \ + TenantParser._loadTenantConfigRepos(connections, conf) + tenant.config_repos_config, tenant.project_repos_config = \ + TenantParser._loadTenantInRepoLayouts( + merger, connections, tenant.config_repos, tenant.project_repos) + unparsed_config.extend(tenant.config_repos_config) + unparsed_config.extend(tenant.project_repos_config) + tenant.layout = TenantParser._parseLayout(base, unparsed_config, scheduler, connections) + tenant.layout.tenant = tenant return tenant @staticmethod - def _loadTenantInRepoLayouts(merger, connections, conf_tenant): - config = model.UnparsedTenantConfig() - jobs = [] + def _loadTenantConfigRepos(connections, conf_tenant): + config_repos = [] + project_repos = [] + for source_name, conf_source in conf_tenant.get('source', {}).items(): source = connections.getSource(source_name) - # Get main config files. These files are permitted the - # full range of configuration. for conf_repo in conf_source.get('config-repos', []): project = source.getProject(conf_repo) - url = source.getGitUrl(project) - job = merger.getFiles(project.name, url, 'master', - files=['zuul.yaml', '.zuul.yaml']) - job.project = project - job.config_repo = True - jobs.append(job) + config_repos.append((source, project)) - # Get in-project-repo config files which have a restricted - # set of options. for conf_repo in conf_source.get('project-repos', []): project = source.getProject(conf_repo) - url = source.getGitUrl(project) - # TODOv3(jeblair): config should be branch specific - job = merger.getFiles(project.name, url, 'master', - files=['.zuul.yaml']) - job.project = project - job.config_repo = False - jobs.append(job) + project_repos.append((source, project)) + + return config_repos, project_repos + + @staticmethod + def _loadTenantInRepoLayouts(merger, connections, config_repos, + project_repos): + config_repos_config = model.UnparsedTenantConfig() + project_repos_config = model.UnparsedTenantConfig() + jobs = [] + + for (source, project) in config_repos: + # Get main config files. These files are permitted the + # full range of configuration. + url = source.getGitUrl(project) + job = merger.getFiles(project.name, url, 'master', + files=['zuul.yaml', '.zuul.yaml']) + job.project = project + job.config_repo = True + jobs.append(job) + + for (source, project) in project_repos: + # Get in-project-repo config files which have a restricted + # set of options. + url = source.getGitUrl(project) + # TODOv3(jeblair): config should be branch specific + job = merger.getFiles(project.name, url, 'master', + files=['.zuul.yaml']) + job.project = project + job.config_repo = False + jobs.append(job) for job in jobs: # Note: this is an ordered list -- we wait for cat jobs to @@ -509,38 +521,30 @@ class TenantParser(object): (job.project, fn)) if job.config_repo: incdata = TenantParser._parseConfigRepoLayout( - job.files[fn], source_name, job.project.name) + job.files[fn]) + config_repos_config.extend(incdata) else: incdata = TenantParser._parseProjectRepoLayout( - job.files[fn], source_name, job.project.name) - config.extend(incdata) - return config + job.files[fn]) + project_repos_config.extend(incdata) + job.project.unparsed_config = incdata + return config_repos_config, project_repos_config @staticmethod - def _parseConfigRepoLayout(data, source_name, project_name): + def _parseConfigRepoLayout(data): # This is the top-level configuration for a tenant. config = model.UnparsedTenantConfig() config.extend(yaml.load(data)) - # Remember where this job was defined - for conf_job in config.jobs: - conf_job['_project_source'] = source_name - conf_job['_project_name'] = project_name - return config @staticmethod - def _parseProjectRepoLayout(data, source_name, project_name): + def _parseProjectRepoLayout(data): # TODOv3(jeblair): this should implement some rules to protect # aspects of the config that should not be changed in-repo config = model.UnparsedTenantConfig() config.extend(yaml.load(data)) - # Remember where this job was defined - for conf_job in config.jobs: - conf_job['_project_source'] = source_name - conf_job['_project_name'] = project_name - return config @staticmethod @@ -572,14 +576,18 @@ class TenantParser(object): class ConfigLoader(object): log = logging.getLogger("zuul.ConfigLoader") + def expandConfigPath(self, config_path): + if config_path: + config_path = os.path.expanduser(config_path) + if not os.path.exists(config_path): + raise Exception("Unable to read tenant config file at %s" % + config_path) + return config_path + def loadConfig(self, config_path, scheduler, merger, connections): abide = model.Abide() - if config_path: - config_path = os.path.expanduser(config_path) - if not os.path.exists(config_path): - raise Exception("Unable to read tenant config file at %s" % - config_path) + config_path = self.expandConfigPath(config_path) with open(config_path) as config_file: self.log.info("Loading configuration from %s" % (config_path,)) data = yaml.load(config_file) @@ -592,3 +600,32 @@ class ConfigLoader(object): merger, conf_tenant) abide.tenants[tenant.name] = tenant return abide + + def createDynamicLayout(self, tenant, files): + config = tenant.config_repos_config.copy() + for source, project in tenant.project_repos: + # TODOv3(jeblair): config should be branch specific + data = files.getFile(project.name, 'master', '.zuul.yaml') + if not data: + data = project.unparsed_config + if not data: + continue + incdata = TenantParser._parseProjectRepoLayout(data) + config.extend(incdata) + + layout = model.Layout() + # TODOv3(jeblair): copying the pipelines could be dangerous/confusing. + layout.pipelines = tenant.layout.pipelines + + for config_job in config.jobs: + layout.addJob(JobParser.fromYaml(layout, config_job)) + + for config_template in config.project_templates: + layout.addProjectTemplate(ProjectTemplateParser.fromYaml( + layout, config_template)) + + for config_project in config.projects: + layout.addProjectConfig(ProjectParser.fromYaml( + layout, config_project), update_pipeline=False) + + return layout diff --git a/zuul/launcher/server.py b/zuul/launcher/server.py index c6a1b23dee..89969cd9c3 100644 --- a/zuul/launcher/server.py +++ b/zuul/launcher/server.py @@ -157,6 +157,7 @@ class LaunchServer(object): def register(self): self.worker.registerFunction("launcher:launch") # TODOv3: abort + self.worker.registerFunction("merger:merge") self.worker.registerFunction("merger:cat") def stop(self): @@ -202,6 +203,9 @@ class LaunchServer(object): elif job.name == 'merger:cat': self.log.debug("Got cat job: %s" % job.unique) self.cat(job) + elif job.name == 'merger:merge': + self.log.debug("Got merge job: %s" % job.unique) + self.merge(job) else: self.log.error("Unable to handle job %s" % job.name) job.sendWorkFail() @@ -294,3 +298,14 @@ class LaunchServer(object): files=files, zuul_url=self.zuul_url) job.sendWorkComplete(json.dumps(result)) + + def merge(self, job): + args = json.loads(job.arguments) + ret = self.merger.mergeChanges(args['items'], args.get('files')) + result = dict(merged=(ret is not None), + zuul_url=self.zuul_url) + if args.get('files'): + result['commit'], result['files'] = ret + else: + result['commit'] = ret + job.sendWorkComplete(json.dumps(result)) diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index a0ddb929ad..181b599d7e 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -14,6 +14,7 @@ import extras import logging from zuul import exceptions +import zuul.configloader from zuul.model import NullChange statsd = extras.try_import('statsd.statsd') @@ -363,7 +364,15 @@ class BasePipelineManager(object): def launchJobs(self, item): # TODO(jeblair): This should return a value indicating a job # was launched. Appears to be a longstanding bug. - jobs = self.pipeline.findJobsToRun(item, self.sched.mutex) + if not item.current_build_set.layout: + return False + + # We may be working with a dynamic layout. Get a pipeline + # object from *that* layout to find out which jobs we should + # run. + layout = item.current_build_set.layout + pipeline = layout.pipelines[self.pipeline.name] + jobs = pipeline.findJobsToRun(item, self.sched.mutex) if jobs: self._launchJobs(item, jobs) @@ -392,6 +401,78 @@ class BasePipelineManager(object): canceled = True return canceled + def _makeMergerItem(self, item): + # Create a dictionary with all info about the item needed by + # the merger. + number = None + patchset = None + oldrev = None + newrev = None + if hasattr(item.change, 'number'): + number = item.change.number + patchset = item.change.patchset + elif hasattr(item.change, 'newrev'): + oldrev = item.change.oldrev + newrev = item.change.newrev + connection_name = self.pipeline.source.connection.connection_name + return dict(project=item.change.project.name, + url=self.pipeline.source.getGitUrl( + item.change.project), + connection_name=connection_name, + merge_mode=item.change.project.merge_mode, + refspec=item.change.refspec, + branch=item.change.branch, + ref=item.current_build_set.ref, + number=number, + patchset=patchset, + oldrev=oldrev, + newrev=newrev, + ) + + def getLayout(self, item): + if not item.change.updatesConfig(): + if item.item_ahead: + return item.item_ahead.current_build_set.layout + else: + return item.queue.pipeline.layout + # This item updates the config, ask the merger for the result. + build_set = item.current_build_set + if build_set.merge_state == build_set.PENDING: + return None + if build_set.merge_state == build_set.COMPLETE: + if build_set.unable_to_merge: + return None + # Load layout + loader = zuul.configloader.ConfigLoader() + self.log.debug("Load dynamic layout with %s" % build_set.files) + layout = loader.createDynamicLayout(item.pipeline.layout.tenant, + build_set.files) + return layout + build_set.merge_state = build_set.PENDING + self.log.debug("Preparing dynamic layout for: %s" % item.change) + dependent_items = self.getDependentItems(item) + dependent_items.reverse() + all_items = dependent_items + [item] + merger_items = map(self._makeMergerItem, all_items) + self.sched.merger.mergeChanges(merger_items, + item.current_build_set, + ['.zuul.yaml'], + self.pipeline.precedence) + + def prepareLayout(self, item): + # Get a copy of the layout in the context of the current + # queue. + # Returns True if the ref is ready, false otherwise + if not item.current_build_set.ref: + item.current_build_set.setConfiguration() + if not item.current_build_set.layout: + item.current_build_set.layout = self.getLayout(item) + if not item.current_build_set.layout: + return False + if not item.job_tree: + item.freezeJobTree() + return True + def _processOneItem(self, item, nnfi): changed = False item_ahead = item.item_ahead @@ -416,6 +497,7 @@ class BasePipelineManager(object): dep_items = self.getFailingDependentItems(item) actionable = change_queue.isActionable(item) item.active = actionable + ready = False if dep_items: failing_reasons.append('a needed change is failing') self.cancelJobs(item, prime=False) @@ -433,13 +515,14 @@ class BasePipelineManager(object): change_queue.moveItem(item, nnfi) changed = True self.cancelJobs(item) - if actionable: - if not item.current_build_set.ref: - item.current_build_set.setConfiguration() - if self.provisionNodes(item): - changed = True - if self.launchJobs(item): - changed = True + if actionable: + ready = self.prepareLayout(item) + if item.current_build_set.unable_to_merge: + failing_reasons.append("it has a merge conflict") + if ready and self.provisionNodes(item): + changed = True + if actionable and ready and self.launchJobs(item): + changed = True if self.pipeline.didAnyJobFail(item): failing_reasons.append("at least one job failed") if (not item.live) and (not item.items_behind): @@ -533,6 +616,7 @@ class BasePipelineManager(object): build_set.zuul_url = event.zuul_url if event.merged: build_set.commit = event.commit + build_set.files.setFiles(event.files) elif event.updated: if not isinstance(item.change, NullChange): build_set.commit = item.change.newrev @@ -591,6 +675,9 @@ class BasePipelineManager(object): actions = self.pipeline.success_actions item.setReportedResult('SUCCESS') self.pipeline._consecutive_failures = 0 + elif not self.pipeline.didMergerSucceed(item): + actions = self.pipeline.merge_failure_actions + item.setReportedResult('MERGER_FAILURE') else: actions = self.pipeline.failure_actions item.setReportedResult('FAILURE') diff --git a/zuul/merger/client.py b/zuul/merger/client.py index ce04795bfa..1e98532b75 100644 --- a/zuul/merger/client.py +++ b/zuul/merger/client.py @@ -107,9 +107,10 @@ class MergeClient(object): timeout=300) return job - def mergeChanges(self, items, build_set, + def mergeChanges(self, items, build_set, files=None, precedence=zuul.model.PRECEDENCE_NORMAL): - data = dict(items=items) + data = dict(items=items, + files=files) self.submitJob('merger:merge', data, build_set, precedence) def updateRepo(self, project, url, build_set, @@ -133,14 +134,15 @@ class MergeClient(object): merged = data.get('merged', False) updated = data.get('updated', False) commit = data.get('commit') - job.files = data.get('files', {}) + files = data.get('files', {}) + job.files = files self.log.info("Merge %s complete, merged: %s, updated: %s, " "commit: %s" % (job, merged, updated, commit)) job.setComplete() if job.build_set: self.sched.onMergeCompleted(job.build_set, zuul_url, - merged, updated, commit) + merged, updated, commit, files) # The test suite expects the job to be removed from the # internal account after the wake flag is set. self.jobs.remove(job) diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py index 46fc468223..a3bccc0f84 100644 --- a/zuul/merger/merger.py +++ b/zuul/merger/merger.py @@ -180,11 +180,14 @@ class Repo(object): origin = repo.remotes.origin origin.update() - def getFiles(self, branch, files): + def getFiles(self, files, branch=None, commit=None): ret = {} repo = self.createRepoObject() - for fn in files: + if branch: tree = repo.heads[branch].commit.tree + else: + tree = repo.commit(commit).tree + for fn in files: if fn in tree: ret[fn] = tree[fn].data_stream.read() else: @@ -335,9 +338,10 @@ class Merger(object): return None return commit - def mergeChanges(self, items): + def mergeChanges(self, items, files=None): recent = {} commit = None + read_files = [] for item in items: if item.get("number") and item.get("patchset"): self.log.debug("Merging for change %s,%s." % @@ -348,8 +352,16 @@ class Merger(object): commit = self._mergeItem(item, recent) if not commit: return None + if files: + repo = self.getRepo(item['project'], item['url']) + repo_files = repo.getFiles(files, commit=commit) + read_files.append(dict(project=item['project'], + branch=item['branch'], + files=repo_files)) + if files: + return commit.hexsha, read_files return commit.hexsha def getFiles(self, project, url, branch, files): repo = self.getRepo(project, url) - return repo.getFiles(branch, files) + return repo.getFiles(files, branch=branch) diff --git a/zuul/merger/server.py b/zuul/merger/server.py index 44606e7af5..750d560f01 100644 --- a/zuul/merger/server.py +++ b/zuul/merger/server.py @@ -105,10 +105,13 @@ class MergeServer(object): def merge(self, job): args = json.loads(job.arguments) - commit = self.merger.mergeChanges(args['items']) - result = dict(merged=(commit is not None), - commit=commit, + ret = self.merger.mergeChanges(args['items'], args.get('files')) + result = dict(merged=(ret is not None), zuul_url=self.zuul_url) + if args.get('files'): + result['commit'], result['files'] = ret + else: + result['commit'] = ret job.sendWorkComplete(json.dumps(result)) def update(self, job): diff --git a/zuul/model.py b/zuul/model.py index 1c0c30932f..fc67e67623 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -146,6 +146,8 @@ class Pipeline(object): return tree def getJobs(self, item): + # TODOv3(jeblair): can this be removed in favor of the frozen + # job list in item? if not item.live: return [] tree = self.getJobTree(item.change.project) @@ -213,21 +215,27 @@ class Pipeline(object): return self._findJobsToRequest(tree.job_trees, item) def haveAllJobsStarted(self, item): - for job in self.getJobs(item): + if not item.hasJobTree(): + return False + for job in item.getJobs(): build = item.current_build_set.getBuild(job.name) if not build or not build.start_time: return False return True def areAllJobsComplete(self, item): - for job in self.getJobs(item): + if not item.hasJobTree(): + return False + for job in item.getJobs(): build = item.current_build_set.getBuild(job.name) if not build or not build.result: return False return True def didAllJobsSucceed(self, item): - for job in self.getJobs(item): + if not item.hasJobTree(): + return False + for job in item.getJobs(): if not job.voting: continue build = item.current_build_set.getBuild(job.name) @@ -243,7 +251,9 @@ class Pipeline(object): return True def didAnyJobFail(self, item): - for job in self.getJobs(item): + if not item.hasJobTree(): + return False + for job in item.getJobs(): if not job.voting: continue build = item.current_build_set.getBuild(job.name) @@ -254,7 +264,9 @@ class Pipeline(object): def isHoldingFollowingChanges(self, item): if not item.live: return False - for job in self.getJobs(item): + if not item.hasJobTree(): + return False + for job in item.getJobs(): if not job.hold_following_changes: continue build = item.current_build_set.getBuild(job.name) @@ -375,7 +387,6 @@ class ChangeQueue(object): def enqueueChange(self, change): item = QueueItem(self, change) - item.freezeJobTree() self.enqueueItem(item) item.enqueue_time = time.time() return item @@ -457,6 +468,7 @@ class Project(object): # of layout projects, this should matter # when deciding whether to enqueue their changes self.foreign = foreign + self.unparsed_config = None def __str__(self): return self.name @@ -489,8 +501,6 @@ class Job(object): pre_run=None, post_run=None, voting=None, - project_source=None, - project_name=None, failure_message=None, success_message=None, failure_url=None, @@ -652,6 +662,27 @@ class Worker(object): return '' % self.name +class RepoFiles(object): + # When we ask a merger to prepare a future multiple-repo state and + # collect files so that we can dynamically load our configuration, + # this class provides easy access to that data. + def __init__(self): + self.projects = {} + + def __repr__(self): + return '' % self.projects + + def setFiles(self, items): + self.projects = {} + for item in items: + project = self.projects.setdefault(item['project'], {}) + branch = project.setdefault(item['branch'], {}) + branch.update(item['files']) + + def getFile(self, project, branch, fn): + return self.projects.get(project, {}).get(branch, {}).get(fn) + + class BuildSet(object): # Merge states: NEW = 1 @@ -679,6 +710,8 @@ class BuildSet(object): self.merge_state = self.NEW self.nodes = {} # job -> nodes self.node_requests = {} # job -> reqs + self.files = RepoFiles() + self.layout = None def __repr__(self): return '' % ( @@ -754,6 +787,7 @@ class QueueItem(object): self.reported = False self.active = False # Whether an item is within an active window self.live = True # Whether an item is intended to be processed at all + self.layout = None # This item's shadow layout self.job_tree = None def __repr__(self): @@ -782,37 +816,15 @@ class QueueItem(object): def setReportedResult(self, result): self.current_build_set.result = result - def _createJobTree(self, job_trees, parent): - for tree in job_trees: - job = tree.job - if not job.changeMatches(self.change): - continue - frozen_job = Job(job.name) - frozen_tree = JobTree(frozen_job) - inherited = set() - for variant in self.pipeline.layout.getJobs(job.name): - if variant.changeMatches(self.change): - if variant not in inherited: - frozen_job.inheritFrom(variant) - inherited.add(variant) - if job not in inherited: - # Only update from the job in the tree if it is - # unique, otherwise we might unset an attribute we - # have overloaded. - frozen_job.inheritFrom(job) - parent.job_trees.append(frozen_tree) - self._createJobTree(tree.job_trees, frozen_tree) - - def createJobTree(self): - project_tree = self.pipeline.getJobTree(self.change.project) - ret = JobTree(None) - self._createJobTree(project_tree.job_trees, ret) - return ret - def freezeJobTree(self): """Find or create actual matching jobs for this item's change and store the resulting job tree.""" - self.job_tree = self.createJobTree() + layout = self.current_build_set.layout + self.job_tree = layout.createJobTree(self) + + def hasJobTree(self): + """Returns True if the item has a job tree.""" + return self.job_tree is not None def getJobs(self): if not self.live or not self.job_tree: @@ -877,7 +889,7 @@ class QueueItem(object): else: ret['owner'] = None max_remaining = 0 - for job in self.pipeline.getJobs(self): + for job in self.getJobs(): now = time.time() build = self.current_build_set.getBuild(job.name) elapsed = None @@ -957,7 +969,7 @@ class QueueItem(object): changeish.project.name, changeish._id(), self.item_ahead) - for job in self.pipeline.getJobs(self): + for job in self.getJobs(): build = self.current_build_set.getBuild(job.name) if build: result = build.result @@ -1008,6 +1020,9 @@ class Changeish(object): def getRelatedChanges(self): return set() + def updatesConfig(self): + return False + class Change(Changeish): def __init__(self, project): @@ -1059,6 +1074,11 @@ class Change(Changeish): related.update(c.getRelatedChanges()) return related + def updatesConfig(self): + if 'zuul.yaml' in self.files or '.zuul.yaml' in self.files: + return True + return False + class Ref(Changeish): def __init__(self, project): @@ -1511,6 +1531,14 @@ class UnparsedTenantConfig(object): self.project_templates = [] self.projects = [] + def copy(self): + r = UnparsedTenantConfig() + r.pipelines = copy.deepcopy(self.pipelines) + r.jobs = copy.deepcopy(self.jobs) + r.project_templates = copy.deepcopy(self.project_templates) + r.projects = copy.deepcopy(self.projects) + return r + def extend(self, conf): if isinstance(conf, UnparsedTenantConfig): self.pipelines.extend(conf.pipelines) @@ -1549,6 +1577,7 @@ class UnparsedTenantConfig(object): class Layout(object): def __init__(self): + self.tenant = None self.projects = {} self.project_configs = {} self.project_templates = {} @@ -1581,20 +1610,62 @@ class Layout(object): def addProjectTemplate(self, project_template): self.project_templates[project_template.name] = project_template - def addProjectConfig(self, project_config): + def addProjectConfig(self, project_config, update_pipeline=True): self.project_configs[project_config.name] = project_config # TODOv3(jeblair): tidy up the relationship between pipelines - # and projects and projectconfigs + # and projects and projectconfigs. Specifically, move + # job_trees out of the pipeline since they are more dynamic + # than pipelines. Remove the update_pipeline argument + if not update_pipeline: + return for pipeline_name, pipeline_config in project_config.pipelines.items(): pipeline = self.pipelines[pipeline_name] project = pipeline.source.getProject(project_config.name) pipeline.job_trees[project] = pipeline_config.job_tree + def _createJobTree(self, change, job_trees, parent): + for tree in job_trees: + job = tree.job + if not job.changeMatches(change): + continue + frozen_job = Job(job.name) + frozen_tree = JobTree(frozen_job) + inherited = set() + for variant in self.getJobs(job.name): + if variant.changeMatches(change): + if variant not in inherited: + frozen_job.inheritFrom(variant) + inherited.add(variant) + if job not in inherited: + # Only update from the job in the tree if it is + # unique, otherwise we might unset an attribute we + # have overloaded. + frozen_job.inheritFrom(job) + parent.job_trees.append(frozen_tree) + self._createJobTree(change, tree.job_trees, frozen_tree) + + def createJobTree(self, item): + project_config = self.project_configs[item.change.project.name] + project_tree = project_config.pipelines[item.pipeline.name].job_tree + ret = JobTree(None) + self._createJobTree(item.change, project_tree.job_trees, ret) + return ret + class Tenant(object): def __init__(self, name): self.name = name self.layout = None + # The list of repos from which we will read main + # configuration. (source, project) + self.config_repos = [] + # The unparsed config from those repos. + self.config_repos_config = None + # The list of projects from which we will read in-repo + # configuration. (source, project) + self.project_repos = [] + # The unparsed config from those repos. + self.project_repos_config = None class Abide(object): diff --git a/zuul/reporter/__init__.py b/zuul/reporter/__init__.py index 6352fbb59c..97dfabce56 100644 --- a/zuul/reporter/__init__.py +++ b/zuul/reporter/__init__.py @@ -108,7 +108,7 @@ class BaseReporter(object): else: url_pattern = None - for job in pipeline.getJobs(item): + for job in item.getJobs(): build = item.current_build_set.getBuild(job.name) (result, url) = item.formatJobResult(job, url_pattern) if not job.voting: diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 60dafbe3ac..a34be2248a 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -191,12 +191,14 @@ class MergeCompletedEvent(ResultEvent): :arg str commit: The SHA of the merged commit (changes with refs). """ - def __init__(self, build_set, zuul_url, merged, updated, commit): + def __init__(self, build_set, zuul_url, merged, updated, commit, + files): self.build_set = build_set self.zuul_url = zuul_url self.merged = merged self.updated = updated self.commit = commit + self.files = files class NodesProvisionedEvent(ResultEvent): @@ -358,11 +360,12 @@ class Scheduler(threading.Thread): self.wake_event.set() self.log.debug("Done adding complete event for build: %s" % build) - def onMergeCompleted(self, build_set, zuul_url, merged, updated, commit): + def onMergeCompleted(self, build_set, zuul_url, merged, updated, + commit, files): self.log.debug("Adding merge complete event for build set: %s" % build_set) - event = MergeCompletedEvent(build_set, zuul_url, - merged, updated, commit) + event = MergeCompletedEvent(build_set, zuul_url, merged, + updated, commit, files) self.result_event_queue.put(event) self.wake_event.set() @@ -606,6 +609,9 @@ class Scheduler(threading.Thread): def _areAllBuildsComplete(self): self.log.debug("Checking if all builds are complete") + if self.merger.areMergesOutstanding(): + self.log.debug("Waiting on merger") + return False waiting = False for pipeline in self.layout.pipelines.values(): for item in pipeline.getAllItems(): @@ -617,7 +623,6 @@ class Scheduler(threading.Thread): if not waiting: self.log.debug("All builds are complete") return True - self.log.debug("All builds are not complete") return False def run(self):