Merge "Extend and improve failure logging"
This commit is contained in:
commit
19dace9548
|
@ -180,6 +180,11 @@ Capturing listener
|
|||
|
||||
.. autoclass:: taskflow.listeners.capturing.CaptureListener
|
||||
|
||||
Formatters
|
||||
----------
|
||||
|
||||
.. automodule:: taskflow.listeners.formatters
|
||||
|
||||
Hierarchy
|
||||
=========
|
||||
|
||||
|
|
|
@ -0,0 +1,160 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import functools
|
||||
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow import states
|
||||
from taskflow.types import tree
|
||||
from taskflow.utils import misc
|
||||
|
||||
|
||||
def _cached_get(cache, cache_key, atom_name, fetch_func, *args, **kwargs):
|
||||
"""Tries to get a previously saved value or fetches it and caches it."""
|
||||
value, value_found = None, False
|
||||
try:
|
||||
value, value_found = cache[cache_key][atom_name]
|
||||
except KeyError:
|
||||
try:
|
||||
value = fetch_func(*args, **kwargs)
|
||||
value_found = True
|
||||
except (exc.StorageFailure, exc.NotFound):
|
||||
pass
|
||||
cache[cache_key][atom_name] = value, value_found
|
||||
return value, value_found
|
||||
|
||||
|
||||
def _fetch_predecessor_tree(graph, atom):
|
||||
"""Creates a tree of predecessors, rooted at given atom."""
|
||||
root = tree.Node(atom)
|
||||
stack = [(root, atom)]
|
||||
seen = set()
|
||||
while stack:
|
||||
parent, node = stack.pop()
|
||||
for pred_node in graph.predecessors_iter(node):
|
||||
child = tree.Node(pred_node)
|
||||
parent.add(child)
|
||||
stack.append((child, pred_node))
|
||||
seen.add(pred_node)
|
||||
return len(seen), root
|
||||
|
||||
|
||||
class FailureFormatter(object):
|
||||
"""Formats a failure and connects it to associated atoms & engine."""
|
||||
|
||||
_BUILDERS = {
|
||||
states.EXECUTE: (_fetch_predecessor_tree, 'predecessors'),
|
||||
}
|
||||
|
||||
def __init__(self, engine, hide_inputs_outputs_of=()):
|
||||
self._hide_inputs_outputs_of = hide_inputs_outputs_of
|
||||
self._engine = engine
|
||||
|
||||
def _format_node(self, storage, cache, node):
|
||||
"""Formats a single tree node (atom) into a string version."""
|
||||
atom = node.item
|
||||
atom_name = atom.name
|
||||
atom_attrs = {}
|
||||
intention, intention_found = _cached_get(cache, 'intentions',
|
||||
atom_name,
|
||||
storage.get_atom_intention,
|
||||
atom_name)
|
||||
if intention_found:
|
||||
atom_attrs['intention'] = intention
|
||||
state, state_found = _cached_get(cache, 'states', atom_name,
|
||||
storage.get_atom_state, atom_name)
|
||||
if state_found:
|
||||
atom_attrs['state'] = state
|
||||
if atom_name not in self._hide_inputs_outputs_of:
|
||||
requires, requires_found = _cached_get(cache, 'requires',
|
||||
atom_name,
|
||||
# When the cache does not
|
||||
# exist for this atom this
|
||||
# will be called with the
|
||||
# rest of these arguments
|
||||
# used to populate the
|
||||
# cache.
|
||||
storage.fetch_mapped_args,
|
||||
atom.rebind,
|
||||
atom_name=atom_name,
|
||||
optional_args=atom.optional)
|
||||
if requires_found:
|
||||
atom_attrs['requires'] = requires
|
||||
provides, provides_found = _cached_get(cache, 'provides',
|
||||
atom_name,
|
||||
storage.get_execute_result,
|
||||
atom_name)
|
||||
if provides_found:
|
||||
atom_attrs['provides'] = provides
|
||||
if atom_attrs:
|
||||
return "Atom '%s' %s" % (atom_name, atom_attrs)
|
||||
else:
|
||||
return "Atom '%s'" % (atom_name)
|
||||
|
||||
def format(self, fail, atom_matcher):
|
||||
"""Returns a (exc_info, details) tuple about the failure.
|
||||
|
||||
The ``exc_info`` tuple should be a standard three element
|
||||
(exctype, value, traceback) tuple that will be used for further
|
||||
logging. A non-empty string is typically returned for ``details``; it
|
||||
should contain any string info about the failure (with any specific
|
||||
details the ``exc_info`` may not have/contain).
|
||||
"""
|
||||
buff = misc.StringIO()
|
||||
storage = self._engine.storage
|
||||
compilation = self._engine.compilation
|
||||
if fail.exc_info is None:
|
||||
# Remote failures will not have a 'exc_info' tuple, so just use
|
||||
# the captured traceback that was captured by the creator when it
|
||||
# failed...
|
||||
buff.write_nl(fail.pformat(traceback=True))
|
||||
if storage is None or compilation is None:
|
||||
# Somehow we got called before prepared and/or compiled; ok
|
||||
# that's weird, skip doing the rest...
|
||||
return (fail.exc_info, buff.getvalue())
|
||||
hierarchy = compilation.hierarchy
|
||||
graph = compilation.execution_graph
|
||||
atom_node = hierarchy.find_first_match(atom_matcher)
|
||||
atom = None
|
||||
priors = 0
|
||||
atom_intention = None
|
||||
if atom_node is not None:
|
||||
atom = atom_node.item
|
||||
atom_intention = storage.get_atom_intention(atom.name)
|
||||
priors = sum(c for (_n, c) in graph.in_degree_iter([atom]))
|
||||
if atom is not None and priors and atom_intention in self._BUILDERS:
|
||||
# Cache as much as we can, since the path of various atoms
|
||||
# may cause the same atom to be seen repeatedly depending on
|
||||
# the graph structure...
|
||||
cache = {
|
||||
'intentions': {},
|
||||
'provides': {},
|
||||
'requires': {},
|
||||
'states': {},
|
||||
}
|
||||
builder, kind = self._BUILDERS[atom_intention]
|
||||
count, rooted_tree = builder(graph, atom)
|
||||
buff.write_nl('%s %s (most recent atoms first):' % (count, kind))
|
||||
formatter = functools.partial(self._format_node, storage, cache)
|
||||
child_count = rooted_tree.child_count()
|
||||
for i, child in enumerate(rooted_tree, 1):
|
||||
if i == child_count:
|
||||
buff.write(child.pformat(stringify_node=formatter,
|
||||
starting_prefix=" "))
|
||||
else:
|
||||
buff.write_nl(child.pformat(stringify_node=formatter,
|
||||
starting_prefix=" "))
|
||||
return (fail.exc_info, buff.getvalue())
|
|
@ -18,9 +18,11 @@ from __future__ import absolute_import
|
|||
|
||||
import os
|
||||
|
||||
from taskflow import formatters
|
||||
from taskflow.listeners import base
|
||||
from taskflow import logging
|
||||
from taskflow import states
|
||||
from taskflow import task
|
||||
from taskflow.types import failure
|
||||
from taskflow.utils import misc
|
||||
|
||||
|
@ -56,6 +58,16 @@ class LoggingListener(base.DumpingListener):
|
|||
self._logger.log(self._level, message, *args, **kwargs)
|
||||
|
||||
|
||||
def _make_matcher(task_name):
|
||||
"""Returns a function that matches a node with task item with same name."""
|
||||
|
||||
def _task_matcher(node):
|
||||
item = node.item
|
||||
return isinstance(item, task.BaseTask) and item.name == task_name
|
||||
|
||||
return _task_matcher
|
||||
|
||||
|
||||
class DynamicLoggingListener(base.Listener):
|
||||
"""Listener that logs notifications it receives.
|
||||
|
||||
|
@ -99,7 +111,7 @@ class DynamicLoggingListener(base.Listener):
|
|||
flow_listen_for=base.DEFAULT_LISTEN_FOR,
|
||||
retry_listen_for=base.DEFAULT_LISTEN_FOR,
|
||||
log=None, failure_level=logging.WARNING,
|
||||
level=logging.DEBUG):
|
||||
level=logging.DEBUG, hide_inputs_outputs_of=()):
|
||||
super(DynamicLoggingListener, self).__init__(
|
||||
engine, task_listen_for=task_listen_for,
|
||||
flow_listen_for=flow_listen_for, retry_listen_for=retry_listen_for)
|
||||
|
@ -115,33 +127,10 @@ class DynamicLoggingListener(base.Listener):
|
|||
states.FAILURE: self._failure_level,
|
||||
states.REVERTED: self._failure_level,
|
||||
}
|
||||
self._hide_inputs_outputs_of = frozenset(hide_inputs_outputs_of)
|
||||
self._logger = misc.pick_first_not_none(log, self._LOGGER, LOG)
|
||||
|
||||
@staticmethod
|
||||
def _format_failure(fail):
|
||||
"""Returns a (exc_info, exc_details) tuple about the failure.
|
||||
|
||||
The ``exc_info`` tuple should be a standard three element
|
||||
(exctype, value, traceback) tuple that will be used for further
|
||||
logging. If a non-empty string is returned for ``exc_details`` it
|
||||
should contain any string info about the failure (with any specific
|
||||
details the ``exc_info`` may not have/contain). If the ``exc_info``
|
||||
tuple is returned as ``None`` then it will cause the logging
|
||||
system to avoid outputting any traceback information (read
|
||||
the python documentation on the logger interaction with ``exc_info``
|
||||
to learn more).
|
||||
"""
|
||||
if fail.exc_info:
|
||||
exc_info = fail.exc_info
|
||||
exc_details = ''
|
||||
else:
|
||||
# When a remote failure occurs (or somehow the failure
|
||||
# object lost its traceback), we will not have a valid
|
||||
# exc_info that can be used but we *should* have a string
|
||||
# version that we can use instead...
|
||||
exc_info = None
|
||||
exc_details = "%s%s" % (os.linesep, fail.pformat(traceback=True))
|
||||
return (exc_info, exc_details)
|
||||
self._fail_formatter = formatters.FailureFormatter(
|
||||
self._engine, hide_inputs_outputs_of=self._hide_inputs_outputs_of)
|
||||
|
||||
def _flow_receiver(self, state, details):
|
||||
"""Gets called on flow state changes."""
|
||||
|
@ -152,39 +141,49 @@ class DynamicLoggingListener(base.Listener):
|
|||
|
||||
def _task_receiver(self, state, details):
|
||||
"""Gets called on task state changes."""
|
||||
task_name = details['task_name']
|
||||
task_uuid = details['task_uuid']
|
||||
if 'result' in details and state in base.FINISH_STATES:
|
||||
# If the task failed, it's useful to show the exception traceback
|
||||
# and any other available exception information.
|
||||
result = details.get('result')
|
||||
if isinstance(result, failure.Failure):
|
||||
exc_info, exc_details = self._format_failure(result)
|
||||
self._logger.log(self._failure_level,
|
||||
"Task '%s' (%s) transitioned into state"
|
||||
" '%s' from state '%s'%s",
|
||||
details['task_name'], details['task_uuid'],
|
||||
state, details['old_state'], exc_details,
|
||||
exc_info=exc_info)
|
||||
exc_info, fail_details = self._fail_formatter.format(
|
||||
result, _make_matcher(task_name))
|
||||
if fail_details:
|
||||
self._logger.log(self._failure_level,
|
||||
"Task '%s' (%s) transitioned into state"
|
||||
" '%s' from state '%s'%s%s",
|
||||
task_name, task_uuid, state,
|
||||
details['old_state'], os.linesep,
|
||||
fail_details, exc_info=exc_info)
|
||||
else:
|
||||
self._logger.log(self._failure_level,
|
||||
"Task '%s' (%s) transitioned into state"
|
||||
" '%s' from state '%s'", task_name,
|
||||
task_uuid, state, details['old_state'],
|
||||
exc_info=exc_info)
|
||||
else:
|
||||
# Otherwise, depending on the enabled logging level/state we
|
||||
# will show or hide results that the task may have produced
|
||||
# during execution.
|
||||
level = self._task_log_levels.get(state, self._level)
|
||||
if (self._logger.isEnabledFor(self._level)
|
||||
or state in self._FAILURE_STATES):
|
||||
show_result = (self._logger.isEnabledFor(self._level)
|
||||
or state == states.FAILURE)
|
||||
if show_result and \
|
||||
task_name not in self._hide_inputs_outputs_of:
|
||||
self._logger.log(level, "Task '%s' (%s) transitioned into"
|
||||
" state '%s' from state '%s' with"
|
||||
" result '%s'", details['task_name'],
|
||||
details['task_uuid'], state,
|
||||
details['old_state'], result)
|
||||
" result '%s'", task_name, task_uuid,
|
||||
state, details['old_state'], result)
|
||||
else:
|
||||
self._logger.log(level, "Task '%s' (%s) transitioned into"
|
||||
" state '%s' from state '%s'",
|
||||
details['task_name'],
|
||||
details['task_uuid'], state,
|
||||
task_name, task_uuid, state,
|
||||
details['old_state'])
|
||||
else:
|
||||
# Just a intermediary state, carry on!
|
||||
level = self._task_log_levels.get(state, self._level)
|
||||
self._logger.log(level, "Task '%s' (%s) transitioned into state"
|
||||
" '%s' from state '%s'", details['task_name'],
|
||||
details['task_uuid'], state, details['old_state'])
|
||||
" '%s' from state '%s'", task_name, task_uuid,
|
||||
state, details['old_state'])
|
||||
|
|
|
@ -60,6 +60,14 @@ class StrEnum(str, enum.Enum):
|
|||
return super(StrEnum, cls).__new__(cls, *args, **kwargs)
|
||||
|
||||
|
||||
class StringIO(six.StringIO):
|
||||
"""String buffer with some small additions."""
|
||||
|
||||
def write_nl(self, value, linesep=os.linesep):
|
||||
self.write(value)
|
||||
self.write(linesep)
|
||||
|
||||
|
||||
def match_type(obj, matchers):
|
||||
"""Matches a given object using the given matchers list/iterable.
|
||||
|
||||
|
|
Loading…
Reference in New Issue