Add optional support for circular dependencies

Allow Zuul to process circular dependencies between changes. Gating of
circular dependencies must be explicitly enabled on a per tenant or
project basis.

In case Zuul detects a dependency cycle it will make sure that every
change also include all other changes that are part of the cycle. However
each change will still be a normal item in the queue with its own jobs.
When it comes to reporting, all items in the cycle are treated as one
unit that determines the success/failure of those changes.

Changes with cross-repo circular dependencies are required to share the
same change queue.

Depends-On: https://review.opendev.org/#/c/643309/
Change-Id: Ic121b2d8d057a7dc4448ae70045853347f265c6c
This commit is contained in:
Simon Westphahl 2019-09-03 10:10:16 +02:00 committed by Tobias Henkel
parent 8673a6ca6f
commit 5161347efd
No known key found for this signature in database
GPG Key ID: 03750DEC158E5FA2
23 changed files with 2150 additions and 74 deletions

View File

@ -42,3 +42,26 @@ Here is an example ``queue`` configuration.
means that all projects that should be gated must have aligned branch means that all projects that should be gated must have aligned branch
names when using per branch queues. Otherwise changes that belong names when using per branch queues. Otherwise changes that belong
together end up in different queues. together end up in different queues.
.. attr:: allow-circular-dependencies
:default: false
Define if Zuul is allowed to process circular dependencies between
changes for this queue. All projects that are part of a dependency cycle
must share the same change queue.
In case Zuul detects a dependency cycle it will make sure that every
change also includes all other changes that are part of the cycle.
However each change will still be a normal item in the queue with its own
jobs.
Reporting of success will be postponed until all items in the cycle
succeeded. In case of a failure in any of those items the whole cycle
will be dequeued.
An error message will be posted to all items of the cycle in case some
items fail to report (e.g. merge failure when some items were already
merged). In this case the target branch(es) might be in a broken state.
In general, circular dependencies are considered to be an antipattern but
can't be avoided in certain cases.

View File

@ -0,0 +1,187 @@
- queue:
name: integrated
allow-circular-dependencies: true
- pipeline:
name: check
manager: independent
trigger:
gerrit:
- event: patchset-created
github:
- event: pull_request
action:
- opened
- changed
- reopened
- edited
success:
gerrit:
Verified: 1
github:
status: success
failure:
gerrit:
Verified: -1
github:
status: failure
- pipeline:
name: check-unused
manager: independent
trigger:
gerrit:
- event: patchset-created
success:
gerrit:
Verified: 1
github:
status: success
failure:
gerrit:
Verified: -1
github:
status: failure
- pipeline:
name: gate
manager: dependent
success-message: Build succeeded (gate).
require:
gerrit:
approval:
- Approved: 1
github:
label: approved
trigger:
gerrit:
- event: comment-added
approval:
- Approved: 1
github:
- event: pull_request
action: edited
- event: pull_request
action: labeled
label: approved
success:
gerrit:
Verified: 2
submit: true
github:
merge: true
failure:
gerrit:
Verified: -2
github: {}
start:
gerrit:
Verified: 0
github: {}
precedence: high
- pipeline:
name: gate-unused
manager: dependent
success-message: Build succeeded (gate).
trigger:
gerrit:
- event: comment-added
approval:
- Approved: 1
success:
gerrit:
Verified: 2
submit: true
github:
merge: true
failure:
gerrit:
Verified: -2
github: {}
start:
gerrit:
Verified: 0
github: {}
precedence: high
- job:
name: base
parent: null
run: playbooks/run.yaml
required-projects:
- common-config
- org/project
- org/project1
- org/project2
- job:
name: common-config-job
- job:
name: project-job
- job:
name: project1-job
- job:
name: project2-job
- job:
name: project3-job
- project:
name: common-config
queue: integrated
check:
jobs:
- common-config-job
gate:
jobs:
- common-config-job
- project:
name: ^.*/project
queue: integrated
check:
jobs:
- project-job
gate:
jobs:
- project-job
- project:
name: ^.*/project1
queue: integrated
check:
jobs:
- project1-job
- project-vars-job
gate:
jobs:
- project1-job
- project-vars-job
- project-template:
name: project2-template
queue: integrated
check:
jobs:
- project2-job
gate:
jobs:
- project2-job
- project:
name: ^.*/project2
templates:
- project2-template
- project:
name: ^.*/project3
check:
jobs:
- project3-job
gate:
jobs:
- project3-job

View File

@ -0,0 +1 @@
test

View File

@ -0,0 +1 @@
test

View File

@ -0,0 +1 @@
test

View File

@ -0,0 +1 @@
test

View File

@ -0,0 +1 @@
test

View File

@ -0,0 +1 @@
test

View File

@ -0,0 +1 @@
test

View File

@ -0,0 +1,4 @@
- job:
name: project-vars-job
vars:
test_var: fail

View File

@ -0,0 +1 @@
test

View File

@ -0,0 +1 @@
test

View File

@ -0,0 +1,25 @@
- tenant:
name: tenant-one
exclude-unprotected-branches: true
source:
gerrit:
config-projects:
- common-config
untrusted-projects:
- org/project
- org/project1
- org/project2
- org/project3
github:
untrusted-projects:
- gh/project
- gh/project1
- gh/project2
- gh/project3
- tenant:
name: tenant-two
source:
gerrit:
untrusted-projects:
- org/project4

File diff suppressed because it is too large Load Diff

View File

@ -1408,6 +1408,7 @@ class QueueParser:
def getSchema(self): def getSchema(self):
queue = {vs.Required('name'): str, queue = {vs.Required('name'): str,
'per-branch': bool, 'per-branch': bool,
'allow-circular-dependencies': bool,
'_source_context': model.SourceContext, '_source_context': model.SourceContext,
'_start_mark': ZuulMark, '_start_mark': ZuulMark,
} }
@ -1415,7 +1416,11 @@ class QueueParser:
def fromYaml(self, conf): def fromYaml(self, conf):
self.schema(conf) self.schema(conf)
queue = model.Queue(conf['name'], conf.get('per-branch', False)) queue = model.Queue(
conf['name'],
conf.get('per-branch', False),
conf.get('allow-circular-dependencies', False),
)
queue.source_context = conf.get('_source_context') queue.source_context = conf.get('_source_context')
queue.start_mark = conf.get('_start_mark') queue.start_mark = conf.get('_start_mark')
queue.freeze() queue.freeze()
@ -1519,6 +1524,7 @@ class TenantParser(object):
'exclude-unprotected-branches': bool, 'exclude-unprotected-branches': bool,
'extra-config-paths': to_list(str), 'extra-config-paths': to_list(str),
'load-branch': str, 'load-branch': str,
'allow-circular-dependencies': bool,
}} }}
project = vs.Any(str, project_dict) project = vs.Any(str, project_dict)
@ -1559,6 +1565,7 @@ class TenantParser(object):
'allowed-reporters': to_list(str), 'allowed-reporters': to_list(str),
'allowed-labels': to_list(str), 'allowed-labels': to_list(str),
'disallowed-labels': to_list(str), 'disallowed-labels': to_list(str),
'allow-circular-dependencies': bool,
'default-parent': str, 'default-parent': str,
'default-ansible-version': vs.Any(str, float), 'default-ansible-version': vs.Any(str, float),
'admin-rules': to_list(str), 'admin-rules': to_list(str),

60
zuul/lib/tarjan.py Normal file
View File

@ -0,0 +1,60 @@
# Algorithm from http://www.logarithmic.net/pfh/blog/01208083168
# License: public domain
# Authors: Dr. Paul Harrison / Dries Verdegem
def strongly_connected_components(graph):
"""
Tarjan's Algorithm (named for its discoverer, Robert Tarjan) is a graph
theory algorithm for finding the strongly connected components of a graph.
Based on:
http://en.wikipedia.org/wiki/Tarjan%27s_strongly_connected_components_algorithm
"""
index_counter = [0]
stack = []
lowlinks = {}
index = {}
result = []
def strongconnect(node):
# set the depth index for this node to the smallest unused index
index[node] = index_counter[0]
lowlinks[node] = index_counter[0]
index_counter[0] += 1
stack.append(node)
# Consider successors of `node`
try:
successors = graph[node]
except KeyError:
successors = []
for successor in successors:
if successor not in lowlinks:
# Successor has not yet been visited; recurse on it
strongconnect(successor)
lowlinks[node] = min(lowlinks[node], lowlinks[successor])
elif successor in stack:
# the successor is in the stack and hence in the current
# strongly connected component (SCC)
lowlinks[node] = min(lowlinks[node], index[successor])
# If 'node' is a root node, pop the stack and generate an SCC
if lowlinks[node] == index[node]:
connected_component = []
while True:
successor = stack.pop()
connected_component.append(successor)
if successor == node:
break
component = tuple(connected_component)
# storing the result
result.append(component)
for node in graph:
if node not in lowlinks:
strongconnect(node)
return result

View File

@ -9,6 +9,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import collections
import logging import logging
import textwrap import textwrap
import urllib import urllib
@ -18,6 +19,7 @@ from zuul import exceptions
from zuul import model from zuul import model
from zuul.lib.dependson import find_dependency_headers from zuul.lib.dependson import find_dependency_headers
from zuul.lib.logutil import get_annotated_logger from zuul.lib.logutil import get_annotated_logger
from zuul.lib.tarjan import strongly_connected_components
class DynamicChangeQueueContextManager(object): class DynamicChangeQueueContextManager(object):
@ -212,21 +214,28 @@ class PipelineManager(metaclass=ABCMeta):
return True return True
def enqueueChangesAhead(self, change, event, quiet, ignore_requirements, def enqueueChangesAhead(self, change, event, quiet, ignore_requirements,
change_queue, history=None): change_queue, history=None, dependency_graph=None):
return True return True
def enqueueChangesBehind(self, change, event, quiet, ignore_requirements, def enqueueChangesBehind(self, change, event, quiet, ignore_requirements,
change_queue): change_queue, history=None,
dependency_graph=None):
return True return True
def checkForChangesNeededBy(self, change, change_queue, event): def checkForChangesNeededBy(self, change, change_queue, event,
dependency_graph=None):
return True return True
def getFailingDependentItems(self, item): def getFailingDependentItems(self, item):
return None return None
def getItemForChange(self, change): def getItemForChange(self, change, change_queue=None):
for item in self.pipeline.getAllItems(): if change_queue is not None:
items = change_queue.queue
else:
items = self.pipeline.getAllItems()
for item in items:
if item.change.equals(change): if item.change.equals(change):
return item return item
return None return None
@ -318,10 +327,18 @@ class PipelineManager(metaclass=ABCMeta):
def addChange(self, change, event, quiet=False, enqueue_time=None, def addChange(self, change, event, quiet=False, enqueue_time=None,
ignore_requirements=False, live=True, ignore_requirements=False, live=True,
change_queue=None, history=None): change_queue=None, history=None, dependency_graph=None):
log = get_annotated_logger(self.log, event) log = get_annotated_logger(self.log, event)
log.debug("Considering adding change %s" % change) log.debug("Considering adding change %s" % change)
history = history if history is not None else []
log.debug("History: %s", history)
# Ensure the dependency graph is created when the first change is
# processed to allow cycle detection with the Tarjan algorithm
dependency_graph = dependency_graph or collections.OrderedDict()
log.debug("Dependency graph: %s", dependency_graph)
# If we are adding a live change, check if it's a live item # If we are adding a live change, check if it's a live item
# anywhere in the pipeline. Otherwise, we will perform the # anywhere in the pipeline. Otherwise, we will perform the
# duplicate check below on the specific change_queue. # duplicate check below on the specific change_queue.
@ -354,30 +371,47 @@ class PipelineManager(metaclass=ABCMeta):
(change, change.project)) (change, change.project))
return False return False
if history and change in history:
log.debug("Dependency cycle detected for "
"change %s in project %s" % (
change, change.project))
item = model.QueueItem(self, change, event)
item.warning("Dependency cycle detected")
actions = self.pipeline.failure_actions
item.setReportedResult('FAILURE')
self.sendReport(actions, item)
return False
if not self.enqueueChangesAhead(change, event, quiet, if not self.enqueueChangesAhead(change, event, quiet,
ignore_requirements, ignore_requirements,
change_queue, history=history): change_queue, history=history,
dependency_graph=dependency_graph):
self.dequeueIncompleteCycle(change, dependency_graph, event,
change_queue)
log.debug("Failed to enqueue changes ahead of %s" % change) log.debug("Failed to enqueue changes ahead of %s" % change)
return False return False
log.debug("History after enqueuing changes ahead: %s", history)
if self.isChangeAlreadyInQueue(change, change_queue): if self.isChangeAlreadyInQueue(change, change_queue):
log.debug("Change %s is already in queue, ignoring" % change) log.debug("Change %s is already in queue, ignoring" % change)
return True return True
cycle = None
if hasattr(change, "needs_changes"):
cycle = self.cycleForChange(change, dependency_graph, event)
if cycle and not self.canProcessCycle(change.project):
log.info("Dequeing change %s since at least one project "
"does not allow circular dependencies", change)
actions = self.pipeline.failure_actions
ci = model.QueueItem(self, cycle[-1], event)
ci.warning("Dependency cycle detected")
ci.setReportedResult('FAILURE')
# Only report the cycle if the project is in the current
# pipeline. Otherwise the change could be spammed by
# reports from unrelated pipelines.
if self.pipeline.tenant.layout.getProjectPipelineConfig(
ci
):
self.sendReport(actions, ci)
return False
log.info("Adding change %s to queue %s in %s" % log.info("Adding change %s to queue %s in %s" %
(change, change_queue, self.pipeline)) (change, change_queue, self.pipeline))
item = change_queue.enqueueChange(change, event) item = change_queue.enqueueChange(change, event)
self.updateBundle(item, change_queue, cycle)
if enqueue_time: if enqueue_time:
item.enqueue_time = enqueue_time item.enqueue_time = enqueue_time
item.live = live item.live = live
@ -386,8 +420,18 @@ class PipelineManager(metaclass=ABCMeta):
if item.live and not item.reported_enqueue: if item.live and not item.reported_enqueue:
self.reportEnqueue(item) self.reportEnqueue(item)
item.reported_enqueue = True item.reported_enqueue = True
self.enqueueChangesBehind(change, event, quiet,
ignore_requirements, change_queue) # Items in a dependency cycle are expected to be enqueued after
# each other. To prevent non-cycle items from being enqueued
# between items of the same cycle, we skip that step when a cycle
# was found.
if not cycle:
self.enqueueChangesBehind(change, event, quiet,
ignore_requirements, change_queue,
history, dependency_graph)
else:
self.log.debug("Skip enqueueing changes behind because of "
"dependency cycle")
zuul_driver = self.sched.connections.drivers['zuul'] zuul_driver = self.sched.connections.drivers['zuul']
tenant = self.pipeline.tenant tenant = self.pipeline.tenant
zuul_driver.onChangeEnqueued( zuul_driver.onChangeEnqueued(
@ -395,6 +439,86 @@ class PipelineManager(metaclass=ABCMeta):
self.dequeueSupercededItems(item) self.dequeueSupercededItems(item)
return True return True
def cycleForChange(self, change, dependency_graph, event):
log = get_annotated_logger(self.log, event)
log.debug("Running Tarjan's algorithm on current dependencies: %s",
dependency_graph)
sccs = [s for s in strongly_connected_components(dependency_graph)
if len(s) > 1]
log.debug("Strongly connected components (cyles): %s", sccs)
for scc in sccs:
if change in scc:
log.debug("Dependency cycle detected for "
"change %s in project %s",
change, change.project)
# Change can not be part of multiple cycles, so we can return
return scc
def canProcessCycle(self, project):
layout = self.pipeline.tenant.layout
pipeline_queue_name = None
project_queue_name = None
for project_config in layout.getAllProjectConfigs(
project.canonical_name
):
if not project_queue_name:
project_queue_name = project_config.queue_name
project_pipeline_config = project_config.pipelines.get(
self.pipeline.name)
if project_pipeline_config is None:
continue
# TODO(simonw): Remove pipeline_queue_name after deprecation
if not pipeline_queue_name:
pipeline_queue_name = project_pipeline_config.queue_name
# Note: we currently support queue name per pipeline and per
# project while project has precedence.
queue_name = project_queue_name or pipeline_queue_name
if queue_name is None:
return False
queue_config = layout.queues.get(queue_name)
return (
queue_config is not None and
queue_config.allow_circular_dependencies
)
def updateBundle(self, item, change_queue, cycle):
if not cycle:
return
log = get_annotated_logger(self.log, item.event)
item.bundle = model.Bundle()
# Try to find already enqueued items of this cycle, so we use
# the same bundle
for needed_change in (c for c in cycle if c is not item.change):
needed_item = self.getItemForChange(needed_change, change_queue)
if not needed_item:
continue
# Use a common bundle for the cycle
item.bundle = needed_item.bundle
break
log.info("Adding cycle item %s to bundle %s", item, item.bundle)
item.bundle.add_item(item)
def dequeueIncompleteCycle(self, change, dependency_graph, event,
change_queue):
log = get_annotated_logger(self.log, event)
cycle = self.cycleForChange(change, dependency_graph, event) or []
enqueued_cycle_items = [i for i in (self.getItemForChange(c,
change_queue)
for c in cycle) if i is not None]
if enqueued_cycle_items:
log.info("Dequeuing incomplete cycle items: %s",
enqueued_cycle_items)
for cycle_item in enqueued_cycle_items:
self.dequeueItem(cycle_item)
def dequeueItem(self, item): def dequeueItem(self, item):
log = get_annotated_logger(self.log, item.event) log = get_annotated_logger(self.log, item.event)
log.debug("Removing change %s from queue", item.change) log.debug("Removing change %s from queue", item.change)
@ -416,6 +540,16 @@ class PipelineManager(metaclass=ABCMeta):
self.dequeueItem(item) self.dequeueItem(item)
self.reportStats(item) self.reportStats(item)
if item.bundle is None:
return
log.debug("Dequeueing items in bundle %s", item.bundle)
bundle_iter = (i for i in item.bundle.items if i is not item)
for bundle_item in bundle_iter:
self.cancelJobs(bundle_item)
self.dequeueItem(bundle_item)
self.reportStats(bundle_item)
def dequeueSupercededItems(self, item): def dequeueSupercededItems(self, item):
for other_name in self.pipeline.supercedes: for other_name in self.pipeline.supercedes:
other_pipeline = self.pipeline.tenant.layout.pipelines.get( other_pipeline = self.pipeline.tenant.layout.pipelines.get(
@ -525,7 +659,11 @@ class PipelineManager(metaclass=ABCMeta):
old_build_set = item.current_build_set old_build_set = item.current_build_set
jobs_to_cancel = item.getJobs() jobs_to_cancel = item.getJobs()
if prime and item.current_build_set.ref: # Don't reset builds for a failing bundle when it has already started
# reporting, to keep available build results. Those items will be
# reported immediately afterwards during queue processing.
if (prime and item.current_build_set.ref and not
item.didBundleStartReporting()):
item.resetAllBuilds() item.resetAllBuilds()
for job in jobs_to_cancel: for job in jobs_to_cancel:
@ -677,13 +815,6 @@ class PipelineManager(metaclass=ABCMeta):
item.setConfigError("Unknown configuration error") item.setConfigError("Unknown configuration error")
return None return None
def _queueUpdatesConfig(self, item):
while item:
if item.change.updatesConfig(item.pipeline.tenant):
return True
item = item.item_ahead
return False
def getLayout(self, item): def getLayout(self, item):
if item.item_ahead: if item.item_ahead:
fallback_layout = item.item_ahead.layout fallback_layout = item.item_ahead.layout
@ -692,8 +823,21 @@ class PipelineManager(metaclass=ABCMeta):
return None return None
else: else:
fallback_layout = item.pipeline.tenant.layout fallback_layout = item.pipeline.tenant.layout
if not item.change.updatesConfig(item.pipeline.tenant):
# Current change does not update layout, use its parent. # If the current change does not update the layout, use its parent.
# If the bundle doesn't update the config or the bundle updates the
# config but the current change's project is not part of the tenant
# (e.g. when dealing w/ cross-tenant cycles), use the parent layout.
if not (
item.change.updatesConfig(item.pipeline.tenant) or
(
item.bundle
and item.bundle.updatesConfig(item.pipeline.tenant)
and item.pipeline.tenant.getProject(
item.change.project.canonical_name
)[1] is not None
)
):
return fallback_layout return fallback_layout
# Else this item updates the config, # Else this item updates the config,
# ask the merger for the result. # ask the merger for the result.
@ -717,10 +861,12 @@ class PipelineManager(metaclass=ABCMeta):
# change that is tested. # change that is tested.
tenant = item.pipeline.tenant tenant = item.pipeline.tenant
items = list(item.items_ahead) + [item] items = list(item.items_ahead) + [item]
projects = [ if item.bundle:
items.extend(item.bundle.items)
projects = {
item.change.project for item in items item.change.project for item in items
if tenant.getProject(item.change.project.canonical_name)[1] if tenant.getProject(item.change.project.canonical_name)[1]
] }
if all(tenant.getExcludeUnprotectedBranches(project) if all(tenant.getExcludeUnprotectedBranches(project)
for project in projects): for project in projects):
branches = set() branches = set()
@ -792,8 +938,15 @@ class PipelineManager(metaclass=ABCMeta):
ready = False ready = False
# If this change alters config or is live, schedule merge and # If this change alters config or is live, schedule merge and
# build a layout. # build a layout.
# If we are dealing w/ a bundle and the bundle updates config we also
# have to merge since a config change in any of the bundle's items
# applies to all items. This is, unless the current item is not part
# of this tenant (e.g. cross-tenant cycle).
if build_set.merge_state == build_set.NEW: if build_set.merge_state == build_set.NEW:
if item.live or item.change.updatesConfig(tenant): if item.live or item.change.updatesConfig(tenant) or (
item.bundle and
item.bundle.updatesConfig(tenant) and tpc is not None
):
ready = self.scheduleMerge( ready = self.scheduleMerge(
item, item,
files=(['zuul.yaml', '.zuul.yaml'] + files=(['zuul.yaml', '.zuul.yaml'] +
@ -864,7 +1017,10 @@ class PipelineManager(metaclass=ABCMeta):
"it can no longer merge" % item.change) "it can no longer merge" % item.change)
self.cancelJobs(item) self.cancelJobs(item)
self.dequeueItem(item) self.dequeueItem(item)
item.setDequeuedNeedingChange() if item.isBundleFailing():
item.setDequeuedBundleFailing()
else:
item.setDequeuedNeedingChange()
if item.live: if item.live:
try: try:
self.reportItem(item) self.reportItem(item)
@ -911,6 +1067,12 @@ class PipelineManager(metaclass=ABCMeta):
failing_reasons.append("it has an invalid configuration") failing_reasons.append("it has an invalid configuration")
if ready and self.provisionNodes(item): if ready and self.provisionNodes(item):
changed = True changed = True
if ready and item.bundle and item.didBundleFinish():
# Since the bundle finished we need to check if any item
# can report. If that's the case we need to process the
# queue again.
changed = changed or any(
i.item_ahead is None for i in item.bundle.items)
if ready and self.executeJobs(item): if ready and self.executeJobs(item):
changed = True changed = True
@ -920,7 +1082,15 @@ class PipelineManager(metaclass=ABCMeta):
failing_reasons.append("is a non-live item with no items behind") failing_reasons.append("is a non-live item with no items behind")
self.dequeueItem(item) self.dequeueItem(item)
changed = dequeued = True changed = dequeued = True
if ((not item_ahead) and item.areAllJobsComplete() and item.live):
can_report = not item_ahead and item.areAllJobsComplete() and item.live
if can_report and item.bundle:
can_report = can_report and (
item.isBundleFailing() or item.didBundleFinish()
)
item.bundle.started_reporting = can_report
if can_report:
try: try:
self.reportItem(item) self.reportItem(item)
except exceptions.MergeFailure: except exceptions.MergeFailure:
@ -930,6 +1100,9 @@ class PipelineManager(metaclass=ABCMeta):
"item ahead, %s, failed to merge" % "item ahead, %s, failed to merge" %
(item_behind.change, item)) (item_behind.change, item))
self.cancelJobs(item_behind) self.cancelJobs(item_behind)
if item.bundle and not item.isBundleFailing():
item.bundle.failed_reporting = True
self.reportProcessedBundleItems(item)
self.dequeueItem(item) self.dequeueItem(item)
changed = dequeued = True changed = dequeued = True
elif not failing_reasons and item.live: elif not failing_reasons and item.live:
@ -946,6 +1119,19 @@ class PipelineManager(metaclass=ABCMeta):
node_request, priority) node_request, priority)
return (changed, nnfi) return (changed, nnfi)
def reportProcessedBundleItems(self, item):
"""Report failure to already reported bundle items.
In case we encounter e.g. a merge failure when we already successfully
reported some items, we need to go back and report again.
"""
reported_items = [i for i in item.bundle.items if i.reported]
actions = self.pipeline.failure_actions
for ri in reported_items:
ri.setReportedResult('FAILURE')
self.sendReport(actions, ri)
def processQueue(self): def processQueue(self):
# Do whatever needs to be done for each change in the queue # Do whatever needs to be done for each change in the queue
self.log.debug("Starting queue processor: %s" % self.pipeline.name) self.log.debug("Starting queue processor: %s" % self.pipeline.name)
@ -1117,7 +1303,7 @@ class PipelineManager(metaclass=ABCMeta):
# _reportItem() returns True if it failed to report. # _reportItem() returns True if it failed to report.
item.reported = not self._reportItem(item) item.reported = not self._reportItem(item)
if self.changes_merge: if self.changes_merge:
succeeded = item.didAllJobsSucceed() succeeded = item.didAllJobsSucceed() and not item.isBundleFailing()
merged = item.reported merged = item.reported
source = item.change.project.source source = item.change.project.source
if merged: if merged:
@ -1181,9 +1367,11 @@ class PipelineManager(metaclass=ABCMeta):
actions = self.pipeline.merge_failure_actions actions = self.pipeline.merge_failure_actions
item.setReportedResult('CONFIG_ERROR') item.setReportedResult('CONFIG_ERROR')
elif item.didMergerFail(): elif item.didMergerFail():
log.debug("Merger failure")
actions = self.pipeline.merge_failure_actions actions = self.pipeline.merge_failure_actions
item.setReportedResult('MERGER_FAILURE') item.setReportedResult('MERGER_FAILURE')
elif item.wasDequeuedNeedingChange(): elif item.wasDequeuedNeedingChange():
log.debug("Dequeued needing change")
actions = self.pipeline.failure_actions actions = self.pipeline.failure_actions
item.setReportedResult('FAILURE') item.setReportedResult('FAILURE')
elif not item.getJobs(): elif not item.getJobs():
@ -1191,7 +1379,13 @@ class PipelineManager(metaclass=ABCMeta):
log.debug("No jobs for change %s", item.change) log.debug("No jobs for change %s", item.change)
actions = self.pipeline.no_jobs_actions actions = self.pipeline.no_jobs_actions
item.setReportedResult('NO_JOBS') item.setReportedResult('NO_JOBS')
elif item.didAllJobsSucceed(): elif item.isBundleFailing():
log.debug("Bundle is failing")
actions = self.pipeline.failure_actions
item.setReportedResult("FAILURE")
if not item.didAllJobsSucceed():
self.pipeline._consecutive_failures += 1
elif item.didAllJobsSucceed() and not item.isBundleFailing():
log.debug("success %s", self.pipeline.success_actions) log.debug("success %s", self.pipeline.success_actions)
actions = self.pipeline.success_actions actions = self.pipeline.success_actions
item.setReportedResult('SUCCESS') item.setReportedResult('SUCCESS')

View File

@ -55,8 +55,10 @@ class DependentPipelineManager(SharedQueuePipelineManager):
return True return True
def enqueueChangesBehind(self, change, event, quiet, ignore_requirements, def enqueueChangesBehind(self, change, event, quiet, ignore_requirements,
change_queue): change_queue, history=None,
dependency_graph=None):
log = get_annotated_logger(self.log, event) log = get_annotated_logger(self.log, event)
history = history if history is not None else []
log.debug("Checking for changes needing %s:" % change) log.debug("Checking for changes needing %s:" % change)
if not hasattr(change, 'needed_by_changes'): if not hasattr(change, 'needed_by_changes'):
@ -84,7 +86,17 @@ class DependentPipelineManager(SharedQueuePipelineManager):
log.debug(" Following changes: %s", needed_by_changes) log.debug(" Following changes: %s", needed_by_changes)
to_enqueue = [] to_enqueue = []
change_dependencies = dependency_graph.get(change, [])
for other_change in needed_by_changes: for other_change in needed_by_changes:
if other_change in change_dependencies:
# Only consider the change if it is not part of a cycle, as
# cycle changes will otherwise be partially enqueued without
# any error handling
self.log.debug(
" Skipping change %s due to dependency cycle"
)
continue
with self.getChangeQueue(other_change, with self.getChangeQueue(other_change,
event) as other_change_queue: event) as other_change_queue:
if other_change_queue != change_queue: if other_change_queue != change_queue:
@ -106,32 +118,41 @@ class DependentPipelineManager(SharedQueuePipelineManager):
for other_change in to_enqueue: for other_change in to_enqueue:
self.addChange(other_change, event, quiet=quiet, self.addChange(other_change, event, quiet=quiet,
ignore_requirements=ignore_requirements, ignore_requirements=ignore_requirements,
change_queue=change_queue) change_queue=change_queue, history=history,
dependency_graph=dependency_graph)
def enqueueChangesAhead(self, change, event, quiet, ignore_requirements, def enqueueChangesAhead(self, change, event, quiet, ignore_requirements,
change_queue, history=None): change_queue, history=None, dependency_graph=None):
log = get_annotated_logger(self.log, event) log = get_annotated_logger(self.log, event)
history = history if history is not None else []
if hasattr(change, 'number'): if hasattr(change, 'number'):
history = history or [] history.append(change)
history = history + [change]
else: else:
# Don't enqueue dependencies ahead of a non-change ref. # Don't enqueue dependencies ahead of a non-change ref.
return True return True
ret = self.checkForChangesNeededBy(change, change_queue, event) ret = self.checkForChangesNeededBy(change, change_queue, event,
dependency_graph=dependency_graph)
if ret in [True, False]: if ret in [True, False]:
return ret return ret
log.debug(" Changes %s must be merged ahead of %s", ret, change) log.debug(" Changes %s must be merged ahead of %s", ret, change)
for needed_change in ret: for needed_change in ret:
r = self.addChange(needed_change, event, quiet=quiet, # If the change is already in the history, but the change also has
ignore_requirements=ignore_requirements, # a git level dependency, we need to enqueue it before the current
change_queue=change_queue, history=history) # change.
if not r: if (needed_change not in history or
return False needed_change in change.git_needs_changes):
r = self.addChange(needed_change, event, quiet=quiet,
ignore_requirements=ignore_requirements,
change_queue=change_queue, history=history,
dependency_graph=dependency_graph)
if not r:
return False
return True return True
def checkForChangesNeededBy(self, change, change_queue, event): def checkForChangesNeededBy(self, change, change_queue, event,
dependency_graph=None):
log = get_annotated_logger(self.log, event) log = get_annotated_logger(self.log, event)
# Return true if okay to proceed enqueing this change, # Return true if okay to proceed enqueing this change,
@ -155,6 +176,13 @@ class DependentPipelineManager(SharedQueuePipelineManager):
if needed_change.is_merged: if needed_change.is_merged:
log.debug(" Needed change is merged") log.debug(" Needed change is merged")
continue continue
if dependency_graph is not None:
log.debug(" Adding change %s to dependency graph for "
"change %s", needed_change, change)
node = dependency_graph.setdefault(change, [])
node.append(needed_change)
with self.getChangeQueue(needed_change, with self.getChangeQueue(needed_change,
event) as needed_change_queue: event) as needed_change_queue:
if needed_change_queue != change_queue: if needed_change_queue != change_queue:
@ -197,6 +225,9 @@ class DependentPipelineManager(SharedQueuePipelineManager):
continue continue
if needed_item.current_build_set.failing_reasons: if needed_item.current_build_set.failing_reasons:
failing_items.add(needed_item) failing_items.add(needed_item)
if item.isBundleFailing():
failing_items.update(item.bundle.items)
failing_items.remove(item)
if failing_items: if failing_items:
return failing_items return failing_items
return None return None

View File

@ -37,17 +37,18 @@ class IndependentPipelineManager(PipelineManager):
return DynamicChangeQueueContextManager(change_queue) return DynamicChangeQueueContextManager(change_queue)
def enqueueChangesAhead(self, change, event, quiet, ignore_requirements, def enqueueChangesAhead(self, change, event, quiet, ignore_requirements,
change_queue, history=None): change_queue, history=None, dependency_graph=None):
log = get_annotated_logger(self.log, event) log = get_annotated_logger(self.log, event)
history = history if history is not None else []
if hasattr(change, 'number'): if hasattr(change, 'number'):
history = history or [] history.append(change)
history = history + [change]
else: else:
# Don't enqueue dependencies ahead of a non-change ref. # Don't enqueue dependencies ahead of a non-change ref.
return True return True
ret = self.checkForChangesNeededBy(change, change_queue, event) ret = self.checkForChangesNeededBy(change, change_queue, event,
dependency_graph=dependency_graph)
if ret in [True, False]: if ret in [True, False]:
return ret return ret
log.debug(" Changes %s must be merged ahead of %s" % (ret, change)) log.debug(" Changes %s must be merged ahead of %s" % (ret, change))
@ -57,15 +58,17 @@ class IndependentPipelineManager(PipelineManager):
# have jobs run. Also, pipeline requirements are always # have jobs run. Also, pipeline requirements are always
# ignored (which is safe because the changes are not # ignored (which is safe because the changes are not
# live). # live).
r = self.addChange(needed_change, event, quiet=True, if needed_change not in history:
ignore_requirements=True, r = self.addChange(needed_change, event, quiet=True,
live=False, change_queue=change_queue, ignore_requirements=True, live=False,
history=history) change_queue=change_queue, history=history,
if not r: dependency_graph=dependency_graph)
return False if not r:
return False
return True return True
def checkForChangesNeededBy(self, change, change_queue, event): def checkForChangesNeededBy(self, change, change_queue, event,
dependency_graph=None):
log = get_annotated_logger(self.log, event) log = get_annotated_logger(self.log, event)
if self.pipeline.ignore_dependencies: if self.pipeline.ignore_dependencies:
@ -89,6 +92,13 @@ class IndependentPipelineManager(PipelineManager):
if needed_change.is_merged: if needed_change.is_merged:
log.debug(" Needed change is merged") log.debug(" Needed change is merged")
continue continue
if dependency_graph is not None:
log.debug(" Adding change %s to dependency graph for "
"change %s", needed_change, change)
node = dependency_graph.setdefault(change, [])
node.append(needed_change)
if self.isChangeAlreadyInQueue(needed_change, change_queue): if self.isChangeAlreadyInQueue(needed_change, change_queue):
log.debug(" Needed change is already ahead in the queue") log.debug(" Needed change is already ahead in the queue")
continue continue

View File

@ -499,10 +499,15 @@ class ChangeQueue(object):
return True return True
def isActionable(self, item): def isActionable(self, item):
if self.window: if not self.window:
return item in self.queue[:self.window]
else:
return True return True
# Ignore done items waiting for bundle dependencies to finish
num_waiting_items = len([
i for i in self.queue
if i.bundle and i.areAllJobsComplete()
])
window = self.window + num_waiting_items
return item in self.queue[:window]
def increaseWindowSize(self): def increaseWindowSize(self):
if self.window: if self.window:
@ -2083,12 +2088,15 @@ class BuildSet(object):
if not self.uuid: if not self.uuid:
self.uuid = uuid4().hex self.uuid = uuid4().hex
if self.dependent_changes is None: if self.dependent_changes is None:
items = [self.item] items = []
next_item = self.item.item_ahead if self.item.bundle:
while next_item: items.extend(reversed(self.item.bundle.items))
items.append(next_item) else:
next_item = next_item.item_ahead items.append(self.item)
items.extend(i for i in self.item.items_ahead if i not in items)
items.reverse() items.reverse()
self.dependent_changes = [i.change.toDict() for i in items] self.dependent_changes = [i.change.toDict() for i in items]
self.merger_items = [i.makeMergerItem() for i in items] self.merger_items = [i.makeMergerItem() for i in items]
@ -2222,6 +2230,11 @@ class QueueItem(object):
# by reporters throughout the lifecycle # by reporters throughout the lifecycle
self.dynamic_state = defaultdict(dict) self.dynamic_state = defaultdict(dict)
# A bundle holds other queue items that have to be successful
# for the current queue item to succeed
self.bundle = None
self.dequeued_bundle_failing = False
def annotateLogger(self, logger): def annotateLogger(self, logger):
"""Return an annotated logger with the trigger event""" """Return an annotated logger with the trigger event"""
return get_annotated_logger(logger, self.event) return get_annotated_logger(logger, self.event)
@ -2394,6 +2407,31 @@ class QueueItem(object):
return True return True
return False return False
def isBundleFailing(self):
if self.bundle:
# We are only checking other items that share the same change
# queue, since we don't need to wait for changes in other change
# queues.
return self.bundle.failed_reporting or any(
i.hasAnyJobFailed() or i.didMergerFail()
for i in self.bundle.items
if i.live and i.queue == self.queue)
return False
def didBundleFinish(self):
if self.bundle:
# We are only checking other items that share the same change
# queue, since we don't need to wait for changes in other change
# queues.
return all(i.areAllJobsComplete() for i in self.bundle.items if
i.live and i.queue == self.queue)
return True
def didBundleStartReporting(self):
if self.bundle:
return self.bundle.started_reporting
return False
def didMergerFail(self): def didMergerFail(self):
return self.current_build_set.unable_to_merge return self.current_build_set.unable_to_merge
@ -2408,6 +2446,21 @@ class QueueItem(object):
includes_untrusted = False includes_untrusted = False
tenant = self.pipeline.tenant tenant = self.pipeline.tenant
item = self item = self
if item.bundle:
# Check all items in the bundle for config updates
for bundle_item in item.bundle.items:
if bundle_item.change.updatesConfig(tenant):
trusted, project = tenant.getProject(
bundle_item.change.project.canonical_name)
if trusted:
includes_trusted = True
else:
includes_untrusted = True
if includes_trusted and includes_untrusted:
# We're done early
return includes_trusted, includes_untrusted
while item: while item:
if item.change.updatesConfig(tenant): if item.change.updatesConfig(tenant):
(trusted, project) = tenant.getProject( (trusted, project) = tenant.getProject(
@ -2781,6 +2834,10 @@ class QueueItem(object):
self.dequeued_needing_change = True self.dequeued_needing_change = True
self._setAllJobsSkipped() self._setAllJobsSkipped()
def setDequeuedBundleFailing(self):
self.dequeued_bundle_failing = True
self._setMissingJobsSkipped()
def setUnableToMerge(self): def setUnableToMerge(self):
self.current_build_set.unable_to_merge = True self.current_build_set.unable_to_merge = True
self._setAllJobsSkipped() self._setAllJobsSkipped()
@ -2799,6 +2856,15 @@ class QueueItem(object):
fakebuild.result = 'SKIPPED' fakebuild.result = 'SKIPPED'
self.addBuild(fakebuild) self.addBuild(fakebuild)
def _setMissingJobsSkipped(self):
for job in self.getJobs():
if job.name in self.current_build_set.builds:
# We already have a build for this job
continue
fakebuild = Build(job, None)
fakebuild.result = 'SKIPPED'
self.addBuild(fakebuild)
def getNodePriority(self): def getNodePriority(self):
return self.pipeline.manager.getNodePriority(self) return self.pipeline.manager.getNodePriority(self)
@ -3135,6 +3201,25 @@ class QueueItem(object):
return False return False
class Bundle:
"""Identifies a collection of changes that must be treated as one unit."""
def __init__(self):
self.items = []
self.started_reporting = False
self.failed_reporting = False
def __repr__(self):
return '<Bundle 0x{:x} {}'.format(id(self), self.items)
def add_item(self, item):
if item not in self.items:
self.items.append(item)
def updatesConfig(self, tenant):
return any(i.change.updatesConfig(tenant) for i in self.items)
class Ref(object): class Ref(object):
"""An existing state of a Project.""" """An existing state of a Project."""
@ -3305,8 +3390,9 @@ class Change(Branch):
@property @property
def needs_changes(self): def needs_changes(self):
commit_needs_changes = self.commit_needs_changes or []
return (self.git_needs_changes + self.compat_needs_changes + return (self.git_needs_changes + self.compat_needs_changes +
self.commit_needs_changes) commit_needs_changes)
@property @property
def needed_by_changes(self): def needed_by_changes(self):
@ -3585,6 +3671,7 @@ class ProjectMetadata(object):
def __init__(self): def __init__(self):
self.merge_mode = None self.merge_mode = None
self.default_branch = None self.default_branch = None
self.queue_name = None
# TODO(ianw) : this would clearly be better if it recorded the # TODO(ianw) : this would clearly be better if it recorded the
@ -4047,6 +4134,11 @@ class Layout(object):
if (md.default_branch is None and if (md.default_branch is None and
project_config.default_branch is not None): project_config.default_branch is not None):
md.default_branch = project_config.default_branch md.default_branch = project_config.default_branch
if (
md.queue_name is None
and project_config.queue_name is not None
):
md.queue_name = project_config.queue_name
def getProjectConfigs(self, name): def getProjectConfigs(self, name):
return self.project_configs.get(name, []) return self.project_configs.get(name, [])
@ -4363,10 +4455,12 @@ class Semaphore(ConfigObject):
class Queue(ConfigObject): class Queue(ConfigObject):
def __init__(self, name, per_branch=False): def __init__(self, name, per_branch=False,
allow_circular_dependencies=False):
super().__init__() super().__init__()
self.name = name self.name = name
self.per_branch = per_branch self.per_branch = per_branch
self.allow_circular_dependencies = allow_circular_dependencies
def __ne__(self, other): def __ne__(self, other):
return not self.__eq__(other) return not self.__eq__(other)
@ -4374,8 +4468,12 @@ class Queue(ConfigObject):
def __eq__(self, other): def __eq__(self, other):
if not isinstance(other, Queue): if not isinstance(other, Queue):
return False return False
return (self.name == other.name and return (
self.per_branch == other.per_branch) self.name == other.name and
self.per_branch == other.per_branch and
self.allow_circular_dependencies ==
other.allow_circular_dependencies
)
class SemaphoreHandler(object): class SemaphoreHandler(object):

View File

@ -177,6 +177,12 @@ class BaseReporter(object, metaclass=abc.ABCMeta):
def _formatItemReportFailure(self, item, with_jobs=True): def _formatItemReportFailure(self, item, with_jobs=True):
if item.dequeued_needing_change: if item.dequeued_needing_change:
msg = 'This change depends on a change that failed to merge.\n' msg = 'This change depends on a change that failed to merge.\n'
elif item.isBundleFailing():
msg = 'This change is part of a bundle that failed.\n'
if with_jobs:
msg = '{}\n\n{}'.format(msg, self._formatItemReportJobs(item))
msg = "{}\n\n{}".format(
msg, self._formatItemReportOtherBundleItems(item))
elif item.didMergerFail(): elif item.didMergerFail():
msg = item.pipeline.merge_failure_message msg = item.pipeline.merge_failure_message
elif item.getConfigErrors(): elif item.getConfigErrors():
@ -215,6 +221,10 @@ class BaseReporter(object, metaclass=abc.ABCMeta):
msg += '\n\n' + self._formatItemReportJobs(item) msg += '\n\n' + self._formatItemReportJobs(item)
return msg return msg
def _formatItemReportOtherBundleItems(self, item):
return "Related changes:\n{}".format("\n".join(
c.url for c in item.change.needs_changes if c is not item.change))
def _getItemReportJobsFields(self, item): def _getItemReportJobsFields(self, item):
# Extract the report elements from an item # Extract the report elements from an item
config = self.connection.sched.config config = self.connection.sched.config

View File

@ -1794,7 +1794,8 @@ class Scheduler(threading.Thread):
if nodeset: if nodeset:
self.nodepool.returnNodeSet( self.nodepool.returnNodeSet(
nodeset, build=build, zuul_event_id=item.event) nodeset, build=build, zuul_event_id=item.event)
build.result = 'CANCELED' if build.result is None:
build.result = 'CANCELED'
else: else:
nodeset = buildset.getJobNodeSet(job_name) nodeset = buildset.getJobNodeSet(job_name)
if nodeset: if nodeset: