diff --git a/doc/source/notifications.rst b/doc/source/notifications.rst index a8924b6c..0e419e91 100644 --- a/doc/source/notifications.rst +++ b/doc/source/notifications.rst @@ -180,6 +180,11 @@ Capturing listener .. autoclass:: taskflow.listeners.capturing.CaptureListener +Formatters +---------- + +.. automodule:: taskflow.listeners.formatters + Hierarchy ========= diff --git a/taskflow/formatters.py b/taskflow/formatters.py new file mode 100644 index 00000000..d36082b2 --- /dev/null +++ b/taskflow/formatters.py @@ -0,0 +1,160 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import functools + +from taskflow import exceptions as exc +from taskflow import states +from taskflow.types import tree +from taskflow.utils import misc + + +def _cached_get(cache, cache_key, atom_name, fetch_func, *args, **kwargs): + """Tries to get a previously saved value or fetches it and caches it.""" + value, value_found = None, False + try: + value, value_found = cache[cache_key][atom_name] + except KeyError: + try: + value = fetch_func(*args, **kwargs) + value_found = True + except (exc.StorageFailure, exc.NotFound): + pass + cache[cache_key][atom_name] = value, value_found + return value, value_found + + +def _fetch_predecessor_tree(graph, atom): + """Creates a tree of predecessors, rooted at given atom.""" + root = tree.Node(atom) + stack = [(root, atom)] + seen = set() + while stack: + parent, node = stack.pop() + for pred_node in graph.predecessors_iter(node): + child = tree.Node(pred_node) + parent.add(child) + stack.append((child, pred_node)) + seen.add(pred_node) + return len(seen), root + + +class FailureFormatter(object): + """Formats a failure and connects it to associated atoms & engine.""" + + _BUILDERS = { + states.EXECUTE: (_fetch_predecessor_tree, 'predecessors'), + } + + def __init__(self, engine, hide_inputs_outputs_of=()): + self._hide_inputs_outputs_of = hide_inputs_outputs_of + self._engine = engine + + def _format_node(self, storage, cache, node): + """Formats a single tree node (atom) into a string version.""" + atom = node.item + atom_name = atom.name + atom_attrs = {} + intention, intention_found = _cached_get(cache, 'intentions', + atom_name, + storage.get_atom_intention, + atom_name) + if intention_found: + atom_attrs['intention'] = intention + state, state_found = _cached_get(cache, 'states', atom_name, + storage.get_atom_state, atom_name) + if state_found: + atom_attrs['state'] = state + if atom_name not in self._hide_inputs_outputs_of: + requires, requires_found = _cached_get(cache, 'requires', + atom_name, + # When the cache does not + # exist for this atom this + # will be called with the + # rest of these arguments + # used to populate the + # cache. + storage.fetch_mapped_args, + atom.rebind, + atom_name=atom_name, + optional_args=atom.optional) + if requires_found: + atom_attrs['requires'] = requires + provides, provides_found = _cached_get(cache, 'provides', + atom_name, + storage.get_execute_result, + atom_name) + if provides_found: + atom_attrs['provides'] = provides + if atom_attrs: + return "Atom '%s' %s" % (atom_name, atom_attrs) + else: + return "Atom '%s'" % (atom_name) + + def format(self, fail, atom_matcher): + """Returns a (exc_info, details) tuple about the failure. + + The ``exc_info`` tuple should be a standard three element + (exctype, value, traceback) tuple that will be used for further + logging. A non-empty string is typically returned for ``details``; it + should contain any string info about the failure (with any specific + details the ``exc_info`` may not have/contain). + """ + buff = misc.StringIO() + storage = self._engine.storage + compilation = self._engine.compilation + if fail.exc_info is None: + # Remote failures will not have a 'exc_info' tuple, so just use + # the captured traceback that was captured by the creator when it + # failed... + buff.write_nl(fail.pformat(traceback=True)) + if storage is None or compilation is None: + # Somehow we got called before prepared and/or compiled; ok + # that's weird, skip doing the rest... + return (fail.exc_info, buff.getvalue()) + hierarchy = compilation.hierarchy + graph = compilation.execution_graph + atom_node = hierarchy.find_first_match(atom_matcher) + atom = None + priors = 0 + atom_intention = None + if atom_node is not None: + atom = atom_node.item + atom_intention = storage.get_atom_intention(atom.name) + priors = sum(c for (_n, c) in graph.in_degree_iter([atom])) + if atom is not None and priors and atom_intention in self._BUILDERS: + # Cache as much as we can, since the path of various atoms + # may cause the same atom to be seen repeatedly depending on + # the graph structure... + cache = { + 'intentions': {}, + 'provides': {}, + 'requires': {}, + 'states': {}, + } + builder, kind = self._BUILDERS[atom_intention] + count, rooted_tree = builder(graph, atom) + buff.write_nl('%s %s (most recent atoms first):' % (count, kind)) + formatter = functools.partial(self._format_node, storage, cache) + child_count = rooted_tree.child_count() + for i, child in enumerate(rooted_tree, 1): + if i == child_count: + buff.write(child.pformat(stringify_node=formatter, + starting_prefix=" ")) + else: + buff.write_nl(child.pformat(stringify_node=formatter, + starting_prefix=" ")) + return (fail.exc_info, buff.getvalue()) diff --git a/taskflow/listeners/logging.py b/taskflow/listeners/logging.py index 37fd58ac..219f6ac8 100644 --- a/taskflow/listeners/logging.py +++ b/taskflow/listeners/logging.py @@ -18,9 +18,11 @@ from __future__ import absolute_import import os +from taskflow import formatters from taskflow.listeners import base from taskflow import logging from taskflow import states +from taskflow import task from taskflow.types import failure from taskflow.utils import misc @@ -56,6 +58,16 @@ class LoggingListener(base.DumpingListener): self._logger.log(self._level, message, *args, **kwargs) +def _make_matcher(task_name): + """Returns a function that matches a node with task item with same name.""" + + def _task_matcher(node): + item = node.item + return isinstance(item, task.BaseTask) and item.name == task_name + + return _task_matcher + + class DynamicLoggingListener(base.Listener): """Listener that logs notifications it receives. @@ -99,7 +111,7 @@ class DynamicLoggingListener(base.Listener): flow_listen_for=base.DEFAULT_LISTEN_FOR, retry_listen_for=base.DEFAULT_LISTEN_FOR, log=None, failure_level=logging.WARNING, - level=logging.DEBUG): + level=logging.DEBUG, hide_inputs_outputs_of=()): super(DynamicLoggingListener, self).__init__( engine, task_listen_for=task_listen_for, flow_listen_for=flow_listen_for, retry_listen_for=retry_listen_for) @@ -115,33 +127,10 @@ class DynamicLoggingListener(base.Listener): states.FAILURE: self._failure_level, states.REVERTED: self._failure_level, } + self._hide_inputs_outputs_of = frozenset(hide_inputs_outputs_of) self._logger = misc.pick_first_not_none(log, self._LOGGER, LOG) - - @staticmethod - def _format_failure(fail): - """Returns a (exc_info, exc_details) tuple about the failure. - - The ``exc_info`` tuple should be a standard three element - (exctype, value, traceback) tuple that will be used for further - logging. If a non-empty string is returned for ``exc_details`` it - should contain any string info about the failure (with any specific - details the ``exc_info`` may not have/contain). If the ``exc_info`` - tuple is returned as ``None`` then it will cause the logging - system to avoid outputting any traceback information (read - the python documentation on the logger interaction with ``exc_info`` - to learn more). - """ - if fail.exc_info: - exc_info = fail.exc_info - exc_details = '' - else: - # When a remote failure occurs (or somehow the failure - # object lost its traceback), we will not have a valid - # exc_info that can be used but we *should* have a string - # version that we can use instead... - exc_info = None - exc_details = "%s%s" % (os.linesep, fail.pformat(traceback=True)) - return (exc_info, exc_details) + self._fail_formatter = formatters.FailureFormatter( + self._engine, hide_inputs_outputs_of=self._hide_inputs_outputs_of) def _flow_receiver(self, state, details): """Gets called on flow state changes.""" @@ -152,39 +141,49 @@ class DynamicLoggingListener(base.Listener): def _task_receiver(self, state, details): """Gets called on task state changes.""" + task_name = details['task_name'] + task_uuid = details['task_uuid'] if 'result' in details and state in base.FINISH_STATES: # If the task failed, it's useful to show the exception traceback # and any other available exception information. result = details.get('result') if isinstance(result, failure.Failure): - exc_info, exc_details = self._format_failure(result) - self._logger.log(self._failure_level, - "Task '%s' (%s) transitioned into state" - " '%s' from state '%s'%s", - details['task_name'], details['task_uuid'], - state, details['old_state'], exc_details, - exc_info=exc_info) + exc_info, fail_details = self._fail_formatter.format( + result, _make_matcher(task_name)) + if fail_details: + self._logger.log(self._failure_level, + "Task '%s' (%s) transitioned into state" + " '%s' from state '%s'%s%s", + task_name, task_uuid, state, + details['old_state'], os.linesep, + fail_details, exc_info=exc_info) + else: + self._logger.log(self._failure_level, + "Task '%s' (%s) transitioned into state" + " '%s' from state '%s'", task_name, + task_uuid, state, details['old_state'], + exc_info=exc_info) else: # Otherwise, depending on the enabled logging level/state we # will show or hide results that the task may have produced # during execution. level = self._task_log_levels.get(state, self._level) - if (self._logger.isEnabledFor(self._level) - or state in self._FAILURE_STATES): + show_result = (self._logger.isEnabledFor(self._level) + or state == states.FAILURE) + if show_result and \ + task_name not in self._hide_inputs_outputs_of: self._logger.log(level, "Task '%s' (%s) transitioned into" " state '%s' from state '%s' with" - " result '%s'", details['task_name'], - details['task_uuid'], state, - details['old_state'], result) + " result '%s'", task_name, task_uuid, + state, details['old_state'], result) else: self._logger.log(level, "Task '%s' (%s) transitioned into" " state '%s' from state '%s'", - details['task_name'], - details['task_uuid'], state, + task_name, task_uuid, state, details['old_state']) else: # Just a intermediary state, carry on! level = self._task_log_levels.get(state, self._level) self._logger.log(level, "Task '%s' (%s) transitioned into state" - " '%s' from state '%s'", details['task_name'], - details['task_uuid'], state, details['old_state']) + " '%s' from state '%s'", task_name, task_uuid, + state, details['old_state']) diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index bd2e6e38..aa89aa81 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -60,6 +60,14 @@ class StrEnum(str, enum.Enum): return super(StrEnum, cls).__new__(cls, *args, **kwargs) +class StringIO(six.StringIO): + """String buffer with some small additions.""" + + def write_nl(self, value, linesep=os.linesep): + self.write(value) + self.write(linesep) + + def match_type(obj, matchers): """Matches a given object using the given matchers list/iterable.