Add job inheritance and start refactoring

This begins a lot of related changes refactoring config loading,
the data model, etc., which will continue in subsequent changes.

Change-Id: I2ca52a079a837555c1f668e29d5a2fe0a80c1af5
This commit is contained in:
James E. Blair 2015-12-11 14:46:03 -08:00
parent 07bf7c0bcb
commit 8300578a2a
26 changed files with 1918 additions and 1585 deletions

View File

@ -50,6 +50,7 @@ import zuul.webapp
import zuul.rpclistener import zuul.rpclistener
import zuul.launcher.gearman import zuul.launcher.gearman
import zuul.lib.swift import zuul.lib.swift
import zuul.lib.connections
import zuul.merger.client import zuul.merger.client
import zuul.merger.merger import zuul.merger.merger
import zuul.merger.server import zuul.merger.server
@ -864,6 +865,7 @@ class BaseTestCase(testtools.TestCase):
class ZuulTestCase(BaseTestCase): class ZuulTestCase(BaseTestCase):
config_file = 'zuul.conf'
def setUp(self): def setUp(self):
super(ZuulTestCase, self).setUp() super(ZuulTestCase, self).setUp()
@ -907,6 +909,8 @@ class ZuulTestCase(BaseTestCase):
self.init_repo("org/experimental-project") self.init_repo("org/experimental-project")
self.init_repo("org/no-jobs-project") self.init_repo("org/no-jobs-project")
self.setup_repos()
self.statsd = FakeStatsd() self.statsd = FakeStatsd()
# note, use 127.0.0.1 rather than localhost to avoid getting ipv6 # note, use 127.0.0.1 rather than localhost to avoid getting ipv6
# see: https://github.com/jsocol/pystatsd/issues/61 # see: https://github.com/jsocol/pystatsd/issues/61
@ -940,7 +944,7 @@ class ZuulTestCase(BaseTestCase):
self.sched.trigger_event_queue self.sched.trigger_event_queue
] ]
self.configure_connections() self.configure_connections(self.sched)
self.sched.registerConnections(self.connections) self.sched.registerConnections(self.connections)
def URLOpenerFactory(*args, **kw): def URLOpenerFactory(*args, **kw):
@ -979,7 +983,7 @@ class ZuulTestCase(BaseTestCase):
self.addCleanup(self.assertFinalState) self.addCleanup(self.assertFinalState)
self.addCleanup(self.shutdown) self.addCleanup(self.shutdown)
def configure_connections(self): def configure_connections(self, sched):
# Register connections from the config # Register connections from the config
self.smtp_messages = [] self.smtp_messages = []
@ -993,7 +997,7 @@ class ZuulTestCase(BaseTestCase):
# a virtual canonical database given by the configured hostname # a virtual canonical database given by the configured hostname
self.gerrit_changes_dbs = {} self.gerrit_changes_dbs = {}
self.gerrit_queues_dbs = {} self.gerrit_queues_dbs = {}
self.connections = {} self.connections = zuul.lib.connections.ConnectionRegistry(sched)
for section_name in self.config.sections(): for section_name in self.config.sections():
con_match = re.match(r'^connection ([\'\"]?)(.*)(\1)$', con_match = re.match(r'^connection ([\'\"]?)(.*)(\1)$',
@ -1018,15 +1022,16 @@ class ZuulTestCase(BaseTestCase):
Queue.Queue() Queue.Queue()
self.event_queues.append( self.event_queues.append(
self.gerrit_queues_dbs[con_config['server']]) self.gerrit_queues_dbs[con_config['server']])
self.connections[con_name] = FakeGerritConnection( self.connections.connections[con_name] = FakeGerritConnection(
con_name, con_config, con_name, con_config,
changes_db=self.gerrit_changes_dbs[con_config['server']], changes_db=self.gerrit_changes_dbs[con_config['server']],
queues_db=self.gerrit_queues_dbs[con_config['server']], queues_db=self.gerrit_queues_dbs[con_config['server']],
upstream_root=self.upstream_root upstream_root=self.upstream_root
) )
setattr(self, 'fake_' + con_name, self.connections[con_name]) setattr(self, 'fake_' + con_name,
self.connections.connections[con_name])
elif con_driver == 'smtp': elif con_driver == 'smtp':
self.connections[con_name] = \ self.connections.connections[con_name] = \
zuul.connection.smtp.SMTPConnection(con_name, con_config) zuul.connection.smtp.SMTPConnection(con_name, con_config)
else: else:
raise Exception("Unknown driver, %s, for connection %s" raise Exception("Unknown driver, %s, for connection %s"
@ -1039,20 +1044,24 @@ class ZuulTestCase(BaseTestCase):
self.gerrit_changes_dbs['gerrit'] = {} self.gerrit_changes_dbs['gerrit'] = {}
self.gerrit_queues_dbs['gerrit'] = Queue.Queue() self.gerrit_queues_dbs['gerrit'] = Queue.Queue()
self.event_queues.append(self.gerrit_queues_dbs['gerrit']) self.event_queues.append(self.gerrit_queues_dbs['gerrit'])
self.connections['gerrit'] = FakeGerritConnection( self.connections.connections['gerrit'] = FakeGerritConnection(
'_legacy_gerrit', dict(self.config.items('gerrit')), '_legacy_gerrit', dict(self.config.items('gerrit')),
changes_db=self.gerrit_changes_dbs['gerrit'], changes_db=self.gerrit_changes_dbs['gerrit'],
queues_db=self.gerrit_queues_dbs['gerrit']) queues_db=self.gerrit_queues_dbs['gerrit'])
if 'smtp' in self.config.sections(): if 'smtp' in self.config.sections():
self.connections['smtp'] = \ self.connections.connections['smtp'] = \
zuul.connection.smtp.SMTPConnection( zuul.connection.smtp.SMTPConnection(
'_legacy_smtp', dict(self.config.items('smtp'))) '_legacy_smtp', dict(self.config.items('smtp')))
def setup_config(self, config_file='zuul.conf'): def setup_config(self):
"""Per test config object. Override to set different config.""" """Per test config object. Override to set different config."""
self.config = ConfigParser.ConfigParser() self.config = ConfigParser.ConfigParser()
self.config.read(os.path.join(FIXTURE_DIR, config_file)) self.config.read(os.path.join(FIXTURE_DIR, self.config_file))
def setup_repos(self):
"""Subclasses can override to manipulate repos before tests"""
pass
def assertFinalState(self): def assertFinalState(self):
# Make sure that git.Repo objects have been garbage collected. # Make sure that git.Repo objects have been garbage collected.
@ -1063,10 +1072,10 @@ class ZuulTestCase(BaseTestCase):
repos.append(obj) repos.append(obj)
self.assertEqual(len(repos), 0) self.assertEqual(len(repos), 0)
self.assertEmptyQueues() self.assertEmptyQueues()
ipm = zuul.manager.independent.IndependentPipelineManager
for tenant in self.sched.abide.tenants.values(): for tenant in self.sched.abide.tenants.values():
for pipeline in tenant.layout.pipelines.values(): for pipeline in tenant.layout.pipelines.values():
if isinstance(pipeline.manager, if isinstance(pipeline.manager, ipm):
zuul.scheduler.IndependentPipelineManager):
self.assertEqual(len(pipeline.queues), 0) self.assertEqual(len(pipeline.queues), 0)
def shutdown(self): def shutdown(self):

View File

@ -1,6 +1,6 @@
pipelines: pipelines:
- name: check - name: check
manager: IndependentPipelineManager manager: independent
source: source:
gerrit gerrit
trigger: trigger:
@ -14,7 +14,7 @@ pipelines:
verified: -1 verified: -1
- name: tenant-one-gate - name: tenant-one-gate
manager: DependentPipelineManager manager: dependent
success-message: Build succeeded (tenant-one-gate). success-message: Build succeeded (tenant-one-gate).
source: source:
gerrit gerrit

View File

@ -2,7 +2,7 @@
server=127.0.0.1 server=127.0.0.1
[zuul] [zuul]
tenant_config=tests/fixtures/config/in-repo/main.yaml tenant_config=config/in-repo/main.yaml
url_pattern=http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number} url_pattern=http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number}
job_name_in_report=true job_name_in_report=true

View File

@ -1,6 +1,6 @@
pipelines: pipelines:
- name: check - name: check
manager: IndependentPipelineManager manager: independent
source: source:
gerrit gerrit
trigger: trigger:

View File

@ -1,6 +1,6 @@
pipelines: pipelines:
- name: tenant-one-gate - name: tenant-one-gate
manager: DependentPipelineManager manager: dependent
success-message: Build succeeded (tenant-one-gate). success-message: Build succeeded (tenant-one-gate).
source: source:
gerrit gerrit
@ -21,6 +21,10 @@ pipelines:
verified: 0 verified: 0
precedence: high precedence: high
jobs:
- name:
project1-test1
projects: projects:
- name: org/project1 - name: org/project1
check: check:

View File

@ -1,6 +1,6 @@
pipelines: pipelines:
- name: tenant-two-gate - name: tenant-two-gate
manager: DependentPipelineManager manager: dependent
success-message: Build succeeded (tenant-two-gate). success-message: Build succeeded (tenant-two-gate).
source: source:
gerrit gerrit
@ -21,6 +21,10 @@ pipelines:
verified: 0 verified: 0
precedence: high precedence: high
jobs:
- name:
project2-test1
projects: projects:
- name: org/project2 - name: org/project2
check: check:

View File

@ -2,7 +2,7 @@
server=127.0.0.1 server=127.0.0.1
[zuul] [zuul]
tenant_config=tests/fixtures/config/multi-tenant/main.yaml tenant_config=config/multi-tenant/main.yaml
url_pattern=http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number} url_pattern=http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number}
job_name_in_report=true job_name_in_report=true

View File

@ -3,7 +3,7 @@ includes:
pipelines: pipelines:
- name: check - name: check
manager: IndependentPipelineManager manager: independent
source: source:
gerrit gerrit
trigger: trigger:
@ -17,7 +17,7 @@ pipelines:
verified: -1 verified: -1
- name: post - name: post
manager: IndependentPipelineManager manager: independent
source: source:
gerrit gerrit
trigger: trigger:
@ -26,7 +26,7 @@ pipelines:
ref: ^(?!refs/).*$ ref: ^(?!refs/).*$
- name: gate - name: gate
manager: DependentPipelineManager manager: dependent
failure-message: Build failed. For information on how to proceed, see http://wiki.example.org/Test_Failures failure-message: Build failed. For information on how to proceed, see http://wiki.example.org/Test_Failures
source: source:
gerrit gerrit
@ -48,7 +48,7 @@ pipelines:
precedence: high precedence: high
- name: unused - name: unused
manager: IndependentPipelineManager manager: independent
dequeue-on-new-patchset: false dequeue-on-new-patchset: false
source: source:
gerrit gerrit
@ -59,7 +59,7 @@ pipelines:
- approved: 1 - approved: 1
- name: dup1 - name: dup1
manager: IndependentPipelineManager manager: independent
source: source:
gerrit gerrit
trigger: trigger:
@ -73,7 +73,7 @@ pipelines:
verified: -1 verified: -1
- name: dup2 - name: dup2
manager: IndependentPipelineManager manager: independent
source: source:
gerrit gerrit
trigger: trigger:
@ -87,7 +87,7 @@ pipelines:
verified: -1 verified: -1
- name: conflict - name: conflict
manager: DependentPipelineManager manager: dependent
failure-message: Build failed. For information on how to proceed, see http://wiki.example.org/Test_Failures failure-message: Build failed. For information on how to proceed, see http://wiki.example.org/Test_Failures
source: source:
gerrit gerrit
@ -108,7 +108,7 @@ pipelines:
verified: 0 verified: 0
- name: experimental - name: experimental
manager: IndependentPipelineManager manager: independent
source: source:
gerrit gerrit
trigger: trigger:

View File

@ -31,6 +31,9 @@ LAYOUT_RE = re.compile(r'^(good|bad)_.*\.yaml$')
class TestLayoutValidator(testtools.TestCase): class TestLayoutValidator(testtools.TestCase):
def setUp(self):
self.skip("Disabled for early v3 development")
def test_layouts(self): def test_layouts(self):
"""Test layout file validation""" """Test layout file validation"""
print print

View File

@ -34,8 +34,11 @@ class TestMergerRepo(ZuulTestCase):
workspace_root = None workspace_root = None
def setUp(self): def setUp(self):
super(TestMergerRepo, self).setUp() self.skip("Disabled for early v3 development")
self.workspace_root = os.path.join(self.test_root, 'workspace')
# def setUp(self):
# super(TestMergerRepo, self).setUp()
# self.workspace_root = os.path.join(self.test_root, 'workspace')
def test_ensure_cloned(self): def test_ensure_cloned(self):
parent_path = os.path.join(self.upstream_root, 'org/project1') parent_path = os.path.join(self.upstream_root, 'org/project1')

View File

@ -12,8 +12,8 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from zuul import change_matcher as cm
from zuul import model from zuul import model
from zuul import configloader
from tests.base import BaseTestCase from tests.base import BaseTestCase
@ -22,11 +22,12 @@ class TestJob(BaseTestCase):
@property @property
def job(self): def job(self):
job = model.Job('job') layout = model.Layout()
job.skip_if_matcher = cm.MatchAll([ job = configloader.JobParser.fromYaml(layout, {
cm.ProjectMatcher('^project$'), 'name': 'job',
cm.MatchAllFiles([cm.FileMatcher('^docs/.*$')]), 'irrelevant-files': [
]) '^docs/.*$'
]})
return job return job
def test_change_matches_returns_false_for_matched_skip_if(self): def test_change_matches_returns_false_for_matched_skip_if(self):
@ -39,10 +40,61 @@ class TestJob(BaseTestCase):
change.files = ['foo'] change.files = ['foo']
self.assertTrue(self.job.changeMatches(change)) self.assertTrue(self.job.changeMatches(change))
def _assert_job_booleans_are_not_none(self, job):
self.assertIsNotNone(job.voting)
self.assertIsNotNone(job.hold_following_changes)
def test_job_sets_defaults_for_boolean_attributes(self): def test_job_sets_defaults_for_boolean_attributes(self):
job = model.Job('job') self.assertIsNotNone(self.job.voting)
self._assert_job_booleans_are_not_none(job)
def test_job_inheritance(self):
layout = model.Layout()
base = configloader.JobParser.fromYaml(layout, {
'name': 'base',
'timeout': 30,
})
layout.addJob(base)
python27 = configloader.JobParser.fromYaml(layout, {
'name': 'python27',
'parent': 'base',
'timeout': 40,
})
layout.addJob(python27)
python27diablo = configloader.JobParser.fromYaml(layout, {
'name': 'python27',
'branches': [
'stable/diablo'
],
'timeout': 50,
})
layout.addJob(python27diablo)
pipeline = model.Pipeline('gate', layout)
layout.addPipeline(pipeline)
queue = model.ChangeQueue(pipeline)
project = model.Project('project')
tree = pipeline.addProject(project)
tree.addJob(layout.getJob('python27'))
change = model.Change(project)
change.branch = 'master'
item = queue.enqueueChange(change)
self.assertTrue(base.changeMatches(change))
self.assertTrue(python27.changeMatches(change))
self.assertFalse(python27diablo.changeMatches(change))
item.freezeJobTree()
self.assertEqual(len(item.getJobs()), 1)
job = item.getJobs()[0]
self.assertEqual(job.name, 'python27')
self.assertEqual(job.timeout, 40)
change.branch = 'stable/diablo'
self.assertTrue(base.changeMatches(change))
self.assertTrue(python27.changeMatches(change))
self.assertTrue(python27diablo.changeMatches(change))
item.freezeJobTree()
self.assertEqual(len(item.getJobs()), 1)
job = item.getJobs()[0]
self.assertEqual(job.name, 'python27')
self.assertEqual(job.timeout, 50)

View File

@ -46,6 +46,9 @@ logging.basicConfig(level=logging.DEBUG,
class TestSchedulerConfigParsing(BaseTestCase): class TestSchedulerConfigParsing(BaseTestCase):
def setUp(self):
self.skip("Disabled for early v3 development")
def test_parse_skip_if(self): def test_parse_skip_if(self):
job_yaml = """ job_yaml = """
jobs: jobs:

View File

@ -26,13 +26,12 @@ logging.basicConfig(level=logging.DEBUG,
'%(levelname)-8s %(message)s') '%(levelname)-8s %(message)s')
class TestV3(ZuulTestCase): class TestMultipleTenants(ZuulTestCase):
# A temporary class to hold new tests while others are disabled # A temporary class to hold new tests while others are disabled
def test_multiple_tenants(self): config_file = 'config/multi-tenant/zuul.conf'
self.setup_config('config/multi-tenant/zuul.conf')
self.sched.reconfigure(self.config)
def test_multiple_tenants(self):
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
A.addApproval('CRVW', 2) A.addApproval('CRVW', 2)
self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
@ -64,9 +63,18 @@ class TestV3(ZuulTestCase):
self.assertEqual(A.reported, 2, "Activity in tenant two should" self.assertEqual(A.reported, 2, "Activity in tenant two should"
"not affect tenant one") "not affect tenant one")
def test_in_repo_config(self):
class TestInRepoConfig(ZuulTestCase):
# A temporary class to hold new tests while others are disabled
config_file = 'config/in-repo/zuul.conf'
def setup_repos(self):
in_repo_conf = textwrap.dedent( in_repo_conf = textwrap.dedent(
""" """
jobs:
- name: project-test1
projects: projects:
- name: org/project - name: org/project
tenant-one-gate: tenant-one-gate:
@ -76,9 +84,7 @@ class TestV3(ZuulTestCase):
self.addCommitToRepo('org/project', 'add zuul conf', self.addCommitToRepo('org/project', 'add zuul conf',
{'.zuul.yaml': in_repo_conf}) {'.zuul.yaml': in_repo_conf})
self.setup_config('config/in-repo/zuul.conf') def test_in_repo_config(self):
self.sched.reconfigure(self.config)
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A') A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('CRVW', 2) A.addApproval('CRVW', 2)
self.fake_gerrit.addEvent(A.addApproval('APRV', 1)) self.fake_gerrit.addEvent(A.addApproval('APRV', 1))

View File

@ -90,6 +90,6 @@ class ZuulApp(object):
else: else:
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
def configure_connections(self): def configure_connections(self, sched):
self.connections = zuul.lib.connections.configure_connections( self.connections = zuul.lib.connections.ConnectionRegistry()
self.config) self.connections.configure(self.config, sched)

View File

@ -177,7 +177,7 @@ class Server(zuul.cmd.ZuulApp):
webapp = zuul.webapp.WebApp(self.sched, cache_expiry=cache_expiry) webapp = zuul.webapp.WebApp(self.sched, cache_expiry=cache_expiry)
rpc = zuul.rpclistener.RPCListener(self.config, self.sched) rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
self.configure_connections() self.configure_connections(self.sched)
self.sched.setLauncher(gearman) self.sched.setLauncher(gearman)
self.sched.setMerger(merger) self.sched.setMerger(merger)

449
zuul/configloader.py Normal file
View File

@ -0,0 +1,449 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import logging
import yaml
import voluptuous as vs
import model
import zuul.manager
import zuul.manager.dependent
import zuul.manager.independent
from zuul import change_matcher
# Several forms accept either a single item or a list, this makes
# specifying that in the schema easy (and explicit).
def to_list(x):
return vs.Any([x], x)
def as_list(item):
if not item:
return []
if isinstance(item, list):
return item
return [item]
def extend_dict(a, b):
"""Extend dictionary a (which will be modified in place) with the
contents of b. This is designed for Zuul yaml files which are
typically dictionaries of lists of dictionaries, e.g.,
{'pipelines': ['name': 'gate']}. If two such dictionaries each
define a pipeline, the result will be a single dictionary with
a pipelines entry whose value is a two-element list."""
for k, v in b.items():
if k not in a:
a[k] = v
elif isinstance(v, dict) and isinstance(a[k], dict):
extend_dict(a[k], v)
elif isinstance(v, list) and isinstance(a[k], list):
a[k] += v
elif isinstance(v, list):
a[k] = [a[k]] + v
elif isinstance(a[k], list):
a[k] += [v]
else:
raise Exception("Unhandled case in extend_dict at %s" % (k,))
def deep_format(obj, paramdict):
"""Apply the paramdict via str.format() to all string objects found within
the supplied obj. Lists and dicts are traversed recursively.
Borrowed from Jenkins Job Builder project"""
if isinstance(obj, str):
ret = obj.format(**paramdict)
elif isinstance(obj, list):
ret = []
for item in obj:
ret.append(deep_format(item, paramdict))
elif isinstance(obj, dict):
ret = {}
for item in obj:
exp_item = item.format(**paramdict)
ret[exp_item] = deep_format(obj[item], paramdict)
else:
ret = obj
return ret
class JobParser(object):
@staticmethod
def getSchema():
# TODOv3(jeblair, jhesketh): move to auth
swift = {vs.Required('name'): str,
'container': str,
'expiry': int,
'max_file_size': int,
'max-file-size': int,
'max_file_count': int,
'max-file-count': int,
'logserver_prefix': str,
'logserver-prefix': str,
}
job = {vs.Required('name'): str,
'parent': str,
'queue-name': str,
'failure-message': str,
'success-message': str,
'failure-url': str,
'success-url': str,
'voting': bool,
'branches': to_list(str),
'files': to_list(str),
'swift': to_list(swift),
'irrelevant-files': to_list(str),
'timeout': int,
}
return vs.Schema(job)
@staticmethod
def fromYaml(layout, conf):
JobParser.getSchema()(conf)
job = model.Job(conf['name'])
if 'parent' in conf:
parent = layout.getJob(conf['parent'])
job.inheritFrom(parent)
job.timeout = conf.get('timeout', job.timeout)
job.workspace = conf.get('workspace', job.workspace)
job.pre_run = as_list(conf.get('pre-run', job.pre_run))
job.post_run = as_list(conf.get('post-run', job.post_run))
job.voting = conf.get('voting', True)
job.failure_message = conf.get('failure-message', job.failure_message)
job.success_message = conf.get('success-message', job.success_message)
job.failure_url = conf.get('failure-url', job.failure_url)
job.success_url = conf.get('success-url', job.success_url)
if 'branches' in conf:
matchers = []
for branch in as_list(conf['branches']):
matchers.append(change_matcher.BranchMatcher(branch))
job.branch_matcher = change_matcher.MatchAny(matchers)
if 'files' in conf:
matchers = []
for fn in as_list(conf['files']):
matchers.append(change_matcher.FileMatcher(fn))
job.file_matcher = change_matcher.MatchAny(matchers)
if 'irrelevant-files' in conf:
matchers = []
for fn in as_list(conf['irrelevant-files']):
matchers.append(change_matcher.FileMatcher(fn))
job.irrelevant_file_matcher = change_matcher.MatchAllFiles(
matchers)
return job
class AbideValidator(object):
tenant_source = vs.Schema({'repos': [str]})
def validateTenantSources(self, connections):
def v(value, path=[]):
if isinstance(value, dict):
for k, val in value.items():
connections.getSource(k)
self.validateTenantSource(val, path + [k])
else:
raise vs.Invalid("Invalid tenant source", path)
return v
def validateTenantSource(self, value, path=[]):
self.tenant_source(value)
def getSchema(self, connections=None):
tenant = {vs.Required('name'): str,
'include': to_list(str),
'source': self.validateTenantSources(connections)}
schema = vs.Schema({'tenants': [tenant]})
return schema
def validate(self, data, connections=None):
schema = self.getSchema(connections)
schema(data)
class ConfigLoader(object):
log = logging.getLogger("zuul.ConfigLoader")
# A set of reporter configuration keys to action mapping
reporter_actions = {
'start': 'start_actions',
'success': 'success_actions',
'failure': 'failure_actions',
'merge-failure': 'merge_failure_actions',
'disabled': 'disabled_actions',
}
def loadConfig(self, config_path, scheduler, merger, connections):
abide = model.Abide()
if config_path:
config_path = os.path.expanduser(config_path)
if not os.path.exists(config_path):
raise Exception("Unable to read tenant config file at %s" %
config_path)
with open(config_path) as config_file:
self.log.info("Loading configuration from %s" % (config_path,))
data = yaml.load(config_file)
base = os.path.dirname(os.path.realpath(config_path))
validator = AbideValidator()
validator.validate(data, connections)
for conf_tenant in data['tenants']:
tenant = model.Tenant(conf_tenant['name'])
abide.tenants[tenant.name] = tenant
tenant_config = {}
for fn in conf_tenant.get('include', []):
if not os.path.isabs(fn):
fn = os.path.join(base, fn)
fn = os.path.expanduser(fn)
with open(fn) as config_file:
self.log.info("Loading configuration from %s" % (fn,))
incdata = yaml.load(config_file)
extend_dict(tenant_config, incdata)
incdata = self._loadTenantInRepoLayouts(merger, connections,
conf_tenant)
extend_dict(tenant_config, incdata)
tenant.layout = self._parseLayout(base, tenant_config,
scheduler, connections)
return abide
def _parseLayout(self, base, data, scheduler, connections):
layout = model.Layout()
project_templates = {}
# TODOv3(jeblair): add validation
# validator = layoutvalidator.LayoutValidator()
# validator.validate(data, connections)
config_env = {}
for include in data.get('includes', []):
if 'python-file' in include:
fn = include['python-file']
if not os.path.isabs(fn):
fn = os.path.join(base, fn)
fn = os.path.expanduser(fn)
execfile(fn, config_env)
for conf_pipeline in data.get('pipelines', []):
pipeline = model.Pipeline(conf_pipeline['name'], layout)
pipeline.description = conf_pipeline.get('description')
pipeline.source = connections.getSource(conf_pipeline['source'])
precedence = model.PRECEDENCE_MAP[conf_pipeline.get('precedence')]
pipeline.precedence = precedence
pipeline.failure_message = conf_pipeline.get('failure-message',
"Build failed.")
pipeline.merge_failure_message = conf_pipeline.get(
'merge-failure-message', "Merge Failed.\n\nThis change or one "
"of its cross-repo dependencies was unable to be "
"automatically merged with the current state of its "
"repository. Please rebase the change and upload a new "
"patchset.")
pipeline.success_message = conf_pipeline.get('success-message',
"Build succeeded.")
pipeline.footer_message = conf_pipeline.get('footer-message', "")
pipeline.dequeue_on_new_patchset = conf_pipeline.get(
'dequeue-on-new-patchset', True)
pipeline.ignore_dependencies = conf_pipeline.get(
'ignore-dependencies', False)
for conf_key, action in self.reporter_actions.items():
reporter_set = []
if conf_pipeline.get(conf_key):
for reporter_name, params \
in conf_pipeline.get(conf_key).items():
reporter = connections.getReporter(reporter_name,
params)
reporter.setAction(conf_key)
reporter_set.append(reporter)
setattr(pipeline, action, reporter_set)
# If merge-failure actions aren't explicit, use the failure actions
if not pipeline.merge_failure_actions:
pipeline.merge_failure_actions = pipeline.failure_actions
pipeline.disable_at = conf_pipeline.get(
'disable-after-consecutive-failures', None)
pipeline.window = conf_pipeline.get('window', 20)
pipeline.window_floor = conf_pipeline.get('window-floor', 3)
pipeline.window_increase_type = conf_pipeline.get(
'window-increase-type', 'linear')
pipeline.window_increase_factor = conf_pipeline.get(
'window-increase-factor', 1)
pipeline.window_decrease_type = conf_pipeline.get(
'window-decrease-type', 'exponential')
pipeline.window_decrease_factor = conf_pipeline.get(
'window-decrease-factor', 2)
manager_name = conf_pipeline['manager']
if manager_name == 'dependent':
manager = zuul.manager.dependent.DependentPipelineManager(
scheduler, pipeline)
elif manager_name == 'independent':
manager = zuul.manager.independent.IndependentPipelineManager(
scheduler, pipeline)
pipeline.setManager(manager)
layout.pipelines[conf_pipeline['name']] = pipeline
if 'require' in conf_pipeline or 'reject' in conf_pipeline:
require = conf_pipeline.get('require', {})
reject = conf_pipeline.get('reject', {})
f = model.ChangeishFilter(
open=require.get('open'),
current_patchset=require.get('current-patchset'),
statuses=to_list(require.get('status')),
required_approvals=to_list(require.get('approval')),
reject_approvals=to_list(reject.get('approval'))
)
manager.changeish_filters.append(f)
for trigger_name, trigger_config\
in conf_pipeline.get('trigger').items():
trigger = connections.getTrigger(trigger_name, trigger_config)
pipeline.triggers.append(trigger)
# TODO: move
manager.event_filters += trigger.getEventFilters(
conf_pipeline['trigger'][trigger_name])
for project_template in data.get('project-templates', []):
# Make sure the template only contains valid pipelines
tpl = dict(
(pipe_name, project_template.get(pipe_name))
for pipe_name in layout.pipelines.keys()
if pipe_name in project_template
)
project_templates[project_template.get('name')] = tpl
for config_job in data.get('jobs', []):
layout.addJob(JobParser.fromYaml(layout, config_job))
def add_jobs(job_tree, config_jobs):
for job in config_jobs:
if isinstance(job, list):
for x in job:
add_jobs(job_tree, x)
if isinstance(job, dict):
for parent, children in job.items():
parent_tree = job_tree.addJob(layout.getJob(parent))
add_jobs(parent_tree, children)
if isinstance(job, str):
job_tree.addJob(layout.getJob(job))
for config_project in data.get('projects', []):
shortname = config_project['name'].split('/')[-1]
# This is reversed due to the prepend operation below, so
# the ultimate order is templates (in order) followed by
# statically defined jobs.
for requested_template in reversed(
config_project.get('template', [])):
# Fetch the template from 'project-templates'
tpl = project_templates.get(
requested_template.get('name'))
# Expand it with the project context
requested_template['name'] = shortname
expanded = deep_format(tpl, requested_template)
# Finally merge the expansion with whatever has been
# already defined for this project. Prepend our new
# jobs to existing ones (which may have been
# statically defined or defined by other templates).
for pipeline in layout.pipelines.values():
if pipeline.name in expanded:
config_project.update(
{pipeline.name: expanded[pipeline.name] +
config_project.get(pipeline.name, [])})
mode = config_project.get('merge-mode', 'merge-resolve')
for pipeline in layout.pipelines.values():
if pipeline.name in config_project:
project = pipeline.source.getProject(
config_project['name'])
project.merge_mode = model.MERGER_MAP[mode]
job_tree = pipeline.addProject(project)
config_jobs = config_project[pipeline.name]
add_jobs(job_tree, config_jobs)
for pipeline in layout.pipelines.values():
pipeline.manager._postConfig(layout)
return layout
def _loadTenantInRepoLayouts(self, merger, connections, conf_tenant):
config = {}
jobs = []
for source_name, conf_source in conf_tenant.get('source', {}).items():
source = connections.getSource(source_name)
for conf_repo in conf_source.get('repos'):
project = source.getProject(conf_repo)
url = source.getGitUrl(project)
# TODOv3(jeblair): config should be branch specific
job = merger.getFiles(project.name, url, 'master',
files=['.zuul.yaml'])
job.project = project
jobs.append(job)
for job in jobs:
self.log.debug("Waiting for cat job %s" % (job,))
job.wait()
if job.files.get('.zuul.yaml'):
self.log.info("Loading configuration from %s/.zuul.yaml" %
(job.project,))
incdata = self._parseInRepoLayout(job.files['.zuul.yaml'])
extend_dict(config, incdata)
return config
def _parseInRepoLayout(self, data):
# TODOv3(jeblair): this should implement some rules to protect
# aspects of the config that should not be changed in-repo
return yaml.load(data)
def _parseSkipIf(self, config_job):
cm = change_matcher
skip_matchers = []
for config_skip in config_job.get('skip-if', []):
nested_matchers = []
project_regex = config_skip.get('project')
if project_regex:
nested_matchers.append(cm.ProjectMatcher(project_regex))
branch_regex = config_skip.get('branch')
if branch_regex:
nested_matchers.append(cm.BranchMatcher(branch_regex))
file_regexes = to_list(config_skip.get('all-files-match-any'))
if file_regexes:
file_matchers = [cm.FileMatcher(x) for x in file_regexes]
all_files_matcher = cm.MatchAllFiles(file_matchers)
nested_matchers.append(all_files_matcher)
# All patterns need to match a given skip-if predicate
skip_matchers.append(cm.MatchAll(nested_matchers))
if skip_matchers:
# Any skip-if predicate can be matched to trigger a skip
return cm.MatchAny(skip_matchers)

View File

@ -25,7 +25,7 @@ import voluptuous as v
import urllib2 import urllib2
from zuul.connection import BaseConnection from zuul.connection import BaseConnection
from zuul.model import TriggerEvent from zuul.model import TriggerEvent, Project
class GerritEventConnector(threading.Thread): class GerritEventConnector(threading.Thread):
@ -221,8 +221,14 @@ class GerritConnection(BaseConnection):
'https://%s' % self.server) 'https://%s' % self.server)
self._change_cache = {} self._change_cache = {}
self.projects = {}
self.gerrit_event_connector = None self.gerrit_event_connector = None
def getProject(self, name):
if name not in self.projects:
self.projects[name] = Project(name)
return self.projects[name]
def getCachedChange(self, key): def getCachedChange(self, key):
if key in self._change_cache: if key in self._change_cache:
return self._change_cache.get(key) return self._change_cache.get(key)

View File

@ -18,9 +18,24 @@ import zuul.connection.gerrit
import zuul.connection.smtp import zuul.connection.smtp
def configure_connections(config): class ConnectionRegistry(object):
# Register connections from the config """A registry of connections"""
def __init__(self, sched):
self.connections = {}
self.sched = sched # TODOv3(jeblair): remove (abstraction violation)
def registerScheduler(self, sched):
for connection_name, connection in self.connections.items():
connection.registerScheduler(sched)
connection.onLoad()
def stop(self):
for connection_name, connection in self.connections.items():
connection.onStop()
def configure(self, config):
# Register connections from the config
# TODO(jhesketh): import connection modules dynamically # TODO(jhesketh): import connection modules dynamically
connections = {} connections = {}
@ -63,4 +78,53 @@ def configure_connections(config):
zuul.connection.smtp.SMTPConnection( zuul.connection.smtp.SMTPConnection(
'_legacy_smtp', dict(config.items('smtp'))) '_legacy_smtp', dict(config.items('smtp')))
return connections self.connections = connections
def _getDriver(self, dtype, connection_name, driver_config={}):
# Instantiate a driver such as a trigger, source or reporter
# TODO(jhesketh): Make this list dynamic or use entrypoints etc.
# Stevedore was not a good fit here due to the nature of triggers.
# Specifically we don't want to load a trigger per a pipeline as one
# trigger can listen to a stream (from gerrit, for example) and the
# scheduler decides which eventfilter to use. As such we want to load
# trigger+connection pairs uniquely.
drivers = {
'source': {
'gerrit': 'zuul.source.gerrit:GerritSource',
},
'trigger': {
'gerrit': 'zuul.trigger.gerrit:GerritTrigger',
'timer': 'zuul.trigger.timer:TimerTrigger',
'zuul': 'zuul.trigger.zuultrigger:ZuulTrigger',
},
'reporter': {
'gerrit': 'zuul.reporter.gerrit:GerritReporter',
'smtp': 'zuul.reporter.smtp:SMTPReporter',
},
}
# TODO(jhesketh): Check the connection_name exists
if connection_name in self.connections.keys():
driver_name = self.connections[connection_name].driver_name
connection = self.connections[connection_name]
else:
# In some cases a driver may not be related to a connection. For
# example, the 'timer' or 'zuul' triggers.
driver_name = connection_name
connection = None
driver = drivers[dtype][driver_name].split(':')
driver_instance = getattr(
__import__(driver[0], fromlist=['']), driver[1])(
driver_config, self.sched, connection
)
return driver_instance
def getSource(self, connection_name):
return self._getDriver('source', connection_name)
def getReporter(self, connection_name, driver_config={}):
return self._getDriver('reporter', connection_name, driver_config)
def getTrigger(self, connection_name, driver_config={}):
return self._getDriver('trigger', connection_name, driver_config)

773
zuul/manager/__init__.py Normal file
View File

@ -0,0 +1,773 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import extras
import logging
from zuul import exceptions
from zuul.model import NullChange
statsd = extras.try_import('statsd.statsd')
class DynamicChangeQueueContextManager(object):
def __init__(self, change_queue):
self.change_queue = change_queue
def __enter__(self):
return self.change_queue
def __exit__(self, etype, value, tb):
if self.change_queue and not self.change_queue.queue:
self.change_queue.pipeline.removeQueue(self.change_queue)
class StaticChangeQueueContextManager(object):
def __init__(self, change_queue):
self.change_queue = change_queue
def __enter__(self):
return self.change_queue
def __exit__(self, etype, value, tb):
pass
class BasePipelineManager(object):
log = logging.getLogger("zuul.BasePipelineManager")
def __init__(self, sched, pipeline):
self.sched = sched
self.pipeline = pipeline
self.event_filters = []
self.changeish_filters = []
def __str__(self):
return "<%s %s>" % (self.__class__.__name__, self.pipeline.name)
def _postConfig(self, layout):
self.log.info("Configured Pipeline Manager %s" % self.pipeline.name)
self.log.info(" Source: %s" % self.pipeline.source)
self.log.info(" Requirements:")
for f in self.changeish_filters:
self.log.info(" %s" % f)
self.log.info(" Events:")
for e in self.event_filters:
self.log.info(" %s" % e)
self.log.info(" Projects:")
def log_jobs(tree, indent=0):
istr = ' ' + ' ' * indent
if tree.job:
efilters = ''
for b in tree.job._branches:
efilters += str(b)
for f in tree.job._files:
efilters += str(f)
if tree.job.skip_if_matcher:
efilters += str(tree.job.skip_if_matcher)
if efilters:
efilters = ' ' + efilters
hold = ''
if tree.job.hold_following_changes:
hold = ' [hold]'
voting = ''
if not tree.job.voting:
voting = ' [nonvoting]'
self.log.info("%s%s%s%s%s" % (istr, repr(tree.job),
efilters, hold, voting))
for x in tree.job_trees:
log_jobs(x, indent + 2)
for p in layout.projects.values():
tree = self.pipeline.getJobTree(p)
if tree:
self.log.info(" %s" % p)
log_jobs(tree)
self.log.info(" On start:")
self.log.info(" %s" % self.pipeline.start_actions)
self.log.info(" On success:")
self.log.info(" %s" % self.pipeline.success_actions)
self.log.info(" On failure:")
self.log.info(" %s" % self.pipeline.failure_actions)
self.log.info(" On merge-failure:")
self.log.info(" %s" % self.pipeline.merge_failure_actions)
self.log.info(" When disabled:")
self.log.info(" %s" % self.pipeline.disabled_actions)
def getSubmitAllowNeeds(self):
# Get a list of code review labels that are allowed to be
# "needed" in the submit records for a change, with respect
# to this queue. In other words, the list of review labels
# this queue itself is likely to set before submitting.
allow_needs = set()
for action_reporter in self.pipeline.success_actions:
allow_needs.update(action_reporter.getSubmitAllowNeeds())
return allow_needs
def eventMatches(self, event, change):
if event.forced_pipeline:
if event.forced_pipeline == self.pipeline.name:
self.log.debug("Event %s for change %s was directly assigned "
"to pipeline %s" % (event, change, self))
return True
else:
return False
for ef in self.event_filters:
if ef.matches(event, change):
self.log.debug("Event %s for change %s matched %s "
"in pipeline %s" % (event, change, ef, self))
return True
return False
def isChangeAlreadyInPipeline(self, change):
# Checks live items in the pipeline
for item in self.pipeline.getAllItems():
if item.live and change.equals(item.change):
return True
return False
def isChangeAlreadyInQueue(self, change, change_queue):
# Checks any item in the specified change queue
for item in change_queue.queue:
if change.equals(item.change):
return True
return False
def reportStart(self, item):
if not self.pipeline._disabled:
try:
self.log.info("Reporting start, action %s item %s" %
(self.pipeline.start_actions, item))
ret = self.sendReport(self.pipeline.start_actions,
self.pipeline.source, item)
if ret:
self.log.error("Reporting item start %s received: %s" %
(item, ret))
except:
self.log.exception("Exception while reporting start:")
def sendReport(self, action_reporters, source, item,
message=None):
"""Sends the built message off to configured reporters.
Takes the action_reporters, item, message and extra options and
sends them to the pluggable reporters.
"""
report_errors = []
if len(action_reporters) > 0:
for reporter in action_reporters:
ret = reporter.report(source, self.pipeline, item)
if ret:
report_errors.append(ret)
if len(report_errors) == 0:
return
return report_errors
def isChangeReadyToBeEnqueued(self, change):
return True
def enqueueChangesAhead(self, change, quiet, ignore_requirements,
change_queue):
return True
def enqueueChangesBehind(self, change, quiet, ignore_requirements,
change_queue):
return True
def checkForChangesNeededBy(self, change, change_queue):
return True
def getFailingDependentItems(self, item):
return None
def getDependentItems(self, item):
orig_item = item
items = []
while item.item_ahead:
items.append(item.item_ahead)
item = item.item_ahead
self.log.info("Change %s depends on changes %s" %
(orig_item.change,
[x.change for x in items]))
return items
def getItemForChange(self, change):
for item in self.pipeline.getAllItems():
if item.change.equals(change):
return item
return None
def findOldVersionOfChangeAlreadyInQueue(self, change):
for item in self.pipeline.getAllItems():
if not item.live:
continue
if change.isUpdateOf(item.change):
return item
return None
def removeOldVersionsOfChange(self, change):
if not self.pipeline.dequeue_on_new_patchset:
return
old_item = self.findOldVersionOfChangeAlreadyInQueue(change)
if old_item:
self.log.debug("Change %s is a new version of %s, removing %s" %
(change, old_item.change, old_item))
self.removeItem(old_item)
def removeAbandonedChange(self, change):
self.log.debug("Change %s abandoned, removing." % change)
for item in self.pipeline.getAllItems():
if not item.live:
continue
if item.change.equals(change):
self.removeItem(item)
def reEnqueueItem(self, item, last_head):
with self.getChangeQueue(item.change, last_head.queue) as change_queue:
if change_queue:
self.log.debug("Re-enqueing change %s in queue %s" %
(item.change, change_queue))
change_queue.enqueueItem(item)
# Re-set build results in case any new jobs have been
# added to the tree.
for build in item.current_build_set.getBuilds():
if build.result:
self.pipeline.setResult(item, build)
# Similarly, reset the item state.
if item.current_build_set.unable_to_merge:
self.pipeline.setUnableToMerge(item)
if item.dequeued_needing_change:
self.pipeline.setDequeuedNeedingChange(item)
self.reportStats(item)
return True
else:
self.log.error("Unable to find change queue for project %s" %
item.change.project)
return False
def addChange(self, change, quiet=False, enqueue_time=None,
ignore_requirements=False, live=True,
change_queue=None):
self.log.debug("Considering adding change %s" % change)
# If we are adding a live change, check if it's a live item
# anywhere in the pipeline. Otherwise, we will perform the
# duplicate check below on the specific change_queue.
if live and self.isChangeAlreadyInPipeline(change):
self.log.debug("Change %s is already in pipeline, "
"ignoring" % change)
return True
if not self.isChangeReadyToBeEnqueued(change):
self.log.debug("Change %s is not ready to be enqueued, ignoring" %
change)
return False
if not ignore_requirements:
for f in self.changeish_filters:
if not f.matches(change):
self.log.debug("Change %s does not match pipeline "
"requirement %s" % (change, f))
return False
with self.getChangeQueue(change, change_queue) as change_queue:
if not change_queue:
self.log.debug("Unable to find change queue for "
"change %s in project %s" %
(change, change.project))
return False
if not self.enqueueChangesAhead(change, quiet, ignore_requirements,
change_queue):
self.log.debug("Failed to enqueue changes "
"ahead of %s" % change)
return False
if self.isChangeAlreadyInQueue(change, change_queue):
self.log.debug("Change %s is already in queue, "
"ignoring" % change)
return True
self.log.debug("Adding change %s to queue %s" %
(change, change_queue))
item = change_queue.enqueueChange(change)
if enqueue_time:
item.enqueue_time = enqueue_time
item.live = live
self.reportStats(item)
if not quiet:
if len(self.pipeline.start_actions) > 0:
self.reportStart(item)
self.enqueueChangesBehind(change, quiet, ignore_requirements,
change_queue)
for trigger in self.sched.triggers.values():
trigger.onChangeEnqueued(item.change, self.pipeline)
return True
def dequeueItem(self, item):
self.log.debug("Removing change %s from queue" % item.change)
item.queue.dequeueItem(item)
def removeItem(self, item):
# Remove an item from the queue, probably because it has been
# superseded by another change.
self.log.debug("Canceling builds behind change: %s "
"because it is being removed." % item.change)
self.cancelJobs(item)
self.dequeueItem(item)
self.reportStats(item)
def _makeMergerItem(self, item):
# Create a dictionary with all info about the item needed by
# the merger.
number = None
patchset = None
oldrev = None
newrev = None
if hasattr(item.change, 'number'):
number = item.change.number
patchset = item.change.patchset
elif hasattr(item.change, 'newrev'):
oldrev = item.change.oldrev
newrev = item.change.newrev
connection_name = self.pipeline.source.connection.connection_name
return dict(project=item.change.project.name,
url=self.pipeline.source.getGitUrl(
item.change.project),
connection_name=connection_name,
merge_mode=item.change.project.merge_mode,
refspec=item.change.refspec,
branch=item.change.branch,
ref=item.current_build_set.ref,
number=number,
patchset=patchset,
oldrev=oldrev,
newrev=newrev,
)
def prepareRef(self, item):
# Returns True if the ref is ready, false otherwise
build_set = item.current_build_set
if build_set.merge_state == build_set.COMPLETE:
return True
if build_set.merge_state == build_set.PENDING:
return False
build_set.merge_state = build_set.PENDING
ref = build_set.ref
if hasattr(item.change, 'refspec') and not ref:
self.log.debug("Preparing ref for: %s" % item.change)
item.current_build_set.setConfiguration()
dependent_items = self.getDependentItems(item)
dependent_items.reverse()
all_items = dependent_items + [item]
merger_items = map(self._makeMergerItem, all_items)
self.sched.merger.mergeChanges(merger_items,
item.current_build_set,
self.pipeline.precedence)
else:
self.log.debug("Preparing update repo for: %s" % item.change)
url = self.pipeline.source.getGitUrl(item.change.project)
self.sched.merger.updateRepo(item.change.project.name,
url, build_set,
self.pipeline.precedence)
return False
def _launchJobs(self, item, jobs):
self.log.debug("Launching jobs for change %s" % item.change)
dependent_items = self.getDependentItems(item)
for job in jobs:
self.log.debug("Found job %s for change %s" % (job, item.change))
try:
build = self.sched.launcher.launch(job, item,
self.pipeline,
dependent_items)
self.log.debug("Adding build %s of job %s to item %s" %
(build, job, item))
item.addBuild(build)
except:
self.log.exception("Exception while launching job %s "
"for change %s:" % (job, item.change))
def launchJobs(self, item):
# TODO(jeblair): This should return a value indicating a job
# was launched. Appears to be a longstanding bug.
jobs = self.pipeline.findJobsToRun(item)
if jobs:
self._launchJobs(item, jobs)
def cancelJobs(self, item, prime=True):
self.log.debug("Cancel jobs for change %s" % item.change)
canceled = False
old_build_set = item.current_build_set
if prime and item.current_build_set.ref:
item.resetAllBuilds()
for build in old_build_set.getBuilds():
try:
self.sched.launcher.cancel(build)
except:
self.log.exception("Exception while canceling build %s "
"for change %s" % (build, item.change))
build.result = 'CANCELED'
canceled = True
self.updateBuildDescriptions(old_build_set)
for item_behind in item.items_behind:
self.log.debug("Canceling jobs for change %s, behind change %s" %
(item_behind.change, item.change))
if self.cancelJobs(item_behind, prime=prime):
canceled = True
return canceled
def _processOneItem(self, item, nnfi):
changed = False
item_ahead = item.item_ahead
if item_ahead and (not item_ahead.live):
item_ahead = None
change_queue = item.queue
failing_reasons = [] # Reasons this item is failing
if self.checkForChangesNeededBy(item.change, change_queue) is not True:
# It's not okay to enqueue this change, we should remove it.
self.log.info("Dequeuing change %s because "
"it can no longer merge" % item.change)
self.cancelJobs(item)
self.dequeueItem(item)
self.pipeline.setDequeuedNeedingChange(item)
if item.live:
try:
self.reportItem(item)
except exceptions.MergeFailure:
pass
return (True, nnfi)
dep_items = self.getFailingDependentItems(item)
actionable = change_queue.isActionable(item)
item.active = actionable
ready = False
if dep_items:
failing_reasons.append('a needed change is failing')
self.cancelJobs(item, prime=False)
else:
item_ahead_merged = False
if (item_ahead and item_ahead.change.is_merged):
item_ahead_merged = True
if (item_ahead != nnfi and not item_ahead_merged):
# Our current base is different than what we expected,
# and it's not because our current base merged. Something
# ahead must have failed.
self.log.info("Resetting builds for change %s because the "
"item ahead, %s, is not the nearest non-failing "
"item, %s" % (item.change, item_ahead, nnfi))
change_queue.moveItem(item, nnfi)
changed = True
self.cancelJobs(item)
if actionable:
ready = self.prepareRef(item)
if item.current_build_set.unable_to_merge:
failing_reasons.append("it has a merge conflict")
ready = False
if actionable and ready and self.launchJobs(item):
changed = True
if self.pipeline.didAnyJobFail(item):
failing_reasons.append("at least one job failed")
if (not item.live) and (not item.items_behind):
failing_reasons.append("is a non-live item with no items behind")
self.dequeueItem(item)
changed = True
if ((not item_ahead) and self.pipeline.areAllJobsComplete(item)
and item.live):
try:
self.reportItem(item)
except exceptions.MergeFailure:
failing_reasons.append("it did not merge")
for item_behind in item.items_behind:
self.log.info("Resetting builds for change %s because the "
"item ahead, %s, failed to merge" %
(item_behind.change, item))
self.cancelJobs(item_behind)
self.dequeueItem(item)
changed = True
elif not failing_reasons and item.live:
nnfi = item
item.current_build_set.failing_reasons = failing_reasons
if failing_reasons:
self.log.debug("%s is a failing item because %s" %
(item, failing_reasons))
return (changed, nnfi)
def processQueue(self):
# Do whatever needs to be done for each change in the queue
self.log.debug("Starting queue processor: %s" % self.pipeline.name)
changed = False
for queue in self.pipeline.queues:
queue_changed = False
nnfi = None # Nearest non-failing item
for item in queue.queue[:]:
item_changed, nnfi = self._processOneItem(
item, nnfi)
if item_changed:
queue_changed = True
self.reportStats(item)
if queue_changed:
changed = True
status = ''
for item in queue.queue:
status += item.formatStatus()
if status:
self.log.debug("Queue %s status is now:\n %s" %
(queue.name, status))
self.log.debug("Finished queue processor: %s (changed: %s)" %
(self.pipeline.name, changed))
return changed
def updateBuildDescriptions(self, build_set):
for build in build_set.getBuilds():
desc = self.formatDescription(build)
self.sched.launcher.setBuildDescription(build, desc)
if build_set.previous_build_set:
for build in build_set.previous_build_set.getBuilds():
desc = self.formatDescription(build)
self.sched.launcher.setBuildDescription(build, desc)
def onBuildStarted(self, build):
self.log.debug("Build %s started" % build)
return True
def onBuildCompleted(self, build):
self.log.debug("Build %s completed" % build)
item = build.build_set.item
self.pipeline.setResult(item, build)
self.log.debug("Item %s status is now:\n %s" %
(item, item.formatStatus()))
return True
def onMergeCompleted(self, event):
build_set = event.build_set
item = build_set.item
build_set.merge_state = build_set.COMPLETE
build_set.zuul_url = event.zuul_url
if event.merged:
build_set.commit = event.commit
elif event.updated:
if not isinstance(item, NullChange):
build_set.commit = item.change.newrev
if not build_set.commit:
self.log.info("Unable to merge change %s" % item.change)
self.pipeline.setUnableToMerge(item)
def reportItem(self, item):
if not item.reported:
# _reportItem() returns True if it failed to report.
item.reported = not self._reportItem(item)
if self.changes_merge:
succeeded = self.pipeline.didAllJobsSucceed(item)
merged = item.reported
if merged:
merged = self.pipeline.source.isMerged(item.change,
item.change.branch)
self.log.info("Reported change %s status: all-succeeded: %s, "
"merged: %s" % (item.change, succeeded, merged))
change_queue = item.queue
if not (succeeded and merged):
self.log.debug("Reported change %s failed tests or failed "
"to merge" % (item.change))
change_queue.decreaseWindowSize()
self.log.debug("%s window size decreased to %s" %
(change_queue, change_queue.window))
raise exceptions.MergeFailure(
"Change %s failed to merge" % item.change)
else:
change_queue.increaseWindowSize()
self.log.debug("%s window size increased to %s" %
(change_queue, change_queue.window))
for trigger in self.sched.triggers.values():
trigger.onChangeMerged(item.change, self.pipeline.source)
def _reportItem(self, item):
self.log.debug("Reporting change %s" % item.change)
ret = True # Means error as returned by trigger.report
if not self.pipeline.getJobs(item):
# We don't send empty reports with +1,
# and the same for -1's (merge failures or transient errors)
# as they cannot be followed by +1's
self.log.debug("No jobs for change %s" % item.change)
actions = []
elif self.pipeline.didAllJobsSucceed(item):
self.log.debug("success %s" % (self.pipeline.success_actions))
actions = self.pipeline.success_actions
item.setReportedResult('SUCCESS')
self.pipeline._consecutive_failures = 0
elif not self.pipeline.didMergerSucceed(item):
actions = self.pipeline.merge_failure_actions
item.setReportedResult('MERGER_FAILURE')
else:
actions = self.pipeline.failure_actions
item.setReportedResult('FAILURE')
self.pipeline._consecutive_failures += 1
if self.pipeline._disabled:
actions = self.pipeline.disabled_actions
# Check here if we should disable so that we only use the disabled
# reporters /after/ the last disable_at failure is still reported as
# normal.
if (self.pipeline.disable_at and not self.pipeline._disabled and
self.pipeline._consecutive_failures >= self.pipeline.disable_at):
self.pipeline._disabled = True
if actions:
try:
self.log.info("Reporting item %s, actions: %s" %
(item, actions))
ret = self.sendReport(actions, self.pipeline.source, item)
if ret:
self.log.error("Reporting item %s received: %s" %
(item, ret))
except:
self.log.exception("Exception while reporting:")
item.setReportedResult('ERROR')
self.updateBuildDescriptions(item.current_build_set)
return ret
def formatDescription(self, build):
concurrent_changes = ''
concurrent_builds = ''
other_builds = ''
for change in build.build_set.other_changes:
concurrent_changes += '<li><a href="{change.url}">\
{change.number},{change.patchset}</a></li>'.format(
change=change)
change = build.build_set.item.change
for build in build.build_set.getBuilds():
if build.url:
concurrent_builds += """\
<li>
<a href="{build.url}">
{build.job.name} #{build.number}</a>: {build.result}
</li>
""".format(build=build)
else:
concurrent_builds += """\
<li>
{build.job.name}: {build.result}
</li>""".format(build=build)
if build.build_set.previous_build_set:
other_build = build.build_set.previous_build_set.getBuild(
build.job.name)
if other_build:
other_builds += """\
<li>
Preceded by: <a href="{build.url}">
{build.job.name} #{build.number}</a>
</li>
""".format(build=other_build)
if build.build_set.next_build_set:
other_build = build.build_set.next_build_set.getBuild(
build.job.name)
if other_build:
other_builds += """\
<li>
Succeeded by: <a href="{build.url}">
{build.job.name} #{build.number}</a>
</li>
""".format(build=other_build)
result = build.build_set.result
if hasattr(change, 'number'):
ret = """\
<p>
Triggered by change:
<a href="{change.url}">{change.number},{change.patchset}</a><br/>
Branch: <b>{change.branch}</b><br/>
Pipeline: <b>{self.pipeline.name}</b>
</p>"""
elif hasattr(change, 'ref'):
ret = """\
<p>
Triggered by reference:
{change.ref}</a><br/>
Old revision: <b>{change.oldrev}</b><br/>
New revision: <b>{change.newrev}</b><br/>
Pipeline: <b>{self.pipeline.name}</b>
</p>"""
else:
ret = ""
if concurrent_changes:
ret += """\
<p>
Other changes tested concurrently with this change:
<ul>{concurrent_changes}</ul>
</p>
"""
if concurrent_builds:
ret += """\
<p>
All builds for this change set:
<ul>{concurrent_builds}</ul>
</p>
"""
if other_builds:
ret += """\
<p>
Other build sets for this change:
<ul>{other_builds}</ul>
</p>
"""
if result:
ret += """\
<p>
Reported result: <b>{result}</b>
</p>
"""
ret = ret.format(**locals())
return ret
def reportStats(self, item):
if not statsd:
return
try:
# Update the gauge on enqueue and dequeue, but timers only
# when dequeing.
if item.dequeue_time:
dt = int((item.dequeue_time - item.enqueue_time) * 1000)
else:
dt = None
items = len(self.pipeline.getAllItems())
# stats.timers.zuul.pipeline.NAME.resident_time
# stats_counts.zuul.pipeline.NAME.total_changes
# stats.gauges.zuul.pipeline.NAME.current_changes
key = 'zuul.pipeline.%s' % self.pipeline.name
statsd.gauge(key + '.current_changes', items)
if dt:
statsd.timing(key + '.resident_time', dt)
statsd.incr(key + '.total_changes')
# stats.timers.zuul.pipeline.NAME.ORG.PROJECT.resident_time
# stats_counts.zuul.pipeline.NAME.ORG.PROJECT.total_changes
project_name = item.change.project.name.replace('/', '.')
key += '.%s' % project_name
if dt:
statsd.timing(key + '.resident_time', dt)
statsd.incr(key + '.total_changes')
except:
self.log.exception("Exception reporting pipeline stats")

198
zuul/manager/dependent.py Normal file
View File

@ -0,0 +1,198 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from zuul import model
from zuul.manager import BasePipelineManager, StaticChangeQueueContextManager
class DependentPipelineManager(BasePipelineManager):
log = logging.getLogger("zuul.DependentPipelineManager")
changes_merge = True
def __init__(self, *args, **kwargs):
super(DependentPipelineManager, self).__init__(*args, **kwargs)
def _postConfig(self, layout):
super(DependentPipelineManager, self)._postConfig(layout)
self.buildChangeQueues()
def buildChangeQueues(self):
self.log.debug("Building shared change queues")
change_queues = []
for project in self.pipeline.getProjects():
change_queue = model.ChangeQueue(
self.pipeline,
window=self.pipeline.window,
window_floor=self.pipeline.window_floor,
window_increase_type=self.pipeline.window_increase_type,
window_increase_factor=self.pipeline.window_increase_factor,
window_decrease_type=self.pipeline.window_decrease_type,
window_decrease_factor=self.pipeline.window_decrease_factor)
change_queue.addProject(project)
change_queues.append(change_queue)
self.log.debug("Created queue: %s" % change_queue)
# Iterate over all queues trying to combine them, and keep doing
# so until they can not be combined further.
last_change_queues = change_queues
while True:
new_change_queues = self.combineChangeQueues(last_change_queues)
if len(last_change_queues) == len(new_change_queues):
break
last_change_queues = new_change_queues
self.log.info(" Shared change queues:")
for queue in new_change_queues:
self.pipeline.addQueue(queue)
self.log.info(" %s containing %s" % (
queue, queue.generated_name))
def combineChangeQueues(self, change_queues):
self.log.debug("Combining shared queues")
new_change_queues = []
for a in change_queues:
merged_a = False
for b in new_change_queues:
if not a.getJobs().isdisjoint(b.getJobs()):
self.log.debug("Merging queue %s into %s" % (a, b))
b.mergeChangeQueue(a)
merged_a = True
break # this breaks out of 'for b' and continues 'for a'
if not merged_a:
self.log.debug("Keeping queue %s" % (a))
new_change_queues.append(a)
return new_change_queues
def getChangeQueue(self, change, existing=None):
if existing:
return StaticChangeQueueContextManager(existing)
return StaticChangeQueueContextManager(
self.pipeline.getQueue(change.project))
def isChangeReadyToBeEnqueued(self, change):
if not self.pipeline.source.canMerge(change,
self.getSubmitAllowNeeds()):
self.log.debug("Change %s can not merge, ignoring" % change)
return False
return True
def enqueueChangesBehind(self, change, quiet, ignore_requirements,
change_queue):
to_enqueue = []
self.log.debug("Checking for changes needing %s:" % change)
if not hasattr(change, 'needed_by_changes'):
self.log.debug(" Changeish does not support dependencies")
return
for other_change in change.needed_by_changes:
with self.getChangeQueue(other_change) as other_change_queue:
if other_change_queue != change_queue:
self.log.debug(" Change %s in project %s can not be "
"enqueued in the target queue %s" %
(other_change, other_change.project,
change_queue))
continue
if self.pipeline.source.canMerge(other_change,
self.getSubmitAllowNeeds()):
self.log.debug(" Change %s needs %s and is ready to merge" %
(other_change, change))
to_enqueue.append(other_change)
if not to_enqueue:
self.log.debug(" No changes need %s" % change)
for other_change in to_enqueue:
self.addChange(other_change, quiet=quiet,
ignore_requirements=ignore_requirements,
change_queue=change_queue)
def enqueueChangesAhead(self, change, quiet, ignore_requirements,
change_queue):
ret = self.checkForChangesNeededBy(change, change_queue)
if ret in [True, False]:
return ret
self.log.debug(" Changes %s must be merged ahead of %s" %
(ret, change))
for needed_change in ret:
r = self.addChange(needed_change, quiet=quiet,
ignore_requirements=ignore_requirements,
change_queue=change_queue)
if not r:
return False
return True
def checkForChangesNeededBy(self, change, change_queue):
self.log.debug("Checking for changes needed by %s:" % change)
# Return true if okay to proceed enqueing this change,
# false if the change should not be enqueued.
if not hasattr(change, 'needs_changes'):
self.log.debug(" Changeish does not support dependencies")
return True
if not change.needs_changes:
self.log.debug(" No changes needed")
return True
changes_needed = []
# Ignore supplied change_queue
with self.getChangeQueue(change) as change_queue:
for needed_change in change.needs_changes:
self.log.debug(" Change %s needs change %s:" % (
change, needed_change))
if needed_change.is_merged:
self.log.debug(" Needed change is merged")
continue
with self.getChangeQueue(needed_change) as needed_change_queue:
if needed_change_queue != change_queue:
self.log.debug(" Change %s in project %s does not "
"share a change queue with %s "
"in project %s" %
(needed_change, needed_change.project,
change, change.project))
return False
if not needed_change.is_current_patchset:
self.log.debug(" Needed change is not the "
"current patchset")
return False
if self.isChangeAlreadyInQueue(needed_change, change_queue):
self.log.debug(" Needed change is already ahead "
"in the queue")
continue
if self.pipeline.source.canMerge(needed_change,
self.getSubmitAllowNeeds()):
self.log.debug(" Change %s is needed" % needed_change)
if needed_change not in changes_needed:
changes_needed.append(needed_change)
continue
# The needed change can't be merged.
self.log.debug(" Change %s is needed but can not be merged" %
needed_change)
return False
if changes_needed:
return changes_needed
return True
def getFailingDependentItems(self, item):
if not hasattr(item.change, 'needs_changes'):
return None
if not item.change.needs_changes:
return None
failing_items = set()
for needed_change in item.change.needs_changes:
needed_item = self.getItemForChange(needed_change)
if not needed_item:
continue
if needed_item.current_build_set.failing_reasons:
failing_items.add(needed_item)
if failing_items:
return failing_items
return None

View File

@ -0,0 +1,95 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from zuul import model
from zuul.manager import BasePipelineManager, DynamicChangeQueueContextManager
class IndependentPipelineManager(BasePipelineManager):
log = logging.getLogger("zuul.IndependentPipelineManager")
changes_merge = False
def _postConfig(self, layout):
super(IndependentPipelineManager, self)._postConfig(layout)
def getChangeQueue(self, change, existing=None):
# creates a new change queue for every change
if existing:
return DynamicChangeQueueContextManager(existing)
if change.project not in self.pipeline.getProjects():
self.pipeline.addProject(change.project)
change_queue = model.ChangeQueue(self.pipeline)
change_queue.addProject(change.project)
self.pipeline.addQueue(change_queue)
self.log.debug("Dynamically created queue %s", change_queue)
return DynamicChangeQueueContextManager(change_queue)
def enqueueChangesAhead(self, change, quiet, ignore_requirements,
change_queue):
ret = self.checkForChangesNeededBy(change, change_queue)
if ret in [True, False]:
return ret
self.log.debug(" Changes %s must be merged ahead of %s" %
(ret, change))
for needed_change in ret:
# This differs from the dependent pipeline by enqueuing
# changes ahead as "not live", that is, not intended to
# have jobs run. Also, pipeline requirements are always
# ignored (which is safe because the changes are not
# live).
r = self.addChange(needed_change, quiet=True,
ignore_requirements=True,
live=False, change_queue=change_queue)
if not r:
return False
return True
def checkForChangesNeededBy(self, change, change_queue):
if self.pipeline.ignore_dependencies:
return True
self.log.debug("Checking for changes needed by %s:" % change)
# Return true if okay to proceed enqueing this change,
# false if the change should not be enqueued.
if not hasattr(change, 'needs_changes'):
self.log.debug(" Changeish does not support dependencies")
return True
if not change.needs_changes:
self.log.debug(" No changes needed")
return True
changes_needed = []
for needed_change in change.needs_changes:
self.log.debug(" Change %s needs change %s:" % (
change, needed_change))
if needed_change.is_merged:
self.log.debug(" Needed change is merged")
continue
if self.isChangeAlreadyInQueue(needed_change, change_queue):
self.log.debug(" Needed change is already ahead in the queue")
continue
self.log.debug(" Change %s is needed" % needed_change)
if needed_change not in changes_needed:
changes_needed.append(needed_change)
continue
# This differs from the dependent pipeline check in not
# verifying that the dependent change is mergable.
if changes_needed:
return changes_needed
return True
def dequeueItem(self, item):
super(IndependentPipelineManager, self).dequeueItem(item)
# An independent pipeline manager dynamically removes empty
# queues
if not item.queue.queue:
self.pipeline.removeQueue(item.queue)

View File

@ -209,7 +209,7 @@ class Merger(object):
self.username = username self.username = username
def _makeSSHWrappers(self, working_root, connections): def _makeSSHWrappers(self, working_root, connections):
for connection_name, connection in connections.items(): for connection_name, connection in connections.connections.items():
sshkey = connection.connection_config.get('sshkey') sshkey = connection.connection_config.get('sshkey')
if sshkey: if sshkey:
self._makeSSHWrapper(sshkey, working_root, connection_name) self._makeSSHWrapper(sshkey, working_root, connection_name)

View File

@ -67,8 +67,9 @@ def normalizeCategory(name):
class Pipeline(object): class Pipeline(object):
"""A top-level pipeline such as check, gate, post, etc.""" """A top-level pipeline such as check, gate, post, etc."""
def __init__(self, name): def __init__(self, name, layout):
self.name = name self.name = name
self.layout = layout
self.description = None self.description = None
self.failure_message = None self.failure_message = None
self.merge_failure_message = None self.merge_failure_message = None
@ -81,6 +82,7 @@ class Pipeline(object):
self.queues = [] self.queues = []
self.precedence = PRECEDENCE_NORMAL self.precedence = PRECEDENCE_NORMAL
self.source = None self.source = None
self.triggers = []
self.start_actions = [] self.start_actions = []
self.success_actions = [] self.success_actions = []
self.failure_actions = [] self.failure_actions = []
@ -96,6 +98,16 @@ class Pipeline(object):
self.window_decrease_type = None self.window_decrease_type = None
self.window_decrease_factor = None self.window_decrease_factor = None
@property
def actions(self):
return (
self.start_actions +
self.success_actions +
self.failure_actions +
self.merge_failure_actions +
self.disabled_actions
)
def __repr__(self): def __repr__(self):
return '<Pipeline %s>' % self.name return '<Pipeline %s>' % self.name
@ -136,11 +148,6 @@ class Pipeline(object):
def _findJobsToRun(self, job_trees, item): def _findJobsToRun(self, job_trees, item):
torun = [] torun = []
if item.item_ahead:
# Only run jobs if any 'hold' jobs on the change ahead
# have completed successfully.
if self.isHoldingFollowingChanges(item.item_ahead):
return []
for tree in job_trees: for tree in job_trees:
job = tree.job job = tree.job
result = None result = None
@ -163,7 +170,7 @@ class Pipeline(object):
def findJobsToRun(self, item): def findJobsToRun(self, item):
if not item.live: if not item.live:
return [] return []
tree = self.getJobTree(item.change.project) tree = item.job_tree
if not tree: if not tree:
return [] return []
return self._findJobsToRun(tree.job_trees, item) return self._findJobsToRun(tree.job_trees, item)
@ -324,24 +331,15 @@ class ChangeQueue(object):
def addProject(self, project): def addProject(self, project):
if project not in self.projects: if project not in self.projects:
self.projects.append(project) self.projects.append(project)
self._jobs |= set(self.pipeline.getJobTree(project).getJobs())
names = [x.name for x in self.projects] names = [x.name for x in self.projects]
names.sort() names.sort()
self.generated_name = ', '.join(names) self.generated_name = ', '.join(names)
for job in self._jobs:
if job.queue_name:
if (self.assigned_name and
job.queue_name != self.assigned_name):
raise Exception("More than one name assigned to "
"change queue: %s != %s" %
(self.assigned_name, job.queue_name))
self.assigned_name = job.queue_name
self.name = self.assigned_name or self.generated_name self.name = self.assigned_name or self.generated_name
def enqueueChange(self, change): def enqueueChange(self, change):
item = QueueItem(self, change) item = QueueItem(self, change)
item.freezeJobTree()
self.enqueueItem(item) self.enqueueItem(item)
item.enqueue_time = time.time() item.enqueue_time = time.time()
return item return item
@ -431,51 +429,88 @@ class Project(object):
return '<Project %s>' % (self.name) return '<Project %s>' % (self.name)
class Inheritable(object):
def __init__(self, parent=None):
self.parent = parent
def __getattribute__(self, name):
parent = object.__getattribute__(self, 'parent')
try:
return object.__getattribute__(self, name)
except AttributeError:
if parent:
return getattr(parent, name)
raise
class Job(object): class Job(object):
attributes = dict(
timeout=None,
# variables={},
# nodes=[],
# auth={},
workspace=None,
pre_run=None,
post_run=None,
voting=None,
failure_message=None,
success_message=None,
failure_url=None,
success_url=None,
# Matchers. These are separate so they can be individually
# overidden.
branch_matcher=None,
file_matcher=None,
irrelevant_file_matcher=None, # skip-if
swift=None, # TODOv3(jeblair): move to auth
parameter_function=None, # TODOv3(jeblair): remove
success_pattern=None, # TODOv3(jeblair): remove
)
def __init__(self, name): def __init__(self, name):
self.name = name self.name = name
self.queue_name = None for k, v in self.attributes.items():
self.failure_message = None setattr(self, k, v)
self.success_message = None
self.failure_pattern = None def __equals__(self, other):
self.success_pattern = None # Compare the name and all inheritable attributes to determine
self.parameter_function = None # whether two jobs with the same name are identically
self.hold_following_changes = False # configured. Useful upon reconfiguration.
self.voting = True if not isinstance(other, Job):
self.branches = [] return False
self._branches = [] if self.name != other.name:
self.files = [] return False
self._files = [] for k, v in self.attributes.items():
self.skip_if_matcher = None if getattr(self, k) != getattr(other, k):
self.swift = {} return False
self.parent = None return True
def __str__(self): def __str__(self):
return self.name return self.name
def __repr__(self): def __repr__(self):
return '<Job %s>' % (self.name) return '<Job %s>' % (self.name,)
def inheritFrom(self, other):
"""Copy the inheritable attributes which have been set on the other
job to this job."""
if not isinstance(other, Job):
raise Exception("Job unable to inherit from %s" % (other,))
for k, v in self.attributes.items():
if getattr(other, k) != v:
setattr(self, k, getattr(other, k))
def changeMatches(self, change): def changeMatches(self, change):
matches_branch = False if self.branch_matcher and not self.branch_matcher.matches(change):
for branch in self.branches:
if hasattr(change, 'branch') and branch.match(change.branch):
matches_branch = True
if hasattr(change, 'ref') and branch.match(change.ref):
matches_branch = True
if self.branches and not matches_branch:
return False return False
matches_file = False if self.file_matcher and not self.file_matcher.matches(change):
for f in self.files:
if hasattr(change, 'files'):
for cf in change.files:
if f.match(cf):
matches_file = True
if self.files and not matches_file:
return False return False
if self.skip_if_matcher and self.skip_if_matcher.matches(change): # NB: This is a negative match.
if (self.irrelevant_file_matcher and
self.irrelevant_file_matcher.matches(change)):
return False return False
return True return True
@ -648,6 +683,7 @@ class QueueItem(object):
self.reported = False self.reported = False
self.active = False # Whether an item is within an active window self.active = False # Whether an item is within an active window
self.live = True # Whether an item is intended to be processed at all self.live = True # Whether an item is intended to be processed at all
self.job_tree = None
def __repr__(self): def __repr__(self):
if self.pipeline: if self.pipeline:
@ -675,6 +711,43 @@ class QueueItem(object):
def setReportedResult(self, result): def setReportedResult(self, result):
self.current_build_set.result = result self.current_build_set.result = result
def _createJobTree(self, job_trees, parent):
for tree in job_trees:
job = tree.job
if not job.changeMatches(self.change):
continue
frozen_job = Job(job.name)
frozen_tree = JobTree(frozen_job)
inherited = set()
for variant in self.pipeline.layout.getJobs(job.name):
if variant.changeMatches(self.change):
if variant not in inherited:
frozen_job.inheritFrom(variant)
inherited.add(variant)
if job not in inherited:
# Only update from the job in the tree if it is
# unique, otherwise we might unset an attribute we
# have overloaded.
frozen_job.inheritFrom(job)
parent.job_trees.append(frozen_tree)
self._createJobTree(tree.job_trees, frozen_tree)
def createJobTree(self):
project_tree = self.pipeline.getJobTree(self.change.project)
ret = JobTree(None)
self._createJobTree(project_tree.job_trees, ret)
return ret
def freezeJobTree(self):
"""Find or create actual matching jobs for this item's change and
store the resulting job tree."""
self.job_tree = self.createJobTree()
def getJobs(self):
if not self.live or not self.job_tree:
return []
return self.job_tree.getJobs()
def formatJSON(self): def formatJSON(self):
changeish = self.change changeish = self.change
ret = {} ret = {}
@ -1287,14 +1360,30 @@ class Layout(object):
def __init__(self): def __init__(self):
self.projects = {} self.projects = {}
self.pipelines = OrderedDict() self.pipelines = OrderedDict()
# This is a dictionary of name -> [jobs]. The first element
# of the list is the first job added with that name. It is
# the reference definition for a given job. Subsequent
# elements are aspects of that job with different matchers
# that override some attribute of the job. These aspects all
# inherit from the reference definition.
self.jobs = {} self.jobs = {}
def getJob(self, name): def getJob(self, name):
if name in self.jobs: if name in self.jobs:
return self.jobs[name] return self.jobs[name][0]
job = Job(name) return None
self.jobs[name] = job
return job def getJobs(self, name):
return self.jobs.get(name, [])
def addJob(self, job):
if job.name in self.jobs:
self.jobs[job.name].append(job)
else:
self.jobs[job.name] = [job]
def addPipeline(self, pipeline):
self.pipelines[pipeline.name] = pipeline
class Tenant(object): class Tenant(object):

File diff suppressed because it is too large Load Diff

View File

@ -31,7 +31,6 @@ class BaseSource(object):
self.source_config = source_config self.source_config = source_config
self.sched = sched self.sched = sched
self.connection = connection self.connection = connection
self.projects = {}
@abc.abstractmethod @abc.abstractmethod
def getRefSha(self, project, ref): def getRefSha(self, project, ref):

View File

@ -16,7 +16,7 @@ import logging
import re import re
import time import time
from zuul import exceptions from zuul import exceptions
from zuul.model import Change, Ref, NullChange, Project from zuul.model import Change, Ref, NullChange
from zuul.source import BaseSource from zuul.source import BaseSource
@ -127,9 +127,7 @@ class GerritSource(BaseSource):
pass pass
def getProject(self, name): def getProject(self, name):
if name not in self.projects: return self.connection.getProject(name)
self.projects[name] = Project(name)
return self.projects[name]
def getChange(self, event): def getChange(self, event):
if event.change_number: if event.change_number: