diff --git a/doc/source/notifications.rst b/doc/source/notifications.rst index 755f7b13..0fbd7a93 100644 --- a/doc/source/notifications.rst +++ b/doc/source/notifications.rst @@ -158,6 +158,8 @@ Printing and logging listeners .. autoclass:: taskflow.listeners.logging.LoggingListener +.. autoclass:: taskflow.listeners.logging.DynamicLoggingListener + .. autoclass:: taskflow.listeners.printing.PrintingListener Timing listener diff --git a/taskflow/listeners/logging.py b/taskflow/listeners/logging.py index 71bf83f5..175bc6fa 100644 --- a/taskflow/listeners/logging.py +++ b/taskflow/listeners/logging.py @@ -17,20 +17,38 @@ from __future__ import absolute_import import logging +import sys from taskflow.listeners import base +from taskflow import states from taskflow.utils import misc LOG = logging.getLogger(__name__) +if sys.version_info[0:2] == (2, 6): + _PY26 = True +else: + _PY26 = False + + +# Fixes this for python 2.6 which was missing the is enabled for method +# when a logger adapter is being used/provided, this will no longer be needed +# when we can just support python 2.7+ (which fixed the lack of this method +# on adapters). +def _isEnabledFor(logger, level): + if _PY26 and isinstance(logger, logging.LoggerAdapter): + return logger.logger.isEnabledFor(level) + return logger.isEnabledFor(level) + class LoggingListener(base.LoggingBase): """Listener that logs notifications it receives. - It listens for task and flow notifications and writes those - notifications to provided logger, or logger of its module - (``taskflow.listeners.logging``) if none provided. Log level - can also be configured, ``logging.DEBUG`` is used by default. + It listens for task and flow notifications and writes those notifications + to a provided logger, or logger of its module + (``taskflow.listeners.logging``) if none is provided. The log level + can also be configured, ``logging.DEBUG`` is used by default when none + is provided. """ def __init__(self, engine, task_listen_for=(misc.Notifier.ANY,), @@ -40,10 +58,115 @@ class LoggingListener(base.LoggingBase): super(LoggingListener, self).__init__(engine, task_listen_for=task_listen_for, flow_listen_for=flow_listen_for) - self._logger = log - if not self._logger: + if not log: self._logger = LOG + else: + self._logger = log self._level = level def _log(self, message, *args, **kwargs): self._logger.log(self._level, message, *args, **kwargs) + + +class DynamicLoggingListener(base.ListenerBase): + """Listener that logs notifications it receives. + + It listens for task and flow notifications and writes those notifications + to a provided logger, or logger of its module + (``taskflow.listeners.logging``) if none is provided. The log level + can *slightly* be configured and ``logging.DEBUG`` or ``logging.WARNING`` + (unless overriden via a constructor parameter) will be selected + automatically based on the execution state and results produced. + + The following flow states cause ``logging.WARNING`` (or provided + level) to be used: + + * ``states.FAILURE`` + * ``states.REVERTED`` + + The following task states cause ``logging.WARNING`` (or provided level) + to be used: + + * ``states.FAILURE`` + * ``states.RETRYING`` + * ``states.REVERTING`` + + When a task produces a :py:class:`~taskflow.utils.misc.Failure` object as + its result (typically this happens when a task raises an exception) this + will **always** switch the logger to use ``logging.WARNING`` (if the + failure object contains a ``exc_info`` tuple this will also be logged to + provide a meaningful traceback). + """ + + def __init__(self, engine, + task_listen_for=(misc.Notifier.ANY,), + flow_listen_for=(misc.Notifier.ANY,), + log=None, failure_level=logging.WARNING, + level=logging.DEBUG): + super(DynamicLoggingListener, self).__init__( + engine, + task_listen_for=task_listen_for, + flow_listen_for=flow_listen_for) + self._failure_level = failure_level + self._level = level + if not log: + self._logger = LOG + else: + self._logger = log + + def _flow_receiver(self, state, details): + # Gets called on flow state changes. + level = self._level + if state in (states.FAILURE, states.REVERTED): + level = self._failure_level + self._logger.log(level, "Flow '%s' (%s) transitioned into state '%s'" + " from state '%s'", details['flow_name'], + details['flow_uuid'], state, details.get('old_state')) + + def _task_receiver(self, state, details): + # Gets called on task state changes. + if 'result' in details and state in base.FINISH_STATES: + # If the task failed, it's useful to show the exception traceback + # and any other available exception information. + result = details.get('result') + if isinstance(result, misc.Failure): + if result.exc_info: + exc_info = result.exc_info + manual_tb = '' + else: + # When a remote failure occurs (or somehow the failure + # object lost its traceback), we will not have a valid + # exc_info that can be used but we *should* have a string + # version that we can use instead... + exc_info = None + manual_tb = "\n%s" % result.pformat(traceback=True) + self._logger.log(self._failure_level, + "Task '%s' (%s) transitioned into state" + " '%s'%s", details['task_name'], + details['task_uuid'], state, manual_tb, + exc_info=exc_info) + else: + # Otherwise, depending on the enabled logging level/state we + # will show or hide results that the task may have produced + # during execution. + level = self._level + if state == states.FAILURE: + level = self._failure_level + if (_isEnabledFor(self._logger, self._level) + or state == states.FAILURE): + self._logger.log(level, "Task '%s' (%s) transitioned into" + " state '%s' with result '%s'", + details['task_name'], + details['task_uuid'], state, + result) + else: + self._logger.log(level, "Task '%s' (%s) transitioned into" + " state '%s'", details['task_name'], + details['task_uuid'], state) + else: + level = self._level + if state in (states.REVERTING, states.RETRYING): + level = self._failure_level + self._logger.log(level, "Task '%s' (%s) transitioned into state" + " '%s'", details['task_name'], + details['task_uuid'], state) diff --git a/taskflow/test.py b/taskflow/test.py index f894b3c3..32c0b0fc 100644 --- a/taskflow/test.py +++ b/taskflow/test.py @@ -14,6 +14,9 @@ # License for the specific language governing permissions and limitations # under the License. +import collections +import logging + import fixtures from oslotest import base from oslotest import mockpatch @@ -236,3 +239,69 @@ class MockTestCase(TestCase): def resetMasterMock(self): self.master_mock.reset_mock() + + +class CapturingLoggingHandler(logging.Handler): + """A handler that saves record contents for post-test analysis.""" + + def __init__(self, level=logging.DEBUG): + # It seems needed to use the old style of base class calling, we + # can remove this old style when we only support py3.x + logging.Handler.__init__(self, level=level) + self._records = [] + + @property + def counts(self): + """Returns a dictionary with the number of records at each level.""" + self.acquire() + try: + captured = collections.defaultdict(int) + for r in self._records: + captured[r.levelno] += 1 + return captured + finally: + self.release() + + @property + def messages(self): + """Returns a dictionary with list of record messages at each level.""" + self.acquire() + try: + captured = collections.defaultdict(list) + for r in self._records: + captured[r.levelno].append(r.getMessage()) + return captured + finally: + self.release() + + @property + def exc_infos(self): + """Returns a list of all the record exc_info tuples captured.""" + self.acquire() + try: + captured = [] + for r in self._records: + if r.exc_info: + captured.append(r.exc_info) + return captured + finally: + self.release() + + def emit(self, record): + self.acquire() + try: + self._records.append(record) + finally: + self.release() + + def reset(self): + """Resets *all* internally captured state.""" + self.acquire() + try: + self._records = [] + finally: + self.release() + + def close(self): + logging.Handler.close(self) + self.reset() diff --git a/taskflow/tests/unit/test_duration.py b/taskflow/tests/unit/test_duration.py deleted file mode 100644 index 47389724..00000000 --- a/taskflow/tests/unit/test_duration.py +++ /dev/null @@ -1,81 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import contextlib -import time - -import taskflow.engines -from taskflow import exceptions as exc -from taskflow.listeners import timing -from taskflow.patterns import linear_flow as lf -from taskflow.persistence.backends import impl_memory -from taskflow import task -from taskflow import test -from taskflow.test import mock -from taskflow.tests import utils as t_utils -from taskflow.utils import persistence_utils as p_utils - - -class SleepyTask(task.Task): - def __init__(self, name, sleep_for=0.0): - super(SleepyTask, self).__init__(name=name) - self._sleep_for = float(sleep_for) - - def execute(self): - if self._sleep_for <= 0: - return - else: - time.sleep(self._sleep_for) - - -class TestDuration(test.TestCase): - def make_engine(self, flow, flow_detail, backend): - e = taskflow.engines.load(flow, - flow_detail=flow_detail, - backend=backend) - e.compile() - return e - - def test_duration(self): - with contextlib.closing(impl_memory.MemoryBackend({})) as be: - flow = lf.Flow("test") - flow.add(SleepyTask("test-1", sleep_for=0.1)) - (lb, fd) = p_utils.temporary_flow_detail(be) - e = self.make_engine(flow, fd, be) - with timing.TimingListener(e): - e.run() - t_uuid = e.storage.get_atom_uuid("test-1") - td = fd.find(t_uuid) - self.assertIsNotNone(td) - self.assertIsNotNone(td.meta) - self.assertIn('duration', td.meta) - self.assertGreaterEqual(0.1, td.meta['duration']) - - @mock.patch.object(timing.LOG, 'warn') - def test_record_ending_exception(self, mocked_warn): - with contextlib.closing(impl_memory.MemoryBackend({})) as be: - flow = lf.Flow("test") - flow.add(t_utils.TaskNoRequiresNoReturns("test-1")) - (lb, fd) = p_utils.temporary_flow_detail(be) - e = self.make_engine(flow, fd, be) - timing_listener = timing.TimingListener(e) - with mock.patch.object(timing_listener._engine.storage, - 'update_atom_metadata') as mocked_uam: - mocked_uam.side_effect = exc.StorageFailure('Woot!') - with timing_listener: - e.run() - mocked_warn.assert_called_once_with(mock.ANY, mock.ANY, 'test-1', - exc_info=True) diff --git a/taskflow/tests/unit/test_listeners.py b/taskflow/tests/unit/test_listeners.py new file mode 100644 index 00000000..6ba97d6d --- /dev/null +++ b/taskflow/tests/unit/test_listeners.py @@ -0,0 +1,190 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import contextlib +import logging +import time + +import taskflow.engines +from taskflow import exceptions as exc +from taskflow.listeners import logging as logging_listeners +from taskflow.listeners import timing +from taskflow.patterns import linear_flow as lf +from taskflow.persistence.backends import impl_memory +from taskflow import task +from taskflow import test +from taskflow.test import mock +from taskflow.tests import utils as test_utils +from taskflow.utils import persistence_utils +from taskflow.utils import reflection + + +_LOG_LEVELS = frozenset([ + logging.CRITICAL, + logging.DEBUG, + logging.ERROR, + logging.INFO, + logging.NOTSET, + logging.WARNING, +]) + + +class SleepyTask(task.Task): + def __init__(self, name, sleep_for=0.0): + super(SleepyTask, self).__init__(name=name) + self._sleep_for = float(sleep_for) + + def execute(self): + if self._sleep_for <= 0: + return + else: + time.sleep(self._sleep_for) + + +class EngineMakerMixin(object): + def _make_engine(self, flow, flow_detail=None, backend=None): + e = taskflow.engines.load(flow, + flow_detail=flow_detail, + backend=backend) + e.compile() + e.prepare() + return e + + +class TestTimingListener(test.TestCase, EngineMakerMixin): + def test_duration(self): + with contextlib.closing(impl_memory.MemoryBackend()) as be: + flow = lf.Flow("test") + flow.add(SleepyTask("test-1", sleep_for=0.1)) + (lb, fd) = persistence_utils.temporary_flow_detail(be) + e = self._make_engine(flow, fd, be) + with timing.TimingListener(e): + e.run() + t_uuid = e.storage.get_atom_uuid("test-1") + td = fd.find(t_uuid) + self.assertIsNotNone(td) + self.assertIsNotNone(td.meta) + self.assertIn('duration', td.meta) + self.assertGreaterEqual(0.1, td.meta['duration']) + + @mock.patch.object(timing.LOG, 'warn') + def test_record_ending_exception(self, mocked_warn): + with contextlib.closing(impl_memory.MemoryBackend()) as be: + flow = lf.Flow("test") + flow.add(test_utils.TaskNoRequiresNoReturns("test-1")) + (lb, fd) = persistence_utils.temporary_flow_detail(be) + e = self._make_engine(flow, fd, be) + timing_listener = timing.TimingListener(e) + with mock.patch.object(timing_listener._engine.storage, + 'update_atom_metadata') as mocked_uam: + mocked_uam.side_effect = exc.StorageFailure('Woot!') + with timing_listener: + e.run() + mocked_warn.assert_called_once_with(mock.ANY, mock.ANY, 'test-1', + exc_info=True) + + +class TestLoggingListeners(test.TestCase, EngineMakerMixin): + def _make_logger(self, level=logging.DEBUG): + log = logging.getLogger( + reflection.get_callable_name(self._get_test_method())) + log.propagate = False + for handler in reversed(log.handlers): + log.removeHandler(handler) + handler = test.CapturingLoggingHandler(level=level) + log.addHandler(handler) + log.setLevel(level) + self.addCleanup(handler.reset) + self.addCleanup(log.removeHandler, handler) + return (log, handler) + + def test_basic(self): + flow = lf.Flow("test") + flow.add(test_utils.TaskNoRequiresNoReturns("test-1")) + e = self._make_engine(flow) + log, handler = self._make_logger() + with logging_listeners.LoggingListener(e, log=log): + e.run() + self.assertGreater(0, handler.counts[logging.DEBUG]) + for levelno in _LOG_LEVELS - set([logging.DEBUG]): + self.assertEqual(0, handler.counts[levelno]) + self.assertEqual([], handler.exc_infos) + + def test_basic_customized(self): + flow = lf.Flow("test") + flow.add(test_utils.TaskNoRequiresNoReturns("test-1")) + e = self._make_engine(flow) + log, handler = self._make_logger() + listener = logging_listeners.LoggingListener( + e, log=log, level=logging.INFO) + with listener: + e.run() + self.assertGreater(0, handler.counts[logging.INFO]) + for levelno in _LOG_LEVELS - set([logging.INFO]): + self.assertEqual(0, handler.counts[levelno]) + self.assertEqual([], handler.exc_infos) + + def test_basic_failure(self): + flow = lf.Flow("test") + flow.add(test_utils.TaskWithFailure("test-1")) + e = self._make_engine(flow) + log, handler = self._make_logger() + with logging_listeners.LoggingListener(e, log=log): + self.assertRaises(RuntimeError, e.run) + self.assertGreater(0, handler.counts[logging.DEBUG]) + for levelno in _LOG_LEVELS - set([logging.DEBUG]): + self.assertEqual(0, handler.counts[levelno]) + self.assertEqual(1, len(handler.exc_infos)) + + def test_dynamic(self): + flow = lf.Flow("test") + flow.add(test_utils.TaskNoRequiresNoReturns("test-1")) + e = self._make_engine(flow) + log, handler = self._make_logger() + with logging_listeners.DynamicLoggingListener(e, log=log): + e.run() + self.assertGreater(0, handler.counts[logging.DEBUG]) + for levelno in _LOG_LEVELS - set([logging.DEBUG]): + self.assertEqual(0, handler.counts[levelno]) + self.assertEqual([], handler.exc_infos) + + def test_dynamic_failure(self): + flow = lf.Flow("test") + flow.add(test_utils.TaskWithFailure("test-1")) + e = self._make_engine(flow) + log, handler = self._make_logger() + with logging_listeners.DynamicLoggingListener(e, log=log): + self.assertRaises(RuntimeError, e.run) + self.assertGreater(0, handler.counts[logging.WARNING]) + self.assertGreater(0, handler.counts[logging.DEBUG]) + self.assertEqual(1, len(handler.exc_infos)) + for levelno in _LOG_LEVELS - set([logging.DEBUG, logging.WARNING]): + self.assertEqual(0, handler.counts[levelno]) + + def test_dynamic_failure_customized_level(self): + flow = lf.Flow("test") + flow.add(test_utils.TaskWithFailure("test-1")) + e = self._make_engine(flow) + log, handler = self._make_logger() + listener = logging_listeners.DynamicLoggingListener( + e, log=log, failure_level=logging.ERROR) + with listener: + self.assertRaises(RuntimeError, e.run) + self.assertGreater(0, handler.counts[logging.ERROR]) + self.assertGreater(0, handler.counts[logging.DEBUG]) + self.assertEqual(1, len(handler.exc_infos)) + for levelno in _LOG_LEVELS - set([logging.DEBUG, logging.ERROR]): + self.assertEqual(0, handler.counts[levelno])