Annotate pipeline processing with event id

The pipeline processing is very complex and it's helpful to be able to
track events through the pipeline processing.

Change-Id: I455cdcf34128a2c067c10425736bf1d195df6fa8
This commit is contained in:
Tobias Henkel 2019-05-12 10:58:29 +02:00
parent 4e5352fef0
commit 5bcee83090
No known key found for this signature in database
GPG Key ID: 03750DEC158E5FA2
4 changed files with 133 additions and 109 deletions

View File

@ -17,6 +17,7 @@ import urllib
from zuul import exceptions
from zuul import model
from zuul.lib.dependson import find_dependency_headers
from zuul.lib.logutil import get_annotated_logger
class DynamicChangeQueueContextManager(object):
@ -107,23 +108,24 @@ class PipelineManager(object):
return allow_needs
def eventMatches(self, event, change):
log = get_annotated_logger(self.log, event)
if event.forced_pipeline:
if event.forced_pipeline == self.pipeline.name:
self.log.debug("Event %s for change %s was directly assigned "
"to pipeline %s" % (event, change, self))
log.debug("Event %s for change %s was directly assigned "
"to pipeline %s" % (event, change, self))
return True
else:
return False
for ef in self.event_filters:
match_result = ef.matches(event, change)
if match_result:
self.log.debug("Event %s for change %s matched %s "
"in pipeline %s" % (event, change, ef, self))
log.debug("Event %s for change %s matched %s "
"in pipeline %s" % (event, change, ef, self))
return True
else:
self.log.debug("Event %s for change %s does not match %s "
"in pipeline %s because %s" % (
event, change, ef, self, str(match_result)))
log.debug("Event %s for change %s does not match %s "
"in pipeline %s because %s" % (
event, change, ef, self, str(match_result)))
return False
def getNodePriority(self, item):
@ -187,7 +189,7 @@ class PipelineManager(object):
change_queue):
return True
def checkForChangesNeededBy(self, change, change_queue):
def checkForChangesNeededBy(self, change, change_queue, event):
return True
def getFailingDependentItems(self, item):
@ -225,7 +227,8 @@ class PipelineManager(object):
self.removeItem(item)
def reEnqueueItem(self, item, last_head, old_item_ahead, item_ahead_valid):
with self.getChangeQueue(item.change, last_head.queue) as change_queue:
with self.getChangeQueue(item.change, item.event,
last_head.queue) as change_queue:
if change_queue:
self.log.debug("Re-enqueing change %s in queue %s" %
(item.change, change_queue))
@ -272,55 +275,53 @@ class PipelineManager(object):
def addChange(self, change, event, quiet=False, enqueue_time=None,
ignore_requirements=False, live=True,
change_queue=None, history=None):
self.log.debug("Considering adding change %s" % change)
log = get_annotated_logger(self.log, event)
log.debug("Considering adding change %s" % change)
# If we are adding a live change, check if it's a live item
# anywhere in the pipeline. Otherwise, we will perform the
# duplicate check below on the specific change_queue.
if live and self.isChangeAlreadyInPipeline(change):
self.log.debug("Change %s is already in pipeline, "
"ignoring" % change)
log.debug("Change %s is already in pipeline, ignoring" % change)
return True
if not ignore_requirements:
for f in self.ref_filters:
if f.connection_name != change.project.connection_name:
self.log.debug("Filter %s skipped for change %s due "
"to mismatched connections" % (f, change))
log.debug("Filter %s skipped for change %s due "
"to mismatched connections" % (f, change))
continue
match_result = f.matches(change)
if not match_result:
self.log.debug("Change %s does not match pipeline "
"requirement %s because %s" % (
change, f, str(match_result)))
log.debug("Change %s does not match pipeline "
"requirement %s because %s" % (
change, f, str(match_result)))
return False
if not self.isChangeReadyToBeEnqueued(change):
self.log.debug("Change %s is not ready to be enqueued, ignoring" %
change)
log.debug("Change %s is not ready to be enqueued, ignoring" %
change)
return False
with self.getChangeQueue(change, change_queue) as change_queue:
with self.getChangeQueue(change, event, change_queue) as change_queue:
if not change_queue:
self.log.debug("Unable to find change queue for "
"change %s in project %s" %
(change, change.project))
log.debug("Unable to find change queue for "
"change %s in project %s" %
(change, change.project))
return False
if not self.enqueueChangesAhead(change, event, quiet,
ignore_requirements,
change_queue, history=history):
self.log.debug("Failed to enqueue changes "
"ahead of %s" % change)
log.debug("Failed to enqueue changes ahead of %s" % change)
return False
if self.isChangeAlreadyInQueue(change, change_queue):
self.log.debug("Change %s is already in queue, "
"ignoring" % change)
log.debug("Change %s is already in queue, ignoring" % change)
return True
self.log.info("Adding change %s to queue %s in %s" %
(change, change_queue, self.pipeline))
log.info("Adding change %s to queue %s in %s" %
(change, change_queue, self.pipeline))
item = change_queue.enqueueChange(change, event)
if enqueue_time:
item.enqueue_time = enqueue_time
@ -348,14 +349,16 @@ class PipelineManager(object):
self.dequeueItem(item)
self.reportStats(item)
def updateCommitDependencies(self, change, change_queue):
def updateCommitDependencies(self, change, change_queue, event):
log = get_annotated_logger(self.log, event)
# Search for Depends-On headers and find appropriate changes
self.log.debug(" Updating commit dependencies for %s", change)
log.debug(" Updating commit dependencies for %s", change)
change.refresh_deps = False
dependencies = []
seen = set()
for match in find_dependency_headers(change.message):
self.log.debug(" Found Depends-On header: %s", match)
log.debug(" Found Depends-On header: %s", match)
if match in seen:
continue
seen.add(match)
@ -367,10 +370,10 @@ class PipelineManager(object):
url.hostname)
if not source:
continue
self.log.debug(" Found source: %s", source)
log.debug(" Found source: %s", source)
dep = source.getChangeByURL(match)
if dep and (not dep.is_merged) and dep not in dependencies:
self.log.debug(" Adding dependency: %s", dep)
log.debug(" Adding dependency: %s", dep)
dependencies.append(dep)
change.commit_needs_changes = dependencies
@ -612,8 +615,9 @@ class PipelineManager(object):
return self._loadDynamicLayout(item)
def scheduleMerge(self, item, files=None, dirs=None):
self.log.debug("Scheduling merge for item %s (files: %s, dirs: %s)" %
(item, files, dirs))
log = get_annotated_logger(self.log, item.event)
log.debug("Scheduling merge for item %s (files: %s, dirs: %s)" %
(item, files, dirs))
build_set = item.current_build_set
build_set.merge_state = build_set.PENDING
if isinstance(item.change, model.Change):
@ -627,7 +631,8 @@ class PipelineManager(object):
return False
def scheduleFilesChanges(self, item):
self.log.debug("Scheduling fileschanged for item %s", item)
log = get_annotated_logger(self.log, item.event)
log.debug("Scheduling fileschanged for item %s", item)
build_set = item.current_build_set
build_set.files_state = build_set.PENDING
@ -660,6 +665,7 @@ class PipelineManager(object):
return ready
def prepareJobs(self, item):
log = get_annotated_logger(self.log, item.event)
# This only runs once the item is in the pipeline's action window
# Returns True if the item is ready, false otherwise
if not item.live:
@ -674,18 +680,18 @@ class PipelineManager(object):
if not item.job_graph:
try:
self.log.debug("Freezing job graph for %s" % (item,))
log.debug("Freezing job graph for %s" % (item,))
item.freezeJobGraph()
except Exception as e:
# TODOv3(jeblair): nicify this exception as it will be reported
self.log.exception("Error freezing job graph for %s" %
(item,))
log.exception("Error freezing job graph for %s" % (item,))
item.setConfigError("Unable to freeze job graph: %s" %
(str(e)))
return False
return True
def _processOneItem(self, item, nnfi):
log = get_annotated_logger(self.log, item.event)
changed = False
ready = False
dequeued = False
@ -696,10 +702,11 @@ class PipelineManager(object):
item_ahead = None
change_queue = item.queue
if self.checkForChangesNeededBy(item.change, change_queue) is not True:
if self.checkForChangesNeededBy(item.change, change_queue,
item.event) is not True:
# It's not okay to enqueue this change, we should remove it.
self.log.info("Dequeuing change %s because "
"it can no longer merge" % item.change)
log.info("Dequeuing change %s because "
"it can no longer merge" % item.change)
self.cancelJobs(item)
self.dequeueItem(item)
item.setDequeuedNeedingChange()
@ -727,9 +734,9 @@ class PipelineManager(object):
# Our current base is different than what we expected,
# and it's not because our current base merged. Something
# ahead must have failed.
self.log.info("Resetting builds for change %s because the "
"item ahead, %s, is not the nearest non-failing "
"item, %s" % (item.change, item_ahead, nnfi))
log.info("Resetting builds for change %s because the "
"item ahead, %s, is not the nearest non-failing "
"item, %s" % (item.change, item_ahead, nnfi))
change_queue.moveItem(item, nnfi)
changed = True
self.cancelJobs(item)
@ -764,9 +771,9 @@ class PipelineManager(object):
except exceptions.MergeFailure:
failing_reasons.append("it did not merge")
for item_behind in item.items_behind:
self.log.info("Resetting builds for change %s because the "
"item ahead, %s, failed to merge" %
(item_behind.change, item))
log.info("Resetting builds for change %s because the "
"item ahead, %s, failed to merge" %
(item_behind.change, item))
self.cancelJobs(item_behind)
self.dequeueItem(item)
changed = dequeued = True
@ -774,8 +781,8 @@ class PipelineManager(object):
nnfi = item
item.current_build_set.failing_reasons = failing_reasons
if failing_reasons:
self.log.debug("%s is a failing item because %s" %
(item, failing_reasons))
log.debug("%s is a failing item because %s" %
(item, failing_reasons))
if item.live and not dequeued and self.sched.use_relative_priority:
priority = item.getNodePriority()
for node_request in item.current_build_set.node_requests.values():

View File

@ -11,6 +11,7 @@
# under the License.
from zuul import model
from zuul.lib.logutil import get_annotated_logger
from zuul.manager import PipelineManager, StaticChangeQueueContextManager
from zuul.manager import DynamicChangeQueueContextManager
@ -74,7 +75,9 @@ class DependentPipelineManager(PipelineManager):
self.log.debug("Added project %s to queue: %s" %
(project, change_queue))
def getChangeQueue(self, change, existing=None):
def getChangeQueue(self, change, event, existing=None):
log = get_annotated_logger(self.log, event)
if existing:
return StaticChangeQueueContextManager(existing)
queue = self.pipeline.getQueue(change.project)
@ -86,11 +89,11 @@ class DependentPipelineManager(PipelineManager):
change_queue = model.ChangeQueue(self.pipeline, dynamic=True)
change_queue.addProject(change.project)
self.pipeline.addQueue(change_queue)
self.log.debug("Dynamically created queue %s", change_queue)
log.debug("Dynamically created queue %s", change_queue)
return DynamicChangeQueueContextManager(change_queue)
def getNodePriority(self, item):
with self.getChangeQueue(item.change) as change_queue:
with self.getChangeQueue(item.change, item.event) as change_queue:
items = change_queue.queue
return items.index(item)
@ -103,9 +106,11 @@ class DependentPipelineManager(PipelineManager):
def enqueueChangesBehind(self, change, event, quiet, ignore_requirements,
change_queue):
self.log.debug("Checking for changes needing %s:" % change)
log = get_annotated_logger(self.log, event)
log.debug("Checking for changes needing %s:" % change)
if not hasattr(change, 'needed_by_changes'):
self.log.debug(" %s does not support dependencies" % type(change))
log.debug(" %s does not support dependencies" % type(change))
return
# for project in change_queue, project.source get changes, then dedup.
@ -116,7 +121,7 @@ class DependentPipelineManager(PipelineManager):
seen = set(change.needed_by_changes)
needed_by_changes = change.needed_by_changes[:]
for source in sources:
self.log.debug(" Checking source: %s", source)
log.debug(" Checking source: %s", source)
for c in source.getChangesDependingOn(change,
change_queue.projects,
self.pipeline.tenant):
@ -124,25 +129,26 @@ class DependentPipelineManager(PipelineManager):
seen.add(c)
needed_by_changes.append(c)
self.log.debug(" Following changes: %s", needed_by_changes)
log.debug(" Following changes: %s", needed_by_changes)
to_enqueue = []
for other_change in needed_by_changes:
with self.getChangeQueue(other_change) as other_change_queue:
with self.getChangeQueue(other_change,
event) as other_change_queue:
if other_change_queue != change_queue:
self.log.debug(" Change %s in project %s can not be "
"enqueued in the target queue %s" %
(other_change, other_change.project,
change_queue))
log.debug(" Change %s in project %s can not be "
"enqueued in the target queue %s" %
(other_change, other_change.project,
change_queue))
continue
source = other_change.project.source
if source.canMerge(other_change, self.getSubmitAllowNeeds()):
self.log.debug(" Change %s needs %s and is ready to merge" %
(other_change, change))
log.debug(" Change %s needs %s and is ready to merge",
other_change, change)
to_enqueue.append(other_change)
if not to_enqueue:
self.log.debug(" No changes need %s" % change)
log.debug(" No changes need %s" % change)
for other_change in to_enqueue:
self.addChange(other_change, event, quiet=quiet,
@ -151,9 +157,11 @@ class DependentPipelineManager(PipelineManager):
def enqueueChangesAhead(self, change, event, quiet, ignore_requirements,
change_queue, history=None):
log = get_annotated_logger(self.log, event)
if history and change in history:
# detected dependency cycle
self.log.warn("Dependency cycle detected")
log.warn("Dependency cycle detected")
return False
if hasattr(change, 'number'):
history = history or []
@ -162,11 +170,10 @@ class DependentPipelineManager(PipelineManager):
# Don't enqueue dependencies ahead of a non-change ref.
return True
ret = self.checkForChangesNeededBy(change, change_queue)
ret = self.checkForChangesNeededBy(change, change_queue, event)
if ret in [True, False]:
return ret
self.log.debug(" Changes %s must be merged ahead of %s" %
(ret, change))
log.debug(" Changes %s must be merged ahead of %s", ret, change)
for needed_change in ret:
r = self.addChange(needed_change, event, quiet=quiet,
ignore_requirements=ignore_requirements,
@ -175,53 +182,54 @@ class DependentPipelineManager(PipelineManager):
return False
return True
def checkForChangesNeededBy(self, change, change_queue):
def checkForChangesNeededBy(self, change, change_queue, event):
log = get_annotated_logger(self.log, event)
# Return true if okay to proceed enqueing this change,
# false if the change should not be enqueued.
self.log.debug("Checking for changes needed by %s:" % change)
log.debug("Checking for changes needed by %s:" % change)
if (hasattr(change, 'commit_needs_changes') and
(change.refresh_deps or change.commit_needs_changes is None)):
self.updateCommitDependencies(change, change_queue)
self.updateCommitDependencies(change, change_queue, event)
if not hasattr(change, 'needs_changes'):
self.log.debug(" %s does not support dependencies" % type(change))
log.debug(" %s does not support dependencies", type(change))
return True
if not change.needs_changes:
self.log.debug(" No changes needed")
log.debug(" No changes needed")
return True
changes_needed = []
# Ignore supplied change_queue
with self.getChangeQueue(change) as change_queue:
with self.getChangeQueue(change, event) as change_queue:
for needed_change in change.needs_changes:
self.log.debug(" Change %s needs change %s:" % (
log.debug(" Change %s needs change %s:" % (
change, needed_change))
if needed_change.is_merged:
self.log.debug(" Needed change is merged")
log.debug(" Needed change is merged")
continue
with self.getChangeQueue(needed_change) as needed_change_queue:
with self.getChangeQueue(needed_change,
event) as needed_change_queue:
if needed_change_queue != change_queue:
self.log.debug(" Change %s in project %s does not "
"share a change queue with %s "
"in project %s" %
(needed_change, needed_change.project,
change, change.project))
log.debug(" Change %s in project %s does not "
"share a change queue with %s "
"in project %s",
needed_change, needed_change.project,
change, change.project)
return False
if not needed_change.is_current_patchset:
self.log.debug(" Needed change is not the "
"current patchset")
log.debug(" Needed change is not the current patchset")
return False
if self.isChangeAlreadyInQueue(needed_change, change_queue):
self.log.debug(" Needed change is already ahead "
"in the queue")
log.debug(" Needed change is already ahead in the queue")
continue
if needed_change.project.source.canMerge(
needed_change, self.getSubmitAllowNeeds()):
self.log.debug(" Change %s is needed" % needed_change)
log.debug(" Change %s is needed", needed_change)
if needed_change not in changes_needed:
changes_needed.append(needed_change)
continue
# The needed change can't be merged.
self.log.debug(" Change %s is needed but can not be merged" %
needed_change)
log.debug(" Change %s is needed but can not be merged",
needed_change)
return False
if changes_needed:
return changes_needed

View File

@ -11,6 +11,7 @@
# under the License.
from zuul import model
from zuul.lib.logutil import get_annotated_logger
from zuul.manager import PipelineManager, DynamicChangeQueueContextManager
@ -22,7 +23,9 @@ class IndependentPipelineManager(PipelineManager):
def _postConfig(self, layout):
super(IndependentPipelineManager, self)._postConfig(layout)
def getChangeQueue(self, change, existing=None):
def getChangeQueue(self, change, event, existing=None):
log = get_annotated_logger(self.log, event)
# We ignore any shared change queues on the pipeline and
# instead create a new change queue for every change.
if existing:
@ -30,14 +33,16 @@ class IndependentPipelineManager(PipelineManager):
change_queue = model.ChangeQueue(self.pipeline)
change_queue.addProject(change.project)
self.pipeline.addQueue(change_queue)
self.log.debug("Dynamically created queue %s", change_queue)
log.debug("Dynamically created queue %s", change_queue)
return DynamicChangeQueueContextManager(change_queue)
def enqueueChangesAhead(self, change, event, quiet, ignore_requirements,
change_queue, history=None):
log = get_annotated_logger(self.log, event)
if history and change in history:
# detected dependency cycle
self.log.warn("Dependency cycle detected")
log.warn("Dependency cycle detected")
return False
if hasattr(change, 'number'):
history = history or []
@ -46,11 +51,10 @@ class IndependentPipelineManager(PipelineManager):
# Don't enqueue dependencies ahead of a non-change ref.
return True
ret = self.checkForChangesNeededBy(change, change_queue)
ret = self.checkForChangesNeededBy(change, change_queue, event)
if ret in [True, False]:
return ret
self.log.debug(" Changes %s must be merged ahead of %s" %
(ret, change))
log.debug(" Changes %s must be merged ahead of %s" % (ret, change))
for needed_change in ret:
# This differs from the dependent pipeline by enqueuing
# changes ahead as "not live", that is, not intended to
@ -65,32 +69,34 @@ class IndependentPipelineManager(PipelineManager):
return False
return True
def checkForChangesNeededBy(self, change, change_queue):
def checkForChangesNeededBy(self, change, change_queue, event):
log = get_annotated_logger(self.log, event)
if self.pipeline.ignore_dependencies:
return True
self.log.debug("Checking for changes needed by %s:" % change)
log.debug("Checking for changes needed by %s:" % change)
# Return true if okay to proceed enqueing this change,
# false if the change should not be enqueued.
if (hasattr(change, 'commit_needs_changes') and
(change.refresh_deps or change.commit_needs_changes is None)):
self.updateCommitDependencies(change, None)
self.updateCommitDependencies(change, None, event)
if not hasattr(change, 'needs_changes'):
self.log.debug(" %s does not support dependencies" % type(change))
log.debug(" %s does not support dependencies" % type(change))
return True
if not change.needs_changes:
self.log.debug(" No changes needed")
log.debug(" No changes needed")
return True
changes_needed = []
for needed_change in change.needs_changes:
self.log.debug(" Change %s needs change %s:" % (
log.debug(" Change %s needs change %s:" % (
change, needed_change))
if needed_change.is_merged:
self.log.debug(" Needed change is merged")
log.debug(" Needed change is merged")
continue
if self.isChangeAlreadyInQueue(needed_change, change_queue):
self.log.debug(" Needed change is already ahead in the queue")
log.debug(" Needed change is already ahead in the queue")
continue
self.log.debug(" Change %s is needed" % needed_change)
log.debug(" Change %s is needed" % needed_change)
if needed_change not in changes_needed:
changes_needed.append(needed_change)
continue

View File

@ -11,6 +11,7 @@
# under the License.
from zuul import model
from zuul.lib.logutil import get_annotated_logger
from zuul.manager import PipelineManager, DynamicChangeQueueContextManager
@ -19,7 +20,9 @@ class SupercedentPipelineManager(PipelineManager):
changes_merge = False
def getChangeQueue(self, change, existing=None):
def getChangeQueue(self, change, event, existing=None):
log = get_annotated_logger(self.log, event)
# creates a new change queue for every project-ref
# combination.
if existing:
@ -33,7 +36,7 @@ class SupercedentPipelineManager(PipelineManager):
hasattr(queue.queue[-1].change, 'branch') and
queue.queue[-1].change.branch == change.branch) or
queue.queue[-1].change.ref == change.ref)):
self.log.debug("Found existing queue %s", queue)
log.debug("Found existing queue %s", queue)
return DynamicChangeQueueContextManager(queue)
change_queue = model.ChangeQueue(
self.pipeline,
@ -43,7 +46,7 @@ class SupercedentPipelineManager(PipelineManager):
window_decrease_type='none')
change_queue.addProject(change.project)
self.pipeline.addQueue(change_queue)
self.log.debug("Dynamically created queue %s", change_queue)
log.debug("Dynamically created queue %s", change_queue)
return DynamicChangeQueueContextManager(change_queue)
def _pruneQueues(self):