eca4620efa
When using a branch and pull model on a shared repository there are usually one or more protected branches which are gated and a dynamic number of temporary personal/feature branches which are the source for the pull requests. These temporary branches while ungated can potentially include broken zuul config and therefore break the global tenant wide configuration. In order to deal with this model add support for excluding unprotected branches. This can be configured on tenant level and overridden per project. Change-Id: I8a45fd41539a3c964a84142f04c1644585c0fdcf
2635 lines
89 KiB
Python
2635 lines
89 KiB
Python
# Copyright 2012 Hewlett-Packard Development Company, L.P.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import abc
|
|
from collections import OrderedDict
|
|
import copy
|
|
import logging
|
|
import os
|
|
import struct
|
|
import time
|
|
from uuid import uuid4
|
|
import urllib.parse
|
|
|
|
MERGER_MERGE = 1 # "git merge"
|
|
MERGER_MERGE_RESOLVE = 2 # "git merge -s resolve"
|
|
MERGER_CHERRY_PICK = 3 # "git cherry-pick"
|
|
|
|
MERGER_MAP = {
|
|
'merge': MERGER_MERGE,
|
|
'merge-resolve': MERGER_MERGE_RESOLVE,
|
|
'cherry-pick': MERGER_CHERRY_PICK,
|
|
}
|
|
|
|
PRECEDENCE_NORMAL = 0
|
|
PRECEDENCE_LOW = 1
|
|
PRECEDENCE_HIGH = 2
|
|
|
|
PRECEDENCE_MAP = {
|
|
None: PRECEDENCE_NORMAL,
|
|
'low': PRECEDENCE_LOW,
|
|
'normal': PRECEDENCE_NORMAL,
|
|
'high': PRECEDENCE_HIGH,
|
|
}
|
|
|
|
# Request states
|
|
STATE_REQUESTED = 'requested'
|
|
STATE_PENDING = 'pending'
|
|
STATE_FULFILLED = 'fulfilled'
|
|
STATE_FAILED = 'failed'
|
|
REQUEST_STATES = set([STATE_REQUESTED,
|
|
STATE_PENDING,
|
|
STATE_FULFILLED,
|
|
STATE_FAILED])
|
|
|
|
# Node states
|
|
STATE_BUILDING = 'building'
|
|
STATE_TESTING = 'testing'
|
|
STATE_READY = 'ready'
|
|
STATE_IN_USE = 'in-use'
|
|
STATE_USED = 'used'
|
|
STATE_HOLD = 'hold'
|
|
STATE_DELETING = 'deleting'
|
|
NODE_STATES = set([STATE_BUILDING,
|
|
STATE_TESTING,
|
|
STATE_READY,
|
|
STATE_IN_USE,
|
|
STATE_USED,
|
|
STATE_HOLD,
|
|
STATE_DELETING])
|
|
|
|
|
|
class Attributes(object):
|
|
"""A class to hold attributes for string formatting."""
|
|
|
|
def __init__(self, **kw):
|
|
setattr(self, '__dict__', kw)
|
|
|
|
|
|
class Pipeline(object):
|
|
"""A configuration that ties together triggers, reporters and managers
|
|
|
|
Trigger
|
|
A description of which events should be processed
|
|
|
|
Manager
|
|
Responsible for enqueing and dequeing Changes
|
|
|
|
Reporter
|
|
Communicates success and failure results somewhere
|
|
"""
|
|
def __init__(self, name, layout):
|
|
self.name = name
|
|
self.layout = layout
|
|
self.description = None
|
|
self.failure_message = None
|
|
self.merge_failure_message = None
|
|
self.success_message = None
|
|
self.footer_message = None
|
|
self.start_message = None
|
|
self.allow_secrets = False
|
|
self.dequeue_on_new_patchset = True
|
|
self.ignore_dependencies = False
|
|
self.manager = None
|
|
self.queues = []
|
|
self.precedence = PRECEDENCE_NORMAL
|
|
self.triggers = []
|
|
self.start_actions = []
|
|
self.success_actions = []
|
|
self.failure_actions = []
|
|
self.merge_failure_actions = []
|
|
self.disabled_actions = []
|
|
self.disable_at = None
|
|
self._consecutive_failures = 0
|
|
self._disabled = False
|
|
self.window = None
|
|
self.window_floor = None
|
|
self.window_increase_type = None
|
|
self.window_increase_factor = None
|
|
self.window_decrease_type = None
|
|
self.window_decrease_factor = None
|
|
|
|
@property
|
|
def actions(self):
|
|
return (
|
|
self.start_actions +
|
|
self.success_actions +
|
|
self.failure_actions +
|
|
self.merge_failure_actions +
|
|
self.disabled_actions
|
|
)
|
|
|
|
def __repr__(self):
|
|
return '<Pipeline %s>' % self.name
|
|
|
|
def getSafeAttributes(self):
|
|
return Attributes(name=self.name)
|
|
|
|
def setManager(self, manager):
|
|
self.manager = manager
|
|
|
|
def addQueue(self, queue):
|
|
self.queues.append(queue)
|
|
|
|
def getQueue(self, project):
|
|
for queue in self.queues:
|
|
if project in queue.projects:
|
|
return queue
|
|
return None
|
|
|
|
def removeQueue(self, queue):
|
|
if queue in self.queues:
|
|
self.queues.remove(queue)
|
|
|
|
def getChangesInQueue(self):
|
|
changes = []
|
|
for shared_queue in self.queues:
|
|
changes.extend([x.change for x in shared_queue.queue])
|
|
return changes
|
|
|
|
def getAllItems(self):
|
|
items = []
|
|
for shared_queue in self.queues:
|
|
items.extend(shared_queue.queue)
|
|
return items
|
|
|
|
def formatStatusJSON(self, websocket_url=None):
|
|
j_pipeline = dict(name=self.name,
|
|
description=self.description)
|
|
j_queues = []
|
|
j_pipeline['change_queues'] = j_queues
|
|
for queue in self.queues:
|
|
j_queue = dict(name=queue.name)
|
|
j_queues.append(j_queue)
|
|
j_queue['heads'] = []
|
|
j_queue['window'] = queue.window
|
|
|
|
j_changes = []
|
|
for e in queue.queue:
|
|
if not e.item_ahead:
|
|
if j_changes:
|
|
j_queue['heads'].append(j_changes)
|
|
j_changes = []
|
|
j_changes.append(e.formatJSON(websocket_url))
|
|
if (len(j_changes) > 1 and
|
|
(j_changes[-2]['remaining_time'] is not None) and
|
|
(j_changes[-1]['remaining_time'] is not None)):
|
|
j_changes[-1]['remaining_time'] = max(
|
|
j_changes[-2]['remaining_time'],
|
|
j_changes[-1]['remaining_time'])
|
|
if j_changes:
|
|
j_queue['heads'].append(j_changes)
|
|
return j_pipeline
|
|
|
|
|
|
class ChangeQueue(object):
|
|
"""A ChangeQueue contains Changes to be processed related projects.
|
|
|
|
A Pipeline with a DependentPipelineManager has multiple parallel
|
|
ChangeQueues shared by different projects. For instance, there may a
|
|
ChangeQueue shared by interrelated projects foo and bar, and a second queue
|
|
for independent project baz.
|
|
|
|
A Pipeline with an IndependentPipelineManager puts every Change into its
|
|
own ChangeQueue
|
|
|
|
The ChangeQueue Window is inspired by TCP windows and controlls how many
|
|
Changes in a given ChangeQueue will be considered active and ready to
|
|
be processed. If a Change succeeds, the Window is increased by
|
|
`window_increase_factor`. If a Change fails, the Window is decreased by
|
|
`window_decrease_factor`.
|
|
|
|
A ChangeQueue may be a dynamically created queue, which may be removed
|
|
from a DependentPipelineManager once empty.
|
|
"""
|
|
def __init__(self, pipeline, window=0, window_floor=1,
|
|
window_increase_type='linear', window_increase_factor=1,
|
|
window_decrease_type='exponential', window_decrease_factor=2,
|
|
name=None, dynamic=False):
|
|
self.pipeline = pipeline
|
|
if name:
|
|
self.name = name
|
|
else:
|
|
self.name = ''
|
|
self.projects = []
|
|
self._jobs = set()
|
|
self.queue = []
|
|
self.window = window
|
|
self.window_floor = window_floor
|
|
self.window_increase_type = window_increase_type
|
|
self.window_increase_factor = window_increase_factor
|
|
self.window_decrease_type = window_decrease_type
|
|
self.window_decrease_factor = window_decrease_factor
|
|
self.dynamic = dynamic
|
|
|
|
def __repr__(self):
|
|
return '<ChangeQueue %s: %s>' % (self.pipeline.name, self.name)
|
|
|
|
def getJobs(self):
|
|
return self._jobs
|
|
|
|
def addProject(self, project):
|
|
if project not in self.projects:
|
|
self.projects.append(project)
|
|
|
|
if not self.name:
|
|
self.name = project.name
|
|
|
|
def enqueueChange(self, change):
|
|
item = QueueItem(self, change)
|
|
self.enqueueItem(item)
|
|
item.enqueue_time = time.time()
|
|
return item
|
|
|
|
def enqueueItem(self, item):
|
|
item.pipeline = self.pipeline
|
|
item.queue = self
|
|
if self.queue:
|
|
item.item_ahead = self.queue[-1]
|
|
item.item_ahead.items_behind.append(item)
|
|
self.queue.append(item)
|
|
|
|
def dequeueItem(self, item):
|
|
if item in self.queue:
|
|
self.queue.remove(item)
|
|
if item.item_ahead:
|
|
item.item_ahead.items_behind.remove(item)
|
|
for item_behind in item.items_behind:
|
|
if item.item_ahead:
|
|
item.item_ahead.items_behind.append(item_behind)
|
|
item_behind.item_ahead = item.item_ahead
|
|
item.item_ahead = None
|
|
item.items_behind = []
|
|
item.dequeue_time = time.time()
|
|
|
|
def moveItem(self, item, item_ahead):
|
|
if item.item_ahead == item_ahead:
|
|
return False
|
|
# Remove from current location
|
|
if item.item_ahead:
|
|
item.item_ahead.items_behind.remove(item)
|
|
for item_behind in item.items_behind:
|
|
if item.item_ahead:
|
|
item.item_ahead.items_behind.append(item_behind)
|
|
item_behind.item_ahead = item.item_ahead
|
|
# Add to new location
|
|
item.item_ahead = item_ahead
|
|
item.items_behind = []
|
|
if item.item_ahead:
|
|
item.item_ahead.items_behind.append(item)
|
|
return True
|
|
|
|
def isActionable(self, item):
|
|
if self.window:
|
|
return item in self.queue[:self.window]
|
|
else:
|
|
return True
|
|
|
|
def increaseWindowSize(self):
|
|
if self.window:
|
|
if self.window_increase_type == 'linear':
|
|
self.window += self.window_increase_factor
|
|
elif self.window_increase_type == 'exponential':
|
|
self.window *= self.window_increase_factor
|
|
|
|
def decreaseWindowSize(self):
|
|
if self.window:
|
|
if self.window_decrease_type == 'linear':
|
|
self.window = max(
|
|
self.window_floor,
|
|
self.window - self.window_decrease_factor)
|
|
elif self.window_decrease_type == 'exponential':
|
|
self.window = max(
|
|
self.window_floor,
|
|
int(self.window / self.window_decrease_factor))
|
|
|
|
|
|
class Project(object):
|
|
"""A Project represents a git repository such as openstack/nova."""
|
|
|
|
# NOTE: Projects should only be instantiated via a Source object
|
|
# so that they are associated with and cached by their Connection.
|
|
# This makes a Project instance a unique identifier for a given
|
|
# project from a given source.
|
|
|
|
def __init__(self, name, source, foreign=False):
|
|
self.name = name
|
|
self.source = source
|
|
self.connection_name = source.connection.connection_name
|
|
self.canonical_hostname = source.canonical_hostname
|
|
self.canonical_name = source.canonical_hostname + '/' + name
|
|
# foreign projects are those referenced in dependencies
|
|
# of layout projects, this should matter
|
|
# when deciding whether to enqueue their changes
|
|
# TODOv3 (jeblair): re-add support for foreign projects if needed
|
|
self.foreign = foreign
|
|
self.unparsed_config = None
|
|
self.unparsed_branch_config = {} # branch -> UnparsedTenantConfig
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
def __repr__(self):
|
|
return '<Project %s>' % (self.name)
|
|
|
|
|
|
class Node(object):
|
|
"""A single node for use by a job.
|
|
|
|
This may represent a request for a node, or an actual node
|
|
provided by Nodepool.
|
|
"""
|
|
|
|
def __init__(self, name, label):
|
|
self.name = name
|
|
self.label = label
|
|
self.id = None
|
|
self.lock = None
|
|
self.hold_job = None
|
|
self.comment = None
|
|
# Attributes from Nodepool
|
|
self._state = 'unknown'
|
|
self.state_time = time.time()
|
|
self.interface_ip = None
|
|
self.public_ipv4 = None
|
|
self.private_ipv4 = None
|
|
self.public_ipv6 = None
|
|
self.ssh_port = 22
|
|
self._keys = []
|
|
self.az = None
|
|
self.provider = None
|
|
self.region = None
|
|
|
|
@property
|
|
def state(self):
|
|
return self._state
|
|
|
|
@state.setter
|
|
def state(self, value):
|
|
if value not in NODE_STATES:
|
|
raise TypeError("'%s' is not a valid state" % value)
|
|
self._state = value
|
|
self.state_time = time.time()
|
|
|
|
def __repr__(self):
|
|
return '<Node %s %s:%s>' % (self.id, self.name, self.label)
|
|
|
|
def __ne__(self, other):
|
|
return not self.__eq__(other)
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, Node):
|
|
return False
|
|
return (self.name == other.name and
|
|
self.label == other.label and
|
|
self.id == other.id)
|
|
|
|
def toDict(self):
|
|
d = {}
|
|
d['state'] = self.state
|
|
d['hold_job'] = self.hold_job
|
|
d['comment'] = self.comment
|
|
for k in self._keys:
|
|
d[k] = getattr(self, k)
|
|
return d
|
|
|
|
def updateFromDict(self, data):
|
|
self._state = data['state']
|
|
keys = []
|
|
for k, v in data.items():
|
|
if k == 'state':
|
|
continue
|
|
keys.append(k)
|
|
setattr(self, k, v)
|
|
self._keys = keys
|
|
|
|
|
|
class Group(object):
|
|
"""A logical group of nodes for use by a job.
|
|
|
|
A Group is a named set of node names that will be provided to
|
|
jobs in the inventory to describe logical units where some subset of tasks
|
|
run.
|
|
"""
|
|
|
|
def __init__(self, name, nodes):
|
|
self.name = name
|
|
self.nodes = nodes
|
|
|
|
def __repr__(self):
|
|
return '<Group %s %s>' % (self.name, str(self.nodes))
|
|
|
|
def __ne__(self, other):
|
|
return not self.__eq__(other)
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, Group):
|
|
return False
|
|
return (self.name == other.name and
|
|
self.nodes == other.nodes)
|
|
|
|
def toDict(self):
|
|
return {
|
|
'name': self.name,
|
|
'nodes': self.nodes
|
|
}
|
|
|
|
|
|
class NodeSet(object):
|
|
"""A set of nodes.
|
|
|
|
In configuration, NodeSets are attributes of Jobs indicating that
|
|
a Job requires nodes matching this description.
|
|
|
|
They may appear as top-level configuration objects and be named,
|
|
or they may appears anonymously in in-line job definitions.
|
|
"""
|
|
|
|
def __init__(self, name=None):
|
|
self.name = name or ''
|
|
self.nodes = OrderedDict()
|
|
self.groups = OrderedDict()
|
|
|
|
def __ne__(self, other):
|
|
return not self.__eq__(other)
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, NodeSet):
|
|
return False
|
|
return (self.name == other.name and
|
|
self.nodes == other.nodes)
|
|
|
|
def copy(self):
|
|
n = NodeSet(self.name)
|
|
for name, node in self.nodes.items():
|
|
n.addNode(Node(node.name, node.label))
|
|
for name, group in self.groups.items():
|
|
n.addGroup(Group(group.name, group.nodes[:]))
|
|
return n
|
|
|
|
def addNode(self, node):
|
|
if node.name in self.nodes:
|
|
raise Exception("Duplicate node in %s" % (self,))
|
|
self.nodes[node.name] = node
|
|
|
|
def getNodes(self):
|
|
return list(self.nodes.values())
|
|
|
|
def addGroup(self, group):
|
|
if group.name in self.groups:
|
|
raise Exception("Duplicate group in %s" % (self,))
|
|
self.groups[group.name] = group
|
|
|
|
def getGroups(self):
|
|
return list(self.groups.values())
|
|
|
|
def __repr__(self):
|
|
if self.name:
|
|
name = self.name + ' '
|
|
else:
|
|
name = ''
|
|
return '<NodeSet %s%s%s>' % (name, self.nodes, self.groups)
|
|
|
|
def __len__(self):
|
|
return len(self.nodes)
|
|
|
|
|
|
class NodeRequest(object):
|
|
"""A request for a set of nodes."""
|
|
|
|
def __init__(self, requestor, build_set, job, nodeset):
|
|
self.requestor = requestor
|
|
self.build_set = build_set
|
|
self.job = job
|
|
self.nodeset = nodeset
|
|
self._state = STATE_REQUESTED
|
|
self.state_time = time.time()
|
|
self.stat = None
|
|
self.uid = uuid4().hex
|
|
self.id = None
|
|
# Zuul internal flags (not stored in ZK so they are not
|
|
# overwritten).
|
|
self.failed = False
|
|
self.canceled = False
|
|
|
|
@property
|
|
def fulfilled(self):
|
|
return (self._state == STATE_FULFILLED) and not self.failed
|
|
|
|
@property
|
|
def state(self):
|
|
return self._state
|
|
|
|
@state.setter
|
|
def state(self, value):
|
|
if value not in REQUEST_STATES:
|
|
raise TypeError("'%s' is not a valid state" % value)
|
|
self._state = value
|
|
self.state_time = time.time()
|
|
|
|
def __repr__(self):
|
|
return '<NodeRequest %s %s>' % (self.id, self.nodeset)
|
|
|
|
def toDict(self):
|
|
d = {}
|
|
nodes = [n.label for n in self.nodeset.getNodes()]
|
|
d['node_types'] = nodes
|
|
d['requestor'] = self.requestor
|
|
d['state'] = self.state
|
|
d['state_time'] = self.state_time
|
|
return d
|
|
|
|
def updateFromDict(self, data):
|
|
self._state = data['state']
|
|
self.state_time = data['state_time']
|
|
|
|
|
|
class Secret(object):
|
|
"""A collection of private data.
|
|
|
|
In configuration, Secrets are collections of private data in
|
|
key-value pair format. They are defined as top-level
|
|
configuration objects and then referenced by Jobs.
|
|
|
|
"""
|
|
|
|
def __init__(self, name, source_context):
|
|
self.name = name
|
|
self.source_context = source_context
|
|
# The secret data may or may not be encrypted. This attribute
|
|
# is named 'secret_data' to make it easy to search for and
|
|
# spot where it is directly used.
|
|
self.secret_data = {}
|
|
|
|
def __ne__(self, other):
|
|
return not self.__eq__(other)
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, Secret):
|
|
return False
|
|
return (self.name == other.name and
|
|
self.source_context == other.source_context and
|
|
self.secret_data == other.secret_data)
|
|
|
|
def __repr__(self):
|
|
return '<Secret %s>' % (self.name,)
|
|
|
|
def decrypt(self, private_key):
|
|
"""Return a copy of this secret with any encrypted data decrypted.
|
|
Note that the original remains encrypted."""
|
|
|
|
r = copy.deepcopy(self)
|
|
decrypted_secret_data = {}
|
|
for k, v in r.secret_data.items():
|
|
if hasattr(v, 'decrypt'):
|
|
decrypted_secret_data[k] = v.decrypt(private_key)
|
|
else:
|
|
decrypted_secret_data[k] = v
|
|
r.secret_data = decrypted_secret_data
|
|
return r
|
|
|
|
|
|
class SourceContext(object):
|
|
"""A reference to the branch of a project in configuration.
|
|
|
|
Jobs and playbooks reference this to keep track of where they
|
|
originate."""
|
|
|
|
def __init__(self, project, branch, path, trusted):
|
|
self.project = project
|
|
self.branch = branch
|
|
self.path = path
|
|
self.trusted = trusted
|
|
|
|
def __str__(self):
|
|
return '%s/%s@%s' % (self.project, self.path, self.branch)
|
|
|
|
def __repr__(self):
|
|
return '<SourceContext %s trusted:%s>' % (str(self),
|
|
self.trusted)
|
|
|
|
def __deepcopy__(self, memo):
|
|
return self.copy()
|
|
|
|
def copy(self):
|
|
return self.__class__(self.project, self.branch, self.path,
|
|
self.trusted)
|
|
|
|
def __ne__(self, other):
|
|
return not self.__eq__(other)
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, SourceContext):
|
|
return False
|
|
return (self.project == other.project and
|
|
self.branch == other.branch and
|
|
self.path == other.path and
|
|
self.trusted == other.trusted)
|
|
|
|
|
|
class PlaybookContext(object):
|
|
|
|
"""A reference to a playbook in the context of a project.
|
|
|
|
Jobs refer to objects of this class for their main, pre, and post
|
|
playbooks so that we can keep track of which repos and security
|
|
contexts are needed in order to run them.
|
|
|
|
We also keep a list of roles so that playbooks only run with the
|
|
roles which were defined at the point the playbook was defined.
|
|
|
|
"""
|
|
|
|
def __init__(self, source_context, path, roles):
|
|
self.source_context = source_context
|
|
self.path = path
|
|
self.roles = roles
|
|
|
|
def __repr__(self):
|
|
return '<PlaybookContext %s %s>' % (self.source_context,
|
|
self.path)
|
|
|
|
def __ne__(self, other):
|
|
return not self.__eq__(other)
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, PlaybookContext):
|
|
return False
|
|
return (self.source_context == other.source_context and
|
|
self.path == other.path and
|
|
self.roles == other.roles)
|
|
|
|
def toDict(self):
|
|
# Render to a dict to use in passing json to the executor
|
|
return dict(
|
|
connection=self.source_context.project.connection_name,
|
|
project=self.source_context.project.name,
|
|
branch=self.source_context.branch,
|
|
trusted=self.source_context.trusted,
|
|
roles=[r.toDict() for r in self.roles],
|
|
path=self.path)
|
|
|
|
|
|
class Role(object, metaclass=abc.ABCMeta):
|
|
"""A reference to an ansible role."""
|
|
|
|
def __init__(self, target_name):
|
|
self.target_name = target_name
|
|
|
|
@abc.abstractmethod
|
|
def __repr__(self):
|
|
pass
|
|
|
|
def __ne__(self, other):
|
|
return not self.__eq__(other)
|
|
|
|
@abc.abstractmethod
|
|
def __eq__(self, other):
|
|
if not isinstance(other, Role):
|
|
return False
|
|
return (self.target_name == other.target_name)
|
|
|
|
@abc.abstractmethod
|
|
def toDict(self):
|
|
# Render to a dict to use in passing json to the executor
|
|
return dict(target_name=self.target_name)
|
|
|
|
|
|
class ZuulRole(Role):
|
|
"""A reference to an ansible role in a Zuul project."""
|
|
|
|
def __init__(self, target_name, connection_name, project_name,
|
|
implicit=False):
|
|
super(ZuulRole, self).__init__(target_name)
|
|
self.connection_name = connection_name
|
|
self.project_name = project_name
|
|
self.implicit = implicit
|
|
|
|
def __repr__(self):
|
|
return '<ZuulRole %s %s>' % (self.project_name, self.target_name)
|
|
|
|
__hash__ = object.__hash__
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, ZuulRole):
|
|
return False
|
|
# Implicit is not consulted for equality so that we can handle
|
|
# implicit to explicit conversions.
|
|
return (super(ZuulRole, self).__eq__(other) and
|
|
self.connection_name == other.connection_name and
|
|
self.project_name == other.project_name)
|
|
|
|
def toDict(self):
|
|
# Render to a dict to use in passing json to the executor
|
|
d = super(ZuulRole, self).toDict()
|
|
d['type'] = 'zuul'
|
|
d['connection'] = self.connection_name
|
|
d['project'] = self.project_name
|
|
d['implicit'] = self.implicit
|
|
return d
|
|
|
|
|
|
class AuthContext(object):
|
|
"""The authentication information for a job.
|
|
|
|
Authentication information (both the actual data and metadata such
|
|
as whether it should be inherited) for a job is grouped together
|
|
in this object.
|
|
"""
|
|
|
|
def __init__(self, inherit=False):
|
|
self.inherit = inherit
|
|
self.secrets = []
|
|
|
|
def __ne__(self, other):
|
|
return not self.__eq__(other)
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, AuthContext):
|
|
return False
|
|
return (self.inherit == other.inherit and
|
|
self.secrets == other.secrets)
|
|
|
|
|
|
class Job(object):
|
|
|
|
"""A Job represents the defintion of actions to perform.
|
|
|
|
A Job is an abstract configuration concept. It describes what,
|
|
where, and under what circumstances something should be run
|
|
(contrast this with Build which is a concrete single execution of
|
|
a Job).
|
|
|
|
NB: Do not modify attributes of this class, set them directly
|
|
(e.g., "job.run = ..." rather than "job.run.append(...)").
|
|
"""
|
|
|
|
def __init__(self, name):
|
|
# These attributes may override even the final form of a job
|
|
# in the context of a project-pipeline. They can not affect
|
|
# the execution of the job, but only whether the job is run
|
|
# and how it is reported.
|
|
self.context_attributes = dict(
|
|
voting=True,
|
|
hold_following_changes=False,
|
|
failure_message=None,
|
|
success_message=None,
|
|
failure_url=None,
|
|
success_url=None,
|
|
# Matchers. These are separate so they can be individually
|
|
# overidden.
|
|
branch_matcher=None,
|
|
file_matcher=None,
|
|
irrelevant_file_matcher=None, # skip-if
|
|
tags=frozenset(),
|
|
dependencies=frozenset(),
|
|
)
|
|
|
|
# These attributes affect how the job is actually run and more
|
|
# care must be taken when overriding them. If a job is
|
|
# declared "final", these may not be overriden in a
|
|
# project-pipeline.
|
|
self.execution_attributes = dict(
|
|
timeout=None,
|
|
variables={},
|
|
nodeset=NodeSet(),
|
|
auth=None,
|
|
workspace=None,
|
|
pre_run=(),
|
|
post_run=(),
|
|
run=(),
|
|
implied_run=(),
|
|
semaphore=None,
|
|
attempts=3,
|
|
final=False,
|
|
roles=(),
|
|
required_projects={},
|
|
allowed_projects=None,
|
|
override_branch=None,
|
|
)
|
|
|
|
# These are generally internal attributes which are not
|
|
# accessible via configuration.
|
|
self.other_attributes = dict(
|
|
name=None,
|
|
source_context=None,
|
|
inheritance_path=(),
|
|
)
|
|
|
|
self.inheritable_attributes = {}
|
|
self.inheritable_attributes.update(self.context_attributes)
|
|
self.inheritable_attributes.update(self.execution_attributes)
|
|
self.attributes = {}
|
|
self.attributes.update(self.inheritable_attributes)
|
|
self.attributes.update(self.other_attributes)
|
|
|
|
self.name = name
|
|
|
|
def __ne__(self, other):
|
|
return not self.__eq__(other)
|
|
|
|
def __eq__(self, other):
|
|
# Compare the name and all inheritable attributes to determine
|
|
# whether two jobs with the same name are identically
|
|
# configured. Useful upon reconfiguration.
|
|
if not isinstance(other, Job):
|
|
return False
|
|
if self.name != other.name:
|
|
return False
|
|
for k, v in self.attributes.items():
|
|
if getattr(self, k) != getattr(other, k):
|
|
return False
|
|
return True
|
|
|
|
__hash__ = object.__hash__
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
def __repr__(self):
|
|
return '<Job %s branches: %s source: %s>' % (self.name,
|
|
self.branch_matcher,
|
|
self.source_context)
|
|
|
|
def __getattr__(self, name):
|
|
v = self.__dict__.get(name)
|
|
if v is None:
|
|
return copy.deepcopy(self.attributes[name])
|
|
return v
|
|
|
|
def _get(self, name):
|
|
return self.__dict__.get(name)
|
|
|
|
def getSafeAttributes(self):
|
|
return Attributes(name=self.name)
|
|
|
|
def setRun(self):
|
|
if not self.run:
|
|
self.run = self.implied_run
|
|
|
|
def addRoles(self, roles):
|
|
newroles = []
|
|
# Start with a copy of the existing roles, but if any of them
|
|
# are implicit roles which are identified as explicit in the
|
|
# new roles list, replace them with the explicit version.
|
|
changed = False
|
|
for existing_role in self.roles:
|
|
if existing_role in roles:
|
|
new_role = roles[roles.index(existing_role)]
|
|
else:
|
|
new_role = None
|
|
if (new_role and
|
|
isinstance(new_role, ZuulRole) and
|
|
isinstance(existing_role, ZuulRole) and
|
|
existing_role.implicit and not new_role.implicit):
|
|
newroles.append(new_role)
|
|
changed = True
|
|
else:
|
|
newroles.append(existing_role)
|
|
# Now add the new roles.
|
|
for role in reversed(roles):
|
|
if role not in newroles:
|
|
newroles.insert(0, role)
|
|
changed = True
|
|
if changed:
|
|
self.roles = tuple(newroles)
|
|
|
|
def updateVariables(self, other_vars):
|
|
v = self.variables
|
|
Job._deepUpdate(v, other_vars)
|
|
self.variables = v
|
|
|
|
def updateProjects(self, other_projects):
|
|
required_projects = self.required_projects
|
|
Job._deepUpdate(required_projects, other_projects)
|
|
self.required_projects = required_projects
|
|
|
|
@staticmethod
|
|
def _deepUpdate(a, b):
|
|
# Merge nested dictionaries if possible, otherwise, overwrite
|
|
# the value in 'a' with the value in 'b'.
|
|
for k, bv in b.items():
|
|
av = a.get(k)
|
|
if isinstance(av, dict) and isinstance(bv, dict):
|
|
Job._deepUpdate(av, bv)
|
|
else:
|
|
a[k] = bv
|
|
|
|
def inheritFrom(self, other):
|
|
"""Copy the inheritable attributes which have been set on the other
|
|
job to this job."""
|
|
if not isinstance(other, Job):
|
|
raise Exception("Job unable to inherit from %s" % (other,))
|
|
|
|
do_not_inherit = set()
|
|
if other.auth and not other.auth.inherit:
|
|
do_not_inherit.add('auth')
|
|
|
|
# copy all attributes
|
|
for k in self.inheritable_attributes:
|
|
if (other._get(k) is not None and k not in do_not_inherit):
|
|
setattr(self, k, copy.deepcopy(getattr(other, k)))
|
|
|
|
msg = 'inherit from %s' % (repr(other),)
|
|
self.inheritance_path = other.inheritance_path + (msg,)
|
|
|
|
def copy(self):
|
|
job = Job(self.name)
|
|
for k in self.attributes:
|
|
if self._get(k) is not None:
|
|
setattr(job, k, copy.deepcopy(self._get(k)))
|
|
return job
|
|
|
|
def applyVariant(self, other):
|
|
"""Copy the attributes which have been set on the other job to this
|
|
job."""
|
|
|
|
if not isinstance(other, Job):
|
|
raise Exception("Job unable to inherit from %s" % (other,))
|
|
|
|
for k in self.execution_attributes:
|
|
if (other._get(k) is not None and
|
|
k not in set(['final'])):
|
|
if self.final:
|
|
raise Exception("Unable to modify final job %s attribute "
|
|
"%s=%s with variant %s" % (
|
|
repr(self), k, other._get(k),
|
|
repr(other)))
|
|
if k not in set(['pre_run', 'post_run', 'roles', 'variables',
|
|
'required_projects']):
|
|
setattr(self, k, copy.deepcopy(other._get(k)))
|
|
|
|
# Don't set final above so that we don't trip an error halfway
|
|
# through assignment.
|
|
if other.final != self.attributes['final']:
|
|
self.final = other.final
|
|
|
|
if other._get('pre_run') is not None:
|
|
self.pre_run = self.pre_run + other.pre_run
|
|
if other._get('post_run') is not None:
|
|
self.post_run = other.post_run + self.post_run
|
|
if other._get('roles') is not None:
|
|
self.addRoles(other.roles)
|
|
if other._get('variables') is not None:
|
|
self.updateVariables(other.variables)
|
|
if other._get('required_projects') is not None:
|
|
self.updateProjects(other.required_projects)
|
|
|
|
for k in self.context_attributes:
|
|
if (other._get(k) is not None and
|
|
k not in set(['tags'])):
|
|
setattr(self, k, copy.deepcopy(other._get(k)))
|
|
|
|
if other._get('tags') is not None:
|
|
self.tags = self.tags.union(other.tags)
|
|
|
|
msg = 'apply variant %s' % (repr(other),)
|
|
self.inheritance_path = self.inheritance_path + (msg,)
|
|
|
|
def changeMatches(self, change):
|
|
if self.branch_matcher and not self.branch_matcher.matches(change):
|
|
return False
|
|
|
|
if self.file_matcher and not self.file_matcher.matches(change):
|
|
return False
|
|
|
|
# NB: This is a negative match.
|
|
if (self.irrelevant_file_matcher and
|
|
self.irrelevant_file_matcher.matches(change)):
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
class JobProject(object):
|
|
""" A reference to a project from a job. """
|
|
|
|
def __init__(self, project_name, override_branch=None):
|
|
self.project_name = project_name
|
|
self.override_branch = override_branch
|
|
|
|
|
|
class JobList(object):
|
|
""" A list of jobs in a project's pipeline. """
|
|
|
|
def __init__(self):
|
|
self.jobs = OrderedDict() # job.name -> [job, ...]
|
|
|
|
def addJob(self, job):
|
|
if job.name in self.jobs:
|
|
self.jobs[job.name].append(job)
|
|
else:
|
|
self.jobs[job.name] = [job]
|
|
|
|
def inheritFrom(self, other):
|
|
for jobname, jobs in other.jobs.items():
|
|
if jobname in self.jobs:
|
|
self.jobs[jobname].extend(jobs)
|
|
else:
|
|
self.jobs[jobname] = jobs
|
|
|
|
|
|
class JobGraph(object):
|
|
""" A JobGraph represents the dependency graph between Job."""
|
|
|
|
def __init__(self):
|
|
self.jobs = OrderedDict() # job_name -> Job
|
|
self._dependencies = {} # dependent_job_name -> set(parent_job_names)
|
|
|
|
def __repr__(self):
|
|
return '<JobGraph %s>' % (self.jobs)
|
|
|
|
def addJob(self, job):
|
|
# A graph must be created after the job list is frozen,
|
|
# therefore we should only get one job with the same name.
|
|
if job.name in self.jobs:
|
|
raise Exception("Job %s already added" % (job.name,))
|
|
self.jobs[job.name] = job
|
|
# Append the dependency information
|
|
self._dependencies.setdefault(job.name, set())
|
|
try:
|
|
for dependency in job.dependencies:
|
|
# Make sure a circular dependency is never created
|
|
ancestor_jobs = self._getParentJobNamesRecursively(
|
|
dependency, soft=True)
|
|
ancestor_jobs.add(dependency)
|
|
if any((job.name == anc_job) for anc_job in ancestor_jobs):
|
|
raise Exception("Dependency cycle detected in job %s" %
|
|
(job.name,))
|
|
self._dependencies[job.name].add(dependency)
|
|
except Exception:
|
|
del self.jobs[job.name]
|
|
del self._dependencies[job.name]
|
|
raise
|
|
|
|
def getJobs(self):
|
|
return list(self.jobs.values()) # Report in the order of layout cfg
|
|
|
|
def _getDirectDependentJobs(self, parent_job):
|
|
ret = set()
|
|
for dependent_name, parent_names in self._dependencies.items():
|
|
if parent_job in parent_names:
|
|
ret.add(dependent_name)
|
|
return ret
|
|
|
|
def getDependentJobsRecursively(self, parent_job):
|
|
all_dependent_jobs = set()
|
|
jobs_to_iterate = set([parent_job])
|
|
while len(jobs_to_iterate) > 0:
|
|
current_job = jobs_to_iterate.pop()
|
|
current_dependent_jobs = self._getDirectDependentJobs(current_job)
|
|
new_dependent_jobs = current_dependent_jobs - all_dependent_jobs
|
|
jobs_to_iterate |= new_dependent_jobs
|
|
all_dependent_jobs |= new_dependent_jobs
|
|
return [self.jobs[name] for name in all_dependent_jobs]
|
|
|
|
def getParentJobsRecursively(self, dependent_job):
|
|
return [self.jobs[name] for name in
|
|
self._getParentJobNamesRecursively(dependent_job)]
|
|
|
|
def _getParentJobNamesRecursively(self, dependent_job, soft=False):
|
|
all_parent_jobs = set()
|
|
jobs_to_iterate = set([dependent_job])
|
|
while len(jobs_to_iterate) > 0:
|
|
current_job = jobs_to_iterate.pop()
|
|
current_parent_jobs = self._dependencies.get(current_job)
|
|
if current_parent_jobs is None:
|
|
if soft:
|
|
current_parent_jobs = set()
|
|
else:
|
|
raise Exception("Dependent job %s not found: " %
|
|
(dependent_job,))
|
|
new_parent_jobs = current_parent_jobs - all_parent_jobs
|
|
jobs_to_iterate |= new_parent_jobs
|
|
all_parent_jobs |= new_parent_jobs
|
|
return all_parent_jobs
|
|
|
|
|
|
class Build(object):
|
|
"""A Build is an instance of a single execution of a Job.
|
|
|
|
While a Job describes what to run, a Build describes an actual
|
|
execution of that Job. Each build is associated with exactly one
|
|
Job (related builds are grouped together in a BuildSet).
|
|
"""
|
|
|
|
def __init__(self, job, uuid):
|
|
self.job = job
|
|
self.uuid = uuid
|
|
self.url = None
|
|
self.result = None
|
|
self.result_data = {}
|
|
self.error_detail = None
|
|
self.build_set = None
|
|
self.execute_time = time.time()
|
|
self.start_time = None
|
|
self.end_time = None
|
|
self.estimated_time = None
|
|
self.pipeline = None
|
|
self.canceled = False
|
|
self.retry = False
|
|
self.parameters = {}
|
|
self.worker = Worker()
|
|
self.node_labels = []
|
|
self.node_name = None
|
|
|
|
def __repr__(self):
|
|
return ('<Build %s of %s on %s>' %
|
|
(self.uuid, self.job.name, self.worker))
|
|
|
|
def getSafeAttributes(self):
|
|
return Attributes(uuid=self.uuid,
|
|
result=self.result,
|
|
error_detail=self.error_detail,
|
|
result_data=self.result_data)
|
|
|
|
|
|
class Worker(object):
|
|
"""Information about the specific worker executing a Build."""
|
|
def __init__(self):
|
|
self.name = "Unknown"
|
|
self.hostname = None
|
|
self.log_port = None
|
|
|
|
def updateFromData(self, data):
|
|
"""Update worker information if contained in the WORK_DATA response."""
|
|
self.name = data.get('worker_name', self.name)
|
|
self.hostname = data.get('worker_hostname', self.hostname)
|
|
self.log_port = data.get('worker_log_port', self.log_port)
|
|
|
|
def __repr__(self):
|
|
return '<Worker %s>' % self.name
|
|
|
|
|
|
class RepoFiles(object):
|
|
"""RepoFiles holds config-file content for per-project job config.
|
|
|
|
When Zuul asks a merger to prepare a future multiple-repo state
|
|
and collect Zuul configuration files so that we can dynamically
|
|
load our configuration, this class provides cached access to that
|
|
data for use by the Change which updated the config files and any
|
|
changes that follow it in a ChangeQueue.
|
|
|
|
It is attached to a BuildSet since the content of Zuul
|
|
configuration files can change with each new BuildSet.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.connections = {}
|
|
|
|
def __repr__(self):
|
|
return '<RepoFiles %s>' % self.connections
|
|
|
|
def setFiles(self, items):
|
|
self.hostnames = {}
|
|
for item in items:
|
|
connection = self.connections.setdefault(
|
|
item['connection'], {})
|
|
project = connection.setdefault(item['project'], {})
|
|
branch = project.setdefault(item['branch'], {})
|
|
branch.update(item['files'])
|
|
|
|
def getFile(self, connection_name, project_name, branch, fn):
|
|
host = self.connections.get(connection_name, {})
|
|
return host.get(project_name, {}).get(branch, {}).get(fn)
|
|
|
|
|
|
class BuildSet(object):
|
|
"""A collection of Builds for one specific potential future repository
|
|
state.
|
|
|
|
When Zuul executes Builds for a change, it creates a Build to
|
|
represent each execution of each job and a BuildSet to keep track
|
|
of all the Builds running for that Change. When Zuul re-executes
|
|
Builds for a Change with a different configuration, all of the
|
|
running Builds in the BuildSet for that change are aborted, and a
|
|
new BuildSet is created to hold the Builds for the Jobs being
|
|
run with the new configuration.
|
|
|
|
A BuildSet also holds the UUID used to produce the Zuul Ref that
|
|
builders check out.
|
|
|
|
"""
|
|
# Merge states:
|
|
NEW = 1
|
|
PENDING = 2
|
|
COMPLETE = 3
|
|
|
|
states_map = {
|
|
1: 'NEW',
|
|
2: 'PENDING',
|
|
3: 'COMPLETE',
|
|
}
|
|
|
|
def __init__(self, item):
|
|
self.item = item
|
|
self.builds = {}
|
|
self.result = None
|
|
self.next_build_set = None
|
|
self.previous_build_set = None
|
|
self.uuid = None
|
|
self.commit = None
|
|
self.dependent_items = None
|
|
self.merger_items = None
|
|
self.unable_to_merge = False
|
|
self.config_error = None # None or an error message string.
|
|
self.failing_reasons = []
|
|
self.merge_state = self.NEW
|
|
self.nodesets = {} # job -> nodeset
|
|
self.node_requests = {} # job -> reqs
|
|
self.files = RepoFiles()
|
|
self.repo_state = {}
|
|
self.layout = None
|
|
self.tries = {}
|
|
|
|
@property
|
|
def ref(self):
|
|
# NOTE(jamielennox): The concept of buildset ref is to be removed and a
|
|
# buildset UUID identifier available instead. Currently the ref is
|
|
# checked to see if the BuildSet has been configured.
|
|
return 'Z' + self.uuid if self.uuid else None
|
|
|
|
def __repr__(self):
|
|
return '<BuildSet item: %s #builds: %s merge state: %s>' % (
|
|
self.item,
|
|
len(self.builds),
|
|
self.getStateName(self.merge_state))
|
|
|
|
def setConfiguration(self):
|
|
# The change isn't enqueued until after it's created
|
|
# so we don't know what the other changes ahead will be
|
|
# until jobs start.
|
|
if self.dependent_items is None:
|
|
items = []
|
|
next_item = self.item.item_ahead
|
|
while next_item:
|
|
items.append(next_item)
|
|
next_item = next_item.item_ahead
|
|
self.dependent_items = items
|
|
if not self.uuid:
|
|
self.uuid = uuid4().hex
|
|
if self.merger_items is None:
|
|
items = [self.item] + self.dependent_items
|
|
items.reverse()
|
|
self.merger_items = [i.makeMergerItem() for i in items]
|
|
|
|
def getStateName(self, state_num):
|
|
return self.states_map.get(
|
|
state_num, 'UNKNOWN (%s)' % state_num)
|
|
|
|
def addBuild(self, build):
|
|
self.builds[build.job.name] = build
|
|
if build.job.name not in self.tries:
|
|
self.tries[build.job.name] = 1
|
|
build.build_set = self
|
|
|
|
def removeBuild(self, build):
|
|
self.tries[build.job.name] += 1
|
|
del self.builds[build.job.name]
|
|
|
|
def getBuild(self, job_name):
|
|
return self.builds.get(job_name)
|
|
|
|
def getBuilds(self):
|
|
keys = list(self.builds.keys())
|
|
keys.sort()
|
|
return [self.builds.get(x) for x in keys]
|
|
|
|
def getJobNodeSet(self, job_name):
|
|
# Return None if not provisioned; empty NodeSet if no nodes
|
|
# required
|
|
return self.nodesets.get(job_name)
|
|
|
|
def removeJobNodeSet(self, job_name):
|
|
if job_name not in self.nodesets:
|
|
raise Exception("No job set for %s" % (job_name))
|
|
del self.nodesets[job_name]
|
|
|
|
def setJobNodeRequest(self, job_name, req):
|
|
if job_name in self.node_requests:
|
|
raise Exception("Prior node request for %s" % (job_name))
|
|
self.node_requests[job_name] = req
|
|
|
|
def getJobNodeRequest(self, job_name):
|
|
return self.node_requests.get(job_name)
|
|
|
|
def jobNodeRequestComplete(self, job_name, req, nodeset):
|
|
if job_name in self.nodesets:
|
|
raise Exception("Prior node request for %s" % (job_name))
|
|
self.nodesets[job_name] = nodeset
|
|
del self.node_requests[job_name]
|
|
|
|
def getTries(self, job_name):
|
|
return self.tries.get(job_name, 0)
|
|
|
|
def getMergeMode(self):
|
|
# We may be called before this build set has a shadow layout
|
|
# (ie, we are called to perform the merge to create that
|
|
# layout). It's possible that the change we are merging will
|
|
# update the merge-mode for the project, but there's not much
|
|
# we can do about that here. Instead, do the best we can by
|
|
# using the nearest shadow layout to determine the merge mode,
|
|
# or if that fails, the current live layout, or if that fails,
|
|
# use the default: merge-resolve.
|
|
item = self.item
|
|
layout = None
|
|
while item:
|
|
layout = item.current_build_set.layout
|
|
if layout:
|
|
break
|
|
item = item.item_ahead
|
|
if not layout:
|
|
layout = self.item.pipeline.layout
|
|
if layout:
|
|
project = self.item.change.project
|
|
project_config = layout.project_configs.get(
|
|
project.canonical_name)
|
|
if project_config:
|
|
return project_config.merge_mode
|
|
return MERGER_MERGE_RESOLVE
|
|
|
|
def getSafeAttributes(self):
|
|
return Attributes(uuid=self.uuid)
|
|
|
|
|
|
class QueueItem(object):
|
|
"""Represents the position of a Change in a ChangeQueue.
|
|
|
|
All Changes are enqueued into ChangeQueue in a QueueItem. The QueueItem
|
|
holds the current `BuildSet` as well as all previous `BuildSets` that were
|
|
produced for this `QueueItem`.
|
|
"""
|
|
log = logging.getLogger("zuul.QueueItem")
|
|
|
|
def __init__(self, queue, change):
|
|
self.pipeline = queue.pipeline
|
|
self.queue = queue
|
|
self.change = change # a ref
|
|
self.build_sets = []
|
|
self.dequeued_needing_change = False
|
|
self.current_build_set = BuildSet(self)
|
|
self.build_sets.append(self.current_build_set)
|
|
self.item_ahead = None
|
|
self.items_behind = []
|
|
self.enqueue_time = None
|
|
self.dequeue_time = None
|
|
self.reported = False
|
|
self.reported_start = False
|
|
self.quiet = False
|
|
self.active = False # Whether an item is within an active window
|
|
self.live = True # Whether an item is intended to be processed at all
|
|
self.job_graph = None
|
|
|
|
def __repr__(self):
|
|
if self.pipeline:
|
|
pipeline = self.pipeline.name
|
|
else:
|
|
pipeline = None
|
|
return '<QueueItem 0x%x for %s in %s>' % (
|
|
id(self), self.change, pipeline)
|
|
|
|
def resetAllBuilds(self):
|
|
old = self.current_build_set
|
|
self.current_build_set.result = 'CANCELED'
|
|
self.current_build_set = BuildSet(self)
|
|
old.next_build_set = self.current_build_set
|
|
self.current_build_set.previous_build_set = old
|
|
self.build_sets.append(self.current_build_set)
|
|
|
|
def addBuild(self, build):
|
|
self.current_build_set.addBuild(build)
|
|
build.pipeline = self.pipeline
|
|
|
|
def removeBuild(self, build):
|
|
self.current_build_set.removeBuild(build)
|
|
|
|
def setReportedResult(self, result):
|
|
self.current_build_set.result = result
|
|
|
|
def freezeJobGraph(self):
|
|
"""Find or create actual matching jobs for this item's change and
|
|
store the resulting job tree."""
|
|
layout = self.current_build_set.layout
|
|
job_graph = layout.createJobGraph(self)
|
|
for job in job_graph.getJobs():
|
|
# Ensure that each jobs's dependencies are fully
|
|
# accessible. This will raise an exception if not.
|
|
job_graph.getParentJobsRecursively(job.name)
|
|
self.job_graph = job_graph
|
|
|
|
def hasJobGraph(self):
|
|
"""Returns True if the item has a job graph."""
|
|
return self.job_graph is not None
|
|
|
|
def getJobs(self):
|
|
if not self.live or not self.job_graph:
|
|
return []
|
|
return self.job_graph.getJobs()
|
|
|
|
def getJob(self, name):
|
|
if not self.job_graph:
|
|
return None
|
|
return self.job_graph.jobs.get(name)
|
|
|
|
def haveAllJobsStarted(self):
|
|
if not self.hasJobGraph():
|
|
return False
|
|
for job in self.getJobs():
|
|
build = self.current_build_set.getBuild(job.name)
|
|
if not build or not build.start_time:
|
|
return False
|
|
return True
|
|
|
|
def areAllJobsComplete(self):
|
|
if (self.current_build_set.config_error or
|
|
self.current_build_set.unable_to_merge):
|
|
return True
|
|
if not self.hasJobGraph():
|
|
return False
|
|
for job in self.getJobs():
|
|
build = self.current_build_set.getBuild(job.name)
|
|
if not build or not build.result:
|
|
return False
|
|
return True
|
|
|
|
def didAllJobsSucceed(self):
|
|
if not self.hasJobGraph():
|
|
return False
|
|
for job in self.getJobs():
|
|
if not job.voting:
|
|
continue
|
|
build = self.current_build_set.getBuild(job.name)
|
|
if not build:
|
|
return False
|
|
if build.result != 'SUCCESS':
|
|
return False
|
|
return True
|
|
|
|
def didAnyJobFail(self):
|
|
if not self.hasJobGraph():
|
|
return False
|
|
for job in self.getJobs():
|
|
if not job.voting:
|
|
continue
|
|
build = self.current_build_set.getBuild(job.name)
|
|
if build and build.result and (build.result != 'SUCCESS'):
|
|
return True
|
|
return False
|
|
|
|
def didMergerFail(self):
|
|
return self.current_build_set.unable_to_merge
|
|
|
|
def getConfigError(self):
|
|
return self.current_build_set.config_error
|
|
|
|
def wasDequeuedNeedingChange(self):
|
|
return self.dequeued_needing_change
|
|
|
|
def isHoldingFollowingChanges(self):
|
|
if not self.live:
|
|
return False
|
|
if not self.hasJobGraph():
|
|
return False
|
|
for job in self.getJobs():
|
|
if not job.hold_following_changes:
|
|
continue
|
|
build = self.current_build_set.getBuild(job.name)
|
|
if not build:
|
|
return True
|
|
if build.result != 'SUCCESS':
|
|
return True
|
|
|
|
if not self.item_ahead:
|
|
return False
|
|
return self.item_ahead.isHoldingFollowingChanges()
|
|
|
|
def findJobsToRun(self, semaphore_handler):
|
|
torun = []
|
|
if not self.live:
|
|
return []
|
|
if not self.job_graph:
|
|
return []
|
|
if self.item_ahead:
|
|
# Only run jobs if any 'hold' jobs on the change ahead
|
|
# have completed successfully.
|
|
if self.item_ahead.isHoldingFollowingChanges():
|
|
return []
|
|
|
|
successful_job_names = set()
|
|
jobs_not_started = set()
|
|
for job in self.job_graph.getJobs():
|
|
build = self.current_build_set.getBuild(job.name)
|
|
if build:
|
|
if build.result == 'SUCCESS':
|
|
successful_job_names.add(job.name)
|
|
else:
|
|
jobs_not_started.add(job)
|
|
|
|
# Attempt to request nodes for jobs in the order jobs appear
|
|
# in configuration.
|
|
for job in self.job_graph.getJobs():
|
|
if job not in jobs_not_started:
|
|
continue
|
|
all_parent_jobs_successful = True
|
|
for parent_job in self.job_graph.getParentJobsRecursively(
|
|
job.name):
|
|
if parent_job.name not in successful_job_names:
|
|
all_parent_jobs_successful = False
|
|
break
|
|
if all_parent_jobs_successful:
|
|
nodeset = self.current_build_set.getJobNodeSet(job.name)
|
|
if nodeset is None:
|
|
# The nodes for this job are not ready, skip
|
|
# it for now.
|
|
continue
|
|
if semaphore_handler.acquire(self, job):
|
|
# If this job needs a semaphore, either acquire it or
|
|
# make sure that we have it before running the job.
|
|
torun.append(job)
|
|
return torun
|
|
|
|
def findJobsToRequest(self):
|
|
build_set = self.current_build_set
|
|
toreq = []
|
|
if not self.live:
|
|
return []
|
|
if not self.job_graph:
|
|
return []
|
|
if self.item_ahead:
|
|
if self.item_ahead.isHoldingFollowingChanges():
|
|
return []
|
|
|
|
successful_job_names = set()
|
|
jobs_not_requested = set()
|
|
for job in self.job_graph.getJobs():
|
|
build = build_set.getBuild(job.name)
|
|
if build and build.result == 'SUCCESS':
|
|
successful_job_names.add(job.name)
|
|
else:
|
|
nodeset = build_set.getJobNodeSet(job.name)
|
|
if nodeset is None:
|
|
req = build_set.getJobNodeRequest(job.name)
|
|
if req is None:
|
|
jobs_not_requested.add(job)
|
|
|
|
# Attempt to request nodes for jobs in the order jobs appear
|
|
# in configuration.
|
|
for job in self.job_graph.getJobs():
|
|
if job not in jobs_not_requested:
|
|
continue
|
|
all_parent_jobs_successful = True
|
|
for parent_job in self.job_graph.getParentJobsRecursively(
|
|
job.name):
|
|
if parent_job.name not in successful_job_names:
|
|
all_parent_jobs_successful = False
|
|
break
|
|
if all_parent_jobs_successful:
|
|
toreq.append(job)
|
|
return toreq
|
|
|
|
def setResult(self, build):
|
|
if build.retry:
|
|
self.removeBuild(build)
|
|
elif build.result != 'SUCCESS':
|
|
for job in self.job_graph.getDependentJobsRecursively(
|
|
build.job.name):
|
|
fakebuild = Build(job, None)
|
|
fakebuild.result = 'SKIPPED'
|
|
self.addBuild(fakebuild)
|
|
|
|
def setNodeRequestFailure(self, job):
|
|
fakebuild = Build(job, None)
|
|
self.addBuild(fakebuild)
|
|
fakebuild.result = 'NODE_FAILURE'
|
|
self.setResult(fakebuild)
|
|
|
|
def setDequeuedNeedingChange(self):
|
|
self.dequeued_needing_change = True
|
|
self._setAllJobsSkipped()
|
|
|
|
def setUnableToMerge(self):
|
|
self.current_build_set.unable_to_merge = True
|
|
self._setAllJobsSkipped()
|
|
|
|
def setConfigError(self, error):
|
|
self.current_build_set.config_error = error
|
|
self._setAllJobsSkipped()
|
|
|
|
def _setAllJobsSkipped(self):
|
|
for job in self.getJobs():
|
|
fakebuild = Build(job, None)
|
|
fakebuild.result = 'SKIPPED'
|
|
self.addBuild(fakebuild)
|
|
|
|
def formatUrlPattern(self, url_pattern, job=None, build=None):
|
|
url = None
|
|
# Produce safe versions of objects which may be useful in
|
|
# result formatting, but don't allow users to crawl through
|
|
# the entire data structure where they might be able to access
|
|
# secrets, etc.
|
|
safe_change = self.change.getSafeAttributes()
|
|
safe_pipeline = self.pipeline.getSafeAttributes()
|
|
safe_tenant = self.pipeline.layout.tenant.getSafeAttributes()
|
|
safe_buildset = self.current_build_set.getSafeAttributes()
|
|
safe_job = job.getSafeAttributes() if job else {}
|
|
safe_build = build.getSafeAttributes() if build else {}
|
|
try:
|
|
url = url_pattern.format(change=safe_change,
|
|
pipeline=safe_pipeline,
|
|
tenant=safe_tenant,
|
|
buildset=safe_buildset,
|
|
job=safe_job,
|
|
build=safe_build)
|
|
except KeyError as e:
|
|
self.log.error("Error while formatting url for job %s: unknown "
|
|
"key %s in pattern %s"
|
|
% (job, e.args[0], url_pattern))
|
|
except AttributeError as e:
|
|
self.log.error("Error while formatting url for job %s: unknown "
|
|
"attribute %s in pattern %s"
|
|
% (job, e.args[0], url_pattern))
|
|
except Exception:
|
|
self.log.exception("Error while formatting url for job %s with "
|
|
"pattern %s:" % (job, url_pattern))
|
|
|
|
return url
|
|
|
|
def formatJobResult(self, job):
|
|
build = self.current_build_set.getBuild(job.name)
|
|
result = build.result
|
|
pattern = None
|
|
if result == 'SUCCESS':
|
|
if job.success_message:
|
|
result = job.success_message
|
|
if job.success_url:
|
|
pattern = job.success_url
|
|
else:
|
|
if job.failure_message:
|
|
result = job.failure_message
|
|
if job.failure_url:
|
|
pattern = job.failure_url
|
|
url = None # The final URL
|
|
default_url = build.result_data.get('zuul', {}).get('log_url')
|
|
if pattern:
|
|
job_url = self.formatUrlPattern(pattern, job, build)
|
|
else:
|
|
job_url = None
|
|
try:
|
|
if job_url:
|
|
u = urllib.parse.urlparse(job_url)
|
|
if u.scheme:
|
|
# The job success or failure url is absolute, so it's
|
|
# our final url.
|
|
url = job_url
|
|
else:
|
|
# We have a relative job url. Combine it with our
|
|
# default url.
|
|
if default_url:
|
|
url = urllib.parse.urljoin(default_url, job_url)
|
|
except Exception:
|
|
self.log.exception("Error while parsing url for job %s:"
|
|
% (job,))
|
|
if not url:
|
|
url = default_url or build.url or job.name
|
|
return (result, url)
|
|
|
|
def formatJSON(self, websocket_url=None):
|
|
ret = {}
|
|
ret['active'] = self.active
|
|
ret['live'] = self.live
|
|
if hasattr(self.change, 'url') and self.change.url is not None:
|
|
ret['url'] = self.change.url
|
|
else:
|
|
ret['url'] = None
|
|
ret['id'] = self.change._id()
|
|
if self.item_ahead:
|
|
ret['item_ahead'] = self.item_ahead.change._id()
|
|
else:
|
|
ret['item_ahead'] = None
|
|
ret['items_behind'] = [i.change._id() for i in self.items_behind]
|
|
ret['failing_reasons'] = self.current_build_set.failing_reasons
|
|
ret['zuul_ref'] = self.current_build_set.ref
|
|
if self.change.project:
|
|
ret['project'] = self.change.project.name
|
|
else:
|
|
# For cross-project dependencies with the depends-on
|
|
# project not known to zuul, the project is None
|
|
# Set it to a static value
|
|
ret['project'] = "Unknown Project"
|
|
ret['enqueue_time'] = int(self.enqueue_time * 1000)
|
|
ret['jobs'] = []
|
|
if hasattr(self.change, 'owner'):
|
|
ret['owner'] = self.change.owner
|
|
else:
|
|
ret['owner'] = None
|
|
max_remaining = 0
|
|
for job in self.getJobs():
|
|
now = time.time()
|
|
build = self.current_build_set.getBuild(job.name)
|
|
elapsed = None
|
|
remaining = None
|
|
result = None
|
|
build_url = None
|
|
finger_url = None
|
|
report_url = None
|
|
worker = None
|
|
if build:
|
|
result = build.result
|
|
finger_url = build.url
|
|
# TODO(tobiash): add support for custom web root
|
|
urlformat = 'static/stream.html?' \
|
|
'uuid={build.uuid}&' \
|
|
'logfile=console.log'
|
|
if websocket_url:
|
|
urlformat += '&websocket_url={websocket_url}'
|
|
build_url = urlformat.format(
|
|
build=build, websocket_url=websocket_url)
|
|
(unused, report_url) = self.formatJobResult(job)
|
|
if build.start_time:
|
|
if build.end_time:
|
|
elapsed = int((build.end_time -
|
|
build.start_time) * 1000)
|
|
remaining = 0
|
|
else:
|
|
elapsed = int((now - build.start_time) * 1000)
|
|
if build.estimated_time:
|
|
remaining = max(
|
|
int(build.estimated_time * 1000) - elapsed,
|
|
0)
|
|
worker = {
|
|
'name': build.worker.name,
|
|
'hostname': build.worker.hostname,
|
|
}
|
|
if remaining and remaining > max_remaining:
|
|
max_remaining = remaining
|
|
|
|
ret['jobs'].append({
|
|
'name': job.name,
|
|
'dependencies': list(job.dependencies),
|
|
'elapsed_time': elapsed,
|
|
'remaining_time': remaining,
|
|
'url': build_url,
|
|
'finger_url': finger_url,
|
|
'report_url': report_url,
|
|
'result': result,
|
|
'voting': job.voting,
|
|
'uuid': build.uuid if build else None,
|
|
'execute_time': build.execute_time if build else None,
|
|
'start_time': build.start_time if build else None,
|
|
'end_time': build.end_time if build else None,
|
|
'estimated_time': build.estimated_time if build else None,
|
|
'pipeline': build.pipeline.name if build else None,
|
|
'canceled': build.canceled if build else None,
|
|
'retry': build.retry if build else None,
|
|
'node_labels': build.node_labels if build else [],
|
|
'node_name': build.node_name if build else None,
|
|
'worker': worker,
|
|
})
|
|
|
|
if self.haveAllJobsStarted():
|
|
ret['remaining_time'] = max_remaining
|
|
else:
|
|
ret['remaining_time'] = None
|
|
return ret
|
|
|
|
def formatStatus(self, indent=0, html=False):
|
|
indent_str = ' ' * indent
|
|
ret = ''
|
|
if html and getattr(self.change, 'url', None) is not None:
|
|
ret += '%sProject %s change <a href="%s">%s</a>\n' % (
|
|
indent_str,
|
|
self.change.project.name,
|
|
self.change.url,
|
|
self.change._id())
|
|
else:
|
|
ret += '%sProject %s change %s based on %s\n' % (
|
|
indent_str,
|
|
self.change.project.name,
|
|
self.change._id(),
|
|
self.item_ahead)
|
|
for job in self.getJobs():
|
|
build = self.current_build_set.getBuild(job.name)
|
|
if build:
|
|
result = build.result
|
|
else:
|
|
result = None
|
|
job_name = job.name
|
|
if not job.voting:
|
|
voting = ' (non-voting)'
|
|
else:
|
|
voting = ''
|
|
if html:
|
|
if build:
|
|
url = build.url
|
|
else:
|
|
url = None
|
|
if url is not None:
|
|
job_name = '<a href="%s">%s</a>' % (url, job_name)
|
|
ret += '%s %s: %s%s' % (indent_str, job_name, result, voting)
|
|
ret += '\n'
|
|
return ret
|
|
|
|
def makeMergerItem(self):
|
|
# Create a dictionary with all info about the item needed by
|
|
# the merger.
|
|
number = None
|
|
patchset = None
|
|
oldrev = None
|
|
newrev = None
|
|
branch = None
|
|
if hasattr(self.change, 'number'):
|
|
number = self.change.number
|
|
patchset = self.change.patchset
|
|
if hasattr(self.change, 'newrev'):
|
|
oldrev = self.change.oldrev
|
|
newrev = self.change.newrev
|
|
if hasattr(self.change, 'branch'):
|
|
branch = self.change.branch
|
|
|
|
source = self.change.project.source
|
|
connection_name = source.connection.connection_name
|
|
project = self.change.project
|
|
|
|
return dict(project=project.name,
|
|
connection=connection_name,
|
|
merge_mode=self.current_build_set.getMergeMode(),
|
|
ref=self.change.ref,
|
|
branch=branch,
|
|
buildset_uuid=self.current_build_set.uuid,
|
|
number=number,
|
|
patchset=patchset,
|
|
oldrev=oldrev,
|
|
newrev=newrev,
|
|
)
|
|
|
|
|
|
class Ref(object):
|
|
"""An existing state of a Project."""
|
|
|
|
def __init__(self, project):
|
|
self.project = project
|
|
self.ref = None
|
|
self.oldrev = None
|
|
self.newrev = None
|
|
self.files = []
|
|
|
|
def _id(self):
|
|
return self.newrev
|
|
|
|
def __repr__(self):
|
|
rep = None
|
|
if self.newrev == '0000000000000000000000000000000000000000':
|
|
rep = '<%s 0x%x deletes %s from %s' % (
|
|
type(self).__name__,
|
|
id(self), self.ref, self.oldrev)
|
|
elif self.oldrev == '0000000000000000000000000000000000000000':
|
|
rep = '<%s 0x%x creates %s on %s>' % (
|
|
type(self).__name__,
|
|
id(self), self.ref, self.newrev)
|
|
else:
|
|
# Catch all
|
|
rep = '<%s 0x%x %s updated %s..%s>' % (
|
|
type(self).__name__,
|
|
id(self), self.ref, self.oldrev, self.newrev)
|
|
return rep
|
|
|
|
def equals(self, other):
|
|
if (self.project == other.project
|
|
and self.ref == other.ref
|
|
and self.newrev == other.newrev):
|
|
return True
|
|
return False
|
|
|
|
def isUpdateOf(self, other):
|
|
return False
|
|
|
|
def filterJobs(self, jobs):
|
|
return filter(lambda job: job.changeMatches(self), jobs)
|
|
|
|
def getRelatedChanges(self):
|
|
return set()
|
|
|
|
def updatesConfig(self):
|
|
if 'zuul.yaml' in self.files or '.zuul.yaml' in self.files or \
|
|
[True for fn in self.files if fn.startswith("zuul.d/") or
|
|
fn.startswith(".zuul.d/")]:
|
|
return True
|
|
return False
|
|
|
|
def getSafeAttributes(self):
|
|
return Attributes(project=self.project,
|
|
ref=self.ref,
|
|
oldrev=self.oldrev,
|
|
newrev=self.newrev)
|
|
|
|
|
|
class Branch(Ref):
|
|
"""An existing branch state for a Project."""
|
|
def __init__(self, project):
|
|
super(Branch, self).__init__(project)
|
|
self.branch = None
|
|
|
|
|
|
class Tag(Ref):
|
|
"""An existing tag state for a Project."""
|
|
def __init__(self, project):
|
|
super(Tag, self).__init__(project)
|
|
self.tag = None
|
|
|
|
|
|
class Change(Branch):
|
|
"""A proposed new state for a Project."""
|
|
def __init__(self, project):
|
|
super(Change, self).__init__(project)
|
|
self.number = None
|
|
self.url = None
|
|
self.patchset = None
|
|
|
|
self.needs_changes = []
|
|
self.needed_by_changes = []
|
|
self.is_current_patchset = True
|
|
self.can_merge = False
|
|
self.is_merged = False
|
|
self.failed_to_merge = False
|
|
self.open = None
|
|
self.status = None
|
|
self.owner = None
|
|
|
|
self.source_event = None
|
|
|
|
def _id(self):
|
|
return '%s,%s' % (self.number, self.patchset)
|
|
|
|
def __repr__(self):
|
|
return '<Change 0x%x %s>' % (id(self), self._id())
|
|
|
|
def equals(self, other):
|
|
if self.number == other.number and self.patchset == other.patchset:
|
|
return True
|
|
return False
|
|
|
|
def isUpdateOf(self, other):
|
|
if ((hasattr(other, 'number') and self.number == other.number) and
|
|
(hasattr(other, 'patchset') and
|
|
self.patchset is not None and
|
|
other.patchset is not None and
|
|
int(self.patchset) > int(other.patchset))):
|
|
return True
|
|
return False
|
|
|
|
def getRelatedChanges(self):
|
|
related = set()
|
|
for c in self.needs_changes:
|
|
related.add(c)
|
|
for c in self.needed_by_changes:
|
|
related.add(c)
|
|
related.update(c.getRelatedChanges())
|
|
return related
|
|
|
|
def getSafeAttributes(self):
|
|
return Attributes(project=self.project,
|
|
number=self.number,
|
|
patchset=self.patchset)
|
|
|
|
|
|
class TriggerEvent(object):
|
|
"""Incoming event from an external system."""
|
|
def __init__(self):
|
|
# TODO(jeblair): further reduce this list
|
|
self.data = None
|
|
# common
|
|
self.type = None
|
|
self.branch_updated = False
|
|
self.ref = None
|
|
# For management events (eg: enqueue / promote)
|
|
self.tenant_name = None
|
|
self.project_hostname = None
|
|
self.project_name = None
|
|
self.trigger_name = None
|
|
# Representation of the user account that performed the event.
|
|
self.account = None
|
|
# patchset-created, comment-added, etc.
|
|
self.change_number = None
|
|
self.change_url = None
|
|
self.patch_number = None
|
|
self.branch = None
|
|
self.comment = None
|
|
self.state = None
|
|
# ref-updated
|
|
self.oldrev = None
|
|
self.newrev = None
|
|
# For events that arrive with a destination pipeline (eg, from
|
|
# an admin command, etc):
|
|
self.forced_pipeline = None
|
|
|
|
@property
|
|
def canonical_project_name(self):
|
|
return self.project_hostname + '/' + self.project_name
|
|
|
|
def isPatchsetCreated(self):
|
|
return False
|
|
|
|
def isChangeAbandoned(self):
|
|
return False
|
|
|
|
|
|
class BaseFilter(object):
|
|
"""Base Class for filtering which Changes and Events to process."""
|
|
pass
|
|
|
|
|
|
class EventFilter(BaseFilter):
|
|
"""Allows a Pipeline to only respond to certain events."""
|
|
def __init__(self, trigger):
|
|
super(EventFilter, self).__init__()
|
|
self.trigger = trigger
|
|
|
|
def matches(self, event, ref):
|
|
# TODO(jeblair): consider removing ref argument
|
|
return True
|
|
|
|
|
|
class RefFilter(BaseFilter):
|
|
"""Allows a Manager to only enqueue Changes that meet certain criteria."""
|
|
def __init__(self, connection_name):
|
|
super(RefFilter, self).__init__()
|
|
self.connection_name = connection_name
|
|
|
|
def matches(self, change):
|
|
return True
|
|
|
|
|
|
class ProjectPipelineConfig(object):
|
|
# Represents a project cofiguration in the context of a pipeline
|
|
def __init__(self):
|
|
self.job_list = JobList()
|
|
self.queue_name = None
|
|
self.merge_mode = None
|
|
|
|
|
|
class TenantProjectConfig(object):
|
|
"""A project in the context of a tenant.
|
|
|
|
A Project is globally unique in the system, however, when used in
|
|
a tenant, some metadata about the project local to the tenant is
|
|
stored in a TenantProjectConfig.
|
|
"""
|
|
|
|
def __init__(self, project):
|
|
self.project = project
|
|
self.load_classes = set()
|
|
self.shadow_projects = set()
|
|
|
|
# The tenant's default setting of exclude_unprotected_branches will
|
|
# be overridden by this one if not None.
|
|
self.exclude_unprotected_branches = None
|
|
|
|
|
|
class ProjectConfig(object):
|
|
# Represents a project cofiguration
|
|
def __init__(self, name):
|
|
self.name = name
|
|
self.merge_mode = None
|
|
self.default_branch = None
|
|
self.pipelines = {}
|
|
self.private_key_file = None
|
|
|
|
|
|
class UnparsedAbideConfig(object):
|
|
|
|
"""A collection of yaml lists that has not yet been parsed into objects.
|
|
|
|
An Abide is a collection of tenants.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.tenants = []
|
|
|
|
def extend(self, conf):
|
|
if isinstance(conf, UnparsedAbideConfig):
|
|
self.tenants.extend(conf.tenants)
|
|
return
|
|
|
|
if not isinstance(conf, list):
|
|
raise Exception("Configuration items must be in the form of "
|
|
"a list of dictionaries (when parsing %s)" %
|
|
(conf,))
|
|
for item in conf:
|
|
if not isinstance(item, dict):
|
|
raise Exception("Configuration items must be in the form of "
|
|
"a list of dictionaries (when parsing %s)" %
|
|
(conf,))
|
|
if len(item.keys()) > 1:
|
|
raise Exception("Configuration item dictionaries must have "
|
|
"a single key (when parsing %s)" %
|
|
(conf,))
|
|
key, value = list(item.items())[0]
|
|
if key == 'tenant':
|
|
self.tenants.append(value)
|
|
else:
|
|
raise Exception("Configuration item not recognized "
|
|
"(when parsing %s)" %
|
|
(conf,))
|
|
|
|
|
|
class UnparsedTenantConfig(object):
|
|
"""A collection of yaml lists that has not yet been parsed into objects."""
|
|
|
|
def __init__(self):
|
|
self.pipelines = []
|
|
self.jobs = []
|
|
self.project_templates = []
|
|
self.projects = {}
|
|
self.nodesets = []
|
|
self.secrets = []
|
|
self.semaphores = []
|
|
|
|
def copy(self):
|
|
r = UnparsedTenantConfig()
|
|
r.pipelines = copy.deepcopy(self.pipelines)
|
|
r.jobs = copy.deepcopy(self.jobs)
|
|
r.project_templates = copy.deepcopy(self.project_templates)
|
|
r.projects = copy.deepcopy(self.projects)
|
|
r.nodesets = copy.deepcopy(self.nodesets)
|
|
r.secrets = copy.deepcopy(self.secrets)
|
|
r.semaphores = copy.deepcopy(self.semaphores)
|
|
return r
|
|
|
|
def extend(self, conf):
|
|
if isinstance(conf, UnparsedTenantConfig):
|
|
self.pipelines.extend(conf.pipelines)
|
|
self.jobs.extend(conf.jobs)
|
|
self.project_templates.extend(conf.project_templates)
|
|
for k, v in conf.projects.items():
|
|
self.projects.setdefault(k, []).extend(v)
|
|
self.nodesets.extend(conf.nodesets)
|
|
self.secrets.extend(conf.secrets)
|
|
self.semaphores.extend(conf.semaphores)
|
|
return
|
|
|
|
if not isinstance(conf, list):
|
|
raise Exception("Configuration items must be in the form of "
|
|
"a list of dictionaries (when parsing %s)" %
|
|
(conf,))
|
|
|
|
for item in conf:
|
|
if not isinstance(item, dict):
|
|
raise Exception("Configuration items must be in the form of "
|
|
"a list of dictionaries (when parsing %s)" %
|
|
(conf,))
|
|
if len(item.keys()) > 1:
|
|
raise Exception("Configuration item dictionaries must have "
|
|
"a single key (when parsing %s)" %
|
|
(conf,))
|
|
key, value = list(item.items())[0]
|
|
if key == 'project':
|
|
name = value['name']
|
|
self.projects.setdefault(name, []).append(value)
|
|
elif key == 'job':
|
|
self.jobs.append(value)
|
|
elif key == 'project-template':
|
|
self.project_templates.append(value)
|
|
elif key == 'pipeline':
|
|
self.pipelines.append(value)
|
|
elif key == 'nodeset':
|
|
self.nodesets.append(value)
|
|
elif key == 'secret':
|
|
self.secrets.append(value)
|
|
elif key == 'semaphore':
|
|
self.semaphores.append(value)
|
|
else:
|
|
raise Exception("Configuration item `%s` not recognized "
|
|
"(when parsing %s)" %
|
|
(item, conf,))
|
|
|
|
|
|
class Layout(object):
|
|
"""Holds all of the Pipelines."""
|
|
|
|
def __init__(self, tenant):
|
|
self.tenant = tenant
|
|
self.project_configs = {}
|
|
self.project_templates = {}
|
|
self.pipelines = OrderedDict()
|
|
# This is a dictionary of name -> [jobs]. The first element
|
|
# of the list is the first job added with that name. It is
|
|
# the reference definition for a given job. Subsequent
|
|
# elements are aspects of that job with different matchers
|
|
# that override some attribute of the job. These aspects all
|
|
# inherit from the reference definition.
|
|
self.jobs = {'noop': [Job('noop')]}
|
|
self.nodesets = {}
|
|
self.secrets = {}
|
|
self.semaphores = {}
|
|
|
|
def getJob(self, name):
|
|
if name in self.jobs:
|
|
return self.jobs[name][0]
|
|
raise Exception("Job %s not defined" % (name,))
|
|
|
|
def getJobs(self, name):
|
|
return self.jobs.get(name, [])
|
|
|
|
def addJob(self, job):
|
|
# We can have multiple variants of a job all with the same
|
|
# name, but these variants must all be defined in the same repo.
|
|
prior_jobs = [j for j in self.getJobs(job.name) if
|
|
j.source_context.project !=
|
|
job.source_context.project]
|
|
# Unless the repo is permitted to shadow another. If so, and
|
|
# the job we are adding is from a repo that is permitted to
|
|
# shadow the one with the older jobs, skip adding this job.
|
|
job_project = job.source_context.project
|
|
job_tpc = self.tenant.project_configs[job_project.canonical_name]
|
|
skip_add = False
|
|
for prior_job in prior_jobs[:]:
|
|
prior_project = prior_job.source_context.project
|
|
if prior_project in job_tpc.shadow_projects:
|
|
prior_jobs.remove(prior_job)
|
|
skip_add = True
|
|
|
|
if prior_jobs:
|
|
raise Exception("Job %s in %s is not permitted to shadow "
|
|
"job %s in %s" % (
|
|
job,
|
|
job.source_context.project,
|
|
prior_jobs[0],
|
|
prior_jobs[0].source_context.project))
|
|
if skip_add:
|
|
return False
|
|
if job.name in self.jobs:
|
|
self.jobs[job.name].append(job)
|
|
else:
|
|
self.jobs[job.name] = [job]
|
|
return True
|
|
|
|
def addNodeSet(self, nodeset):
|
|
if nodeset.name in self.nodesets:
|
|
raise Exception("NodeSet %s already defined" % (nodeset.name,))
|
|
self.nodesets[nodeset.name] = nodeset
|
|
|
|
def addSecret(self, secret):
|
|
if secret.name in self.secrets:
|
|
raise Exception("Secret %s already defined" % (secret.name,))
|
|
self.secrets[secret.name] = secret
|
|
|
|
def addSemaphore(self, semaphore):
|
|
if semaphore.name in self.semaphores:
|
|
raise Exception("Semaphore %s already defined" % (semaphore.name,))
|
|
self.semaphores[semaphore.name] = semaphore
|
|
|
|
def addPipeline(self, pipeline):
|
|
self.pipelines[pipeline.name] = pipeline
|
|
|
|
def addProjectTemplate(self, project_template):
|
|
self.project_templates[project_template.name] = project_template
|
|
|
|
def addProjectConfig(self, project_config):
|
|
self.project_configs[project_config.name] = project_config
|
|
|
|
def _createJobGraph(self, item, job_list, job_graph):
|
|
change = item.change
|
|
pipeline = item.pipeline
|
|
for jobname in job_list.jobs:
|
|
# This is the final job we are constructing
|
|
frozen_job = None
|
|
# Whether the change matches any globally defined variant
|
|
matched = False
|
|
for variant in self.getJobs(jobname):
|
|
if variant.changeMatches(change):
|
|
if frozen_job is None:
|
|
frozen_job = variant.copy()
|
|
frozen_job.setRun()
|
|
else:
|
|
frozen_job.applyVariant(variant)
|
|
matched = True
|
|
if not matched:
|
|
# A change must match at least one defined job variant
|
|
# (that is to say that it must match more than just
|
|
# the job that is defined in the tree).
|
|
continue
|
|
# If the job does not allow auth inheritance, do not allow
|
|
# the project-pipeline variants to update its execution
|
|
# attributes.
|
|
if frozen_job.auth and not frozen_job.auth.inherit:
|
|
frozen_job.final = True
|
|
# Whether the change matches any of the project pipeline
|
|
# variants
|
|
matched = False
|
|
for variant in job_list.jobs[jobname]:
|
|
if variant.changeMatches(change):
|
|
frozen_job.applyVariant(variant)
|
|
matched = True
|
|
if not matched:
|
|
# A change must match at least one project pipeline
|
|
# job variant.
|
|
continue
|
|
if (frozen_job.allowed_projects and
|
|
change.project.name not in frozen_job.allowed_projects):
|
|
raise Exception("Project %s is not allowed to run job %s" %
|
|
(change.project.name, frozen_job.name))
|
|
if ((not pipeline.allow_secrets) and frozen_job.auth and
|
|
frozen_job.auth.secrets):
|
|
raise Exception("Pipeline %s does not allow jobs with "
|
|
"secrets (job %s)" % (
|
|
pipeline.name, frozen_job.name))
|
|
job_graph.addJob(frozen_job)
|
|
|
|
def createJobGraph(self, item):
|
|
project_config = self.project_configs.get(
|
|
item.change.project.canonical_name, None)
|
|
ret = JobGraph()
|
|
# NOTE(pabelanger): It is possible for a foreign project not to have a
|
|
# configured pipeline, if so return an empty JobGraph.
|
|
if project_config and item.pipeline.name in project_config.pipelines:
|
|
project_job_list = \
|
|
project_config.pipelines[item.pipeline.name].job_list
|
|
self._createJobGraph(item, project_job_list, ret)
|
|
return ret
|
|
|
|
def hasProject(self, project):
|
|
return project.canonical_name in self.project_configs
|
|
|
|
|
|
class Semaphore(object):
|
|
def __init__(self, name, max=1):
|
|
self.name = name
|
|
self.max = int(max)
|
|
|
|
|
|
class SemaphoreHandler(object):
|
|
log = logging.getLogger("zuul.SemaphoreHandler")
|
|
|
|
def __init__(self):
|
|
self.semaphores = {}
|
|
|
|
def acquire(self, item, job):
|
|
if not job.semaphore:
|
|
return True
|
|
|
|
semaphore_key = job.semaphore
|
|
|
|
m = self.semaphores.get(semaphore_key)
|
|
if not m:
|
|
# The semaphore is not held, acquire it
|
|
self._acquire(semaphore_key, item, job.name)
|
|
return True
|
|
if (item, job.name) in m:
|
|
# This item already holds the semaphore
|
|
return True
|
|
|
|
# semaphore is there, check max
|
|
if len(m) < self._max_count(item, job.semaphore):
|
|
self._acquire(semaphore_key, item, job.name)
|
|
return True
|
|
|
|
return False
|
|
|
|
def release(self, item, job):
|
|
if not job.semaphore:
|
|
return
|
|
|
|
semaphore_key = job.semaphore
|
|
|
|
m = self.semaphores.get(semaphore_key)
|
|
if not m:
|
|
# The semaphore is not held, nothing to do
|
|
self.log.error("Semaphore can not be released for %s "
|
|
"because the semaphore is not held" %
|
|
item)
|
|
return
|
|
if (item, job.name) in m:
|
|
# This item is a holder of the semaphore
|
|
self._release(semaphore_key, item, job.name)
|
|
return
|
|
self.log.error("Semaphore can not be released for %s "
|
|
"which does not hold it" % item)
|
|
|
|
def _acquire(self, semaphore_key, item, job_name):
|
|
self.log.debug("Semaphore acquire {semaphore}: job {job}, item {item}"
|
|
.format(semaphore=semaphore_key,
|
|
job=job_name,
|
|
item=item))
|
|
if semaphore_key not in self.semaphores:
|
|
self.semaphores[semaphore_key] = []
|
|
self.semaphores[semaphore_key].append((item, job_name))
|
|
|
|
def _release(self, semaphore_key, item, job_name):
|
|
self.log.debug("Semaphore release {semaphore}: job {job}, item {item}"
|
|
.format(semaphore=semaphore_key,
|
|
job=job_name,
|
|
item=item))
|
|
sem_item = (item, job_name)
|
|
if sem_item in self.semaphores[semaphore_key]:
|
|
self.semaphores[semaphore_key].remove(sem_item)
|
|
|
|
# cleanup if there is no user of the semaphore anymore
|
|
if len(self.semaphores[semaphore_key]) == 0:
|
|
del self.semaphores[semaphore_key]
|
|
|
|
@staticmethod
|
|
def _max_count(item, semaphore_name):
|
|
if not item.current_build_set.layout:
|
|
# This should not occur as the layout of the item must already be
|
|
# built when acquiring or releasing a semaphore for a job.
|
|
raise Exception("Item {} has no layout".format(item))
|
|
|
|
# find the right semaphore
|
|
default_semaphore = Semaphore(semaphore_name, 1)
|
|
semaphores = item.current_build_set.layout.semaphores
|
|
return semaphores.get(semaphore_name, default_semaphore).max
|
|
|
|
|
|
class Tenant(object):
|
|
def __init__(self, name):
|
|
self.name = name
|
|
self.max_nodes_per_job = 5
|
|
self.exclude_unprotected_branches = False
|
|
self.layout = None
|
|
# The unparsed configuration from the main zuul config for
|
|
# this tenant.
|
|
self.unparsed_config = None
|
|
# The list of projects from which we will read full
|
|
# configuration.
|
|
self.config_projects = []
|
|
# The unparsed config from those projects.
|
|
self.config_projects_config = None
|
|
# The list of projects from which we will read untrusted
|
|
# in-repo configuration.
|
|
self.untrusted_projects = []
|
|
# The unparsed config from those projects.
|
|
self.untrusted_projects_config = None
|
|
self.semaphore_handler = SemaphoreHandler()
|
|
# Metadata about projects for this tenant
|
|
# canonical project name -> TenantProjectConfig
|
|
self.project_configs = {}
|
|
|
|
# A mapping of project names to projects. project_name ->
|
|
# VALUE where VALUE is a further dictionary of
|
|
# canonical_hostname -> Project.
|
|
self.projects = {}
|
|
self.canonical_hostnames = set()
|
|
|
|
def _addProject(self, tpc):
|
|
"""Add a project to the project index
|
|
|
|
:arg TenantProjectConfig tpc: The TenantProjectConfig (with
|
|
associated project) to add.
|
|
|
|
"""
|
|
project = tpc.project
|
|
self.canonical_hostnames.add(project.canonical_hostname)
|
|
hostname_dict = self.projects.setdefault(project.name, {})
|
|
if project.canonical_hostname in hostname_dict:
|
|
raise Exception("Project %s is already in project index" %
|
|
(project,))
|
|
hostname_dict[project.canonical_hostname] = project
|
|
self.project_configs[project.canonical_name] = tpc
|
|
|
|
def getProject(self, name):
|
|
"""Return a project given its name.
|
|
|
|
:arg str name: The name of the project. It may be fully
|
|
qualified (E.g., "git.example.com/subpath/project") or may
|
|
contain only the project name name may be supplied (E.g.,
|
|
"subpath/project").
|
|
|
|
:returns: A tuple (trusted, project) or (None, None) if the
|
|
project is not found or ambiguous. The "trusted" boolean
|
|
indicates whether or not the project is trusted by this
|
|
tenant.
|
|
:rtype: (bool, Project)
|
|
|
|
"""
|
|
path = name.split('/', 1)
|
|
if path[0] in self.canonical_hostnames:
|
|
hostname = path[0]
|
|
project_name = path[1]
|
|
else:
|
|
hostname = None
|
|
project_name = name
|
|
hostname_dict = self.projects.get(project_name)
|
|
project = None
|
|
if hostname_dict:
|
|
if hostname:
|
|
project = hostname_dict.get(hostname)
|
|
else:
|
|
values = list(hostname_dict.values())
|
|
if len(values) == 1:
|
|
project = values[0]
|
|
else:
|
|
raise Exception("Project name '%s' is ambiguous, "
|
|
"please fully qualify the project "
|
|
"with a hostname" % (name,))
|
|
if project is None:
|
|
return (None, None)
|
|
if project in self.config_projects:
|
|
return (True, project)
|
|
if project in self.untrusted_projects:
|
|
return (False, project)
|
|
# This should never happen:
|
|
raise Exception("Project %s is neither trusted nor untrusted" %
|
|
(project,))
|
|
|
|
def addConfigProject(self, tpc):
|
|
self.config_projects.append(tpc.project)
|
|
self._addProject(tpc)
|
|
|
|
def addUntrustedProject(self, tpc):
|
|
self.untrusted_projects.append(tpc.project)
|
|
self._addProject(tpc)
|
|
|
|
def getSafeAttributes(self):
|
|
return Attributes(name=self.name)
|
|
|
|
|
|
class Abide(object):
|
|
def __init__(self):
|
|
self.tenants = OrderedDict()
|
|
|
|
|
|
class JobTimeData(object):
|
|
format = 'B10H10H10B'
|
|
version = 0
|
|
|
|
def __init__(self, path):
|
|
self.path = path
|
|
self.success_times = [0 for x in range(10)]
|
|
self.failure_times = [0 for x in range(10)]
|
|
self.results = [0 for x in range(10)]
|
|
|
|
def load(self):
|
|
if not os.path.exists(self.path):
|
|
return
|
|
with open(self.path, 'rb') as f:
|
|
data = struct.unpack(self.format, f.read())
|
|
version = data[0]
|
|
if version != self.version:
|
|
raise Exception("Unkown data version")
|
|
self.success_times = list(data[1:11])
|
|
self.failure_times = list(data[11:21])
|
|
self.results = list(data[21:32])
|
|
|
|
def save(self):
|
|
tmpfile = self.path + '.tmp'
|
|
data = [self.version]
|
|
data.extend(self.success_times)
|
|
data.extend(self.failure_times)
|
|
data.extend(self.results)
|
|
data = struct.pack(self.format, *data)
|
|
with open(tmpfile, 'wb') as f:
|
|
f.write(data)
|
|
os.rename(tmpfile, self.path)
|
|
|
|
def add(self, elapsed, result):
|
|
elapsed = int(elapsed)
|
|
if result == 'SUCCESS':
|
|
self.success_times.append(elapsed)
|
|
self.success_times.pop(0)
|
|
result = 0
|
|
else:
|
|
self.failure_times.append(elapsed)
|
|
self.failure_times.pop(0)
|
|
result = 1
|
|
self.results.append(result)
|
|
self.results.pop(0)
|
|
|
|
def getEstimatedTime(self):
|
|
times = [x for x in self.success_times if x]
|
|
if times:
|
|
return float(sum(times)) / len(times)
|
|
return 0.0
|
|
|
|
|
|
class TimeDataBase(object):
|
|
def __init__(self, root):
|
|
self.root = root
|
|
self.jobs = {}
|
|
|
|
def _getTD(self, name):
|
|
td = self.jobs.get(name)
|
|
if not td:
|
|
td = JobTimeData(os.path.join(self.root, name))
|
|
self.jobs[name] = td
|
|
td.load()
|
|
return td
|
|
|
|
def getEstimatedTime(self, name):
|
|
return self._getTD(name).getEstimatedTime()
|
|
|
|
def update(self, name, elapsed, result):
|
|
td = self._getTD(name)
|
|
td.add(elapsed, result)
|
|
td.save()
|