# Copyright 2012 Hewlett-Packard Development Company, L.P.
|
|
# Copyright 2021 Acme Gating, LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import abc
|
|
from collections import OrderedDict, defaultdict, UserDict
|
|
import copy
|
|
import json
|
|
import logging
|
|
import os
|
|
import re2
|
|
import struct
|
|
import time
|
|
from uuid import uuid4
|
|
import urllib.parse
|
|
import textwrap
|
|
import types
|
|
import itertools
|
|
import yaml
|
|
|
|
import jsonpath_rw
|
|
|
|
from zuul import change_matcher
|
|
from zuul.lib.config import get_default
|
|
from zuul.lib.result_data import get_artifacts_from_result_data
|
|
from zuul.lib.logutil import get_annotated_logger
|
|
from zuul.lib.capabilities import capabilities_registry
|
|
|
|
MERGER_MERGE = 1 # "git merge"
|
|
MERGER_MERGE_RESOLVE = 2 # "git merge -s resolve"
|
|
MERGER_CHERRY_PICK = 3 # "git cherry-pick"
|
|
MERGER_SQUASH_MERGE = 4 # "git merge --squash"
|
|
|
|
MERGER_MAP = {
|
|
'merge': MERGER_MERGE,
|
|
'merge-resolve': MERGER_MERGE_RESOLVE,
|
|
'cherry-pick': MERGER_CHERRY_PICK,
|
|
'squash-merge': MERGER_SQUASH_MERGE,
|
|
}
|
|
|
|
PRECEDENCE_NORMAL = 0
|
|
PRECEDENCE_LOW = 1
|
|
PRECEDENCE_HIGH = 2
|
|
|
|
PRECEDENCE_MAP = {
|
|
None: PRECEDENCE_NORMAL,
|
|
'low': PRECEDENCE_LOW,
|
|
'normal': PRECEDENCE_NORMAL,
|
|
'high': PRECEDENCE_HIGH,
|
|
}
|
|
|
|
PRIORITY_MAP = {
|
|
PRECEDENCE_NORMAL: 200,
|
|
PRECEDENCE_LOW: 300,
|
|
PRECEDENCE_HIGH: 100,
|
|
}
|
|
|
|
# Request states
|
|
STATE_REQUESTED = 'requested'
|
|
STATE_FULFILLED = 'fulfilled'
|
|
STATE_FAILED = 'failed'
|
|
REQUEST_STATES = set([STATE_REQUESTED,
|
|
STATE_FULFILLED,
|
|
STATE_FAILED])
|
|
|
|
# Node states
|
|
STATE_BUILDING = 'building'
|
|
STATE_TESTING = 'testing'
|
|
STATE_READY = 'ready'
|
|
STATE_IN_USE = 'in-use'
|
|
STATE_USED = 'used'
|
|
STATE_HOLD = 'hold'
|
|
STATE_DELETING = 'deleting'
|
|
NODE_STATES = set([STATE_BUILDING,
|
|
STATE_TESTING,
|
|
STATE_READY,
|
|
STATE_IN_USE,
|
|
STATE_USED,
|
|
STATE_HOLD,
|
|
STATE_DELETING])
|
|
|
|
|
|
class ConfigurationErrorKey(object):
|
|
"""A class which attempts to uniquely identify configuration errors
|
|
based on their file location. It's not perfect, but it's usually
|
|
sufficient to determine whether we should show an error to a user.
|
|
"""
|
|
|
|
def __init__(self, context, mark, error_text):
|
|
self.context = context
|
|
self.mark = mark
|
|
self.error_text = error_text
|
|
elements = []
|
|
if context:
|
|
elements.extend([
|
|
context.project.canonical_name,
|
|
context.branch,
|
|
context.path,
|
|
])
|
|
else:
|
|
elements.extend([None, None, None])
|
|
if mark:
|
|
elements.extend([
|
|
mark.line,
|
|
mark.snippet,
|
|
])
|
|
else:
|
|
elements.extend([None, None])
|
|
elements.append(error_text)
|
|
self._hash = hash('|'.join([str(x) for x in elements]))
|
|
|
|
def __hash__(self):
|
|
return self._hash
|
|
|
|
def __ne__(self, other):
|
|
return not self.__eq__(other)
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, ConfigurationErrorKey):
|
|
return False
|
|
return (self.context == other.context and
|
|
self.mark == other.mark and
|
|
self.error_text == other.error_text)
|
|
|
|
|
|
class ConfigurationError(object):
|
|
|
|
"""A configuration error"""
|
|
def __init__(self, context, mark, error, short_error=None):
|
|
self.error = str(error)
|
|
self.short_error = short_error
|
|
self.key = ConfigurationErrorKey(context, mark, self.error)
|
|
|
|
|
|
class LoadingErrors(object):
|
|
"""A configuration errors accumalator attached to a layout object
|
|
"""
|
|
def __init__(self):
|
|
self.errors = []
|
|
self.error_keys = set()
|
|
|
|
def addError(self, context, mark, error, short_error=None):
|
|
e = ConfigurationError(context, mark, error, short_error)
|
|
self.errors.append(e)
|
|
self.error_keys.add(e.key)
|
|
|
|
def __getitem__(self, index):
|
|
return self.errors[index]
|
|
|
|
def __len__(self):
|
|
return len(self.errors)
|
|
|
|
|
|
class NoMatchingParentError(Exception):
|
|
"""A job referenced a parent, but that parent had no variants which
|
|
matched the current change."""
|
|
pass
|
|
|
|
|
|
class TemplateNotFoundError(Exception):
|
|
"""A project referenced a template that does not exist."""
|
|
pass
|
|
|
|
|
|
class RequirementsError(Exception):
|
|
"""A job's requirements were not met."""
|
|
pass
|
|
|
|
|
|
class Attributes(object):
|
|
"""A class to hold attributes for string formatting."""
|
|
|
|
def __init__(self, **kw):
|
|
setattr(self, '__dict__', kw)
|
|
|
|
|
|
class Freezable(object):
|
|
"""A mix-in class so that an object can be made immutable"""
|
|
|
|
def __init__(self):
|
|
super(Freezable, self).__setattr__('_frozen', False)
|
|
|
|
def freeze(self):
|
|
"""Make this object immutable"""
|
|
def _freezelist(l):
|
|
for i, v in enumerate(l):
|
|
if isinstance(v, Freezable):
|
|
if not v._frozen:
|
|
v.freeze()
|
|
elif isinstance(v, dict):
|
|
l[i] = _freezedict(v)
|
|
elif isinstance(v, list):
|
|
l[i] = _freezelist(v)
|
|
return tuple(l)
|
|
|
|
def _freezedict(d):
|
|
for k, v in list(d.items()):
|
|
if isinstance(v, Freezable):
|
|
if not v._frozen:
|
|
v.freeze()
|
|
elif isinstance(v, dict):
|
|
d[k] = _freezedict(v)
|
|
elif isinstance(v, list):
|
|
d[k] = _freezelist(v)
|
|
return types.MappingProxyType(d)
|
|
|
|
_freezedict(self.__dict__)
|
|
# Ignore return value from freezedict because __dict__ can't
|
|
# be a mappingproxy.
|
|
self._frozen = True
|
|
|
|
def __setattr__(self, name, value):
|
|
if self._frozen:
|
|
raise Exception("Unable to modify frozen object %s" %
|
|
(repr(self),))
|
|
super(Freezable, self).__setattr__(name, value)
|
|
|
|
|
|
class ConfigObject(Freezable):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.source_context = None
|
|
self.start_mark = None
|
|
|
|
|
|
class Pipeline(object):
|
|
"""A configuration that ties together triggers, reporters and managers
|
|
|
|
Trigger
|
|
A description of which events should be processed
|
|
|
|
Manager
|
|
Responsible for enqueing and dequeing Changes
|
|
|
|
Reporter
|
|
Communicates success and failure results somewhere
|
|
"""
|
|
STATE_NORMAL = 'normal'
|
|
STATE_ERROR = 'error'
|
|
|
|
def __init__(self, name, tenant):
|
|
self.name = name
|
|
# Note that pipelines are not portable across tenants (new
|
|
# pipeline objects must be made when a tenant is
|
|
# reconfigured). A pipeline requires a tenant in order to
|
|
# reach the currently active layout for that tenant.
|
|
self.tenant = tenant
|
|
self.source_context = None
|
|
self.start_mark = None
|
|
self.description = None
|
|
self.failure_message = None
|
|
self.merge_failure_message = None
|
|
self.success_message = None
|
|
self.footer_message = None
|
|
self.enqueue_message = None
|
|
self.start_message = None
|
|
self.dequeue_message = None
|
|
self.post_review = False
|
|
self.dequeue_on_new_patchset = True
|
|
self.ignore_dependencies = False
|
|
self.manager = None
|
|
self.queues = []
|
|
self.relative_priority_queues = {}
|
|
self.precedence = PRECEDENCE_NORMAL
|
|
self.supercedes = []
|
|
self.triggers = []
|
|
self.enqueue_actions = []
|
|
self.start_actions = []
|
|
self.success_actions = []
|
|
self.failure_actions = []
|
|
self.merge_failure_actions = []
|
|
self.no_jobs_actions = []
|
|
self.disabled_actions = []
|
|
self.dequeue_actions = []
|
|
self.disable_at = None
|
|
self._consecutive_failures = 0
|
|
self._disabled = False
|
|
self.window = None
|
|
self.window_floor = None
|
|
self.window_increase_type = None
|
|
self.window_increase_factor = None
|
|
self.window_decrease_type = None
|
|
self.window_decrease_factor = None
|
|
self.state = self.STATE_NORMAL
|
|
|
|
@property
|
|
def actions(self):
|
|
return (
|
|
self.enqueue_actions +
|
|
self.start_actions +
|
|
self.success_actions +
|
|
self.failure_actions +
|
|
self.merge_failure_actions +
|
|
self.no_jobs_actions +
|
|
self.disabled_actions +
|
|
self.dequeue_actions
|
|
)
|
|
|
|
def __repr__(self):
|
|
return '<Pipeline %s>' % self.name
|
|
|
|
def getSafeAttributes(self):
|
|
return Attributes(name=self.name)
|
|
|
|
def validateReferences(self, layout):
|
|
# Verify that references to other objects in the layout are
|
|
# valid.
|
|
|
|
for pipeline in self.supercedes:
|
|
if not layout.pipelines.get(pipeline):
|
|
raise Exception(
|
|
'The pipeline "{this}" supercedes an unknown pipeline '
|
|
'{other}.'.format(
|
|
this=self.name,
|
|
other=pipeline))
|
|
|
|
def setManager(self, manager):
|
|
self.manager = manager
|
|
|
|
def addQueue(self, queue):
|
|
self.queues.append(queue)
|
|
|
|
def getQueue(self, project, branch):
|
|
# Queues might be branch specific so match with branch
|
|
for queue in self.queues:
|
|
if queue.matches(project, branch):
|
|
return queue
|
|
return None
|
|
|
|
def getRelativePriorityQueue(self, project):
|
|
for queue in self.relative_priority_queues.values():
|
|
if project in queue:
|
|
return queue
|
|
return [project]
|
|
|
|
def removeQueue(self, queue):
|
|
if queue in self.queues:
|
|
self.queues.remove(queue)
|
|
|
|
def getChangesInQueue(self):
|
|
changes = []
|
|
for shared_queue in self.queues:
|
|
changes.extend([x.change for x in shared_queue.queue])
|
|
return changes
|
|
|
|
def getAllItems(self):
|
|
items = []
|
|
for shared_queue in self.queues:
|
|
items.extend(shared_queue.queue)
|
|
return items
|
|
|
|
def formatStatusJSON(self, websocket_url=None):
|
|
j_pipeline = dict(name=self.name,
|
|
description=self.description,
|
|
state=self.state)
|
|
j_queues = []
|
|
j_pipeline['change_queues'] = j_queues
|
|
for queue in self.queues:
|
|
j_queue = dict(name=queue.name)
|
|
j_queues.append(j_queue)
|
|
j_queue['heads'] = []
|
|
j_queue['window'] = queue.window
|
|
|
|
j_changes = []
|
|
for e in queue.queue:
|
|
if not e.item_ahead:
|
|
if j_changes:
|
|
j_queue['heads'].append(j_changes)
|
|
j_changes = []
|
|
j_changes.append(e.formatJSON(websocket_url))
|
|
if (len(j_changes) > 1 and
|
|
(j_changes[-2]['remaining_time'] is not None) and
|
|
(j_changes[-1]['remaining_time'] is not None)):
|
|
j_changes[-1]['remaining_time'] = max(
|
|
j_changes[-2]['remaining_time'],
|
|
j_changes[-1]['remaining_time'])
|
|
if j_changes:
|
|
j_queue['heads'].append(j_changes)
|
|
return j_pipeline
|
|
|
|
|
|
class ChangeQueue(object):
|
|
"""A ChangeQueue contains Changes to be processed for related projects.
|
|
|
|
A Pipeline with a DependentPipelineManager has multiple parallel
|
|
ChangeQueues shared by different projects. For instance, there may a
|
|
ChangeQueue shared by interrelated projects foo and bar, and a second queue
|
|
for independent project baz.
|
|
|
|
A Pipeline with an IndependentPipelineManager puts every Change into its
|
|
own ChangeQueue.
|
|
|
|
The ChangeQueue Window is inspired by TCP windows and controlls how many
|
|
Changes in a given ChangeQueue will be considered active and ready to
|
|
be processed. If a Change succeeds, the Window is increased by
|
|
`window_increase_factor`. If a Change fails, the Window is decreased by
|
|
`window_decrease_factor`.
|
|
|
|
A ChangeQueue may be a dynamically created queue, which may be removed
|
|
from a DependentPipelineManager once empty.
|
|
"""
|
|
def __init__(self, pipeline, window=0, window_floor=1,
|
|
window_increase_type='linear', window_increase_factor=1,
|
|
window_decrease_type='exponential', window_decrease_factor=2,
|
|
name=None, dynamic=False):
|
|
self.pipeline = pipeline
|
|
if name:
|
|
self.name = name
|
|
else:
|
|
self.name = ''
|
|
self.project_branches = []
|
|
self._jobs = set()
|
|
self.queue = []
|
|
self.window = window
|
|
self.window_floor = window_floor
|
|
self.window_increase_type = window_increase_type
|
|
self.window_increase_factor = window_increase_factor
|
|
self.window_decrease_type = window_decrease_type
|
|
self.window_decrease_factor = window_decrease_factor
|
|
self.dynamic = dynamic
|
|
|
|
def __repr__(self):
|
|
return '<ChangeQueue %s: %s>' % (self.pipeline.name, self.name)
|
|
|
|
def getJobs(self):
|
|
return self._jobs
|
|
|
|
def addProject(self, project, branch):
|
|
"""
|
|
Adds a project branch combination to the queue.
|
|
|
|
The queue will match exactly this combination. If the caller doesn't
|
|
care about branches it can supply None (but must supply None as well
|
|
when matching)
|
|
"""
|
|
project_branch = (project, branch)
|
|
if project_branch not in self.project_branches:
|
|
self.project_branches.append(project_branch)
|
|
|
|
if not self.name:
|
|
self.name = project.name
|
|
|
|
def matches(self, project, branch):
|
|
return (project, branch) in self.project_branches
|
|
|
|
def enqueueChange(self, change, event):
|
|
item = QueueItem(self, change, event)
|
|
self.enqueueItem(item)
|
|
item.enqueue_time = time.time()
|
|
return item
|
|
|
|
def enqueueItem(self, item):
|
|
item.pipeline = self.pipeline
|
|
item.queue = self
|
|
if self.queue:
|
|
item.item_ahead = self.queue[-1]
|
|
item.item_ahead.items_behind.append(item)
|
|
self.queue.append(item)
|
|
|
|
def dequeueItem(self, item):
|
|
if item in self.queue:
|
|
self.queue.remove(item)
|
|
if item.item_ahead:
|
|
item.item_ahead.items_behind.remove(item)
|
|
for item_behind in item.items_behind:
|
|
if item.item_ahead:
|
|
item.item_ahead.items_behind.append(item_behind)
|
|
item_behind.item_ahead = item.item_ahead
|
|
item.item_ahead = None
|
|
item.items_behind = []
|
|
item.dequeue_time = time.time()
|
|
|
|
def moveItem(self, item, item_ahead):
|
|
if item.item_ahead == item_ahead:
|
|
return False
|
|
# Remove from current location
|
|
if item.item_ahead:
|
|
item.item_ahead.items_behind.remove(item)
|
|
for item_behind in item.items_behind:
|
|
if item.item_ahead:
|
|
item.item_ahead.items_behind.append(item_behind)
|
|
item_behind.item_ahead = item.item_ahead
|
|
# Add to new location
|
|
item.item_ahead = item_ahead
|
|
item.items_behind = []
|
|
if item.item_ahead:
|
|
item.item_ahead.items_behind.append(item)
|
|
return True
|
|
|
|
def isActionable(self, item):
|
|
if not self.window:
|
|
return True
|
|
# Ignore done items waiting for bundle dependencies to finish
|
|
num_waiting_items = len([
|
|
i for i in self.queue
|
|
if i.bundle and i.areAllJobsComplete()
|
|
])
|
|
window = self.window + num_waiting_items
|
|
return item in self.queue[:window]
|
|
|
|
def increaseWindowSize(self):
|
|
if self.window:
|
|
if self.window_increase_type == 'linear':
|
|
self.window += self.window_increase_factor
|
|
elif self.window_increase_type == 'exponential':
|
|
self.window *= self.window_increase_factor
|
|
|
|
def decreaseWindowSize(self):
|
|
if self.window:
|
|
if self.window_decrease_type == 'linear':
|
|
self.window = max(
|
|
self.window_floor,
|
|
self.window - self.window_decrease_factor)
|
|
elif self.window_decrease_type == 'exponential':
|
|
self.window = max(
|
|
self.window_floor,
|
|
int(self.window / self.window_decrease_factor))
|
|
|
|
|
|
class Project(object):
|
|
"""A Project represents a git repository such as openstack/nova."""
|
|
|
|
# NOTE: Projects should only be instantiated via a Source object
|
|
# so that they are associated with and cached by their Connection.
|
|
# This makes a Project instance a unique identifier for a given
|
|
# project from a given source.
|
|
|
|
def __init__(self, name, source, foreign=False):
|
|
self.name = name
|
|
self.source = source
|
|
self.connection_name = source.connection.connection_name
|
|
self.canonical_hostname = source.canonical_hostname
|
|
self.canonical_name = source.canonical_hostname + '/' + name
|
|
self.private_secrets_key = None
|
|
self.public_secrets_key = None
|
|
self.private_ssh_key = None
|
|
self.public_ssh_key = None
|
|
# foreign projects are those referenced in dependencies
|
|
# of layout projects, this should matter
|
|
# when deciding whether to enqueue their changes
|
|
# TODOv3 (jeblair): re-add support for foreign projects if needed
|
|
self.foreign = foreign
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
def __repr__(self):
|
|
return '<Project %s>' % (self.name)
|
|
|
|
def getSafeAttributes(self):
|
|
return Attributes(name=self.name)
|
|
|
|
def toDict(self):
|
|
d = {}
|
|
d['name'] = self.name
|
|
d['connection_name'] = self.connection_name
|
|
d['canonical_name'] = self.canonical_name
|
|
return d
|
|
|
|
|
|
class Node(ConfigObject):
|
|
"""A single node for use by a job.
|
|
|
|
This may represent a request for a node, or an actual node
|
|
provided by Nodepool.
|
|
"""
|
|
|
|
def __init__(self, name, label):
|
|
super(Node, self).__init__()
|
|
self.name = name
|
|
self.label = label
|
|
self.id = None
|
|
self.lock = None
|
|
self.hold_job = None
|
|
self.comment = None
|
|
# Attributes from Nodepool
|
|
self._state = 'unknown'
|
|
self.state_time = time.time()
|
|
self.host_id = None
|
|
self.interface_ip = None
|
|
self.public_ipv4 = None
|
|
self.private_ipv4 = None
|
|
self.public_ipv6 = None
|
|
self.connection_port = 22
|
|
self.connection_type = None
|
|
self._keys = []
|
|
self.az = None
|
|
self.provider = None
|
|
self.region = None
|
|
self.username = None
|
|
self.hold_expiration = None
|
|
self.resources = None
|
|
|
|
@property
|
|
def state(self):
|
|
return self._state
|
|
|
|
@state.setter
|
|
def state(self, value):
|
|
if value not in NODE_STATES:
|
|
raise TypeError("'%s' is not a valid state" % value)
|
|
self._state = value
|
|
self.state_time = time.time()
|
|
|
|
def __repr__(self):
|
|
return '<Node %s %s:%s>' % (self.id, self.name, self.label)
|
|
|
|
def __ne__(self, other):
|
|
return not self.__eq__(other)
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, Node):
|
|
return False
|
|
return (self.name == other.name and
|
|
self.label == other.label and
|
|
self.id == other.id)
|
|
|
|
def toDict(self, internal_attributes=False):
|
|
d = {}
|
|
d['state'] = self.state
|
|
d['hold_job'] = self.hold_job
|
|
d['comment'] = self.comment
|
|
for k in self._keys:
|
|
d[k] = getattr(self, k)
|
|
if internal_attributes:
|
|
# These attributes are only useful for the rpc serialization
|
|
d['name'] = self.name[0]
|
|
d['aliases'] = self.name[1:]
|
|
d['label'] = self.label
|
|
return d
|
|
|
|
def updateFromDict(self, data):
|
|
self._state = data['state']
|
|
keys = []
|
|
for k, v in data.items():
|
|
if k == 'state':
|
|
continue
|
|
keys.append(k)
|
|
setattr(self, k, v)
|
|
self._keys = keys
|
|
|
|
|
|
class Group(ConfigObject):
|
|
"""A logical group of nodes for use by a job.
|
|
|
|
A Group is a named set of node names that will be provided to
|
|
jobs in the inventory to describe logical units where some subset of tasks
|
|
run.
|
|
"""
|
|
|
|
def __init__(self, name, nodes):
|
|
super(Group, self).__init__()
|
|
self.name = name
|
|
self.nodes = nodes
|
|
|
|
def __repr__(self):
|
|
return '<Group %s %s>' % (self.name, str(self.nodes))
|
|
|
|
def __ne__(self, other):
|
|
return not self.__eq__(other)
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, Group):
|
|
return False
|
|
return (self.name == other.name and
|
|
self.nodes == other.nodes)
|
|
|
|
def toDict(self):
|
|
return {
|
|
'name': self.name,
|
|
'nodes': self.nodes
|
|
}
|
|
|
|
|
|
class NodeSet(ConfigObject):
|
|
"""A set of nodes.
|
|
|
|
In configuration, NodeSets are attributes of Jobs indicating that
|
|
a Job requires nodes matching this description.
|
|
|
|
They may appear as top-level configuration objects and be named,
|
|
or they may appears anonymously in in-line job definitions.
|
|
"""
|
|
|
|
def __init__(self, name=None):
|
|
super(NodeSet, self).__init__()
|
|
self.name = name or ''
|
|
self.nodes = OrderedDict()
|
|
self.groups = OrderedDict()
|
|
|
|
def __ne__(self, other):
|
|
return not self.__eq__(other)
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, NodeSet):
|
|
return False
|
|
return (self.name == other.name and
|
|
self.nodes == other.nodes)
|
|
|
|
def toDict(self):
|
|
d = {}
|
|
d['name'] = self.name
|
|
d['nodes'] = []
|
|
for node in self.nodes.values():
|
|
d['nodes'].append(node.toDict(internal_attributes=True))
|
|
d['groups'] = []
|
|
for group in self.groups.values():
|
|
d['groups'].append(group.toDict())
|
|
return d
|
|
|
|
def copy(self):
|
|
n = NodeSet(self.name)
|
|
for name, node in self.nodes.items():
|
|
n.addNode(Node(node.name, node.label))
|
|
for name, group in self.groups.items():
|
|
n.addGroup(Group(group.name, group.nodes[:]))
|
|
return n
|
|
|
|
def addNode(self, node):
|
|
for name in node.name:
|
|
if name in self.nodes:
|
|
raise Exception("Duplicate node in %s" % (self,))
|
|
self.nodes[tuple(node.name)] = node
|
|
|
|
def getNodes(self):
|
|
return list(self.nodes.values())
|
|
|
|
def addGroup(self, group):
|
|
if group.name in self.groups:
|
|
raise Exception("Duplicate group in %s" % (self,))
|
|
self.groups[group.name] = group
|
|
|
|
def getGroups(self):
|
|
return list(self.groups.values())
|
|
|
|
def __repr__(self):
|
|
if self.name:
|
|
name = self.name + ' '
|
|
else:
|
|
name = ''
|
|
return '<NodeSet %s%s>' % (name, list(self.nodes.values()))
|
|
|
|
def __len__(self):
|
|
return len(self.nodes)
|
|
|
|
|
|
class NodeRequest(object):
|
|
"""A request for a set of nodes."""
|
|
|
|
def __init__(self, requestor, build_set, job, nodeset, relative_priority,
|
|
event=None):
|
|
self.requestor = requestor
|
|
self.build_set = build_set
|
|
self.job = job
|
|
self.nodeset = nodeset
|
|
self._state = STATE_REQUESTED
|
|
self.requested_time = time.time()
|
|
self.state_time = time.time()
|
|
self.created_time = None
|
|
self.stat = None
|
|
self.uid = uuid4().hex
|
|
self.relative_priority = relative_priority
|
|
self.provider = self._getPausedParentProvider()
|
|
self.id = None
|
|
self._zk_data = {} # Data that we read back from ZK
|
|
if event is not None:
|
|
self.event_id = event.zuul_event_id
|
|
else:
|
|
self.event_id = None
|
|
# Zuul internal flags (not stored in ZK so they are not
|
|
# overwritten).
|
|
self.failed = False
|
|
self.canceled = False
|
|
|
|
def _getPausedParent(self):
|
|
if self.build_set:
|
|
job_graph = self.build_set.item.job_graph
|
|
if job_graph:
|
|
for parent in job_graph.getParentJobsRecursively(
|
|
self.job.name):
|
|
build = self.build_set.getBuild(parent.name)
|
|
if build.paused:
|
|
return build
|
|
return None
|
|
|
|
def _getPausedParentProvider(self):
|
|
build = self._getPausedParent()
|
|
if build:
|
|
nodeset = self.build_set.getJobNodeSet(build.job.name)
|
|
if nodeset and nodeset.nodes:
|
|
return list(nodeset.nodes.values())[0].provider
|
|
return None
|
|
|
|
@property
|
|
def priority(self):
|
|
precedence_adjustment = 0
|
|
if self.build_set:
|
|
precedence = self.build_set.item.pipeline.precedence
|
|
if self._getPausedParent():
|
|
precedence_adjustment = -1
|
|
else:
|
|
precedence = PRECEDENCE_NORMAL
|
|
initial_precedence = PRIORITY_MAP[precedence]
|
|
return max(0, initial_precedence + precedence_adjustment)
|
|
|
|
@property
|
|
def fulfilled(self):
|
|
return (self._state == STATE_FULFILLED) and not self.failed
|
|
|
|
@property
|
|
def state(self):
|
|
return self._state
|
|
|
|
@state.setter
|
|
def state(self, value):
|
|
if value not in REQUEST_STATES:
|
|
raise TypeError("'%s' is not a valid state" % value)
|
|
self._state = value
|
|
self.state_time = time.time()
|
|
|
|
def __repr__(self):
|
|
return '<NodeRequest %s %s>' % (self.id, self.nodeset)
|
|
|
|
def toDict(self):
|
|
# Start with any previously read data
|
|
d = self._zk_data.copy()
|
|
nodes = [n.label for n in self.nodeset.getNodes()]
|
|
# These are immutable once set
|
|
d.setdefault('node_types', nodes)
|
|
d.setdefault('requestor', self.requestor)
|
|
d.setdefault('created_time', self.created_time)
|
|
d.setdefault('provider', self.provider)
|
|
# We might change these
|
|
d['state'] = self.state
|
|
d['state_time'] = self.state_time
|
|
d['relative_priority'] = self.relative_priority
|
|
d['event_id'] = self.event_id
|
|
return d
|
|
|
|
def updateFromDict(self, data):
|
|
self._zk_data = data
|
|
self._state = data['state']
|
|
self.state_time = data['state_time']
|
|
self.relative_priority = data.get('relative_priority', 0)
|
|
|
|
|
|
class Secret(ConfigObject):
|
|
"""A collection of private data.
|
|
|
|
In configuration, Secrets are collections of private data in
|
|
key-value pair format. They are defined as top-level
|
|
configuration objects and then referenced by Jobs.
|
|
|
|
"""
|
|
|
|
def __init__(self, name, source_context):
|
|
super(Secret, self).__init__()
|
|
self.name = name
|
|
self.source_context = source_context
|
|
# The secret data may or may not be encrypted. This attribute
|
|
# is named 'secret_data' to make it easy to search for and
|
|
# spot where it is directly used.
|
|
self.secret_data = {}
|
|
|
|
def __ne__(self, other):
|
|
return not self.__eq__(other)
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, Secret):
|
|
return False
|
|
return (self.name == other.name and
|
|
self.source_context == other.source_context and
|
|
self.secret_data == other.secret_data)
|
|
|
|
def areDataEqual(self, other):
|
|
return (self.secret_data == other.secret_data)
|
|
|
|
def __repr__(self):
|
|
return '<Secret %s>' % (self.name,)
|
|
|
|
def _decrypt(self, private_key, secret_data):
|
|
# recursive function to decrypt data
|
|
if hasattr(secret_data, 'decrypt'):
|
|
return secret_data.decrypt(private_key)
|
|
|
|
if isinstance(secret_data, (dict, types.MappingProxyType)):
|
|
decrypted_secret_data = {}
|
|
for k, v in secret_data.items():
|
|
decrypted_secret_data[k] = self._decrypt(private_key, v)
|
|
return decrypted_secret_data
|
|
|
|
if isinstance(secret_data, (list, tuple)):
|
|
decrypted_secret_data = []
|
|
for v in secret_data:
|
|
decrypted_secret_data.append(self._decrypt(private_key, v))
|
|
return decrypted_secret_data
|
|
|
|
return secret_data
|
|
|
|
def decrypt(self, private_key):
|
|
"""Return a copy of this secret with any encrypted data decrypted.
|
|
Note that the original remains encrypted."""
|
|
|
|
r = Secret(self.name, self.source_context)
|
|
r.secret_data = self._decrypt(private_key, self.secret_data)
|
|
return r
|
|
|
|
|
|
class SecretUse(ConfigObject):
|
|
"""A use of a secret in a Job"""
|
|
|
|
def __init__(self, name, alias):
|
|
super(SecretUse, self).__init__()
|
|
self.name = name
|
|
self.alias = alias
|
|
self.pass_to_parent = False
|
|
|
|
|
|
class ProjectContext(ConfigObject):
|
|
|
|
def __init__(self, project):
|
|
super().__init__()
|
|
self.project = project
|
|
self.branch = None
|
|
self.path = None
|
|
|
|
def __str__(self):
|
|
return self.project.name
|
|
|
|
def toDict(self):
|
|
return dict(
|
|
project=self.project.name,
|
|
)
|
|
|
|
|
|
class SourceContext(ConfigObject):
|
|
"""A reference to the branch of a project in configuration.
|
|
|
|
Jobs and playbooks reference this to keep track of where they
|
|
originate."""
|
|
|
|
def __init__(self, project, branch, path, trusted):
|
|
super(SourceContext, self).__init__()
|
|
self.project = project
|
|
self.branch = branch
|
|
self.path = path
|
|
self.trusted = trusted
|
|
self.implied_branch_matchers = None
|
|
self.implied_branches = None
|
|
|
|
def __str__(self):
|
|
return '%s/%s@%s' % (self.project, self.path, self.branch)
|
|
|
|
def __repr__(self):
|
|
return '<SourceContext %s trusted:%s>' % (str(self),
|
|
self.trusted)
|
|
|
|
def __deepcopy__(self, memo):
|
|
return self.copy()
|
|
|
|
def copy(self):
|
|
return self.__class__(self.project, self.branch, self.path,
|
|
self.trusted)
|
|
|
|
def isSameProject(self, other):
|
|
if not isinstance(other, SourceContext):
|
|
return False
|
|
return (self.project == other.project and
|
|
self.trusted == other.trusted)
|
|
|
|
def __ne__(self, other):
|
|
return not self.__eq__(other)
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, SourceContext):
|
|
return False
|
|
return (self.project == other.project and
|
|
self.branch == other.branch and
|
|
self.path == other.path and
|
|
self.trusted == other.trusted)
|
|
|
|
def toDict(self):
|
|
return dict(
|
|
project=self.project.name,
|
|
branch=self.branch,
|
|
path=self.path,
|
|
)
|
|
|
|
|
|
class PlaybookContext(ConfigObject):
|
|
"""A reference to a playbook in the context of a project.
|
|
|
|
Jobs refer to objects of this class for their main, pre, and post
|
|
playbooks so that we can keep track of which repos and security
|
|
contexts are needed in order to run them.
|
|
|
|
We also keep a list of roles so that playbooks only run with the
|
|
roles which were defined at the point the playbook was defined.
|
|
|
|
"""
|
|
|
|
def __init__(self, source_context, path, roles, secrets):
|
|
super(PlaybookContext, self).__init__()
|
|
self.source_context = source_context
|
|
self.path = path
|
|
self.roles = roles
|
|
self.secrets = secrets
|
|
self.decrypted_secrets = ()
|
|
|
|
def __repr__(self):
|
|
return '<PlaybookContext %s %s>' % (self.source_context,
|
|
self.path)
|
|
|
|
def __ne__(self, other):
|
|
return not self.__eq__(other)
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, PlaybookContext):
|
|
return False
|
|
return (self.source_context == other.source_context and
|
|
self.path == other.path and
|
|
self.roles == other.roles and
|
|
self.secrets == other.secrets)
|
|
|
|
def copy(self):
|
|
r = PlaybookContext(self.source_context,
|
|
self.path,
|
|
self.roles,
|
|
self.secrets)
|
|
return r
|
|
|
|
def validateReferences(self, layout):
|
|
# Verify that references to other objects in the layout are
|
|
# valid.
|
|
for secret_use in self.secrets:
|
|
secret = layout.secrets.get(secret_use.name)
|
|
if secret is None:
|
|
raise Exception(
|
|
'The secret "{name}" was not found.'.format(
|
|
name=secret_use.name))
|
|
if secret_use.alias == 'zuul' or secret_use.alias == 'nodepool':
|
|
raise Exception('Secrets named "zuul" or "nodepool" '
|
|
'are not allowed.')
|
|
if not secret.source_context.isSameProject(self.source_context):
|
|
raise Exception(
|
|
"Unable to use secret {name}. Secrets must be "
|
|
"defined in the same project in which they "
|
|
"are used".format(
|
|
name=secret_use.name))
|
|
# Decrypt a copy of the secret to verify it can be done
|
|
secret.decrypt(self.source_context.project.private_secrets_key)
|
|
|
|
def freezeSecrets(self, layout):
|
|
secrets = []
|
|
for secret_use in self.secrets:
|
|
secret = layout.secrets.get(secret_use.name)
|
|
decrypted_secret = secret.decrypt(
|
|
self.source_context.project.private_secrets_key)
|
|
decrypted_secret.name = secret_use.alias
|
|
secrets.append(decrypted_secret)
|
|
self.decrypted_secrets = tuple(secrets)
|
|
|
|
def addSecrets(self, decrypted_secrets):
|
|
current_names = set([s.name for s in self.decrypted_secrets])
|
|
new_secrets = [s for s in decrypted_secrets
|
|
if s.name not in current_names]
|
|
self.decrypted_secrets = self.decrypted_secrets + tuple(new_secrets)
|
|
|
|
def toDict(self, redact_secrets=True):
|
|
# Render to a dict to use in passing json to the executor
|
|
secrets = {}
|
|
for secret in self.decrypted_secrets:
|
|
if redact_secrets:
|
|
secrets[secret.name] = 'REDACTED'
|
|
else:
|
|
secrets[secret.name] = secret.secret_data
|
|
return dict(
|
|
connection=self.source_context.project.connection_name,
|
|
project=self.source_context.project.name,
|
|
branch=self.source_context.branch,
|
|
trusted=self.source_context.trusted,
|
|
roles=[r.toDict() for r in self.roles],
|
|
secrets=secrets,
|
|
path=self.path)
|
|
|
|
def toSchemaDict(self):
|
|
# Render to a dict to use in REST api
|
|
d = {
|
|
'path': self.path,
|
|
'roles': list(map(lambda x: x.toDict(), self.roles)),
|
|
'secrets': [{'name': secret.name, 'alias': secret.alias}
|
|
for secret in self.secrets],
|
|
}
|
|
if self.source_context:
|
|
d['source_context'] = self.source_context.toDict()
|
|
else:
|
|
d['source_context'] = None
|
|
return d
|
|
|
|
|
|
class Role(ConfigObject, metaclass=abc.ABCMeta):
|
|
"""A reference to an ansible role."""
|
|
|
|
def __init__(self, target_name):
|
|
super(Role, self).__init__()
|
|
self.target_name = target_name
|
|
|
|
@abc.abstractmethod
|
|
def __repr__(self):
|
|
pass
|
|
|
|
def __ne__(self, other):
|
|
return not self.__eq__(other)
|
|
|
|
@abc.abstractmethod
|
|
def __eq__(self, other):
|
|
if not isinstance(other, Role):
|
|
return False
|
|
return (self.target_name == other.target_name)
|
|
|
|
@abc.abstractmethod
|
|
def toDict(self):
|
|
# Render to a dict to use in passing json to the executor
|
|
return dict(target_name=self.target_name)
|
|
|
|
|
|
class ZuulRole(Role):
|
|
"""A reference to an ansible role in a Zuul project."""
|
|
|
|
def __init__(self, target_name, project_canonical_name, implicit=False):
|
|
super(ZuulRole, self).__init__(target_name)
|
|
self.project_canonical_name = project_canonical_name
|
|
self.implicit = implicit
|
|
|
|
def __repr__(self):
|
|
return '<ZuulRole %s %s>' % (self.project_canonical_name,
|
|
self.target_name)
|
|
|
|
__hash__ = object.__hash__
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, ZuulRole):
|
|
return False
|
|
# Implicit is not consulted for equality so that we can handle
|
|
# implicit to explicit conversions.
|
|
return (super(ZuulRole, self).__eq__(other) and
|
|
self.project_canonical_name == other.project_canonical_name)
|
|
|
|
def toDict(self):
|
|
# Render to a dict to use in passing json to the executor
|
|
d = super(ZuulRole, self).toDict()
|
|
d['type'] = 'zuul'
|
|
d['project_canonical_name'] = self.project_canonical_name
|
|
d['implicit'] = self.implicit
|
|
return d
|
|
|
|
|
|
class Job(ConfigObject):
|
|
|
|
"""A Job represents the defintion of actions to perform.
|
|
|
|
A Job is an abstract configuration concept. It describes what,
|
|
where, and under what circumstances something should be run
|
|
(contrast this with Build which is a concrete single execution of
|
|
a Job).
|
|
|
|
NB: Do not modify attributes of this class, set them directly
|
|
(e.g., "job.run = ..." rather than "job.run.append(...)").
|
|
"""
|
|
|
|
BASE_JOB_MARKER = object()
|
|
|
|
def __init__(self, name):
|
|
super(Job, self).__init__()
|
|
# These attributes may override even the final form of a job
|
|
# in the context of a project-pipeline. They can not affect
|
|
# the execution of the job, but only whether the job is run
|
|
# and how it is reported.
|
|
self.context_attributes = dict(
|
|
voting=True,
|
|
hold_following_changes=False,
|
|
failure_message=None,
|
|
success_message=None,
|
|
failure_url=None,
|
|
success_url=None,
|
|
branch_matcher=None,
|
|
_branches=(),
|
|
file_matcher=None,
|
|
_files=(),
|
|
irrelevant_file_matcher=None, # skip-if
|
|
_irrelevant_files=(),
|
|
match_on_config_updates=True,
|
|
tags=frozenset(),
|
|
provides=frozenset(),
|
|
requires=frozenset(),
|
|
dependencies=frozenset(),
|
|
ignore_allowed_projects=None, # internal, but inherited
|
|
# in the usual manner
|
|
)
|
|
|
|
# These attributes affect how the job is actually run and more
|
|
# care must be taken when overriding them. If a job is
|
|
# declared "final", these may not be overridden in a
|
|
# project-pipeline.
|
|
self.execution_attributes = dict(
|
|
parent=None,
|
|
timeout=None,
|
|
post_timeout=None,
|
|
variables={},
|
|
extra_variables={},
|
|
host_variables={},
|
|
group_variables={},
|
|
nodeset=NodeSet(),
|
|
workspace=None,
|
|
pre_run=(),
|
|
post_run=(),
|
|
cleanup_run=(),
|
|
run=(),
|
|
ansible_version=None,
|
|
semaphore=None,
|
|
attempts=3,
|
|
final=False,
|
|
abstract=False,
|
|
intermediate=False,
|
|
protected=None,
|
|
roles=(),
|
|
required_projects={},
|
|
allowed_projects=None,
|
|
override_branch=None,
|
|
override_checkout=None,
|
|
post_review=None,
|
|
)
|
|
|
|
# These are generally internal attributes which are not
|
|
# accessible via configuration.
|
|
self.other_attributes = dict(
|
|
name=None,
|
|
source_context=None,
|
|
start_mark=None,
|
|
inheritance_path=(),
|
|
parent_data=None,
|
|
artifact_data=None,
|
|
description=None,
|
|
variant_description=None,
|
|
protected_origin=None,
|
|
secrets=(), # secrets aren't inheritable
|
|
queued=False,
|
|
waiting_status=None, # Text description of why its waiting
|
|
)
|
|
|
|
self.attributes = {}
|
|
self.attributes.update(self.context_attributes)
|
|
self.attributes.update(self.execution_attributes)
|
|
self.attributes.update(self.other_attributes)
|
|
|
|
self.name = name
|
|
|
|
@property
|
|
def combined_variables(self):
|
|
"""
|
|
Combines the data that has been returned by parent jobs with the
|
|
job variables where job variables have priority over parent data.
|
|
"""
|
|
return Job._deepUpdate(self.parent_data or {}, self.variables)
|
|
|
|
def toDict(self, tenant):
|
|
'''
|
|
Convert a Job object's attributes to a dictionary.
|
|
'''
|
|
d = {}
|
|
d['name'] = self.name
|
|
d['branches'] = self._branches
|
|
d['override_checkout'] = self.override_checkout
|
|
d['files'] = self._files
|
|
d['irrelevant_files'] = self._irrelevant_files
|
|
d['variant_description'] = self.variant_description
|
|
if self.source_context:
|
|
d['source_context'] = self.source_context.toDict()
|
|
else:
|
|
d['source_context'] = None
|
|
d['description'] = self.description
|
|
d['required_projects'] = []
|
|
for project in self.required_projects.values():
|
|
d['required_projects'].append(project.toDict())
|
|
if self.semaphore:
|
|
# For now just leave the semaphore name here until we really need
|
|
# more information in zuul-web about this
|
|
d['semaphore'] = self.semaphore.name
|
|
else:
|
|
d['semaphore'] = None
|
|
d['variables'] = self.variables
|
|
d['extra_variables'] = self.extra_variables
|
|
d['host_variables'] = self.host_variables
|
|
d['group_variables'] = self.group_variables
|
|
d['final'] = self.final
|
|
d['abstract'] = self.abstract
|
|
d['intermediate'] = self.intermediate
|
|
d['protected'] = self.protected
|
|
d['voting'] = self.voting
|
|
d['timeout'] = self.timeout
|
|
d['tags'] = list(self.tags)
|
|
d['provides'] = list(self.provides)
|
|
d['requires'] = list(self.requires)
|
|
d['dependencies'] = list(map(lambda x: x.toDict(), self.dependencies))
|
|
d['attempts'] = self.attempts
|
|
d['roles'] = list(map(lambda x: x.toDict(), self.roles))
|
|
d['run'] = list(map(lambda x: x.toSchemaDict(), self.run))
|
|
d['pre_run'] = list(map(lambda x: x.toSchemaDict(), self.pre_run))
|
|
d['post_run'] = list(map(lambda x: x.toSchemaDict(), self.post_run))
|
|
d['cleanup_run'] = list(map(lambda x: x.toSchemaDict(),
|
|
self.cleanup_run))
|
|
d['post_review'] = self.post_review
|
|
d['match_on_config_updates'] = self.match_on_config_updates
|
|
if self.isBase():
|
|
d['parent'] = None
|
|
elif self.parent:
|
|
d['parent'] = self.parent
|
|
else:
|
|
d['parent'] = tenant.default_base_job
|
|
if isinstance(self.nodeset, str):
|
|
ns = tenant.layout.nodesets.get(self.nodeset)
|
|
else:
|
|
ns = self.nodeset
|
|
if ns:
|
|
d['nodeset'] = ns.toDict()
|
|
if self.ansible_version:
|
|
d['ansible_version'] = self.ansible_version
|
|
else:
|
|
d['ansible_version'] = None
|
|
return d
|
|
|
|
def __ne__(self, other):
|
|
return not self.__eq__(other)
|
|
|
|
def __eq__(self, other):
|
|
# Compare the name and all inheritable attributes to determine
|
|
# whether two jobs with the same name are identically
|
|
# configured. Useful upon reconfiguration.
|
|
if not isinstance(other, Job):
|
|
return False
|
|
if self.name != other.name:
|
|
return False
|
|
for k, v in self.attributes.items():
|
|
if getattr(self, k) != getattr(other, k):
|
|
return False
|
|
return True
|
|
|
|
__hash__ = object.__hash__
|
|
|
|
def __str__(self):
|
|
return self.name
|
|
|
|
def __repr__(self):
|
|
ln = 0
|
|
if self.start_mark:
|
|
ln = self.start_mark.line + 1
|
|
return '<Job %s branches: %s source: %s#%s>' % (
|
|
self.name,
|
|
self.branch_matcher,
|
|
self.source_context,
|
|
ln)
|
|
|
|
def __getattr__(self, name):
|
|
v = self.__dict__.get(name)
|
|
if v is None:
|
|
return self.attributes[name]
|
|
return v
|
|
|
|
def _get(self, name):
|
|
return self.__dict__.get(name)
|
|
|
|
def getSafeAttributes(self):
|
|
return Attributes(name=self.name)
|
|
|
|
def isBase(self):
|
|
return self.parent is self.BASE_JOB_MARKER
|
|
|
|
def setBase(self, layout):
|
|
self.inheritance_path = self.inheritance_path + (repr(self),)
|
|
if self._get('run') is not None:
|
|
self.run = self.freezePlaybooks(self.run, layout)
|
|
if self._get('pre_run') is not None:
|
|
self.pre_run = self.freezePlaybooks(self.pre_run, layout)
|
|
if self._get('post_run') is not None:
|
|
self.post_run = self.freezePlaybooks(self.post_run, layout)
|
|
if self._get('cleanup_run') is not None:
|
|
self.cleanup_run = self.freezePlaybooks(self.cleanup_run, layout)
|
|
|
|
def getNodeSet(self, layout):
|
|
if isinstance(self.nodeset, str):
|
|
# This references an existing named nodeset in the layout.
|
|
ns = layout.nodesets.get(self.nodeset)
|
|
if ns is None:
|
|
raise Exception(
|
|
'The nodeset "{nodeset}" was not found.'.format(
|
|
nodeset=self.nodeset))
|
|
return ns
|
|
return self.nodeset
|
|
|
|
def validateReferences(self, layout):
|
|
# Verify that references to other objects in the layout are
|
|
# valid.
|
|
if not self.isBase() and self.parent:
|
|
layout.getJob(self.parent)
|
|
|
|
ns = self.getNodeSet(layout)
|
|
if layout.tenant.max_nodes_per_job != -1 and \
|
|
len(ns) > layout.tenant.max_nodes_per_job:
|
|
raise Exception(
|
|
'The job "{job}" exceeds tenant '
|
|
'max-nodes-per-job {maxnodes}.'.format(
|
|
job=self.name,
|
|
maxnodes=layout.tenant.max_nodes_per_job))
|
|
|
|
for pb in self.pre_run + self.run + self.post_run + self.cleanup_run:
|
|
pb.validateReferences(layout)
|
|
|
|
def addRoles(self, roles):
|
|
newroles = []
|
|
# Start with a copy of the existing roles, but if any of them
|
|
# are implicit roles which are identified as explicit in the
|
|
# new roles list, replace them with the explicit version.
|
|
changed = False
|
|
for existing_role in self.roles:
|
|
if existing_role in roles:
|
|
new_role = roles[roles.index(existing_role)]
|
|
else:
|
|
new_role = None
|
|
if (new_role and
|
|
isinstance(new_role, ZuulRole) and
|
|
isinstance(existing_role, ZuulRole) and
|
|
existing_role.implicit and not new_role.implicit):
|
|
newroles.append(new_role)
|
|
changed = True
|
|
else:
|
|
newroles.append(existing_role)
|
|
# Now add the new roles.
|
|
for role in reversed(roles):
|
|
if role not in newroles:
|
|
newroles.insert(0, role)
|
|
changed = True
|
|
if changed:
|
|
self.roles = tuple(newroles)
|
|
|
|
def getBranches(self):
|
|
# Return the raw branch list that match this job
|
|
return self._branches
|
|
|
|
def setBranchMatcher(self, branches, implied=False):
|
|
# Set the branch matcher to match any of the supplied branches
|
|
self._branches = branches
|
|
matchers = []
|
|
if implied:
|
|
matcher_class = change_matcher.ImpliedBranchMatcher
|
|
else:
|
|
matcher_class = change_matcher.BranchMatcher
|
|
for branch in branches:
|
|
matchers.append(matcher_class(branch))
|
|
self.branch_matcher = change_matcher.MatchAny(matchers)
|
|
|
|
def setFileMatcher(self, files):
|
|
# Set the file matcher to match any of the change files
|
|
self._files = files
|
|
matchers = []
|
|
for fn in files:
|
|
matchers.append(change_matcher.FileMatcher(fn))
|
|
self.file_matcher = change_matcher.MatchAnyFiles(matchers)
|
|
|
|
def setIrrelevantFileMatcher(self, irrelevant_files):
|
|
# Set the irrelevant file matcher to match any of the change files
|
|
self._irrelevant_files = irrelevant_files
|
|
matchers = []
|
|
for fn in irrelevant_files:
|
|
matchers.append(change_matcher.FileMatcher(fn))
|
|
self.irrelevant_file_matcher = change_matcher.MatchAllFiles(matchers)
|
|
|
|
def updateVariables(self, other_vars, other_extra_vars, other_host_vars,
|
|
other_group_vars):
|
|
if other_vars is not None:
|
|
self.variables = Job._deepUpdate(self.variables, other_vars)
|
|
if other_extra_vars is not None:
|
|
self.extra_variables = Job._deepUpdate(
|
|
self.extra_variables, other_extra_vars)
|
|
if other_host_vars is not None:
|
|
self.host_variables = Job._deepUpdate(
|
|
self.host_variables, other_host_vars)
|
|
if other_group_vars is not None:
|
|
self.group_variables = Job._deepUpdate(
|
|
self.group_variables, other_group_vars)
|
|
|
|
def updateParentData(self, other_build):
|
|
# Update variables, but give the new values priority. If more than one
|
|
# parent job returns the same variable, the value from the later job
|
|
# in the job graph will take precedence.
|
|
other_vars = other_build.result_data
|
|
v = self.parent_data or {}
|
|
v = Job._deepUpdate(v, other_vars)
|
|
# To avoid running afoul of checks that jobs don't set zuul
|
|
# variables, remove them from parent data here.
|
|
if 'zuul' in v:
|
|
del v['zuul']
|
|
self.parent_data = v
|
|
|
|
artifact_data = self.artifact_data or []
|
|
artifacts = get_artifacts_from_result_data(other_vars)
|
|
for a in artifacts:
|
|
# Change here may be any ref type (tag, change, etc)
|
|
ref = other_build.build_set.item.change
|
|
a.update({'project': ref.project.name,
|
|
'job': other_build.job.name})
|
|
# Change is a Branch
|
|
if hasattr(ref, 'branch'):
|
|
a.update({'branch': ref.branch})
|
|
if hasattr(ref, 'number') and hasattr(ref, 'patchset'):
|
|
a.update({'change': str(ref.number),
|
|
'patchset': ref.patchset})
|
|
# Otherwise we are ref type
|
|
else:
|
|
a.update({'ref': ref.ref,
|
|
'oldrev': ref.oldrev,
|
|
'newrev': ref.newrev})
|
|
if hasattr(ref, 'tag'):
|
|
a.update({'tag': ref.tag})
|
|
if a not in artifact_data:
|
|
artifact_data.append(a)
|
|
if artifact_data:
|
|
self.updateArtifactData(artifact_data)
|
|
|
|
def updateArtifactData(self, artifact_data):
|
|
self.artifact_data = artifact_data
|
|
|
|
def updateProjectVariables(self, project_vars):
|
|
# Merge project/template variables directly into the job
|
|
# variables. Job variables override project variables.
|
|
self.variables = Job._deepUpdate(project_vars, self.variables)
|
|
|
|
def updateProjects(self, other_projects):
|
|
required_projects = self.required_projects.copy()
|
|
required_projects.update(other_projects)
|
|
self.required_projects = required_projects
|
|
|
|
@staticmethod
|
|
def _deepUpdate(a, b):
|
|
# Merge nested dictionaries if possible, otherwise, overwrite
|
|
# the value in 'a' with the value in 'b'.
|
|
|
|
ret = {}
|
|
for k, av in a.items():
|
|
if k not in b:
|
|
ret[k] = av
|
|
for k, bv in b.items():
|
|
av = a.get(k)
|
|
if (isinstance(av, (dict, types.MappingProxyType)) and
|
|
isinstance(bv, (dict, types.MappingProxyType))):
|
|
ret[k] = Job._deepUpdate(av, bv)
|
|
else:
|
|
ret[k] = bv
|
|
return ret
|
|
|
|
def copy(self):
|
|
job = Job(self.name)
|
|
for k in self.attributes:
|
|
v = self._get(k)
|
|
if v is not None:
|
|
# If this is a config object, it's frozen, so it's
|
|
# safe to shallow copy.
|
|
setattr(job, k, v)
|
|
return job
|
|
|
|
def freezePlaybooks(self, pblist, layout):
|
|
"""Take a list of playbooks, and return a copy of it updated with this
|
|
job's roles.
|
|
|
|
"""
|
|
|
|
ret = []
|
|
for old_pb in pblist:
|
|
pb = old_pb.copy()
|
|
pb.roles = self.roles
|
|
pb.freezeSecrets(layout)
|
|
ret.append(pb)
|
|
return tuple(ret)
|
|
|
|
def applyVariant(self, other, layout):
|
|
"""Copy the attributes which have been set on the other job to this
|
|
job."""
|
|
if not isinstance(other, Job):
|
|
raise Exception("Job unable to inherit from %s" % (other,))
|
|
|
|
for k in self.execution_attributes:
|
|
if (other._get(k) is not None and
|
|
k not in set(['final', 'abstract', 'protected',
|
|
'intermediate'])):
|
|
if self.final:
|
|
raise Exception("Unable to modify final job %s attribute "
|
|
"%s=%s with variant %s" % (
|
|
repr(self), k, other._get(k),
|
|
repr(other)))
|
|
if self.protected_origin:
|
|
# this is a protected job, check origin of job definition
|
|
this_origin = self.protected_origin
|
|
other_origin = other.source_context.project.canonical_name
|
|
if this_origin != other_origin:
|
|
raise Exception("Job %s which is defined in %s is "
|
|
"protected and cannot be inherited "
|
|
"from other projects."
|
|
% (repr(self), this_origin))
|
|
if k not in set(['pre_run', 'run', 'post_run', 'cleanup_run',
|
|
'roles', 'variables', 'extra_variables',
|
|
'host_variables', 'group_variables',
|
|
'required_projects', 'allowed_projects']):
|
|
setattr(self, k, other._get(k))
|
|
|
|
# Don't set final above so that we don't trip an error halfway
|
|
# through assignment.
|
|
if other.final != self.attributes['final']:
|
|
self.final = other.final
|
|
|
|
# Abstract may not be reset by a variant, it may only be
|
|
# cleared by inheriting.
|
|
if other.name != self.name:
|
|
self.abstract = other.abstract
|
|
elif other.abstract:
|
|
self.abstract = True
|
|
|
|
# An intermediate job may only be inherited by an abstract
|
|
# job. Note intermediate jobs must be also be abstract, that
|
|
# has been enforced during config reading. Similar to
|
|
# abstract, it is cleared by inheriting.
|
|
if self.intermediate and not other.abstract:
|
|
raise Exception("Intermediate job %s may only inherit "
|
|
"to another abstract job" %
|
|
(repr(self)))
|
|
if other.name != self.name:
|
|
self.intermediate = other.intermediate
|
|
elif other.intermediate:
|
|
self.intermediate = True
|
|
|
|
# Protected may only be set to true
|
|
if other.protected is not None:
|
|
# don't allow to reset protected flag
|
|
if not other.protected and self.protected_origin:
|
|
raise Exception("Unable to reset protected attribute of job"
|
|
" %s by job %s" % (
|
|
repr(self), repr(other)))
|
|
if not self.protected_origin:
|
|
self.protected_origin = \
|
|
other.source_context.project.canonical_name
|
|
|
|
# We must update roles before any playbook contexts
|
|
if other._get('roles') is not None:
|
|
self.addRoles(other.roles)
|
|
|
|
# Freeze the nodeset
|
|
self.nodeset = self.getNodeSet(layout)
|
|
|
|
# Pass secrets to parents
|
|
secrets_for_parents = [s for s in other.secrets if s.pass_to_parent]
|
|
if secrets_for_parents:
|
|
decrypted_secrets = []
|
|
for secret_use in secrets_for_parents:
|
|
secret = layout.secrets.get(secret_use.name)
|
|
if secret is None:
|
|
raise Exception("Secret %s not found" % (secret_use.name,))
|
|
decrypted_secret = secret.decrypt(
|
|
other.source_context.project.private_secrets_key)
|
|
decrypted_secret.name = secret_use.alias
|
|
decrypted_secrets.append(decrypted_secret)
|
|
# Add the secrets to any existing playbooks. If any of
|
|
# them are in an untrusted project, then we've just given
|
|
# a secret to a playbook which can run in dynamic config,
|
|
# therefore it's no longer safe to run this job
|
|
# pre-review. The only way pass-to-parent can work with
|
|
# pre-review pipeline is if all playbooks are in the
|
|
# trusted context.
|
|
for pb in itertools.chain(
|
|
self.pre_run, self.run, self.post_run, self.cleanup_run):
|
|
pb.addSecrets(decrypted_secrets)
|
|
if not pb.source_context.trusted:
|
|
self.post_review = True
|
|
|
|
if other._get('run') is not None:
|
|
other_run = self.freezePlaybooks(other.run, layout)
|
|
self.run = other_run
|
|
if other._get('pre_run') is not None:
|
|
other_pre_run = self.freezePlaybooks(other.pre_run, layout)
|
|
self.pre_run = self.pre_run + other_pre_run
|
|
if other._get('post_run') is not None:
|
|
other_post_run = self.freezePlaybooks(other.post_run, layout)
|
|
self.post_run = other_post_run + self.post_run
|
|
if other._get('cleanup_run') is not None:
|
|
other_cleanup_run = self.freezePlaybooks(other.cleanup_run, layout)
|
|
self.cleanup_run = other_cleanup_run + self.cleanup_run
|
|
self.updateVariables(other.variables, other.extra_variables,
|
|
other.host_variables, other.group_variables)
|
|
if other._get('required_projects') is not None:
|
|
self.updateProjects(other.required_projects)
|
|
if (other._get('allowed_projects') is not None and
|
|
self._get('allowed_projects') is not None):
|
|
self.allowed_projects = frozenset(
|
|
self.allowed_projects.intersection(
|
|
other.allowed_projects))
|
|
elif other._get('allowed_projects') is not None:
|
|
self.allowed_projects = other.allowed_projects
|
|
|
|
for k in self.context_attributes:
|
|
if (other._get(k) is not None and
|
|
k not in set(['tags', 'requires', 'provides'])):
|
|
setattr(self, k, other._get(k))
|
|
|
|
for k in ('tags', 'requires', 'provides'):
|
|
if other._get(k) is not None:
|
|
setattr(self, k, getattr(self, k).union(other._get(k)))
|
|
|
|
self.inheritance_path = self.inheritance_path + (repr(other),)
|
|
|
|
def changeMatchesBranch(self, change, override_branch=None):
|
|
if override_branch is None:
|
|
branch_change = change
|
|
else:
|
|
# If an override branch is supplied, create a very basic
|
|
# change (a Ref) and set its branch to the override
|
|
# branch.
|
|
branch_change = Ref(change.project)
|
|
branch_change.ref = override_branch
|
|
|
|
if self.branch_matcher and not self.branch_matcher.matches(
|
|
branch_change):
|
|
return False
|
|
return True
|
|
|
|
def changeMatchesFiles(self, change):
|
|
if self.file_matcher and not self.file_matcher.matches(change):
|
|
return False
|
|
|
|
# NB: This is a negative match.
|
|
if (self.irrelevant_file_matcher and
|
|
self.irrelevant_file_matcher.matches(change)):
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
class JobProject(ConfigObject):
|
|
""" A reference to a project from a job. """
|
|
|
|
def __init__(self, project_name, override_branch=None,
|
|
override_checkout=None):
|
|
super(JobProject, self).__init__()
|
|
self.project_name = project_name
|
|
self.override_branch = override_branch
|
|
self.override_checkout = override_checkout
|
|
|
|
def toDict(self):
|
|
d = dict()
|
|
d['project_name'] = self.project_name
|
|
d['override_branch'] = self.override_branch
|
|
d['override_checkout'] = self.override_checkout
|
|
return d
|
|
|
|
|
|
class JobSemaphore(ConfigObject):
|
|
""" A reference to a semaphore from a job. """
|
|
|
|
def __init__(self, semaphore_name, resources_first=False):
|
|
super().__init__()
|
|
self.name = semaphore_name
|
|
self.resources_first = resources_first
|
|
|
|
def toDict(self):
|
|
d = dict()
|
|
d['name'] = self.name
|
|
d['resources_first'] = self.resources_first
|
|
return d
|
|
|
|
|
|
class JobList(ConfigObject):
|
|
""" A list of jobs in a project's pipeline. """
|
|
|
|
def __init__(self):
|
|
super(JobList, self).__init__()
|
|
self.jobs = OrderedDict() # job.name -> [job, ...]
|
|
|
|
def addJob(self, job):
|
|
if job.name in self.jobs:
|
|
self.jobs[job.name].append(job)
|
|
else:
|
|
self.jobs[job.name] = [job]
|
|
|
|
def inheritFrom(self, other):
|
|
for jobname, jobs in other.jobs.items():
|
|
joblist = self.jobs.setdefault(jobname, [])
|
|
for job in jobs:
|
|
if job not in joblist:
|
|
joblist.append(job)
|
|
|
|
|
|
class JobDependency(ConfigObject):
|
|
""" A reference to another job in the project-pipeline-config. """
|
|
def __init__(self, name, soft=False):
|
|
super(JobDependency, self).__init__()
|
|
self.name = name
|
|
self.soft = soft
|
|
|
|
def toDict(self):
|
|
return {'name': self.name,
|
|
'soft': self.soft}
|
|
|
|
|
|
class JobGraph(object):
|
|
""" A JobGraph represents the dependency graph between Job."""
|
|
|
|
def __init__(self):
|
|
self.jobs = OrderedDict() # job_name -> Job
|
|
# dependent_job_name -> dict(parent_job_name -> soft)
|
|
self._dependencies = {}
|
|
|
|
def __repr__(self):
|
|
return '<JobGraph %s>' % (self.jobs)
|
|
|
|
def addJob(self, job):
|
|
# A graph must be created after the job list is frozen,
|
|
# therefore we should only get one job with the same name.
|
|
if job.name in self.jobs:
|
|
raise Exception("Job %s already added" % (job.name,))
|
|
self.jobs[job.name] = job
|
|
# Append the dependency information
|
|
self._dependencies.setdefault(job.name, {})
|
|
try:
|
|
for dependency in job.dependencies:
|
|
# Make sure a circular dependency is never created
|
|
ancestor_jobs = self._getParentJobNamesRecursively(
|
|
dependency.name, soft=True)
|
|
ancestor_jobs.add(dependency.name)
|
|
if any((job.name == anc_job) for anc_job in ancestor_jobs):
|
|
raise Exception("Dependency cycle detected in job %s" %
|
|
(job.name,))
|
|
self._dependencies[job.name][dependency.name] = \
|
|
dependency.soft
|
|
except Exception:
|
|
del self.jobs[job.name]
|
|
del self._dependencies[job.name]
|
|
raise
|
|
|
|
def getJobs(self):
|
|
return list(self.jobs.values()) # Report in the order of layout cfg
|
|
|
|
def getDirectDependentJobs(self, parent_job, skip_soft=False):
|
|
ret = set()
|
|
for dependent_name, parents in self._dependencies.items():
|
|
part = parent_job in parents \
|
|
and (not skip_soft or not parents[parent_job])
|
|
if part:
|
|
ret.add(dependent_name)
|
|
return ret
|
|
|
|
def getDependentJobsRecursively(self, parent_job, skip_soft=False):
|
|
all_dependent_jobs = set()
|
|
jobs_to_iterate = set([parent_job])
|
|
while len(jobs_to_iterate) > 0:
|
|
current_job = jobs_to_iterate.pop()
|
|
current_dependent_jobs = self.getDirectDependentJobs(current_job,
|
|
skip_soft)
|
|
new_dependent_jobs = current_dependent_jobs - all_dependent_jobs
|
|
jobs_to_iterate |= new_dependent_jobs
|
|
all_dependent_jobs |= new_dependent_jobs
|
|
return [self.jobs[name] for name in all_dependent_jobs]
|
|
|
|
def getParentJobsRecursively(self, dependent_job, layout=None,
|
|
skip_soft=False):
|
|
return [self.jobs[name] for name in
|
|
self._getParentJobNamesRecursively(dependent_job,
|
|
layout=layout,
|
|
skip_soft=skip_soft)]
|
|
|
|
def _getParentJobNamesRecursively(self, dependent_job, soft=False,
|
|
layout=None, skip_soft=False):
|
|
all_parent_jobs = set()
|
|
jobs_to_iterate = set([(dependent_job, False)])
|
|
while len(jobs_to_iterate) > 0:
|
|
(current_job, current_soft) = jobs_to_iterate.pop()
|
|
current_parent_jobs = self._dependencies.get(current_job)
|
|
if skip_soft:
|
|
hard_parent_jobs = \
|
|
{d: s for d, s in current_parent_jobs.items() if not s}
|
|
current_parent_jobs = hard_parent_jobs
|
|
if current_parent_jobs is None:
|
|
if soft or current_soft:
|
|
if layout:
|
|
# If the caller supplied a layout, verify that
|
|
# the job exists to provide a helpful error
|
|
# message. Called for exception side effect:
|
|
layout.getJob(current_job)
|
|
current_parent_jobs = {}
|
|
else:
|
|
raise Exception("Job %s depends on %s which was not run." %
|
|
(dependent_job, current_job))
|
|
elif dependent_job != current_job:
|
|
all_parent_jobs.add(current_job)
|
|
new_parent_jobs = set(current_parent_jobs.keys()) - all_parent_jobs
|
|
for j in new_parent_jobs:
|
|
jobs_to_iterate.add((j, current_parent_jobs[j]))
|
|
return all_parent_jobs
|
|
|
|
|
|
class Build(object):
|
|
"""A Build is an instance of a single execution of a Job.
|
|
|
|
While a Job describes what to run, a Build describes an actual
|
|
execution of that Job. Each build is associated with exactly one
|
|
Job (related builds are grouped together in a BuildSet).
|
|
"""
|
|
|
|
def __init__(self, job, build_set, uuid, zuul_event_id=None):
|
|
self.job = job
|
|
self.build_set = build_set
|
|
self.uuid = uuid
|
|
self.url = None
|
|
self.result = None
|
|
self.result_data = {}
|
|
self.error_detail = None
|
|
self.execute_time = time.time()
|
|
self.start_time = None
|
|
self.end_time = None
|
|
self.estimated_time = None
|
|
self.canceled = False
|
|
self.paused = False
|
|
self.retry = False
|
|
self.held = False
|
|
self.parameters = {}
|
|
self.worker = Worker()
|
|
self.node_labels = []
|
|
self.node_name = None
|
|
self.nodeset = None
|
|
self.zuul_event_id = zuul_event_id
|
|
|
|
def __repr__(self):
|
|
return ('<Build %s of %s voting:%s on %s>' %
|
|
(self.uuid, self.job.name, self.job.voting, self.worker))
|
|
|
|
@property
|
|
def failed(self):
|
|
if self.result and self.result not in ['SUCCESS', 'SKIPPED']:
|
|
return True
|
|
return False
|
|
|
|
@property
|
|
def pipeline(self):
|
|
return self.build_set.item.pipeline
|
|
|
|
@property
|
|
def log_url(self):
|
|
log_url = self.result_data.get('zuul', {}).get('log_url')
|
|
if log_url and log_url[-1] != '/':
|
|
log_url = log_url + '/'
|
|
return log_url
|
|
|
|
def getSafeAttributes(self):
|
|
return Attributes(uuid=self.uuid,
|
|
result=self.result,
|
|
error_detail=self.error_detail,
|
|
result_data=self.result_data)
|
|
|
|
|
|
class Worker(object):
|
|
"""Information about the specific worker executing a Build."""
|
|
def __init__(self):
|
|
self.name = "Unknown"
|
|
self.hostname = None
|
|
self.log_port = None
|
|
|
|
def updateFromData(self, data):
|
|
"""Update worker information if contained in the WORK_DATA response."""
|
|
self.name = data.get('worker_name', self.name)
|
|
self.hostname = data.get('worker_hostname', self.hostname)
|
|
self.log_port = data.get('worker_log_port', self.log_port)
|
|
|
|
def __repr__(self):
|
|
return '<Worker %s>' % self.name
|
|
|
|
|
|
class RepoFiles(object):
|
|
"""RepoFiles holds config-file content for per-project job config.
|
|
|
|
When Zuul asks a merger to prepare a future multiple-repo state
|
|
and collect Zuul configuration files so that we can dynamically
|
|
load our configuration, this class provides cached access to that
|
|
data for use by the Change which updated the config files and any
|
|
changes that follow it in a ChangeQueue.
|
|
|
|
It is attached to a BuildSet since the content of Zuul
|
|
configuration files can change with each new BuildSet.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.connections = {}
|
|
|
|
def __repr__(self):
|
|
return '<RepoFiles %s>' % self.connections
|
|
|
|
def setFiles(self, items):
|
|
self.hostnames = {}
|
|
for item in items:
|
|
connection = self.connections.setdefault(
|
|
item['connection'], {})
|
|
project = connection.setdefault(item['project'], {})
|
|
branch = project.setdefault(item['branch'], {})
|
|
branch.update(item['files'])
|
|
|
|
def getFile(self, connection_name, project_name, branch, fn):
|
|
host = self.connections.get(connection_name, {})
|
|
return host.get(project_name, {}).get(branch, {}).get(fn)
|
|
|
|
|
|
class BuildSet(object):
|
|
"""A collection of Builds for one specific potential future repository
|
|
state.
|
|
|
|
When Zuul executes Builds for a change, it creates a Build to
|
|
represent each execution of each job and a BuildSet to keep track
|
|
of all the Builds running for that Change. When Zuul re-executes
|
|
Builds for a Change with a different configuration, all of the
|
|
running Builds in the BuildSet for that change are aborted, and a
|
|
new BuildSet is created to hold the Builds for the Jobs being
|
|
run with the new configuration.
|
|
|
|
A BuildSet also holds the UUID used to produce the Zuul Ref that
|
|
builders check out.
|
|
|
|
"""
|
|
# Merge states:
|
|
NEW = 1
|
|
PENDING = 2
|
|
COMPLETE = 3
|
|
|
|
states_map = {
|
|
1: 'NEW',
|
|
2: 'PENDING',
|
|
3: 'COMPLETE',
|
|
}
|
|
|
|
def __init__(self, item):
|
|
self.item = item
|
|
self.builds = {}
|
|
self.retry_builds = {}
|
|
self.result = None
|
|
self.uuid = None
|
|
self.commit = None
|
|
self.dependent_changes = None
|
|
self.merger_items = None
|
|
self.unable_to_merge = False
|
|
self.config_errors = [] # list of ConfigurationErrors
|
|
self.failing_reasons = []
|
|
self.debug_messages = []
|
|
self.warning_messages = []
|
|
self.merge_state = self.NEW
|
|
self.nodesets = {} # job -> nodeset
|
|
self.node_requests = {} # job -> reqs
|
|
self.files = RepoFiles()
|
|
self.repo_state = {}
|
|
self.tries = {}
|
|
if item.change.files is not None:
|
|
self.files_state = self.COMPLETE
|
|
else:
|
|
self.files_state = self.NEW
|
|
|
|
@property
|
|
def ref(self):
|
|
# NOTE(jamielennox): The concept of buildset ref is to be removed and a
|
|
# buildset UUID identifier available instead. Currently the ref is
|
|
# checked to see if the BuildSet has been configured.
|
|
return 'Z' + self.uuid if self.uuid else None
|
|
|
|
def __repr__(self):
|
|
return '<BuildSet item: %s #builds: %s merge state: %s>' % (
|
|
self.item,
|
|
len(self.builds),
|
|
self.getStateName(self.merge_state))
|
|
|
|
def setConfiguration(self):
|
|
# The change isn't enqueued until after it's created
|
|
# so we don't know what the other changes ahead will be
|
|
# until jobs start.
|
|
if not self.uuid:
|
|
self.uuid = uuid4().hex
|
|
if self.dependent_changes is None:
|
|
items = []
|
|
if self.item.bundle:
|
|
items.extend(reversed(self.item.bundle.items))
|
|
else:
|
|
items.append(self.item)
|
|
|
|
items.extend(i for i in self.item.items_ahead if i not in items)
|
|
items.reverse()
|
|
|
|
self.dependent_changes = [i.change.toDict() for i in items]
|
|
self.merger_items = [i.makeMergerItem() for i in items]
|
|
|
|
def getStateName(self, state_num):
|
|
return self.states_map.get(
|
|
state_num, 'UNKNOWN (%s)' % state_num)
|
|
|
|
def addBuild(self, build):
|
|
self.builds[build.job.name] = build
|
|
if build.job.name not in self.tries:
|
|
self.tries[build.job.name] = 1
|
|
|
|
def addRetryBuild(self, build):
|
|
self.retry_builds.setdefault(build.job.name, []).append(build)
|
|
|
|
def removeBuild(self, build):
|
|
if build.job.name not in self.builds:
|
|
return
|
|
self.tries[build.job.name] += 1
|
|
del self.builds[build.job.name]
|
|
|
|
def getBuild(self, job_name):
|
|
return self.builds.get(job_name)
|
|
|
|
def getBuilds(self):
|
|
keys = list(self.builds.keys())
|
|
keys.sort()
|
|
return [self.builds.get(x) for x in keys]
|
|
|
|
def getRetryBuildsForJob(self, job_name):
|
|
return self.retry_builds.get(job_name, [])
|
|
|
|
def getJobNodeSet(self, job_name: str) -> NodeSet:
|
|
# Return None if not provisioned; empty NodeSet if no nodes
|
|
# required
|
|
return self.nodesets.get(job_name)
|
|
|
|
def removeJobNodeSet(self, job_name: str):
|
|
if job_name not in self.nodesets:
|
|
raise Exception("No job nodeset for %s" % (job_name))
|
|
del self.nodesets[job_name]
|
|
|
|
def setJobNodeRequest(self, job_name: str, req: NodeRequest):
|
|
if job_name in self.node_requests:
|
|
raise Exception("Prior node request for %s" % (job_name))
|
|
self.node_requests[job_name] = req
|
|
|
|
def getJobNodeRequest(self, job_name: str) -> NodeRequest:
|
|
return self.node_requests.get(job_name)
|
|
|
|
def removeJobNodeRequest(self, job_name: str):
|
|
if job_name in self.node_requests:
|
|
del self.node_requests[job_name]
|
|
|
|
def jobNodeRequestComplete(self, job_name: str, nodeset: NodeSet):
|
|
if job_name in self.nodesets:
|
|
raise Exception("Prior node request for %s" % (job_name))
|
|
self.nodesets[job_name] = nodeset
|
|
del self.node_requests[job_name]
|
|
|
|
def getTries(self, job_name):
|
|
return self.tries.get(job_name, 0)
|
|
|
|
def getMergeMode(self):
|
|
# We may be called before this build set has a shadow layout
|
|
# (ie, we are called to perform the merge to create that
|
|
# layout). It's possible that the change we are merging will
|
|
# update the merge-mode for the project, but there's not much
|
|
# we can do about that here. Instead, do the best we can by
|
|
# using the nearest shadow layout to determine the merge mode,
|
|
# or if that fails, the current live layout, or if that fails,
|
|
# use the default: merge-resolve.
|
|
item = self.item
|
|
layout = None
|
|
while item:
|
|
layout = item.layout
|
|
if layout:
|
|
break
|
|
item = item.item_ahead
|
|
if not layout:
|
|
layout = self.item.pipeline.tenant.layout
|
|
if layout:
|
|
project = self.item.change.project
|
|
project_metadata = layout.getProjectMetadata(
|
|
project.canonical_name)
|
|
if project_metadata:
|
|
return project_metadata.merge_mode
|
|
return MERGER_MERGE_RESOLVE
|
|
|
|
def getSafeAttributes(self):
|
|
return Attributes(uuid=self.uuid)
|
|
|
|
|
|
class QueueItem(object):
|
|
|
|
"""Represents the position of a Change in a ChangeQueue.
|
|
|
|
All Changes are enqueued into ChangeQueue in a QueueItem. The QueueItem
|
|
holds the current `BuildSet` as well as all previous `BuildSets` that were
|
|
produced for this `QueueItem`.
|
|
"""
|
|
|
|
def __init__(self, queue, change, event):
|
|
log = logging.getLogger("zuul.QueueItem")
|
|
self.log = get_annotated_logger(log, event)
|
|
self.uuid = uuid4().hex
|
|
self.pipeline = queue.pipeline
|
|
self.queue = queue
|
|
self.change = change # a ref
|
|
self.dequeued_needing_change = False
|
|
self.current_build_set = BuildSet(self)
|
|
self.item_ahead = None
|
|
self.items_behind = []
|
|
self.enqueue_time = None
|
|
self.report_time = None
|
|
self.dequeue_time = None
|
|
self.reported = False
|
|
self.reported_enqueue = False
|
|
self.reported_start = False
|
|
self.quiet = False
|
|
self.active = False # Whether an item is within an active window
|
|
self.live = True # Whether an item is intended to be processed at all
|
|
self.layout = None
|
|
self.project_pipeline_config = None
|
|
self.job_graph = None
|
|
self._old_job_graph = None # Cached job graph of previous layout
|
|
self._cached_sql_results = {}
|
|
self.event = event # The trigger event that lead to this queue item
|
|
|
|
# Additional container for connection specifig information to be used
|
|
# by reporters throughout the lifecycle
|
|
self.dynamic_state = defaultdict(dict)
|
|
|
|
# A bundle holds other queue items that have to be successful
|
|
# for the current queue item to succeed
|
|
self.bundle = None
|
|
self.dequeued_bundle_failing = False
|
|
|
|
def annotateLogger(self, logger):
|
|
"""Return an annotated logger with the trigger event"""
|
|
return get_annotated_logger(logger, self.event)
|
|
|
|
def __repr__(self):
|
|
if self.pipeline:
|
|
pipeline = self.pipeline.name
|
|
else:
|
|
pipeline = None
|
|
return '<QueueItem %s for %s in %s>' % (
|
|
self.uuid, self.change, pipeline)
|
|
|
|
def resetAllBuilds(self):
|
|
self.current_build_set = BuildSet(self)
|
|
self.layout = None
|
|
self.project_pipeline_config = None
|
|
self.job_graph = None
|
|
self._old_job_graph = None
|
|
|
|
def addBuild(self, build):
|
|
self.current_build_set.addBuild(build)
|
|
|
|
def addRetryBuild(self, build):
|
|
self.current_build_set.addRetryBuild(build)
|
|
|
|
def removeBuild(self, build):
|
|
self.current_build_set.removeBuild(build)
|
|
|
|
def setReportedResult(self, result):
|
|
self.report_time = time.time()
|
|
self.current_build_set.result = result
|
|
|
|
def debug(self, msg, indent=0):
|
|
if (not self.project_pipeline_config or
|
|
not self.project_pipeline_config.debug):
|
|
return
|
|
if indent:
|
|
indent = ' ' * indent
|
|
else:
|
|
indent = ''
|
|
self.current_build_set.debug_messages.append(indent + msg)
|
|
|
|
def warning(self, msg):
|
|
self.current_build_set.warning_messages.append(msg)
|
|
self.log.info(msg)
|
|
|
|
def freezeJobGraph(self, skip_file_matcher=False):
|
|
"""Find or create actual matching jobs for this item's change and
|
|
store the resulting job tree."""
|
|
|
|
ppc = self.layout.getProjectPipelineConfig(self)
|
|
try:
|
|
# Conditionally set self.ppc so that the debug method can
|
|
# consult it as we resolve the jobs.
|
|
self.project_pipeline_config = ppc
|
|
if ppc:
|
|
for msg in ppc.debug_messages:
|
|
self.debug(msg)
|
|
job_graph = self.layout.createJobGraph(
|
|
self, ppc, skip_file_matcher)
|
|
for job in job_graph.getJobs():
|
|
# Ensure that each jobs's dependencies are fully
|
|
# accessible. This will raise an exception if not.
|
|
job_graph.getParentJobsRecursively(job.name, self.layout)
|
|
self.job_graph = job_graph
|
|
except Exception:
|
|
self.project_pipeline_config = None
|
|
self.job_graph = None
|
|
self._old_job_graph = None
|
|
raise
|
|
|
|
def hasJobGraph(self):
|
|
"""Returns True if the item has a job graph."""
|
|
return self.job_graph is not None
|
|
|
|
def getJobs(self):
|
|
if not self.live or not self.job_graph:
|
|
return []
|
|
return self.job_graph.getJobs()
|
|
|
|
def getJob(self, name):
|
|
if not self.job_graph:
|
|
return None
|
|
return self.job_graph.jobs.get(name)
|
|
|
|
@property
|
|
def items_ahead(self):
|
|
item_ahead = self.item_ahead
|
|
while item_ahead:
|
|
yield item_ahead
|
|
item_ahead = item_ahead.item_ahead
|
|
|
|
def getNonLiveItemsAhead(self):
|
|
items = [item for item in self.items_ahead if not item.live]
|
|
return reversed(items)
|
|
|
|
def haveAllJobsStarted(self):
|
|
if not self.hasJobGraph():
|
|
return False
|
|
for job in self.getJobs():
|
|
build = self.current_build_set.getBuild(job.name)
|
|
if not build or not build.start_time:
|
|
return False
|
|
return True
|
|
|
|
def areAllJobsComplete(self):
|
|
if (self.current_build_set.config_errors or
|
|
self.current_build_set.unable_to_merge):
|
|
return True
|
|
if not self.hasJobGraph():
|
|
return False
|
|
for job in self.getJobs():
|
|
build = self.current_build_set.getBuild(job.name)
|
|
if not build or not build.result:
|
|
return False
|
|
return True
|
|
|
|
def didAllJobsSucceed(self):
|
|
"""Check if all jobs have completed with status SUCCESS.
|
|
|
|
Return True if all voting jobs have completed with status
|
|
SUCCESS. Non-voting jobs are ignored. Skipped jobs are
|
|
ignored, but skipping all jobs returns a failure. Incomplete
|
|
builds are considered a failure, hence this is unlikely to be
|
|
useful unless all builds are complete.
|
|
|
|
"""
|
|
if not self.hasJobGraph():
|
|
return False
|
|
|
|
all_jobs_skipped = True
|
|
for job in self.getJobs():
|
|
build = self.current_build_set.getBuild(job.name)
|
|
if build:
|
|
# If the build ran, record whether or not it was skipped
|
|
# and return False if the build was voting and has an
|
|
# unsuccessful return value
|
|
if build.result != 'SKIPPED':
|
|
all_jobs_skipped = False
|
|
if job.voting and build.result not in ['SUCCESS', 'SKIPPED']:
|
|
return False
|
|
elif job.voting:
|
|
# If the build failed to run and was voting that is an
|
|
# unsuccessful build. But we don't count against it if not
|
|
# voting.
|
|
return False
|
|
|
|
# NOTE(pabelanger): We shouldn't be able to skip all jobs.
|
|
if all_jobs_skipped:
|
|
return False
|
|
|
|
return True
|
|
|
|
def hasAnyJobFailed(self):
|
|
"""Check if any jobs have finished with a non-success result.
|
|
|
|
Return True if any job in the job graph has returned with a
|
|
status not equal to SUCCESS or SKIPPED, else return False.
|
|
Non-voting and in-flight jobs are ignored.
|
|
|
|
"""
|
|
if not self.hasJobGraph():
|
|
return False
|
|
for job in self.getJobs():
|
|
if not job.voting:
|
|
continue
|
|
build = self.current_build_set.getBuild(job.name)
|
|
if (build and build.result and
|
|
build.result not in ['SUCCESS', 'SKIPPED']):
|
|
return True
|
|
return False
|
|
|
|
def isBundleFailing(self):
|
|
if self.bundle:
|
|
# We are only checking other items that share the same change
|
|
# queue, since we don't need to wait for changes in other change
|
|
# queues.
|
|
return self.bundle.failed_reporting or any(
|
|
i.hasAnyJobFailed() or i.didMergerFail()
|
|
for i in self.bundle.items
|
|
if i.live and i.queue == self.queue)
|
|
return False
|
|
|
|
def didBundleFinish(self):
|
|
if self.bundle:
|
|
# We are only checking other items that share the same change
|
|
# queue, since we don't need to wait for changes in other change
|
|
# queues.
|
|
return all(i.areAllJobsComplete() for i in self.bundle.items if
|
|
i.live and i.queue == self.queue)
|
|
return True
|
|
|
|
def didBundleStartReporting(self):
|
|
if self.bundle:
|
|
return self.bundle.started_reporting
|
|
return False
|
|
|
|
def cannotMergeBundle(self):
|
|
if self.bundle:
|
|
return self.bundle.cannot_merge
|
|
return False
|
|
|
|
def didMergerFail(self):
|
|
return self.current_build_set.unable_to_merge
|
|
|
|
def getConfigErrors(self):
|
|
return self.current_build_set.config_errors
|
|
|
|
def wasDequeuedNeedingChange(self):
|
|
return self.dequeued_needing_change
|
|
|
|
def includesConfigUpdates(self):
|
|
includes_trusted = False
|
|
includes_untrusted = False
|
|
tenant = self.pipeline.tenant
|
|
item = self
|
|
|
|
if item.bundle:
|
|
# Check all items in the bundle for config updates
|
|
for bundle_item in item.bundle.items:
|
|
if bundle_item.change.updatesConfig(tenant):
|
|
trusted, project = tenant.getProject(
|
|
bundle_item.change.project.canonical_name)
|
|
if trusted:
|
|
includes_trusted = True
|
|
else:
|
|
includes_untrusted = True
|
|
if includes_trusted and includes_untrusted:
|
|
# We're done early
|
|
return includes_trusted, includes_untrusted
|
|
|
|
while item:
|
|
if item.change.updatesConfig(tenant):
|
|
(trusted, project) = tenant.getProject(
|
|
item.change.project.canonical_name)
|
|
if trusted:
|
|
includes_trusted = True
|
|
else:
|
|
includes_untrusted = True
|
|
if includes_trusted and includes_untrusted:
|
|
# We're done early
|
|
return (includes_trusted, includes_untrusted)
|
|
item = item.item_ahead
|
|
return (includes_trusted, includes_untrusted)
|
|
|
|
def isHoldingFollowingChanges(self):
|
|
if not self.live:
|
|
return False
|
|
if not self.hasJobGraph():
|
|
return False
|
|
for job in self.getJobs():
|
|
if not job.hold_following_changes:
|
|
continue
|
|
build = self.current_build_set.getBuild(job.name)
|
|
if not build:
|
|
return True
|
|
if build.result != 'SUCCESS':
|
|
return True
|
|
|
|
if not self.item_ahead:
|
|
return False
|
|
return self.item_ahead.isHoldingFollowingChanges()
|
|
|
|
def _getRequirementsResultFromSQL(self, job):
|
|
# This either returns data or raises an exception
|
|
requirements = job.requires
|
|
self.log.debug("Checking DB for requirements")
|
|
requirements_tuple = tuple(sorted(requirements))
|
|
if requirements_tuple not in self._cached_sql_results:
|
|
conn = self.pipeline.manager.sched.connections.getSqlConnection()
|
|
if conn:
|
|
builds = conn.getBuilds(
|
|
tenant=self.pipeline.tenant.name,
|
|
project=self.change.project.name,
|
|
pipeline=self.pipeline.name,
|
|
change=self.change.number,
|
|
branch=self.change.branch,
|
|
patchset=self.change.patchset,
|
|
provides=requirements_tuple)
|
|
else:
|
|
builds = []
|
|
# Just look at the most recent buildset.
|
|
# TODO: query for a buildset instead of filtering.
|
|
builds = [b for b in builds
|
|
if b.buildset.uuid == builds[0].buildset.uuid]
|
|
self._cached_sql_results[requirements_tuple] = builds
|
|
|
|
builds = self._cached_sql_results[requirements_tuple]
|
|
data = []
|
|
if not builds:
|
|
self.log.debug("No artifacts matching requirements found in DB")
|
|
return data
|
|
|
|
for build in builds:
|
|
if build.result != 'SUCCESS':
|
|
provides = [x.name for x in build.provides]
|
|
requirement = list(requirements.intersection(set(provides)))
|
|
raise RequirementsError(
|
|
'Job %s requires artifact(s) %s provided by build %s '
|
|
'(triggered by change %s on project %s), but that build '
|
|
'failed with result "%s"' % (
|
|
job.name, ', '.join(requirement), build.uuid,
|
|
build.buildset.change, build.buildset.project,
|
|
build.result))
|
|
else:
|
|
for a in build.artifacts:
|
|
artifact = {'name': a.name,
|
|
'url': a.url,
|
|
'project': build.buildset.project,
|
|
'change': str(build.buildset.change),
|
|
'patchset': build.buildset.patchset,
|
|
'job': build.job_name}
|
|
if a.meta:
|
|
artifact['metadata'] = json.loads(a.meta)
|
|
data.append(artifact)
|
|
self.log.debug("Found artifacts in DB: %s", repr(data))
|
|
return data
|
|
|
|
def providesRequirements(self, job, data, recurse=True):
|
|
# Mutates data and returns true/false if requirements
|
|
# satisfied.
|
|
requirements = job.requires
|
|
if not requirements:
|
|
return True
|
|
if not self.live:
|
|
self.log.debug("Checking whether non-live item %s provides %s",
|
|
self, requirements)
|
|
# Look for this item in other queues in the pipeline.
|
|
item = None
|
|
found = False
|
|
for item in self.pipeline.getAllItems():
|
|
if item.live and item.change == self.change:
|
|
found = True
|
|
break
|
|
if found:
|
|
if not item.providesRequirements(job, data,
|
|
recurse=False):
|
|
return False
|
|
else:
|
|
# Look for this item in the SQL DB.
|
|
data += self._getRequirementsResultFromSQL(job)
|
|
if self.hasJobGraph():
|
|
for _job in self.getJobs():
|
|
if _job.provides.intersection(requirements):
|
|
build = self.current_build_set.getBuild(_job.name)
|
|
if not build:
|
|
return False
|
|
if build.result and build.result != 'SUCCESS':
|
|
return False
|
|
if not build.result and not build.paused:
|
|
return False
|
|
artifacts = get_artifacts_from_result_data(
|
|
build.result_data,
|
|
logger=self.log)
|
|
for a in artifacts:
|
|
a.update({'project': self.change.project.name,
|
|
'change': self.change.number,
|
|
'patchset': self.change.patchset,
|
|
'job': build.job.name})
|
|
self.log.debug("Found live artifacts: %s", repr(artifacts))
|
|
data += artifacts
|
|
if not self.item_ahead:
|
|
return True
|
|
if not recurse:
|
|
return True
|
|
return self.item_ahead.providesRequirements(job, data)
|
|
|
|
def jobRequirementsReady(self, job):
|
|
if not self.item_ahead:
|
|
return True
|
|
try:
|
|
data = []
|
|
ret = self.item_ahead.providesRequirements(job, data)
|
|
data.reverse()
|
|
job.updateArtifactData(data)
|
|
except RequirementsError as e:
|
|
self.warning(str(e))
|
|
fakebuild = Build(job, self.current_build_set, None)
|
|
fakebuild.result = 'FAILURE'
|
|
self.addBuild(fakebuild)
|
|
self.setResult(fakebuild)
|
|
ret = True
|
|
return ret
|
|
|
|