diff --git a/tests/base.py b/tests/base.py index 497d706883..8efdfd13ba 100755 --- a/tests/base.py +++ b/tests/base.py @@ -50,6 +50,7 @@ import zuul.webapp import zuul.rpclistener import zuul.launcher.gearman import zuul.lib.swift +import zuul.lib.connections import zuul.merger.client import zuul.merger.merger import zuul.merger.server @@ -864,6 +865,7 @@ class BaseTestCase(testtools.TestCase): class ZuulTestCase(BaseTestCase): + config_file = 'zuul.conf' def setUp(self): super(ZuulTestCase, self).setUp() @@ -907,6 +909,8 @@ class ZuulTestCase(BaseTestCase): self.init_repo("org/experimental-project") self.init_repo("org/no-jobs-project") + self.setup_repos() + self.statsd = FakeStatsd() # note, use 127.0.0.1 rather than localhost to avoid getting ipv6 # see: https://github.com/jsocol/pystatsd/issues/61 @@ -940,7 +944,7 @@ class ZuulTestCase(BaseTestCase): self.sched.trigger_event_queue ] - self.configure_connections() + self.configure_connections(self.sched) self.sched.registerConnections(self.connections) def URLOpenerFactory(*args, **kw): @@ -979,7 +983,7 @@ class ZuulTestCase(BaseTestCase): self.addCleanup(self.assertFinalState) self.addCleanup(self.shutdown) - def configure_connections(self): + def configure_connections(self, sched): # Register connections from the config self.smtp_messages = [] @@ -993,7 +997,7 @@ class ZuulTestCase(BaseTestCase): # a virtual canonical database given by the configured hostname self.gerrit_changes_dbs = {} self.gerrit_queues_dbs = {} - self.connections = {} + self.connections = zuul.lib.connections.ConnectionRegistry(sched) for section_name in self.config.sections(): con_match = re.match(r'^connection ([\'\"]?)(.*)(\1)$', @@ -1018,15 +1022,16 @@ class ZuulTestCase(BaseTestCase): Queue.Queue() self.event_queues.append( self.gerrit_queues_dbs[con_config['server']]) - self.connections[con_name] = FakeGerritConnection( + self.connections.connections[con_name] = FakeGerritConnection( con_name, con_config, changes_db=self.gerrit_changes_dbs[con_config['server']], queues_db=self.gerrit_queues_dbs[con_config['server']], upstream_root=self.upstream_root ) - setattr(self, 'fake_' + con_name, self.connections[con_name]) + setattr(self, 'fake_' + con_name, + self.connections.connections[con_name]) elif con_driver == 'smtp': - self.connections[con_name] = \ + self.connections.connections[con_name] = \ zuul.connection.smtp.SMTPConnection(con_name, con_config) else: raise Exception("Unknown driver, %s, for connection %s" @@ -1039,20 +1044,24 @@ class ZuulTestCase(BaseTestCase): self.gerrit_changes_dbs['gerrit'] = {} self.gerrit_queues_dbs['gerrit'] = Queue.Queue() self.event_queues.append(self.gerrit_queues_dbs['gerrit']) - self.connections['gerrit'] = FakeGerritConnection( + self.connections.connections['gerrit'] = FakeGerritConnection( '_legacy_gerrit', dict(self.config.items('gerrit')), changes_db=self.gerrit_changes_dbs['gerrit'], queues_db=self.gerrit_queues_dbs['gerrit']) if 'smtp' in self.config.sections(): - self.connections['smtp'] = \ + self.connections.connections['smtp'] = \ zuul.connection.smtp.SMTPConnection( '_legacy_smtp', dict(self.config.items('smtp'))) - def setup_config(self, config_file='zuul.conf'): + def setup_config(self): """Per test config object. Override to set different config.""" self.config = ConfigParser.ConfigParser() - self.config.read(os.path.join(FIXTURE_DIR, config_file)) + self.config.read(os.path.join(FIXTURE_DIR, self.config_file)) + + def setup_repos(self): + """Subclasses can override to manipulate repos before tests""" + pass def assertFinalState(self): # Make sure that git.Repo objects have been garbage collected. @@ -1063,10 +1072,10 @@ class ZuulTestCase(BaseTestCase): repos.append(obj) self.assertEqual(len(repos), 0) self.assertEmptyQueues() + ipm = zuul.manager.independent.IndependentPipelineManager for tenant in self.sched.abide.tenants.values(): for pipeline in tenant.layout.pipelines.values(): - if isinstance(pipeline.manager, - zuul.scheduler.IndependentPipelineManager): + if isinstance(pipeline.manager, ipm): self.assertEqual(len(pipeline.queues), 0) def shutdown(self): diff --git a/tests/fixtures/config/in-repo/common.yaml b/tests/fixtures/config/in-repo/common.yaml index 96aebd6316..f38406b56b 100644 --- a/tests/fixtures/config/in-repo/common.yaml +++ b/tests/fixtures/config/in-repo/common.yaml @@ -1,6 +1,6 @@ pipelines: - name: check - manager: IndependentPipelineManager + manager: independent source: gerrit trigger: @@ -14,7 +14,7 @@ pipelines: verified: -1 - name: tenant-one-gate - manager: DependentPipelineManager + manager: dependent success-message: Build succeeded (tenant-one-gate). source: gerrit diff --git a/tests/fixtures/config/in-repo/zuul.conf b/tests/fixtures/config/in-repo/zuul.conf index 14708aa791..1910084e33 100644 --- a/tests/fixtures/config/in-repo/zuul.conf +++ b/tests/fixtures/config/in-repo/zuul.conf @@ -2,7 +2,7 @@ server=127.0.0.1 [zuul] -tenant_config=tests/fixtures/config/in-repo/main.yaml +tenant_config=config/in-repo/main.yaml url_pattern=http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number} job_name_in_report=true diff --git a/tests/fixtures/config/multi-tenant/common.yaml b/tests/fixtures/config/multi-tenant/common.yaml index d36448ed22..8fc3bba98b 100644 --- a/tests/fixtures/config/multi-tenant/common.yaml +++ b/tests/fixtures/config/multi-tenant/common.yaml @@ -1,6 +1,6 @@ pipelines: - name: check - manager: IndependentPipelineManager + manager: independent source: gerrit trigger: diff --git a/tests/fixtures/config/multi-tenant/tenant-one.yaml b/tests/fixtures/config/multi-tenant/tenant-one.yaml index 7b2298c6a0..c9096ef1c6 100644 --- a/tests/fixtures/config/multi-tenant/tenant-one.yaml +++ b/tests/fixtures/config/multi-tenant/tenant-one.yaml @@ -1,6 +1,6 @@ pipelines: - name: tenant-one-gate - manager: DependentPipelineManager + manager: dependent success-message: Build succeeded (tenant-one-gate). source: gerrit @@ -21,6 +21,10 @@ pipelines: verified: 0 precedence: high +jobs: + - name: + project1-test1 + projects: - name: org/project1 check: diff --git a/tests/fixtures/config/multi-tenant/tenant-two.yaml b/tests/fixtures/config/multi-tenant/tenant-two.yaml index 57ad64de07..6cb2d9a52a 100644 --- a/tests/fixtures/config/multi-tenant/tenant-two.yaml +++ b/tests/fixtures/config/multi-tenant/tenant-two.yaml @@ -1,6 +1,6 @@ pipelines: - name: tenant-two-gate - manager: DependentPipelineManager + manager: dependent success-message: Build succeeded (tenant-two-gate). source: gerrit @@ -21,6 +21,10 @@ pipelines: verified: 0 precedence: high +jobs: + - name: + project2-test1 + projects: - name: org/project2 check: diff --git a/tests/fixtures/config/multi-tenant/zuul.conf b/tests/fixtures/config/multi-tenant/zuul.conf index ceb3903d2e..346450e67c 100644 --- a/tests/fixtures/config/multi-tenant/zuul.conf +++ b/tests/fixtures/config/multi-tenant/zuul.conf @@ -2,7 +2,7 @@ server=127.0.0.1 [zuul] -tenant_config=tests/fixtures/config/multi-tenant/main.yaml +tenant_config=config/multi-tenant/main.yaml url_pattern=http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number} job_name_in_report=true diff --git a/tests/fixtures/layout.yaml b/tests/fixtures/layout.yaml index 99b135c0c2..e30147f7ce 100644 --- a/tests/fixtures/layout.yaml +++ b/tests/fixtures/layout.yaml @@ -3,7 +3,7 @@ includes: pipelines: - name: check - manager: IndependentPipelineManager + manager: independent source: gerrit trigger: @@ -17,7 +17,7 @@ pipelines: verified: -1 - name: post - manager: IndependentPipelineManager + manager: independent source: gerrit trigger: @@ -26,7 +26,7 @@ pipelines: ref: ^(?!refs/).*$ - name: gate - manager: DependentPipelineManager + manager: dependent failure-message: Build failed. For information on how to proceed, see http://wiki.example.org/Test_Failures source: gerrit @@ -48,7 +48,7 @@ pipelines: precedence: high - name: unused - manager: IndependentPipelineManager + manager: independent dequeue-on-new-patchset: false source: gerrit @@ -59,7 +59,7 @@ pipelines: - approved: 1 - name: dup1 - manager: IndependentPipelineManager + manager: independent source: gerrit trigger: @@ -73,7 +73,7 @@ pipelines: verified: -1 - name: dup2 - manager: IndependentPipelineManager + manager: independent source: gerrit trigger: @@ -87,7 +87,7 @@ pipelines: verified: -1 - name: conflict - manager: DependentPipelineManager + manager: dependent failure-message: Build failed. For information on how to proceed, see http://wiki.example.org/Test_Failures source: gerrit @@ -108,7 +108,7 @@ pipelines: verified: 0 - name: experimental - manager: IndependentPipelineManager + manager: independent source: gerrit trigger: diff --git a/tests/test_layoutvalidator.py b/tests/test_layoutvalidator.py index 3dc3234a88..bd507d19a0 100644 --- a/tests/test_layoutvalidator.py +++ b/tests/test_layoutvalidator.py @@ -31,6 +31,9 @@ LAYOUT_RE = re.compile(r'^(good|bad)_.*\.yaml$') class TestLayoutValidator(testtools.TestCase): + def setUp(self): + self.skip("Disabled for early v3 development") + def test_layouts(self): """Test layout file validation""" print diff --git a/tests/test_merger_repo.py b/tests/test_merger_repo.py index 454f3ccf33..7bf08eed01 100644 --- a/tests/test_merger_repo.py +++ b/tests/test_merger_repo.py @@ -34,8 +34,11 @@ class TestMergerRepo(ZuulTestCase): workspace_root = None def setUp(self): - super(TestMergerRepo, self).setUp() - self.workspace_root = os.path.join(self.test_root, 'workspace') + self.skip("Disabled for early v3 development") + + # def setUp(self): + # super(TestMergerRepo, self).setUp() + # self.workspace_root = os.path.join(self.test_root, 'workspace') def test_ensure_cloned(self): parent_path = os.path.join(self.upstream_root, 'org/project1') diff --git a/tests/test_model.py b/tests/test_model.py index d4c7880001..f8f74dc793 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -12,8 +12,8 @@ # License for the specific language governing permissions and limitations # under the License. -from zuul import change_matcher as cm from zuul import model +from zuul import configloader from tests.base import BaseTestCase @@ -22,11 +22,12 @@ class TestJob(BaseTestCase): @property def job(self): - job = model.Job('job') - job.skip_if_matcher = cm.MatchAll([ - cm.ProjectMatcher('^project$'), - cm.MatchAllFiles([cm.FileMatcher('^docs/.*$')]), - ]) + layout = model.Layout() + job = configloader.JobParser.fromYaml(layout, { + 'name': 'job', + 'irrelevant-files': [ + '^docs/.*$' + ]}) return job def test_change_matches_returns_false_for_matched_skip_if(self): @@ -39,10 +40,61 @@ class TestJob(BaseTestCase): change.files = ['foo'] self.assertTrue(self.job.changeMatches(change)) - def _assert_job_booleans_are_not_none(self, job): - self.assertIsNotNone(job.voting) - self.assertIsNotNone(job.hold_following_changes) - def test_job_sets_defaults_for_boolean_attributes(self): - job = model.Job('job') - self._assert_job_booleans_are_not_none(job) + self.assertIsNotNone(self.job.voting) + + def test_job_inheritance(self): + layout = model.Layout() + base = configloader.JobParser.fromYaml(layout, { + 'name': 'base', + 'timeout': 30, + }) + layout.addJob(base) + python27 = configloader.JobParser.fromYaml(layout, { + 'name': 'python27', + 'parent': 'base', + 'timeout': 40, + }) + layout.addJob(python27) + python27diablo = configloader.JobParser.fromYaml(layout, { + 'name': 'python27', + 'branches': [ + 'stable/diablo' + ], + 'timeout': 50, + }) + layout.addJob(python27diablo) + + pipeline = model.Pipeline('gate', layout) + layout.addPipeline(pipeline) + queue = model.ChangeQueue(pipeline) + + project = model.Project('project') + tree = pipeline.addProject(project) + tree.addJob(layout.getJob('python27')) + + change = model.Change(project) + change.branch = 'master' + item = queue.enqueueChange(change) + + self.assertTrue(base.changeMatches(change)) + self.assertTrue(python27.changeMatches(change)) + self.assertFalse(python27diablo.changeMatches(change)) + + item.freezeJobTree() + self.assertEqual(len(item.getJobs()), 1) + job = item.getJobs()[0] + self.assertEqual(job.name, 'python27') + self.assertEqual(job.timeout, 40) + + change.branch = 'stable/diablo' + + self.assertTrue(base.changeMatches(change)) + self.assertTrue(python27.changeMatches(change)) + self.assertTrue(python27diablo.changeMatches(change)) + + item.freezeJobTree() + self.assertEqual(len(item.getJobs()), 1) + job = item.getJobs()[0] + self.assertEqual(job.name, 'python27') + self.assertEqual(job.timeout, 50) diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 7ef166c420..85ac6006ef 100755 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -46,6 +46,9 @@ logging.basicConfig(level=logging.DEBUG, class TestSchedulerConfigParsing(BaseTestCase): + def setUp(self): + self.skip("Disabled for early v3 development") + def test_parse_skip_if(self): job_yaml = """ jobs: diff --git a/tests/test_v3.py b/tests/test_v3.py index 69e66a0113..73efcc99d3 100644 --- a/tests/test_v3.py +++ b/tests/test_v3.py @@ -26,13 +26,12 @@ logging.basicConfig(level=logging.DEBUG, '%(levelname)-8s %(message)s') -class TestV3(ZuulTestCase): +class TestMultipleTenants(ZuulTestCase): # A temporary class to hold new tests while others are disabled - def test_multiple_tenants(self): - self.setup_config('config/multi-tenant/zuul.conf') - self.sched.reconfigure(self.config) + config_file = 'config/multi-tenant/zuul.conf' + def test_multiple_tenants(self): A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') A.addApproval('CRVW', 2) self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) @@ -64,9 +63,18 @@ class TestV3(ZuulTestCase): self.assertEqual(A.reported, 2, "Activity in tenant two should" "not affect tenant one") - def test_in_repo_config(self): + +class TestInRepoConfig(ZuulTestCase): + # A temporary class to hold new tests while others are disabled + + config_file = 'config/in-repo/zuul.conf' + + def setup_repos(self): in_repo_conf = textwrap.dedent( """ + jobs: + - name: project-test1 + projects: - name: org/project tenant-one-gate: @@ -76,9 +84,7 @@ class TestV3(ZuulTestCase): self.addCommitToRepo('org/project', 'add zuul conf', {'.zuul.yaml': in_repo_conf}) - self.setup_config('config/in-repo/zuul.conf') - self.sched.reconfigure(self.config) - + def test_in_repo_config(self): A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') A.addApproval('CRVW', 2) self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) diff --git a/zuul/cmd/__init__.py b/zuul/cmd/__init__.py index 2902c50630..f6b9fe1e5d 100644 --- a/zuul/cmd/__init__.py +++ b/zuul/cmd/__init__.py @@ -90,6 +90,6 @@ class ZuulApp(object): else: logging.basicConfig(level=logging.DEBUG) - def configure_connections(self): - self.connections = zuul.lib.connections.configure_connections( - self.config) + def configure_connections(self, sched): + self.connections = zuul.lib.connections.ConnectionRegistry() + self.connections.configure(self.config, sched) diff --git a/zuul/cmd/server.py b/zuul/cmd/server.py index 2aca4f2c02..2c891b50e2 100755 --- a/zuul/cmd/server.py +++ b/zuul/cmd/server.py @@ -177,7 +177,7 @@ class Server(zuul.cmd.ZuulApp): webapp = zuul.webapp.WebApp(self.sched, cache_expiry=cache_expiry) rpc = zuul.rpclistener.RPCListener(self.config, self.sched) - self.configure_connections() + self.configure_connections(self.sched) self.sched.setLauncher(gearman) self.sched.setMerger(merger) diff --git a/zuul/configloader.py b/zuul/configloader.py new file mode 100644 index 0000000000..d22106d955 --- /dev/null +++ b/zuul/configloader.py @@ -0,0 +1,449 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import logging +import yaml + +import voluptuous as vs + +import model +import zuul.manager +import zuul.manager.dependent +import zuul.manager.independent +from zuul import change_matcher + + +# Several forms accept either a single item or a list, this makes +# specifying that in the schema easy (and explicit). +def to_list(x): + return vs.Any([x], x) + + +def as_list(item): + if not item: + return [] + if isinstance(item, list): + return item + return [item] + + +def extend_dict(a, b): + """Extend dictionary a (which will be modified in place) with the + contents of b. This is designed for Zuul yaml files which are + typically dictionaries of lists of dictionaries, e.g., + {'pipelines': ['name': 'gate']}. If two such dictionaries each + define a pipeline, the result will be a single dictionary with + a pipelines entry whose value is a two-element list.""" + + for k, v in b.items(): + if k not in a: + a[k] = v + elif isinstance(v, dict) and isinstance(a[k], dict): + extend_dict(a[k], v) + elif isinstance(v, list) and isinstance(a[k], list): + a[k] += v + elif isinstance(v, list): + a[k] = [a[k]] + v + elif isinstance(a[k], list): + a[k] += [v] + else: + raise Exception("Unhandled case in extend_dict at %s" % (k,)) + + +def deep_format(obj, paramdict): + """Apply the paramdict via str.format() to all string objects found within + the supplied obj. Lists and dicts are traversed recursively. + + Borrowed from Jenkins Job Builder project""" + if isinstance(obj, str): + ret = obj.format(**paramdict) + elif isinstance(obj, list): + ret = [] + for item in obj: + ret.append(deep_format(item, paramdict)) + elif isinstance(obj, dict): + ret = {} + for item in obj: + exp_item = item.format(**paramdict) + + ret[exp_item] = deep_format(obj[item], paramdict) + else: + ret = obj + return ret + + +class JobParser(object): + @staticmethod + def getSchema(): + # TODOv3(jeblair, jhesketh): move to auth + swift = {vs.Required('name'): str, + 'container': str, + 'expiry': int, + 'max_file_size': int, + 'max-file-size': int, + 'max_file_count': int, + 'max-file-count': int, + 'logserver_prefix': str, + 'logserver-prefix': str, + } + + job = {vs.Required('name'): str, + 'parent': str, + 'queue-name': str, + 'failure-message': str, + 'success-message': str, + 'failure-url': str, + 'success-url': str, + 'voting': bool, + 'branches': to_list(str), + 'files': to_list(str), + 'swift': to_list(swift), + 'irrelevant-files': to_list(str), + 'timeout': int, + } + + return vs.Schema(job) + + @staticmethod + def fromYaml(layout, conf): + JobParser.getSchema()(conf) + job = model.Job(conf['name']) + if 'parent' in conf: + parent = layout.getJob(conf['parent']) + job.inheritFrom(parent) + job.timeout = conf.get('timeout', job.timeout) + job.workspace = conf.get('workspace', job.workspace) + job.pre_run = as_list(conf.get('pre-run', job.pre_run)) + job.post_run = as_list(conf.get('post-run', job.post_run)) + job.voting = conf.get('voting', True) + + job.failure_message = conf.get('failure-message', job.failure_message) + job.success_message = conf.get('success-message', job.success_message) + job.failure_url = conf.get('failure-url', job.failure_url) + job.success_url = conf.get('success-url', job.success_url) + if 'branches' in conf: + matchers = [] + for branch in as_list(conf['branches']): + matchers.append(change_matcher.BranchMatcher(branch)) + job.branch_matcher = change_matcher.MatchAny(matchers) + if 'files' in conf: + matchers = [] + for fn in as_list(conf['files']): + matchers.append(change_matcher.FileMatcher(fn)) + job.file_matcher = change_matcher.MatchAny(matchers) + if 'irrelevant-files' in conf: + matchers = [] + for fn in as_list(conf['irrelevant-files']): + matchers.append(change_matcher.FileMatcher(fn)) + job.irrelevant_file_matcher = change_matcher.MatchAllFiles( + matchers) + return job + + +class AbideValidator(object): + tenant_source = vs.Schema({'repos': [str]}) + + def validateTenantSources(self, connections): + def v(value, path=[]): + if isinstance(value, dict): + for k, val in value.items(): + connections.getSource(k) + self.validateTenantSource(val, path + [k]) + else: + raise vs.Invalid("Invalid tenant source", path) + return v + + def validateTenantSource(self, value, path=[]): + self.tenant_source(value) + + def getSchema(self, connections=None): + tenant = {vs.Required('name'): str, + 'include': to_list(str), + 'source': self.validateTenantSources(connections)} + + schema = vs.Schema({'tenants': [tenant]}) + + return schema + + def validate(self, data, connections=None): + schema = self.getSchema(connections) + schema(data) + + +class ConfigLoader(object): + log = logging.getLogger("zuul.ConfigLoader") + + # A set of reporter configuration keys to action mapping + reporter_actions = { + 'start': 'start_actions', + 'success': 'success_actions', + 'failure': 'failure_actions', + 'merge-failure': 'merge_failure_actions', + 'disabled': 'disabled_actions', + } + + def loadConfig(self, config_path, scheduler, merger, connections): + abide = model.Abide() + + if config_path: + config_path = os.path.expanduser(config_path) + if not os.path.exists(config_path): + raise Exception("Unable to read tenant config file at %s" % + config_path) + with open(config_path) as config_file: + self.log.info("Loading configuration from %s" % (config_path,)) + data = yaml.load(config_file) + base = os.path.dirname(os.path.realpath(config_path)) + + validator = AbideValidator() + validator.validate(data, connections) + + for conf_tenant in data['tenants']: + tenant = model.Tenant(conf_tenant['name']) + abide.tenants[tenant.name] = tenant + tenant_config = {} + for fn in conf_tenant.get('include', []): + if not os.path.isabs(fn): + fn = os.path.join(base, fn) + fn = os.path.expanduser(fn) + with open(fn) as config_file: + self.log.info("Loading configuration from %s" % (fn,)) + incdata = yaml.load(config_file) + extend_dict(tenant_config, incdata) + incdata = self._loadTenantInRepoLayouts(merger, connections, + conf_tenant) + extend_dict(tenant_config, incdata) + tenant.layout = self._parseLayout(base, tenant_config, + scheduler, connections) + return abide + + def _parseLayout(self, base, data, scheduler, connections): + layout = model.Layout() + project_templates = {} + + # TODOv3(jeblair): add validation + # validator = layoutvalidator.LayoutValidator() + # validator.validate(data, connections) + + config_env = {} + for include in data.get('includes', []): + if 'python-file' in include: + fn = include['python-file'] + if not os.path.isabs(fn): + fn = os.path.join(base, fn) + fn = os.path.expanduser(fn) + execfile(fn, config_env) + + for conf_pipeline in data.get('pipelines', []): + pipeline = model.Pipeline(conf_pipeline['name'], layout) + pipeline.description = conf_pipeline.get('description') + + pipeline.source = connections.getSource(conf_pipeline['source']) + + precedence = model.PRECEDENCE_MAP[conf_pipeline.get('precedence')] + pipeline.precedence = precedence + pipeline.failure_message = conf_pipeline.get('failure-message', + "Build failed.") + pipeline.merge_failure_message = conf_pipeline.get( + 'merge-failure-message', "Merge Failed.\n\nThis change or one " + "of its cross-repo dependencies was unable to be " + "automatically merged with the current state of its " + "repository. Please rebase the change and upload a new " + "patchset.") + pipeline.success_message = conf_pipeline.get('success-message', + "Build succeeded.") + pipeline.footer_message = conf_pipeline.get('footer-message', "") + pipeline.dequeue_on_new_patchset = conf_pipeline.get( + 'dequeue-on-new-patchset', True) + pipeline.ignore_dependencies = conf_pipeline.get( + 'ignore-dependencies', False) + + for conf_key, action in self.reporter_actions.items(): + reporter_set = [] + if conf_pipeline.get(conf_key): + for reporter_name, params \ + in conf_pipeline.get(conf_key).items(): + reporter = connections.getReporter(reporter_name, + params) + reporter.setAction(conf_key) + reporter_set.append(reporter) + setattr(pipeline, action, reporter_set) + + # If merge-failure actions aren't explicit, use the failure actions + if not pipeline.merge_failure_actions: + pipeline.merge_failure_actions = pipeline.failure_actions + + pipeline.disable_at = conf_pipeline.get( + 'disable-after-consecutive-failures', None) + + pipeline.window = conf_pipeline.get('window', 20) + pipeline.window_floor = conf_pipeline.get('window-floor', 3) + pipeline.window_increase_type = conf_pipeline.get( + 'window-increase-type', 'linear') + pipeline.window_increase_factor = conf_pipeline.get( + 'window-increase-factor', 1) + pipeline.window_decrease_type = conf_pipeline.get( + 'window-decrease-type', 'exponential') + pipeline.window_decrease_factor = conf_pipeline.get( + 'window-decrease-factor', 2) + + manager_name = conf_pipeline['manager'] + if manager_name == 'dependent': + manager = zuul.manager.dependent.DependentPipelineManager( + scheduler, pipeline) + elif manager_name == 'independent': + manager = zuul.manager.independent.IndependentPipelineManager( + scheduler, pipeline) + + pipeline.setManager(manager) + layout.pipelines[conf_pipeline['name']] = pipeline + + if 'require' in conf_pipeline or 'reject' in conf_pipeline: + require = conf_pipeline.get('require', {}) + reject = conf_pipeline.get('reject', {}) + f = model.ChangeishFilter( + open=require.get('open'), + current_patchset=require.get('current-patchset'), + statuses=to_list(require.get('status')), + required_approvals=to_list(require.get('approval')), + reject_approvals=to_list(reject.get('approval')) + ) + manager.changeish_filters.append(f) + + for trigger_name, trigger_config\ + in conf_pipeline.get('trigger').items(): + trigger = connections.getTrigger(trigger_name, trigger_config) + pipeline.triggers.append(trigger) + + # TODO: move + manager.event_filters += trigger.getEventFilters( + conf_pipeline['trigger'][trigger_name]) + + for project_template in data.get('project-templates', []): + # Make sure the template only contains valid pipelines + tpl = dict( + (pipe_name, project_template.get(pipe_name)) + for pipe_name in layout.pipelines.keys() + if pipe_name in project_template + ) + project_templates[project_template.get('name')] = tpl + + for config_job in data.get('jobs', []): + layout.addJob(JobParser.fromYaml(layout, config_job)) + + def add_jobs(job_tree, config_jobs): + for job in config_jobs: + if isinstance(job, list): + for x in job: + add_jobs(job_tree, x) + if isinstance(job, dict): + for parent, children in job.items(): + parent_tree = job_tree.addJob(layout.getJob(parent)) + add_jobs(parent_tree, children) + if isinstance(job, str): + job_tree.addJob(layout.getJob(job)) + + for config_project in data.get('projects', []): + shortname = config_project['name'].split('/')[-1] + + # This is reversed due to the prepend operation below, so + # the ultimate order is templates (in order) followed by + # statically defined jobs. + for requested_template in reversed( + config_project.get('template', [])): + # Fetch the template from 'project-templates' + tpl = project_templates.get( + requested_template.get('name')) + # Expand it with the project context + requested_template['name'] = shortname + expanded = deep_format(tpl, requested_template) + # Finally merge the expansion with whatever has been + # already defined for this project. Prepend our new + # jobs to existing ones (which may have been + # statically defined or defined by other templates). + for pipeline in layout.pipelines.values(): + if pipeline.name in expanded: + config_project.update( + {pipeline.name: expanded[pipeline.name] + + config_project.get(pipeline.name, [])}) + + mode = config_project.get('merge-mode', 'merge-resolve') + for pipeline in layout.pipelines.values(): + if pipeline.name in config_project: + project = pipeline.source.getProject( + config_project['name']) + project.merge_mode = model.MERGER_MAP[mode] + job_tree = pipeline.addProject(project) + config_jobs = config_project[pipeline.name] + add_jobs(job_tree, config_jobs) + + for pipeline in layout.pipelines.values(): + pipeline.manager._postConfig(layout) + + return layout + + def _loadTenantInRepoLayouts(self, merger, connections, conf_tenant): + config = {} + jobs = [] + for source_name, conf_source in conf_tenant.get('source', {}).items(): + source = connections.getSource(source_name) + for conf_repo in conf_source.get('repos'): + project = source.getProject(conf_repo) + url = source.getGitUrl(project) + # TODOv3(jeblair): config should be branch specific + job = merger.getFiles(project.name, url, 'master', + files=['.zuul.yaml']) + job.project = project + jobs.append(job) + for job in jobs: + self.log.debug("Waiting for cat job %s" % (job,)) + job.wait() + if job.files.get('.zuul.yaml'): + self.log.info("Loading configuration from %s/.zuul.yaml" % + (job.project,)) + incdata = self._parseInRepoLayout(job.files['.zuul.yaml']) + extend_dict(config, incdata) + return config + + def _parseInRepoLayout(self, data): + # TODOv3(jeblair): this should implement some rules to protect + # aspects of the config that should not be changed in-repo + return yaml.load(data) + + def _parseSkipIf(self, config_job): + cm = change_matcher + skip_matchers = [] + + for config_skip in config_job.get('skip-if', []): + nested_matchers = [] + + project_regex = config_skip.get('project') + if project_regex: + nested_matchers.append(cm.ProjectMatcher(project_regex)) + + branch_regex = config_skip.get('branch') + if branch_regex: + nested_matchers.append(cm.BranchMatcher(branch_regex)) + + file_regexes = to_list(config_skip.get('all-files-match-any')) + if file_regexes: + file_matchers = [cm.FileMatcher(x) for x in file_regexes] + all_files_matcher = cm.MatchAllFiles(file_matchers) + nested_matchers.append(all_files_matcher) + + # All patterns need to match a given skip-if predicate + skip_matchers.append(cm.MatchAll(nested_matchers)) + + if skip_matchers: + # Any skip-if predicate can be matched to trigger a skip + return cm.MatchAny(skip_matchers) diff --git a/zuul/connection/gerrit.py b/zuul/connection/gerrit.py index 8d2e771a83..a203c24820 100644 --- a/zuul/connection/gerrit.py +++ b/zuul/connection/gerrit.py @@ -25,7 +25,7 @@ import voluptuous as v import urllib2 from zuul.connection import BaseConnection -from zuul.model import TriggerEvent +from zuul.model import TriggerEvent, Project class GerritEventConnector(threading.Thread): @@ -221,8 +221,14 @@ class GerritConnection(BaseConnection): 'https://%s' % self.server) self._change_cache = {} + self.projects = {} self.gerrit_event_connector = None + def getProject(self, name): + if name not in self.projects: + self.projects[name] = Project(name) + return self.projects[name] + def getCachedChange(self, key): if key in self._change_cache: return self._change_cache.get(key) diff --git a/zuul/lib/connections.py b/zuul/lib/connections.py index cb26ba509c..5f42c3ac4f 100644 --- a/zuul/lib/connections.py +++ b/zuul/lib/connections.py @@ -18,49 +18,113 @@ import zuul.connection.gerrit import zuul.connection.smtp -def configure_connections(config): - # Register connections from the config +class ConnectionRegistry(object): + """A registry of connections""" - # TODO(jhesketh): import connection modules dynamically - connections = {} + def __init__(self, sched): + self.connections = {} + self.sched = sched # TODOv3(jeblair): remove (abstraction violation) - for section_name in config.sections(): - con_match = re.match(r'^connection ([\'\"]?)(.*)(\1)$', - section_name, re.I) - if not con_match: - continue - con_name = con_match.group(2) - con_config = dict(config.items(section_name)) + def registerScheduler(self, sched): + for connection_name, connection in self.connections.items(): + connection.registerScheduler(sched) + connection.onLoad() - if 'driver' not in con_config: - raise Exception("No driver specified for connection %s." - % con_name) + def stop(self): + for connection_name, connection in self.connections.items(): + connection.onStop() - con_driver = con_config['driver'] + def configure(self, config): + # Register connections from the config + # TODO(jhesketh): import connection modules dynamically + connections = {} - # TODO(jhesketh): load the required class automatically - if con_driver == 'gerrit': - connections[con_name] = \ - zuul.connection.gerrit.GerritConnection(con_name, - con_config) - elif con_driver == 'smtp': - connections[con_name] = \ - zuul.connection.smtp.SMTPConnection(con_name, con_config) + for section_name in config.sections(): + con_match = re.match(r'^connection ([\'\"]?)(.*)(\1)$', + section_name, re.I) + if not con_match: + continue + con_name = con_match.group(2) + con_config = dict(config.items(section_name)) + + if 'driver' not in con_config: + raise Exception("No driver specified for connection %s." + % con_name) + + con_driver = con_config['driver'] + + # TODO(jhesketh): load the required class automatically + if con_driver == 'gerrit': + connections[con_name] = \ + zuul.connection.gerrit.GerritConnection(con_name, + con_config) + elif con_driver == 'smtp': + connections[con_name] = \ + zuul.connection.smtp.SMTPConnection(con_name, con_config) + else: + raise Exception("Unknown driver, %s, for connection %s" + % (con_config['driver'], con_name)) + + # If the [gerrit] or [smtp] sections still exist, load them in as a + # connection named 'gerrit' or 'smtp' respectfully + + if 'gerrit' in config.sections(): + connections['gerrit'] = \ + zuul.connection.gerrit.GerritConnection( + '_legacy_gerrit', dict(config.items('gerrit'))) + + if 'smtp' in config.sections(): + connections['smtp'] = \ + zuul.connection.smtp.SMTPConnection( + '_legacy_smtp', dict(config.items('smtp'))) + + self.connections = connections + + def _getDriver(self, dtype, connection_name, driver_config={}): + # Instantiate a driver such as a trigger, source or reporter + # TODO(jhesketh): Make this list dynamic or use entrypoints etc. + # Stevedore was not a good fit here due to the nature of triggers. + # Specifically we don't want to load a trigger per a pipeline as one + # trigger can listen to a stream (from gerrit, for example) and the + # scheduler decides which eventfilter to use. As such we want to load + # trigger+connection pairs uniquely. + drivers = { + 'source': { + 'gerrit': 'zuul.source.gerrit:GerritSource', + }, + 'trigger': { + 'gerrit': 'zuul.trigger.gerrit:GerritTrigger', + 'timer': 'zuul.trigger.timer:TimerTrigger', + 'zuul': 'zuul.trigger.zuultrigger:ZuulTrigger', + }, + 'reporter': { + 'gerrit': 'zuul.reporter.gerrit:GerritReporter', + 'smtp': 'zuul.reporter.smtp:SMTPReporter', + }, + } + + # TODO(jhesketh): Check the connection_name exists + if connection_name in self.connections.keys(): + driver_name = self.connections[connection_name].driver_name + connection = self.connections[connection_name] else: - raise Exception("Unknown driver, %s, for connection %s" - % (con_config['driver'], con_name)) + # In some cases a driver may not be related to a connection. For + # example, the 'timer' or 'zuul' triggers. + driver_name = connection_name + connection = None + driver = drivers[dtype][driver_name].split(':') + driver_instance = getattr( + __import__(driver[0], fromlist=['']), driver[1])( + driver_config, self.sched, connection + ) - # If the [gerrit] or [smtp] sections still exist, load them in as a - # connection named 'gerrit' or 'smtp' respectfully + return driver_instance - if 'gerrit' in config.sections(): - connections['gerrit'] = \ - zuul.connection.gerrit.GerritConnection( - '_legacy_gerrit', dict(config.items('gerrit'))) + def getSource(self, connection_name): + return self._getDriver('source', connection_name) - if 'smtp' in config.sections(): - connections['smtp'] = \ - zuul.connection.smtp.SMTPConnection( - '_legacy_smtp', dict(config.items('smtp'))) + def getReporter(self, connection_name, driver_config={}): + return self._getDriver('reporter', connection_name, driver_config) - return connections + def getTrigger(self, connection_name, driver_config={}): + return self._getDriver('trigger', connection_name, driver_config) diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py new file mode 100644 index 0000000000..00e949b679 --- /dev/null +++ b/zuul/manager/__init__.py @@ -0,0 +1,773 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import extras +import logging + +from zuul import exceptions +from zuul.model import NullChange + +statsd = extras.try_import('statsd.statsd') + + +class DynamicChangeQueueContextManager(object): + def __init__(self, change_queue): + self.change_queue = change_queue + + def __enter__(self): + return self.change_queue + + def __exit__(self, etype, value, tb): + if self.change_queue and not self.change_queue.queue: + self.change_queue.pipeline.removeQueue(self.change_queue) + + +class StaticChangeQueueContextManager(object): + def __init__(self, change_queue): + self.change_queue = change_queue + + def __enter__(self): + return self.change_queue + + def __exit__(self, etype, value, tb): + pass + + +class BasePipelineManager(object): + log = logging.getLogger("zuul.BasePipelineManager") + + def __init__(self, sched, pipeline): + self.sched = sched + self.pipeline = pipeline + self.event_filters = [] + self.changeish_filters = [] + + def __str__(self): + return "<%s %s>" % (self.__class__.__name__, self.pipeline.name) + + def _postConfig(self, layout): + self.log.info("Configured Pipeline Manager %s" % self.pipeline.name) + self.log.info(" Source: %s" % self.pipeline.source) + self.log.info(" Requirements:") + for f in self.changeish_filters: + self.log.info(" %s" % f) + self.log.info(" Events:") + for e in self.event_filters: + self.log.info(" %s" % e) + self.log.info(" Projects:") + + def log_jobs(tree, indent=0): + istr = ' ' + ' ' * indent + if tree.job: + efilters = '' + for b in tree.job._branches: + efilters += str(b) + for f in tree.job._files: + efilters += str(f) + if tree.job.skip_if_matcher: + efilters += str(tree.job.skip_if_matcher) + if efilters: + efilters = ' ' + efilters + hold = '' + if tree.job.hold_following_changes: + hold = ' [hold]' + voting = '' + if not tree.job.voting: + voting = ' [nonvoting]' + self.log.info("%s%s%s%s%s" % (istr, repr(tree.job), + efilters, hold, voting)) + for x in tree.job_trees: + log_jobs(x, indent + 2) + + for p in layout.projects.values(): + tree = self.pipeline.getJobTree(p) + if tree: + self.log.info(" %s" % p) + log_jobs(tree) + self.log.info(" On start:") + self.log.info(" %s" % self.pipeline.start_actions) + self.log.info(" On success:") + self.log.info(" %s" % self.pipeline.success_actions) + self.log.info(" On failure:") + self.log.info(" %s" % self.pipeline.failure_actions) + self.log.info(" On merge-failure:") + self.log.info(" %s" % self.pipeline.merge_failure_actions) + self.log.info(" When disabled:") + self.log.info(" %s" % self.pipeline.disabled_actions) + + def getSubmitAllowNeeds(self): + # Get a list of code review labels that are allowed to be + # "needed" in the submit records for a change, with respect + # to this queue. In other words, the list of review labels + # this queue itself is likely to set before submitting. + allow_needs = set() + for action_reporter in self.pipeline.success_actions: + allow_needs.update(action_reporter.getSubmitAllowNeeds()) + return allow_needs + + def eventMatches(self, event, change): + if event.forced_pipeline: + if event.forced_pipeline == self.pipeline.name: + self.log.debug("Event %s for change %s was directly assigned " + "to pipeline %s" % (event, change, self)) + return True + else: + return False + for ef in self.event_filters: + if ef.matches(event, change): + self.log.debug("Event %s for change %s matched %s " + "in pipeline %s" % (event, change, ef, self)) + return True + return False + + def isChangeAlreadyInPipeline(self, change): + # Checks live items in the pipeline + for item in self.pipeline.getAllItems(): + if item.live and change.equals(item.change): + return True + return False + + def isChangeAlreadyInQueue(self, change, change_queue): + # Checks any item in the specified change queue + for item in change_queue.queue: + if change.equals(item.change): + return True + return False + + def reportStart(self, item): + if not self.pipeline._disabled: + try: + self.log.info("Reporting start, action %s item %s" % + (self.pipeline.start_actions, item)) + ret = self.sendReport(self.pipeline.start_actions, + self.pipeline.source, item) + if ret: + self.log.error("Reporting item start %s received: %s" % + (item, ret)) + except: + self.log.exception("Exception while reporting start:") + + def sendReport(self, action_reporters, source, item, + message=None): + """Sends the built message off to configured reporters. + + Takes the action_reporters, item, message and extra options and + sends them to the pluggable reporters. + """ + report_errors = [] + if len(action_reporters) > 0: + for reporter in action_reporters: + ret = reporter.report(source, self.pipeline, item) + if ret: + report_errors.append(ret) + if len(report_errors) == 0: + return + return report_errors + + def isChangeReadyToBeEnqueued(self, change): + return True + + def enqueueChangesAhead(self, change, quiet, ignore_requirements, + change_queue): + return True + + def enqueueChangesBehind(self, change, quiet, ignore_requirements, + change_queue): + return True + + def checkForChangesNeededBy(self, change, change_queue): + return True + + def getFailingDependentItems(self, item): + return None + + def getDependentItems(self, item): + orig_item = item + items = [] + while item.item_ahead: + items.append(item.item_ahead) + item = item.item_ahead + self.log.info("Change %s depends on changes %s" % + (orig_item.change, + [x.change for x in items])) + return items + + def getItemForChange(self, change): + for item in self.pipeline.getAllItems(): + if item.change.equals(change): + return item + return None + + def findOldVersionOfChangeAlreadyInQueue(self, change): + for item in self.pipeline.getAllItems(): + if not item.live: + continue + if change.isUpdateOf(item.change): + return item + return None + + def removeOldVersionsOfChange(self, change): + if not self.pipeline.dequeue_on_new_patchset: + return + old_item = self.findOldVersionOfChangeAlreadyInQueue(change) + if old_item: + self.log.debug("Change %s is a new version of %s, removing %s" % + (change, old_item.change, old_item)) + self.removeItem(old_item) + + def removeAbandonedChange(self, change): + self.log.debug("Change %s abandoned, removing." % change) + for item in self.pipeline.getAllItems(): + if not item.live: + continue + if item.change.equals(change): + self.removeItem(item) + + def reEnqueueItem(self, item, last_head): + with self.getChangeQueue(item.change, last_head.queue) as change_queue: + if change_queue: + self.log.debug("Re-enqueing change %s in queue %s" % + (item.change, change_queue)) + change_queue.enqueueItem(item) + + # Re-set build results in case any new jobs have been + # added to the tree. + for build in item.current_build_set.getBuilds(): + if build.result: + self.pipeline.setResult(item, build) + # Similarly, reset the item state. + if item.current_build_set.unable_to_merge: + self.pipeline.setUnableToMerge(item) + if item.dequeued_needing_change: + self.pipeline.setDequeuedNeedingChange(item) + + self.reportStats(item) + return True + else: + self.log.error("Unable to find change queue for project %s" % + item.change.project) + return False + + def addChange(self, change, quiet=False, enqueue_time=None, + ignore_requirements=False, live=True, + change_queue=None): + self.log.debug("Considering adding change %s" % change) + + # If we are adding a live change, check if it's a live item + # anywhere in the pipeline. Otherwise, we will perform the + # duplicate check below on the specific change_queue. + if live and self.isChangeAlreadyInPipeline(change): + self.log.debug("Change %s is already in pipeline, " + "ignoring" % change) + return True + + if not self.isChangeReadyToBeEnqueued(change): + self.log.debug("Change %s is not ready to be enqueued, ignoring" % + change) + return False + + if not ignore_requirements: + for f in self.changeish_filters: + if not f.matches(change): + self.log.debug("Change %s does not match pipeline " + "requirement %s" % (change, f)) + return False + + with self.getChangeQueue(change, change_queue) as change_queue: + if not change_queue: + self.log.debug("Unable to find change queue for " + "change %s in project %s" % + (change, change.project)) + return False + + if not self.enqueueChangesAhead(change, quiet, ignore_requirements, + change_queue): + self.log.debug("Failed to enqueue changes " + "ahead of %s" % change) + return False + + if self.isChangeAlreadyInQueue(change, change_queue): + self.log.debug("Change %s is already in queue, " + "ignoring" % change) + return True + + self.log.debug("Adding change %s to queue %s" % + (change, change_queue)) + item = change_queue.enqueueChange(change) + if enqueue_time: + item.enqueue_time = enqueue_time + item.live = live + self.reportStats(item) + if not quiet: + if len(self.pipeline.start_actions) > 0: + self.reportStart(item) + self.enqueueChangesBehind(change, quiet, ignore_requirements, + change_queue) + for trigger in self.sched.triggers.values(): + trigger.onChangeEnqueued(item.change, self.pipeline) + return True + + def dequeueItem(self, item): + self.log.debug("Removing change %s from queue" % item.change) + item.queue.dequeueItem(item) + + def removeItem(self, item): + # Remove an item from the queue, probably because it has been + # superseded by another change. + self.log.debug("Canceling builds behind change: %s " + "because it is being removed." % item.change) + self.cancelJobs(item) + self.dequeueItem(item) + self.reportStats(item) + + def _makeMergerItem(self, item): + # Create a dictionary with all info about the item needed by + # the merger. + number = None + patchset = None + oldrev = None + newrev = None + if hasattr(item.change, 'number'): + number = item.change.number + patchset = item.change.patchset + elif hasattr(item.change, 'newrev'): + oldrev = item.change.oldrev + newrev = item.change.newrev + connection_name = self.pipeline.source.connection.connection_name + return dict(project=item.change.project.name, + url=self.pipeline.source.getGitUrl( + item.change.project), + connection_name=connection_name, + merge_mode=item.change.project.merge_mode, + refspec=item.change.refspec, + branch=item.change.branch, + ref=item.current_build_set.ref, + number=number, + patchset=patchset, + oldrev=oldrev, + newrev=newrev, + ) + + def prepareRef(self, item): + # Returns True if the ref is ready, false otherwise + build_set = item.current_build_set + if build_set.merge_state == build_set.COMPLETE: + return True + if build_set.merge_state == build_set.PENDING: + return False + build_set.merge_state = build_set.PENDING + ref = build_set.ref + if hasattr(item.change, 'refspec') and not ref: + self.log.debug("Preparing ref for: %s" % item.change) + item.current_build_set.setConfiguration() + dependent_items = self.getDependentItems(item) + dependent_items.reverse() + all_items = dependent_items + [item] + merger_items = map(self._makeMergerItem, all_items) + self.sched.merger.mergeChanges(merger_items, + item.current_build_set, + self.pipeline.precedence) + else: + self.log.debug("Preparing update repo for: %s" % item.change) + url = self.pipeline.source.getGitUrl(item.change.project) + self.sched.merger.updateRepo(item.change.project.name, + url, build_set, + self.pipeline.precedence) + return False + + def _launchJobs(self, item, jobs): + self.log.debug("Launching jobs for change %s" % item.change) + dependent_items = self.getDependentItems(item) + for job in jobs: + self.log.debug("Found job %s for change %s" % (job, item.change)) + try: + build = self.sched.launcher.launch(job, item, + self.pipeline, + dependent_items) + self.log.debug("Adding build %s of job %s to item %s" % + (build, job, item)) + item.addBuild(build) + except: + self.log.exception("Exception while launching job %s " + "for change %s:" % (job, item.change)) + + def launchJobs(self, item): + # TODO(jeblair): This should return a value indicating a job + # was launched. Appears to be a longstanding bug. + jobs = self.pipeline.findJobsToRun(item) + if jobs: + self._launchJobs(item, jobs) + + def cancelJobs(self, item, prime=True): + self.log.debug("Cancel jobs for change %s" % item.change) + canceled = False + old_build_set = item.current_build_set + if prime and item.current_build_set.ref: + item.resetAllBuilds() + for build in old_build_set.getBuilds(): + try: + self.sched.launcher.cancel(build) + except: + self.log.exception("Exception while canceling build %s " + "for change %s" % (build, item.change)) + build.result = 'CANCELED' + canceled = True + self.updateBuildDescriptions(old_build_set) + for item_behind in item.items_behind: + self.log.debug("Canceling jobs for change %s, behind change %s" % + (item_behind.change, item.change)) + if self.cancelJobs(item_behind, prime=prime): + canceled = True + return canceled + + def _processOneItem(self, item, nnfi): + changed = False + item_ahead = item.item_ahead + if item_ahead and (not item_ahead.live): + item_ahead = None + change_queue = item.queue + failing_reasons = [] # Reasons this item is failing + + if self.checkForChangesNeededBy(item.change, change_queue) is not True: + # It's not okay to enqueue this change, we should remove it. + self.log.info("Dequeuing change %s because " + "it can no longer merge" % item.change) + self.cancelJobs(item) + self.dequeueItem(item) + self.pipeline.setDequeuedNeedingChange(item) + if item.live: + try: + self.reportItem(item) + except exceptions.MergeFailure: + pass + return (True, nnfi) + dep_items = self.getFailingDependentItems(item) + actionable = change_queue.isActionable(item) + item.active = actionable + ready = False + if dep_items: + failing_reasons.append('a needed change is failing') + self.cancelJobs(item, prime=False) + else: + item_ahead_merged = False + if (item_ahead and item_ahead.change.is_merged): + item_ahead_merged = True + if (item_ahead != nnfi and not item_ahead_merged): + # Our current base is different than what we expected, + # and it's not because our current base merged. Something + # ahead must have failed. + self.log.info("Resetting builds for change %s because the " + "item ahead, %s, is not the nearest non-failing " + "item, %s" % (item.change, item_ahead, nnfi)) + change_queue.moveItem(item, nnfi) + changed = True + self.cancelJobs(item) + if actionable: + ready = self.prepareRef(item) + if item.current_build_set.unable_to_merge: + failing_reasons.append("it has a merge conflict") + ready = False + if actionable and ready and self.launchJobs(item): + changed = True + if self.pipeline.didAnyJobFail(item): + failing_reasons.append("at least one job failed") + if (not item.live) and (not item.items_behind): + failing_reasons.append("is a non-live item with no items behind") + self.dequeueItem(item) + changed = True + if ((not item_ahead) and self.pipeline.areAllJobsComplete(item) + and item.live): + try: + self.reportItem(item) + except exceptions.MergeFailure: + failing_reasons.append("it did not merge") + for item_behind in item.items_behind: + self.log.info("Resetting builds for change %s because the " + "item ahead, %s, failed to merge" % + (item_behind.change, item)) + self.cancelJobs(item_behind) + self.dequeueItem(item) + changed = True + elif not failing_reasons and item.live: + nnfi = item + item.current_build_set.failing_reasons = failing_reasons + if failing_reasons: + self.log.debug("%s is a failing item because %s" % + (item, failing_reasons)) + return (changed, nnfi) + + def processQueue(self): + # Do whatever needs to be done for each change in the queue + self.log.debug("Starting queue processor: %s" % self.pipeline.name) + changed = False + for queue in self.pipeline.queues: + queue_changed = False + nnfi = None # Nearest non-failing item + for item in queue.queue[:]: + item_changed, nnfi = self._processOneItem( + item, nnfi) + if item_changed: + queue_changed = True + self.reportStats(item) + if queue_changed: + changed = True + status = '' + for item in queue.queue: + status += item.formatStatus() + if status: + self.log.debug("Queue %s status is now:\n %s" % + (queue.name, status)) + self.log.debug("Finished queue processor: %s (changed: %s)" % + (self.pipeline.name, changed)) + return changed + + def updateBuildDescriptions(self, build_set): + for build in build_set.getBuilds(): + desc = self.formatDescription(build) + self.sched.launcher.setBuildDescription(build, desc) + + if build_set.previous_build_set: + for build in build_set.previous_build_set.getBuilds(): + desc = self.formatDescription(build) + self.sched.launcher.setBuildDescription(build, desc) + + def onBuildStarted(self, build): + self.log.debug("Build %s started" % build) + return True + + def onBuildCompleted(self, build): + self.log.debug("Build %s completed" % build) + item = build.build_set.item + + self.pipeline.setResult(item, build) + self.log.debug("Item %s status is now:\n %s" % + (item, item.formatStatus())) + return True + + def onMergeCompleted(self, event): + build_set = event.build_set + item = build_set.item + build_set.merge_state = build_set.COMPLETE + build_set.zuul_url = event.zuul_url + if event.merged: + build_set.commit = event.commit + elif event.updated: + if not isinstance(item, NullChange): + build_set.commit = item.change.newrev + if not build_set.commit: + self.log.info("Unable to merge change %s" % item.change) + self.pipeline.setUnableToMerge(item) + + def reportItem(self, item): + if not item.reported: + # _reportItem() returns True if it failed to report. + item.reported = not self._reportItem(item) + if self.changes_merge: + succeeded = self.pipeline.didAllJobsSucceed(item) + merged = item.reported + if merged: + merged = self.pipeline.source.isMerged(item.change, + item.change.branch) + self.log.info("Reported change %s status: all-succeeded: %s, " + "merged: %s" % (item.change, succeeded, merged)) + change_queue = item.queue + if not (succeeded and merged): + self.log.debug("Reported change %s failed tests or failed " + "to merge" % (item.change)) + change_queue.decreaseWindowSize() + self.log.debug("%s window size decreased to %s" % + (change_queue, change_queue.window)) + raise exceptions.MergeFailure( + "Change %s failed to merge" % item.change) + else: + change_queue.increaseWindowSize() + self.log.debug("%s window size increased to %s" % + (change_queue, change_queue.window)) + + for trigger in self.sched.triggers.values(): + trigger.onChangeMerged(item.change, self.pipeline.source) + + def _reportItem(self, item): + self.log.debug("Reporting change %s" % item.change) + ret = True # Means error as returned by trigger.report + if not self.pipeline.getJobs(item): + # We don't send empty reports with +1, + # and the same for -1's (merge failures or transient errors) + # as they cannot be followed by +1's + self.log.debug("No jobs for change %s" % item.change) + actions = [] + elif self.pipeline.didAllJobsSucceed(item): + self.log.debug("success %s" % (self.pipeline.success_actions)) + actions = self.pipeline.success_actions + item.setReportedResult('SUCCESS') + self.pipeline._consecutive_failures = 0 + elif not self.pipeline.didMergerSucceed(item): + actions = self.pipeline.merge_failure_actions + item.setReportedResult('MERGER_FAILURE') + else: + actions = self.pipeline.failure_actions + item.setReportedResult('FAILURE') + self.pipeline._consecutive_failures += 1 + if self.pipeline._disabled: + actions = self.pipeline.disabled_actions + # Check here if we should disable so that we only use the disabled + # reporters /after/ the last disable_at failure is still reported as + # normal. + if (self.pipeline.disable_at and not self.pipeline._disabled and + self.pipeline._consecutive_failures >= self.pipeline.disable_at): + self.pipeline._disabled = True + if actions: + try: + self.log.info("Reporting item %s, actions: %s" % + (item, actions)) + ret = self.sendReport(actions, self.pipeline.source, item) + if ret: + self.log.error("Reporting item %s received: %s" % + (item, ret)) + except: + self.log.exception("Exception while reporting:") + item.setReportedResult('ERROR') + self.updateBuildDescriptions(item.current_build_set) + return ret + + def formatDescription(self, build): + concurrent_changes = '' + concurrent_builds = '' + other_builds = '' + + for change in build.build_set.other_changes: + concurrent_changes += '
+ Triggered by change:
+ {change.number},{change.patchset}
+ Branch: {change.branch}
+ Pipeline: {self.pipeline.name}
+
+ Triggered by reference:
+ {change.ref}
+ Old revision: {change.oldrev}
+ New revision: {change.newrev}
+ Pipeline: {self.pipeline.name}
+
+ Other changes tested concurrently with this change: +
+ All builds for this change set: +
+ Other build sets for this change: +
+ Reported result: {result} +
+""" + + ret = ret.format(**locals()) + return ret + + def reportStats(self, item): + if not statsd: + return + try: + # Update the gauge on enqueue and dequeue, but timers only + # when dequeing. + if item.dequeue_time: + dt = int((item.dequeue_time - item.enqueue_time) * 1000) + else: + dt = None + items = len(self.pipeline.getAllItems()) + + # stats.timers.zuul.pipeline.NAME.resident_time + # stats_counts.zuul.pipeline.NAME.total_changes + # stats.gauges.zuul.pipeline.NAME.current_changes + key = 'zuul.pipeline.%s' % self.pipeline.name + statsd.gauge(key + '.current_changes', items) + if dt: + statsd.timing(key + '.resident_time', dt) + statsd.incr(key + '.total_changes') + + # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time + # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes + project_name = item.change.project.name.replace('/', '.') + key += '.%s' % project_name + if dt: + statsd.timing(key + '.resident_time', dt) + statsd.incr(key + '.total_changes') + except: + self.log.exception("Exception reporting pipeline stats") diff --git a/zuul/manager/dependent.py b/zuul/manager/dependent.py new file mode 100644 index 0000000000..02dc9f6ae7 --- /dev/null +++ b/zuul/manager/dependent.py @@ -0,0 +1,198 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from zuul import model +from zuul.manager import BasePipelineManager, StaticChangeQueueContextManager + + +class DependentPipelineManager(BasePipelineManager): + log = logging.getLogger("zuul.DependentPipelineManager") + changes_merge = True + + def __init__(self, *args, **kwargs): + super(DependentPipelineManager, self).__init__(*args, **kwargs) + + def _postConfig(self, layout): + super(DependentPipelineManager, self)._postConfig(layout) + self.buildChangeQueues() + + def buildChangeQueues(self): + self.log.debug("Building shared change queues") + change_queues = [] + + for project in self.pipeline.getProjects(): + change_queue = model.ChangeQueue( + self.pipeline, + window=self.pipeline.window, + window_floor=self.pipeline.window_floor, + window_increase_type=self.pipeline.window_increase_type, + window_increase_factor=self.pipeline.window_increase_factor, + window_decrease_type=self.pipeline.window_decrease_type, + window_decrease_factor=self.pipeline.window_decrease_factor) + change_queue.addProject(project) + change_queues.append(change_queue) + self.log.debug("Created queue: %s" % change_queue) + + # Iterate over all queues trying to combine them, and keep doing + # so until they can not be combined further. + last_change_queues = change_queues + while True: + new_change_queues = self.combineChangeQueues(last_change_queues) + if len(last_change_queues) == len(new_change_queues): + break + last_change_queues = new_change_queues + + self.log.info(" Shared change queues:") + for queue in new_change_queues: + self.pipeline.addQueue(queue) + self.log.info(" %s containing %s" % ( + queue, queue.generated_name)) + + def combineChangeQueues(self, change_queues): + self.log.debug("Combining shared queues") + new_change_queues = [] + for a in change_queues: + merged_a = False + for b in new_change_queues: + if not a.getJobs().isdisjoint(b.getJobs()): + self.log.debug("Merging queue %s into %s" % (a, b)) + b.mergeChangeQueue(a) + merged_a = True + break # this breaks out of 'for b' and continues 'for a' + if not merged_a: + self.log.debug("Keeping queue %s" % (a)) + new_change_queues.append(a) + return new_change_queues + + def getChangeQueue(self, change, existing=None): + if existing: + return StaticChangeQueueContextManager(existing) + return StaticChangeQueueContextManager( + self.pipeline.getQueue(change.project)) + + def isChangeReadyToBeEnqueued(self, change): + if not self.pipeline.source.canMerge(change, + self.getSubmitAllowNeeds()): + self.log.debug("Change %s can not merge, ignoring" % change) + return False + return True + + def enqueueChangesBehind(self, change, quiet, ignore_requirements, + change_queue): + to_enqueue = [] + self.log.debug("Checking for changes needing %s:" % change) + if not hasattr(change, 'needed_by_changes'): + self.log.debug(" Changeish does not support dependencies") + return + for other_change in change.needed_by_changes: + with self.getChangeQueue(other_change) as other_change_queue: + if other_change_queue != change_queue: + self.log.debug(" Change %s in project %s can not be " + "enqueued in the target queue %s" % + (other_change, other_change.project, + change_queue)) + continue + if self.pipeline.source.canMerge(other_change, + self.getSubmitAllowNeeds()): + self.log.debug(" Change %s needs %s and is ready to merge" % + (other_change, change)) + to_enqueue.append(other_change) + + if not to_enqueue: + self.log.debug(" No changes need %s" % change) + + for other_change in to_enqueue: + self.addChange(other_change, quiet=quiet, + ignore_requirements=ignore_requirements, + change_queue=change_queue) + + def enqueueChangesAhead(self, change, quiet, ignore_requirements, + change_queue): + ret = self.checkForChangesNeededBy(change, change_queue) + if ret in [True, False]: + return ret + self.log.debug(" Changes %s must be merged ahead of %s" % + (ret, change)) + for needed_change in ret: + r = self.addChange(needed_change, quiet=quiet, + ignore_requirements=ignore_requirements, + change_queue=change_queue) + if not r: + return False + return True + + def checkForChangesNeededBy(self, change, change_queue): + self.log.debug("Checking for changes needed by %s:" % change) + # Return true if okay to proceed enqueing this change, + # false if the change should not be enqueued. + if not hasattr(change, 'needs_changes'): + self.log.debug(" Changeish does not support dependencies") + return True + if not change.needs_changes: + self.log.debug(" No changes needed") + return True + changes_needed = [] + # Ignore supplied change_queue + with self.getChangeQueue(change) as change_queue: + for needed_change in change.needs_changes: + self.log.debug(" Change %s needs change %s:" % ( + change, needed_change)) + if needed_change.is_merged: + self.log.debug(" Needed change is merged") + continue + with self.getChangeQueue(needed_change) as needed_change_queue: + if needed_change_queue != change_queue: + self.log.debug(" Change %s in project %s does not " + "share a change queue with %s " + "in project %s" % + (needed_change, needed_change.project, + change, change.project)) + return False + if not needed_change.is_current_patchset: + self.log.debug(" Needed change is not the " + "current patchset") + return False + if self.isChangeAlreadyInQueue(needed_change, change_queue): + self.log.debug(" Needed change is already ahead " + "in the queue") + continue + if self.pipeline.source.canMerge(needed_change, + self.getSubmitAllowNeeds()): + self.log.debug(" Change %s is needed" % needed_change) + if needed_change not in changes_needed: + changes_needed.append(needed_change) + continue + # The needed change can't be merged. + self.log.debug(" Change %s is needed but can not be merged" % + needed_change) + return False + if changes_needed: + return changes_needed + return True + + def getFailingDependentItems(self, item): + if not hasattr(item.change, 'needs_changes'): + return None + if not item.change.needs_changes: + return None + failing_items = set() + for needed_change in item.change.needs_changes: + needed_item = self.getItemForChange(needed_change) + if not needed_item: + continue + if needed_item.current_build_set.failing_reasons: + failing_items.add(needed_item) + if failing_items: + return failing_items + return None diff --git a/zuul/manager/independent.py b/zuul/manager/independent.py new file mode 100644 index 0000000000..56901898df --- /dev/null +++ b/zuul/manager/independent.py @@ -0,0 +1,95 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from zuul import model +from zuul.manager import BasePipelineManager, DynamicChangeQueueContextManager + + +class IndependentPipelineManager(BasePipelineManager): + log = logging.getLogger("zuul.IndependentPipelineManager") + changes_merge = False + + def _postConfig(self, layout): + super(IndependentPipelineManager, self)._postConfig(layout) + + def getChangeQueue(self, change, existing=None): + # creates a new change queue for every change + if existing: + return DynamicChangeQueueContextManager(existing) + if change.project not in self.pipeline.getProjects(): + self.pipeline.addProject(change.project) + change_queue = model.ChangeQueue(self.pipeline) + change_queue.addProject(change.project) + self.pipeline.addQueue(change_queue) + self.log.debug("Dynamically created queue %s", change_queue) + return DynamicChangeQueueContextManager(change_queue) + + def enqueueChangesAhead(self, change, quiet, ignore_requirements, + change_queue): + ret = self.checkForChangesNeededBy(change, change_queue) + if ret in [True, False]: + return ret + self.log.debug(" Changes %s must be merged ahead of %s" % + (ret, change)) + for needed_change in ret: + # This differs from the dependent pipeline by enqueuing + # changes ahead as "not live", that is, not intended to + # have jobs run. Also, pipeline requirements are always + # ignored (which is safe because the changes are not + # live). + r = self.addChange(needed_change, quiet=True, + ignore_requirements=True, + live=False, change_queue=change_queue) + if not r: + return False + return True + + def checkForChangesNeededBy(self, change, change_queue): + if self.pipeline.ignore_dependencies: + return True + self.log.debug("Checking for changes needed by %s:" % change) + # Return true if okay to proceed enqueing this change, + # false if the change should not be enqueued. + if not hasattr(change, 'needs_changes'): + self.log.debug(" Changeish does not support dependencies") + return True + if not change.needs_changes: + self.log.debug(" No changes needed") + return True + changes_needed = [] + for needed_change in change.needs_changes: + self.log.debug(" Change %s needs change %s:" % ( + change, needed_change)) + if needed_change.is_merged: + self.log.debug(" Needed change is merged") + continue + if self.isChangeAlreadyInQueue(needed_change, change_queue): + self.log.debug(" Needed change is already ahead in the queue") + continue + self.log.debug(" Change %s is needed" % needed_change) + if needed_change not in changes_needed: + changes_needed.append(needed_change) + continue + # This differs from the dependent pipeline check in not + # verifying that the dependent change is mergable. + if changes_needed: + return changes_needed + return True + + def dequeueItem(self, item): + super(IndependentPipelineManager, self).dequeueItem(item) + # An independent pipeline manager dynamically removes empty + # queues + if not item.queue.queue: + self.pipeline.removeQueue(item.queue) diff --git a/zuul/merger/merger.py b/zuul/merger/merger.py index b7e1842d38..eaa5721f72 100644 --- a/zuul/merger/merger.py +++ b/zuul/merger/merger.py @@ -209,7 +209,7 @@ class Merger(object): self.username = username def _makeSSHWrappers(self, working_root, connections): - for connection_name, connection in connections.items(): + for connection_name, connection in connections.connections.items(): sshkey = connection.connection_config.get('sshkey') if sshkey: self._makeSSHWrapper(sshkey, working_root, connection_name) diff --git a/zuul/model.py b/zuul/model.py index 2893d6f306..aa21f859c9 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -67,8 +67,9 @@ def normalizeCategory(name): class Pipeline(object): """A top-level pipeline such as check, gate, post, etc.""" - def __init__(self, name): + def __init__(self, name, layout): self.name = name + self.layout = layout self.description = None self.failure_message = None self.merge_failure_message = None @@ -81,6 +82,7 @@ class Pipeline(object): self.queues = [] self.precedence = PRECEDENCE_NORMAL self.source = None + self.triggers = [] self.start_actions = [] self.success_actions = [] self.failure_actions = [] @@ -96,6 +98,16 @@ class Pipeline(object): self.window_decrease_type = None self.window_decrease_factor = None + @property + def actions(self): + return ( + self.start_actions + + self.success_actions + + self.failure_actions + + self.merge_failure_actions + + self.disabled_actions + ) + def __repr__(self): return '
- Triggered by change:
- {change.number},{change.patchset}
- Branch: {change.branch}
- Pipeline: {self.pipeline.name}
-
- Triggered by reference:
- {change.ref}
- Old revision: {change.oldrev}
- New revision: {change.newrev}
- Pipeline: {self.pipeline.name}
-
- Other changes tested concurrently with this change: -
- All builds for this change set: -
- Other build sets for this change: -
- Reported result: {result} -
-""" - - ret = ret.format(**locals()) - return ret - - def reportStats(self, item): - if not statsd: - return - try: - # Update the gauge on enqueue and dequeue, but timers only - # when dequeing. - if item.dequeue_time: - dt = int((item.dequeue_time - item.enqueue_time) * 1000) - else: - dt = None - items = len(self.pipeline.getAllItems()) - - # stats.timers.zuul.pipeline.NAME.resident_time - # stats_counts.zuul.pipeline.NAME.total_changes - # stats.gauges.zuul.pipeline.NAME.current_changes - key = 'zuul.pipeline.%s' % self.pipeline.name - statsd.gauge(key + '.current_changes', items) - if dt: - statsd.timing(key + '.resident_time', dt) - statsd.incr(key + '.total_changes') - - # stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time - # stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes - project_name = item.change.project.name.replace('/', '.') - key += '.%s' % project_name - if dt: - statsd.timing(key + '.resident_time', dt) - statsd.incr(key + '.total_changes') - except: - self.log.exception("Exception reporting pipeline stats") - - -class DynamicChangeQueueContextManager(object): - def __init__(self, change_queue): - self.change_queue = change_queue - - def __enter__(self): - return self.change_queue - - def __exit__(self, etype, value, tb): - if self.change_queue and not self.change_queue.queue: - self.change_queue.pipeline.removeQueue(self.change_queue.queue) - - -class IndependentPipelineManager(BasePipelineManager): - log = logging.getLogger("zuul.IndependentPipelineManager") - changes_merge = False - - def _postConfig(self, layout): - super(IndependentPipelineManager, self)._postConfig(layout) - - def getChangeQueue(self, change, existing=None): - # creates a new change queue for every change - if existing: - return DynamicChangeQueueContextManager(existing) - if change.project not in self.pipeline.getProjects(): - self.pipeline.addProject(change.project) - change_queue = ChangeQueue(self.pipeline) - change_queue.addProject(change.project) - self.pipeline.addQueue(change_queue) - self.log.debug("Dynamically created queue %s", change_queue) - return DynamicChangeQueueContextManager(change_queue) - - def enqueueChangesAhead(self, change, quiet, ignore_requirements, - change_queue): - ret = self.checkForChangesNeededBy(change, change_queue) - if ret in [True, False]: - return ret - self.log.debug(" Changes %s must be merged ahead of %s" % - (ret, change)) - for needed_change in ret: - # This differs from the dependent pipeline by enqueuing - # changes ahead as "not live", that is, not intended to - # have jobs run. Also, pipeline requirements are always - # ignored (which is safe because the changes are not - # live). - r = self.addChange(needed_change, quiet=True, - ignore_requirements=True, - live=False, change_queue=change_queue) - if not r: - return False - return True - - def checkForChangesNeededBy(self, change, change_queue): - if self.pipeline.ignore_dependencies: - return True - self.log.debug("Checking for changes needed by %s:" % change) - # Return true if okay to proceed enqueing this change, - # false if the change should not be enqueued. - if not hasattr(change, 'needs_changes'): - self.log.debug(" Changeish does not support dependencies") - return True - if not change.needs_changes: - self.log.debug(" No changes needed") - return True - changes_needed = [] - for needed_change in change.needs_changes: - self.log.debug(" Change %s needs change %s:" % ( - change, needed_change)) - if needed_change.is_merged: - self.log.debug(" Needed change is merged") - continue - if self.isChangeAlreadyInQueue(needed_change, change_queue): - self.log.debug(" Needed change is already ahead in the queue") - continue - self.log.debug(" Change %s is needed" % needed_change) - if needed_change not in changes_needed: - changes_needed.append(needed_change) - continue - # This differs from the dependent pipeline check in not - # verifying that the dependent change is mergable. - if changes_needed: - return changes_needed - return True - - def dequeueItem(self, item): - super(IndependentPipelineManager, self).dequeueItem(item) - # An independent pipeline manager dynamically removes empty - # queues - if not item.queue.queue: - self.pipeline.removeQueue(item.queue) - - -class StaticChangeQueueContextManager(object): - def __init__(self, change_queue): - self.change_queue = change_queue - - def __enter__(self): - return self.change_queue - - def __exit__(self, etype, value, tb): - pass - - -class DependentPipelineManager(BasePipelineManager): - log = logging.getLogger("zuul.DependentPipelineManager") - changes_merge = True - - def __init__(self, *args, **kwargs): - super(DependentPipelineManager, self).__init__(*args, **kwargs) - - def _postConfig(self, layout): - super(DependentPipelineManager, self)._postConfig(layout) - self.buildChangeQueues() - - def buildChangeQueues(self): - self.log.debug("Building shared change queues") - change_queues = [] - - for project in self.pipeline.getProjects(): - change_queue = ChangeQueue( - self.pipeline, - window=self.pipeline.window, - window_floor=self.pipeline.window_floor, - window_increase_type=self.pipeline.window_increase_type, - window_increase_factor=self.pipeline.window_increase_factor, - window_decrease_type=self.pipeline.window_decrease_type, - window_decrease_factor=self.pipeline.window_decrease_factor) - change_queue.addProject(project) - change_queues.append(change_queue) - self.log.debug("Created queue: %s" % change_queue) - - # Iterate over all queues trying to combine them, and keep doing - # so until they can not be combined further. - last_change_queues = change_queues - while True: - new_change_queues = self.combineChangeQueues(last_change_queues) - if len(last_change_queues) == len(new_change_queues): - break - last_change_queues = new_change_queues - - self.log.info(" Shared change queues:") - for queue in new_change_queues: - self.pipeline.addQueue(queue) - self.log.info(" %s containing %s" % ( - queue, queue.generated_name)) - - def combineChangeQueues(self, change_queues): - self.log.debug("Combining shared queues") - new_change_queues = [] - for a in change_queues: - merged_a = False - for b in new_change_queues: - if not a.getJobs().isdisjoint(b.getJobs()): - self.log.debug("Merging queue %s into %s" % (a, b)) - b.mergeChangeQueue(a) - merged_a = True - break # this breaks out of 'for b' and continues 'for a' - if not merged_a: - self.log.debug("Keeping queue %s" % (a)) - new_change_queues.append(a) - return new_change_queues - - def getChangeQueue(self, change, existing=None): - if existing: - return StaticChangeQueueContextManager(existing) - return StaticChangeQueueContextManager( - self.pipeline.getQueue(change.project)) - - def isChangeReadyToBeEnqueued(self, change): - if not self.pipeline.source.canMerge(change, - self.getSubmitAllowNeeds()): - self.log.debug("Change %s can not merge, ignoring" % change) - return False - return True - - def enqueueChangesBehind(self, change, quiet, ignore_requirements, - change_queue): - to_enqueue = [] - self.log.debug("Checking for changes needing %s:" % change) - if not hasattr(change, 'needed_by_changes'): - self.log.debug(" Changeish does not support dependencies") - return - for other_change in change.needed_by_changes: - with self.getChangeQueue(other_change) as other_change_queue: - if other_change_queue != change_queue: - self.log.debug(" Change %s in project %s can not be " - "enqueued in the target queue %s" % - (other_change, other_change.project, - change_queue)) - continue - if self.pipeline.source.canMerge(other_change, - self.getSubmitAllowNeeds()): - self.log.debug(" Change %s needs %s and is ready to merge" % - (other_change, change)) - to_enqueue.append(other_change) - - if not to_enqueue: - self.log.debug(" No changes need %s" % change) - - for other_change in to_enqueue: - self.addChange(other_change, quiet=quiet, - ignore_requirements=ignore_requirements, - change_queue=change_queue) - - def enqueueChangesAhead(self, change, quiet, ignore_requirements, - change_queue): - ret = self.checkForChangesNeededBy(change, change_queue) - if ret in [True, False]: - return ret - self.log.debug(" Changes %s must be merged ahead of %s" % - (ret, change)) - for needed_change in ret: - r = self.addChange(needed_change, quiet=quiet, - ignore_requirements=ignore_requirements, - change_queue=change_queue) - if not r: - return False - return True - - def checkForChangesNeededBy(self, change, change_queue): - self.log.debug("Checking for changes needed by %s:" % change) - # Return true if okay to proceed enqueing this change, - # false if the change should not be enqueued. - if not hasattr(change, 'needs_changes'): - self.log.debug(" Changeish does not support dependencies") - return True - if not change.needs_changes: - self.log.debug(" No changes needed") - return True - changes_needed = [] - # Ignore supplied change_queue - with self.getChangeQueue(change) as change_queue: - for needed_change in change.needs_changes: - self.log.debug(" Change %s needs change %s:" % ( - change, needed_change)) - if needed_change.is_merged: - self.log.debug(" Needed change is merged") - continue - with self.getChangeQueue(needed_change) as needed_change_queue: - if needed_change_queue != change_queue: - self.log.debug(" Change %s in project %s does not " - "share a change queue with %s " - "in project %s" % - (needed_change, needed_change.project, - change, change.project)) - return False - if not needed_change.is_current_patchset: - self.log.debug(" Needed change is not the " - "current patchset") - return False - if self.isChangeAlreadyInQueue(needed_change, change_queue): - self.log.debug(" Needed change is already ahead " - "in the queue") - continue - if self.pipeline.source.canMerge(needed_change, - self.getSubmitAllowNeeds()): - self.log.debug(" Change %s is needed" % needed_change) - if needed_change not in changes_needed: - changes_needed.append(needed_change) - continue - # The needed change can't be merged. - self.log.debug(" Change %s is needed but can not be merged" % - needed_change) - return False - if changes_needed: - return changes_needed - return True - - def getFailingDependentItems(self, item): - if not hasattr(item.change, 'needs_changes'): - return None - if not item.change.needs_changes: - return None - failing_items = set() - for needed_change in item.change.needs_changes: - needed_item = self.getItemForChange(needed_change) - if not needed_item: - continue - if needed_item.current_build_set.failing_reasons: - failing_items.add(needed_item) - if failing_items: - return failing_items - return None diff --git a/zuul/source/__init__.py b/zuul/source/__init__.py index 610ae6e5f6..bb79e9d7ec 100644 --- a/zuul/source/__init__.py +++ b/zuul/source/__init__.py @@ -31,7 +31,6 @@ class BaseSource(object): self.source_config = source_config self.sched = sched self.connection = connection - self.projects = {} @abc.abstractmethod def getRefSha(self, project, ref): diff --git a/zuul/source/gerrit.py b/zuul/source/gerrit.py index 883d77ad80..e1a137ac9a 100644 --- a/zuul/source/gerrit.py +++ b/zuul/source/gerrit.py @@ -16,7 +16,7 @@ import logging import re import time from zuul import exceptions -from zuul.model import Change, Ref, NullChange, Project +from zuul.model import Change, Ref, NullChange from zuul.source import BaseSource @@ -127,9 +127,7 @@ class GerritSource(BaseSource): pass def getProject(self, name): - if name not in self.projects: - self.projects[name] = Project(name) - return self.projects[name] + return self.connection.getProject(name) def getChange(self, event): if event.change_number: