zuul/tests/unit/test_configloader.py

614 lines
27 KiB
Python

# Copyright 2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from configparser import ConfigParser
import fixtures
import logging
import textwrap
from zuul.configloader import AuthorizationRuleParser
from tests.base import ZuulTestCase
class TenantParserTestCase(ZuulTestCase):
create_project_keys = True
CONFIG_SET = set(['pipeline', 'job', 'semaphore', 'project',
'project-template', 'nodeset', 'secret'])
UNTRUSTED_SET = CONFIG_SET - set(['pipeline'])
def setupAllProjectKeys(self, config: ConfigParser):
for project in ['common-config', 'org/project1', 'org/project2']:
self.setupProjectKeys('gerrit', project)
class TestTenantSimple(TenantParserTestCase):
tenant_config_file = 'config/tenant-parser/simple.yaml'
def test_tenant_simple(self):
tenant = self.sched.abide.tenants.get('tenant-one')
self.assertEqual(['common-config'],
[x.name for x in tenant.config_projects])
self.assertEqual(['org/project1', 'org/project2'],
[x.name for x in tenant.untrusted_projects])
project = tenant.config_projects[0]
tpc = tenant.project_configs[project.canonical_name]
self.assertEqual(self.CONFIG_SET, tpc.load_classes)
project = tenant.untrusted_projects[0]
tpc = tenant.project_configs[project.canonical_name]
self.assertEqual(self.UNTRUSTED_SET, tpc.load_classes)
project = tenant.untrusted_projects[1]
tpc = tenant.project_configs[project.canonical_name]
self.assertEqual(self.UNTRUSTED_SET, tpc.load_classes)
self.assertTrue('common-config-job' in tenant.layout.jobs)
self.assertTrue('project1-job' in tenant.layout.jobs)
self.assertTrue('project2-job' in tenant.layout.jobs)
project1_config = tenant.layout.project_configs.get(
'review.example.com/org/project1')
self.assertTrue('common-config-job' in
project1_config[0].pipelines['check'].job_list.jobs)
self.assertTrue('project1-job' in
project1_config[1].pipelines['check'].job_list.jobs)
project2_config = tenant.layout.project_configs.get(
'review.example.com/org/project2')
self.assertTrue('common-config-job' in
project2_config[0].pipelines['check'].job_list.jobs)
self.assertTrue('project2-job' in
project2_config[1].pipelines['check'].job_list.jobs)
def test_variant_description(self):
tenant = self.sched.abide.tenants.get('tenant-one')
job = tenant.layout.jobs.get("project2-job")
self.assertEqual(job[0].variant_description, "")
self.assertEqual(job[1].variant_description, "stable")
class TestTenantOverride(TenantParserTestCase):
tenant_config_file = 'config/tenant-parser/override.yaml'
def test_tenant_override(self):
tenant = self.sched.abide.tenants.get('tenant-one')
self.assertEqual(['common-config'],
[x.name for x in tenant.config_projects])
self.assertEqual(['org/project1', 'org/project2', 'org/project4'],
[x.name for x in tenant.untrusted_projects])
project = tenant.config_projects[0]
tpc = tenant.project_configs[project.canonical_name]
self.assertEqual(self.CONFIG_SET, tpc.load_classes)
project = tenant.untrusted_projects[0]
tpc = tenant.project_configs[project.canonical_name]
self.assertEqual(self.UNTRUSTED_SET - set(['project']),
tpc.load_classes)
project = tenant.untrusted_projects[1]
tpc = tenant.project_configs[project.canonical_name]
self.assertEqual(set(['job']), tpc.load_classes)
self.assertTrue('common-config-job' in tenant.layout.jobs)
self.assertTrue('project1-job' in tenant.layout.jobs)
self.assertTrue('project2-job' in tenant.layout.jobs)
project1_config = tenant.layout.project_configs.get(
'review.example.com/org/project1')
self.assertTrue('common-config-job' in
project1_config[0].pipelines['check'].job_list.jobs)
self.assertFalse('project1-job' in
project1_config[0].pipelines['check'].job_list.jobs)
project2_config = tenant.layout.project_configs.get(
'review.example.com/org/project2')
self.assertTrue('common-config-job' in
project2_config[0].pipelines['check'].job_list.jobs)
self.assertFalse('project2-job' in
project2_config[0].pipelines['check'].job_list.jobs)
class TestTenantGroups(TenantParserTestCase):
tenant_config_file = 'config/tenant-parser/groups.yaml'
def test_tenant_groups(self):
tenant = self.sched.abide.tenants.get('tenant-one')
self.assertEqual(['common-config'],
[x.name for x in tenant.config_projects])
self.assertEqual(['org/project1', 'org/project2'],
[x.name for x in tenant.untrusted_projects])
project = tenant.config_projects[0]
tpc = tenant.project_configs[project.canonical_name]
self.assertEqual(self.CONFIG_SET, tpc.load_classes)
project = tenant.untrusted_projects[0]
tpc = tenant.project_configs[project.canonical_name]
self.assertEqual(self.UNTRUSTED_SET - set(['project']),
tpc.load_classes)
project = tenant.untrusted_projects[1]
tpc = tenant.project_configs[project.canonical_name]
self.assertEqual(self.UNTRUSTED_SET - set(['project']),
tpc.load_classes)
self.assertTrue('common-config-job' in tenant.layout.jobs)
self.assertTrue('project1-job' in tenant.layout.jobs)
self.assertTrue('project2-job' in tenant.layout.jobs)
project1_config = tenant.layout.project_configs.get(
'review.example.com/org/project1')
self.assertTrue('common-config-job' in
project1_config[0].pipelines['check'].job_list.jobs)
self.assertFalse('project1-job' in
project1_config[0].pipelines['check'].job_list.jobs)
project2_config = tenant.layout.project_configs.get(
'review.example.com/org/project2')
self.assertTrue('common-config-job' in
project2_config[0].pipelines['check'].job_list.jobs)
self.assertFalse('project2-job' in
project2_config[0].pipelines['check'].job_list.jobs)
class TestTenantGroups2(TenantParserTestCase):
tenant_config_file = 'config/tenant-parser/groups2.yaml'
def test_tenant_groups2(self):
tenant = self.sched.abide.tenants.get('tenant-one')
self.assertEqual(['common-config'],
[x.name for x in tenant.config_projects])
self.assertEqual(['org/project1', 'org/project2', 'org/project3'],
[x.name for x in tenant.untrusted_projects])
project = tenant.config_projects[0]
tpc = tenant.project_configs[project.canonical_name]
self.assertEqual(self.CONFIG_SET, tpc.load_classes)
project = tenant.untrusted_projects[0]
tpc = tenant.project_configs[project.canonical_name]
self.assertEqual(self.UNTRUSTED_SET - set(['project']),
tpc.load_classes)
project = tenant.untrusted_projects[1]
tpc = tenant.project_configs[project.canonical_name]
self.assertEqual(self.UNTRUSTED_SET - set(['project', 'job']),
tpc.load_classes)
self.assertTrue('common-config-job' in tenant.layout.jobs)
self.assertTrue('project1-job' in tenant.layout.jobs)
self.assertFalse('project2-job' in tenant.layout.jobs)
project1_config = tenant.layout.project_configs.get(
'review.example.com/org/project1')
self.assertTrue('common-config-job' in
project1_config[0].pipelines['check'].job_list.jobs)
self.assertFalse('project1-job' in
project1_config[0].pipelines['check'].job_list.jobs)
project2_config = tenant.layout.project_configs.get(
'review.example.com/org/project2')
self.assertTrue('common-config-job' in
project2_config[0].pipelines['check'].job_list.jobs)
self.assertFalse('project2-job' in
project2_config[0].pipelines['check'].job_list.jobs)
class TestTenantGroups3(TenantParserTestCase):
tenant_config_file = 'config/tenant-parser/groups3.yaml'
def test_tenant_groups3(self):
tenant = self.sched.abide.tenants.get('tenant-one')
self.assertEqual(False, tenant.exclude_unprotected_branches)
self.assertEqual(['common-config'],
[x.name for x in tenant.config_projects])
self.assertEqual(['org/project1', 'org/project2'],
[x.name for x in tenant.untrusted_projects])
project = tenant.config_projects[0]
tpc = tenant.project_configs[project.canonical_name]
self.assertEqual(self.CONFIG_SET, tpc.load_classes)
project = tenant.untrusted_projects[0]
tpc = tenant.project_configs[project.canonical_name]
self.assertEqual(set(['job']), tpc.load_classes)
project = tenant.untrusted_projects[1]
tpc = tenant.project_configs[project.canonical_name]
self.assertEqual(set(['project', 'job']), tpc.load_classes)
self.assertTrue('common-config-job' in tenant.layout.jobs)
self.assertTrue('project1-job' in tenant.layout.jobs)
self.assertTrue('project2-job' in tenant.layout.jobs)
project1_config = tenant.layout.project_configs.get(
'review.example.com/org/project1')
self.assertTrue('common-config-job' in
project1_config[0].pipelines['check'].job_list.jobs)
self.assertFalse('project1-job' in
project1_config[0].pipelines['check'].job_list.jobs)
project2_config = tenant.layout.project_configs.get(
'review.example.com/org/project2')
self.assertTrue('common-config-job' in
project2_config[0].pipelines['check'].job_list.jobs)
self.assertTrue('project2-job' in
project2_config[1].pipelines['check'].job_list.jobs)
class TestTenantGroups4(TenantParserTestCase):
tenant_config_file = 'config/tenant-parser/groups4.yaml'
def test_tenant_groups(self):
tenant = self.sched.abide.tenants.get('tenant-one')
self.assertEqual(['common-config'],
[x.name for x in tenant.config_projects])
self.assertEqual(['org/project1', 'org/project2'],
[x.name for x in tenant.untrusted_projects])
project = tenant.config_projects[0]
tpc = tenant.project_configs[project.canonical_name]
self.assertEqual(self.CONFIG_SET, tpc.load_classes)
project = tenant.untrusted_projects[0]
tpc = tenant.project_configs[project.canonical_name]
self.assertEqual(set([]),
tpc.load_classes)
project = tenant.untrusted_projects[1]
tpc = tenant.project_configs[project.canonical_name]
self.assertEqual(set([]),
tpc.load_classes)
# Check that only one merger:cat job was requested
# org/project1 and org/project2 have an empty load_classes
cat_jobs = [job for job in self.gearman_server.jobs_history
if job.name == b'merger:cat']
self.assertEqual(1, len(cat_jobs))
old_layout = tenant.layout
# Check that creating a change in project1 doesn't cause a
# reconfiguration (due to a mistaken belief that we need to
# load config from it since there is none in memory).
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
tenant = self.sched.abide.tenants.get('tenant-one')
new_layout = tenant.layout
self.assertEqual(old_layout, new_layout)
class TestTenantGroups5(TenantParserTestCase):
tenant_config_file = 'config/tenant-parser/groups5.yaml'
def test_tenant_single_projet_exclude(self):
tenant = self.sched.abide.tenants.get('tenant-one')
self.assertEqual(['common-config'],
[x.name for x in tenant.config_projects])
self.assertEqual(['org/project1'],
[x.name for x in tenant.untrusted_projects])
project = tenant.config_projects[0]
tpc = tenant.project_configs[project.canonical_name]
self.assertEqual(self.CONFIG_SET, tpc.load_classes)
project = tenant.untrusted_projects[0]
tpc = tenant.project_configs[project.canonical_name]
self.assertEqual(set([]),
tpc.load_classes)
# Check that only one merger:cat job was requested
# org/project1 and org/project2 have an empty load_classes
cat_jobs = [job for job in self.gearman_server.jobs_history
if job.name == b'merger:cat']
self.assertEqual(1, len(cat_jobs))
class TestTenantFromScript(TestTenantSimple):
tenant_config_file = None
tenant_config_script_file = 'config/tenant-parser/tenant_config_script.py'
def test_tenant_simple(self):
TestTenantSimple.test_tenant_simple(self)
class TestTenantUnprotectedBranches(TenantParserTestCase):
tenant_config_file = 'config/tenant-parser/unprotected-branches.yaml'
def test_tenant_unprotected_branches(self):
tenant = self.sched.abide.tenants.get('tenant-one')
self.assertEqual(True, tenant.exclude_unprotected_branches)
self.assertEqual(['common-config'],
[x.name for x in tenant.config_projects])
self.assertEqual(['org/project1', 'org/project2'],
[x.name for x in tenant.untrusted_projects])
tpc = tenant.project_configs
project_name = tenant.config_projects[0].canonical_name
self.assertEqual(False, tpc[project_name].exclude_unprotected_branches)
project_name = tenant.untrusted_projects[0].canonical_name
self.assertIsNone(tpc[project_name].exclude_unprotected_branches)
project_name = tenant.untrusted_projects[1].canonical_name
self.assertIsNone(tpc[project_name].exclude_unprotected_branches)
class TestTenantExcludeAll(TenantParserTestCase):
tenant_config_file = 'config/tenant-parser/exclude-all.yaml'
def test_tenant_exclude_all(self):
"""
Tests that excluding all configuration of project1 in tenant-one
doesn't remove the configuration of project1 in tenant-two.
"""
# The config in org/project5 depends on config in org/project1 so
# validate that there are no config errors in that tenant.
tenant_two = self.sched.abide.tenants.get('tenant-two')
self.assertEquals(
len(tenant_two.layout.loading_errors), 0,
"No error should have been accumulated")
class TestSplitConfig(ZuulTestCase):
tenant_config_file = 'config/split-config/main.yaml'
def test_split_config(self):
tenant = self.sched.abide.tenants.get('tenant-one')
self.assertIn('project-test1', tenant.layout.jobs)
self.assertIn('project-test2', tenant.layout.jobs)
test1 = tenant.layout.getJob('project-test1')
self.assertEqual(test1.source_context.project.name, 'common-config')
self.assertEqual(test1.source_context.branch, 'master')
self.assertEqual(test1.source_context.path, 'zuul.d/jobs.yaml')
self.assertEqual(test1.source_context.trusted, True)
test2 = tenant.layout.getJob('project-test2')
self.assertEqual(test2.source_context.project.name, 'common-config')
self.assertEqual(test2.source_context.branch, 'master')
self.assertEqual(test2.source_context.path, 'zuul.d/more-jobs.yaml')
self.assertEqual(test2.source_context.trusted, True)
self.assertNotEqual(test1.source_context, test2.source_context)
self.assertTrue(test1.source_context.isSameProject(
test2.source_context))
project_config = tenant.layout.project_configs.get(
'review.example.com/org/project')
self.assertIn('project-test1',
project_config[0].pipelines['check'].job_list.jobs)
project1_config = tenant.layout.project_configs.get(
'review.example.com/org/project1')
self.assertIn('project1-project2-integration',
project1_config[0].pipelines['check'].job_list.jobs)
def test_dynamic_split_config(self):
in_repo_conf = textwrap.dedent(
"""
- project:
name: org/project1
check:
jobs:
- project-test1
""")
file_dict = {'.zuul.d/gate.yaml': in_repo_conf}
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A',
files=file_dict)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
# project1-project2-integration test removed, only want project-test1
self.assertHistory([
dict(name='project-test1', result='SUCCESS', changes='1,1')])
def test_config_path_conflict(self):
def add_file(project, path):
new_file = textwrap.dedent(
"""
- job:
name: test-job
"""
)
file_dict = {path: new_file}
A = self.fake_gerrit.addFakeChange(project, 'master', 'A',
files=file_dict)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
log_fixture = self.useFixture(
fixtures.FakeLogger(level=logging.WARNING))
log_fixture._output.truncate(0)
add_file("common-config", "zuul.yaml")
self.assertIn("Multiple configuration", log_fixture.output)
log_fixture._output.truncate(0)
add_file("org/project1", ".zuul.yaml")
self.assertIn("Multiple configuration", log_fixture.output)
class TestConfigConflict(ZuulTestCase):
tenant_config_file = 'config/conflict-config/main.yaml'
def test_conflict_config(self):
tenant = self.sched.abide.tenants.get('tenant-one')
jobs = sorted(tenant.layout.jobs.keys())
self.assertEqual(
['base', 'noop', 'trusted-zuul.yaml-job',
'untrusted-zuul.yaml-job'],
jobs)
class TestAuthorizationRuleParser(ZuulTestCase):
tenant_config_file = 'config/tenant-parser/authorizations.yaml'
def test_rules_are_loaded(self):
rules = self.sched.abide.admin_rules
self.assertTrue('auth-rule-one' in rules, self.sched.abide)
self.assertTrue('auth-rule-two' in rules, self.sched.abide)
claims_1 = {'sub': 'venkman'}
claims_2 = {'sub': 'gozer',
'iss': 'another_dimension'}
self.assertTrue(rules['auth-rule-one'](claims_1))
self.assertTrue(not rules['auth-rule-one'](claims_2))
self.assertTrue(not rules['auth-rule-two'](claims_1))
self.assertTrue(rules['auth-rule-two'](claims_2))
def test_parse_simplest_rule_from_yaml(self):
rule_d = {'name': 'my-rule',
'conditions': {'sub': 'user1'}
}
rule = AuthorizationRuleParser().fromYaml(rule_d)
self.assertEqual('my-rule', rule.name)
claims = {'iss': 'my-idp',
'sub': 'user1',
'groups': ['admin', 'ghostbusters']}
self.assertTrue(rule(claims))
claims = {'iss': 'my-2nd-idp',
'sub': 'user2',
'groups': ['admin', 'ghostbusters']}
self.assertFalse(rule(claims))
def test_parse_AND_rule_from_yaml(self):
rule_d = {'name': 'my-rule',
'conditions': {'sub': 'user1',
'iss': 'my-idp'}
}
rule = AuthorizationRuleParser().fromYaml(rule_d)
self.assertEqual('my-rule', rule.name)
claims = {'iss': 'my-idp',
'sub': 'user1',
'groups': ['admin', 'ghostbusters']}
self.assertTrue(rule(claims))
claims = {'iss': 'my-2nd-idp',
'sub': 'user1',
'groups': ['admin', 'ghostbusters']}
self.assertFalse(rule(claims))
def test_parse_OR_rule_from_yaml(self):
rule_d = {'name': 'my-rule',
'conditions': [{'sub': 'user1',
'iss': 'my-idp'},
{'sub': 'user2',
'iss': 'my-2nd-idp'}
]
}
rule = AuthorizationRuleParser().fromYaml(rule_d)
self.assertEqual('my-rule', rule.name)
claims = {'iss': 'my-idp',
'sub': 'user1',
'groups': ['admin', 'ghostbusters']}
self.assertTrue(rule(claims))
claims = {'iss': 'my-2nd-idp',
'sub': 'user1',
'groups': ['admin', 'ghostbusters']}
self.assertFalse(rule(claims))
claims = {'iss': 'my-2nd-idp',
'sub': 'user2',
'groups': ['admin', 'ghostbusters']}
self.assertTrue(rule(claims))
def test_parse_rule_with_list_claim_from_yaml(self):
rule_d = {'name': 'my-rule',
'conditions': [{'groups': 'ghostbusters',
'iss': 'my-idp'},
{'sub': 'user2',
'iss': 'my-2nd-idp'}
],
}
rule = AuthorizationRuleParser().fromYaml(rule_d)
self.assertEqual('my-rule', rule.name)
claims = {'iss': 'my-idp',
'sub': 'user1',
'groups': ['admin', 'ghostbusters']}
self.assertTrue(rule(claims))
claims = {'iss': 'my-idp',
'sub': 'user1',
'groups': ['admin', 'ghostbeaters']}
self.assertFalse(rule(claims))
claims = {'iss': 'my-2nd-idp',
'sub': 'user2',
'groups': ['admin', 'ghostbusters']}
self.assertTrue(rule(claims))
def test_check_complex_rule_from_yaml_jsonpath(self):
rule_d = {'name': 'my-rule',
'conditions': [{'hello.this.is': 'a complex value'},
],
}
rule = AuthorizationRuleParser().fromYaml(rule_d)
self.assertEqual('my-rule', rule.name)
claims = {'iss': 'my-idp',
'hello': {
'this': {
'is': 'a complex value'
},
'and': {
'this one': 'too'
}
}
}
self.assertTrue(rule(claims))
def test_check_complex_rule_from_yaml_nested_dict(self):
rule_d = {'name': 'my-rule',
'conditions': [{'hello': {'this': {'is': 'a complex value'
}
}
},
],
}
rule = AuthorizationRuleParser().fromYaml(rule_d)
self.assertEqual('my-rule', rule.name)
claims = {'iss': 'my-idp',
'hello': {
'this': {
'is': 'a complex value'
},
'and': {
'this one': 'too'
}
}
}
self.assertTrue(rule(claims))
class TestTenantExtra(TenantParserTestCase):
tenant_config_file = 'config/tenant-parser/extra.yaml'
def test_tenant_extra(self):
tenant = self.sched.abide.tenants.get('tenant-one')
self.assertTrue('project2-extra-file' in tenant.layout.jobs)
self.assertTrue('project2-extra-dir' in tenant.layout.jobs)
def test_dynamic_extra(self):
in_repo_conf = textwrap.dedent(
"""
- job:
name: project2-extra-file2
parent: common-config-job
- project:
name: org/project2
check:
jobs:
- project2-extra-file2
""")
file_dict = {'extra.yaml': in_repo_conf, '.zuul.yaml': ''}
A = self.fake_gerrit.addFakeChange('org/project2', 'master', 'A',
files=file_dict)
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertHistory([
dict(name='common-config-job', result='SUCCESS', changes='1,1'),
dict(name='project2-extra-file2', result='SUCCESS', changes='1,1'),
], ordered=False)
def test_extra_reconfigure(self):
in_repo_conf = textwrap.dedent(
"""
- job:
name: project2-extra-file2
parent: common-config-job
- project:
name: org/project2
check:
jobs:
- project2-extra-file2
""")
file_dict = {'extra.yaml': in_repo_conf}
A = self.fake_gerrit.addFakeChange('org/project2', 'master', 'A',
files=file_dict)
A.setMerged()
self.fake_gerrit.addEvent(A.getChangeMergedEvent())
self.waitUntilSettled()
self.fake_gerrit.addEvent(A.getRefUpdatedEvent())
self.waitUntilSettled()
B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
self.waitUntilSettled()
self.assertHistory([
dict(name='common-config-job', result='SUCCESS', changes='2,1'),
dict(name='project2-job', result='SUCCESS', changes='2,1'),
dict(name='project2-extra-file2', result='SUCCESS', changes='2,1'),
], ordered=False)