Merge "Add a more dynamic/useful logging listener"

This commit is contained in:
Jenkins
2014-10-18 19:37:58 +00:00
committed by Gerrit Code Review
5 changed files with 390 additions and 87 deletions

View File

@@ -158,6 +158,8 @@ Printing and logging listeners
.. autoclass:: taskflow.listeners.logging.LoggingListener
.. autoclass:: taskflow.listeners.logging.DynamicLoggingListener
.. autoclass:: taskflow.listeners.printing.PrintingListener
Timing listener

View File

@@ -17,20 +17,38 @@
from __future__ import absolute_import
import logging
import sys
from taskflow.listeners import base
from taskflow import states
from taskflow.utils import misc
LOG = logging.getLogger(__name__)
if sys.version_info[0:2] == (2, 6):
_PY26 = True
else:
_PY26 = False
# Fixes this for python 2.6 which was missing the is enabled for method
# when a logger adapter is being used/provided, this will no longer be needed
# when we can just support python 2.7+ (which fixed the lack of this method
# on adapters).
def _isEnabledFor(logger, level):
if _PY26 and isinstance(logger, logging.LoggerAdapter):
return logger.logger.isEnabledFor(level)
return logger.isEnabledFor(level)
class LoggingListener(base.LoggingBase):
"""Listener that logs notifications it receives.
It listens for task and flow notifications and writes those
notifications to provided logger, or logger of its module
(``taskflow.listeners.logging``) if none provided. Log level
can also be configured, ``logging.DEBUG`` is used by default.
It listens for task and flow notifications and writes those notifications
to a provided logger, or logger of its module
(``taskflow.listeners.logging``) if none is provided. The log level
can also be configured, ``logging.DEBUG`` is used by default when none
is provided.
"""
def __init__(self, engine,
task_listen_for=(misc.Notifier.ANY,),
@@ -40,10 +58,115 @@ class LoggingListener(base.LoggingBase):
super(LoggingListener, self).__init__(engine,
task_listen_for=task_listen_for,
flow_listen_for=flow_listen_for)
self._logger = log
if not self._logger:
if not log:
self._logger = LOG
else:
self._logger = log
self._level = level
def _log(self, message, *args, **kwargs):
self._logger.log(self._level, message, *args, **kwargs)
class DynamicLoggingListener(base.ListenerBase):
"""Listener that logs notifications it receives.
It listens for task and flow notifications and writes those notifications
to a provided logger, or logger of its module
(``taskflow.listeners.logging``) if none is provided. The log level
can *slightly* be configured and ``logging.DEBUG`` or ``logging.WARNING``
(unless overriden via a constructor parameter) will be selected
automatically based on the execution state and results produced.
The following flow states cause ``logging.WARNING`` (or provided
level) to be used:
* ``states.FAILURE``
* ``states.REVERTED``
The following task states cause ``logging.WARNING`` (or provided level)
to be used:
* ``states.FAILURE``
* ``states.RETRYING``
* ``states.REVERTING``
When a task produces a :py:class:`~taskflow.utils.misc.Failure` object as
its result (typically this happens when a task raises an exception) this
will **always** switch the logger to use ``logging.WARNING`` (if the
failure object contains a ``exc_info`` tuple this will also be logged to
provide a meaningful traceback).
"""
def __init__(self, engine,
task_listen_for=(misc.Notifier.ANY,),
flow_listen_for=(misc.Notifier.ANY,),
log=None, failure_level=logging.WARNING,
level=logging.DEBUG):
super(DynamicLoggingListener, self).__init__(
engine,
task_listen_for=task_listen_for,
flow_listen_for=flow_listen_for)
self._failure_level = failure_level
self._level = level
if not log:
self._logger = LOG
else:
self._logger = log
def _flow_receiver(self, state, details):
# Gets called on flow state changes.
level = self._level
if state in (states.FAILURE, states.REVERTED):
level = self._failure_level
self._logger.log(level, "Flow '%s' (%s) transitioned into state '%s'"
" from state '%s'", details['flow_name'],
details['flow_uuid'], state, details.get('old_state'))
def _task_receiver(self, state, details):
# Gets called on task state changes.
if 'result' in details and state in base.FINISH_STATES:
# If the task failed, it's useful to show the exception traceback
# and any other available exception information.
result = details.get('result')
if isinstance(result, misc.Failure):
if result.exc_info:
exc_info = result.exc_info
manual_tb = ''
else:
# When a remote failure occurs (or somehow the failure
# object lost its traceback), we will not have a valid
# exc_info that can be used but we *should* have a string
# version that we can use instead...
exc_info = None
manual_tb = "\n%s" % result.pformat(traceback=True)
self._logger.log(self._failure_level,
"Task '%s' (%s) transitioned into state"
" '%s'%s", details['task_name'],
details['task_uuid'], state, manual_tb,
exc_info=exc_info)
else:
# Otherwise, depending on the enabled logging level/state we
# will show or hide results that the task may have produced
# during execution.
level = self._level
if state == states.FAILURE:
level = self._failure_level
if (_isEnabledFor(self._logger, self._level)
or state == states.FAILURE):
self._logger.log(level, "Task '%s' (%s) transitioned into"
" state '%s' with result '%s'",
details['task_name'],
details['task_uuid'], state,
result)
else:
self._logger.log(level, "Task '%s' (%s) transitioned into"
" state '%s'", details['task_name'],
details['task_uuid'], state)
else:
level = self._level
if state in (states.REVERTING, states.RETRYING):
level = self._failure_level
self._logger.log(level, "Task '%s' (%s) transitioned into state"
" '%s'", details['task_name'],
details['task_uuid'], state)

View File

@@ -14,6 +14,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import logging
import fixtures
from oslotest import base
from oslotest import mockpatch
@@ -236,3 +239,69 @@ class MockTestCase(TestCase):
def resetMasterMock(self):
self.master_mock.reset_mock()
class CapturingLoggingHandler(logging.Handler):
"""A handler that saves record contents for post-test analysis."""
def __init__(self, level=logging.DEBUG):
# It seems needed to use the old style of base class calling, we
# can remove this old style when we only support py3.x
logging.Handler.__init__(self, level=level)
self._records = []
@property
def counts(self):
"""Returns a dictionary with the number of records at each level."""
self.acquire()
try:
captured = collections.defaultdict(int)
for r in self._records:
captured[r.levelno] += 1
return captured
finally:
self.release()
@property
def messages(self):
"""Returns a dictionary with list of record messages at each level."""
self.acquire()
try:
captured = collections.defaultdict(list)
for r in self._records:
captured[r.levelno].append(r.getMessage())
return captured
finally:
self.release()
@property
def exc_infos(self):
"""Returns a list of all the record exc_info tuples captured."""
self.acquire()
try:
captured = []
for r in self._records:
if r.exc_info:
captured.append(r.exc_info)
return captured
finally:
self.release()
def emit(self, record):
self.acquire()
try:
self._records.append(record)
finally:
self.release()
def reset(self):
"""Resets *all* internally captured state."""
self.acquire()
try:
self._records = []
finally:
self.release()
def close(self):
logging.Handler.close(self)
self.reset()

View File

@@ -1,81 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import time
import taskflow.engines
from taskflow import exceptions as exc
from taskflow.listeners import timing
from taskflow.patterns import linear_flow as lf
from taskflow.persistence.backends import impl_memory
from taskflow import task
from taskflow import test
from taskflow.test import mock
from taskflow.tests import utils as t_utils
from taskflow.utils import persistence_utils as p_utils
class SleepyTask(task.Task):
def __init__(self, name, sleep_for=0.0):
super(SleepyTask, self).__init__(name=name)
self._sleep_for = float(sleep_for)
def execute(self):
if self._sleep_for <= 0:
return
else:
time.sleep(self._sleep_for)
class TestDuration(test.TestCase):
def make_engine(self, flow, flow_detail, backend):
e = taskflow.engines.load(flow,
flow_detail=flow_detail,
backend=backend)
e.compile()
return e
def test_duration(self):
with contextlib.closing(impl_memory.MemoryBackend({})) as be:
flow = lf.Flow("test")
flow.add(SleepyTask("test-1", sleep_for=0.1))
(lb, fd) = p_utils.temporary_flow_detail(be)
e = self.make_engine(flow, fd, be)
with timing.TimingListener(e):
e.run()
t_uuid = e.storage.get_atom_uuid("test-1")
td = fd.find(t_uuid)
self.assertIsNotNone(td)
self.assertIsNotNone(td.meta)
self.assertIn('duration', td.meta)
self.assertGreaterEqual(0.1, td.meta['duration'])
@mock.patch.object(timing.LOG, 'warn')
def test_record_ending_exception(self, mocked_warn):
with contextlib.closing(impl_memory.MemoryBackend({})) as be:
flow = lf.Flow("test")
flow.add(t_utils.TaskNoRequiresNoReturns("test-1"))
(lb, fd) = p_utils.temporary_flow_detail(be)
e = self.make_engine(flow, fd, be)
timing_listener = timing.TimingListener(e)
with mock.patch.object(timing_listener._engine.storage,
'update_atom_metadata') as mocked_uam:
mocked_uam.side_effect = exc.StorageFailure('Woot!')
with timing_listener:
e.run()
mocked_warn.assert_called_once_with(mock.ANY, mock.ANY, 'test-1',
exc_info=True)

View File

@@ -0,0 +1,190 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import logging
import time
import taskflow.engines
from taskflow import exceptions as exc
from taskflow.listeners import logging as logging_listeners
from taskflow.listeners import timing
from taskflow.patterns import linear_flow as lf
from taskflow.persistence.backends import impl_memory
from taskflow import task
from taskflow import test
from taskflow.test import mock
from taskflow.tests import utils as test_utils
from taskflow.utils import persistence_utils
from taskflow.utils import reflection
_LOG_LEVELS = frozenset([
logging.CRITICAL,
logging.DEBUG,
logging.ERROR,
logging.INFO,
logging.NOTSET,
logging.WARNING,
])
class SleepyTask(task.Task):
def __init__(self, name, sleep_for=0.0):
super(SleepyTask, self).__init__(name=name)
self._sleep_for = float(sleep_for)
def execute(self):
if self._sleep_for <= 0:
return
else:
time.sleep(self._sleep_for)
class EngineMakerMixin(object):
def _make_engine(self, flow, flow_detail=None, backend=None):
e = taskflow.engines.load(flow,
flow_detail=flow_detail,
backend=backend)
e.compile()
e.prepare()
return e
class TestTimingListener(test.TestCase, EngineMakerMixin):
def test_duration(self):
with contextlib.closing(impl_memory.MemoryBackend()) as be:
flow = lf.Flow("test")
flow.add(SleepyTask("test-1", sleep_for=0.1))
(lb, fd) = persistence_utils.temporary_flow_detail(be)
e = self._make_engine(flow, fd, be)
with timing.TimingListener(e):
e.run()
t_uuid = e.storage.get_atom_uuid("test-1")
td = fd.find(t_uuid)
self.assertIsNotNone(td)
self.assertIsNotNone(td.meta)
self.assertIn('duration', td.meta)
self.assertGreaterEqual(0.1, td.meta['duration'])
@mock.patch.object(timing.LOG, 'warn')
def test_record_ending_exception(self, mocked_warn):
with contextlib.closing(impl_memory.MemoryBackend()) as be:
flow = lf.Flow("test")
flow.add(test_utils.TaskNoRequiresNoReturns("test-1"))
(lb, fd) = persistence_utils.temporary_flow_detail(be)
e = self._make_engine(flow, fd, be)
timing_listener = timing.TimingListener(e)
with mock.patch.object(timing_listener._engine.storage,
'update_atom_metadata') as mocked_uam:
mocked_uam.side_effect = exc.StorageFailure('Woot!')
with timing_listener:
e.run()
mocked_warn.assert_called_once_with(mock.ANY, mock.ANY, 'test-1',
exc_info=True)
class TestLoggingListeners(test.TestCase, EngineMakerMixin):
def _make_logger(self, level=logging.DEBUG):
log = logging.getLogger(
reflection.get_callable_name(self._get_test_method()))
log.propagate = False
for handler in reversed(log.handlers):
log.removeHandler(handler)
handler = test.CapturingLoggingHandler(level=level)
log.addHandler(handler)
log.setLevel(level)
self.addCleanup(handler.reset)
self.addCleanup(log.removeHandler, handler)
return (log, handler)
def test_basic(self):
flow = lf.Flow("test")
flow.add(test_utils.TaskNoRequiresNoReturns("test-1"))
e = self._make_engine(flow)
log, handler = self._make_logger()
with logging_listeners.LoggingListener(e, log=log):
e.run()
self.assertGreater(0, handler.counts[logging.DEBUG])
for levelno in _LOG_LEVELS - set([logging.DEBUG]):
self.assertEqual(0, handler.counts[levelno])
self.assertEqual([], handler.exc_infos)
def test_basic_customized(self):
flow = lf.Flow("test")
flow.add(test_utils.TaskNoRequiresNoReturns("test-1"))
e = self._make_engine(flow)
log, handler = self._make_logger()
listener = logging_listeners.LoggingListener(
e, log=log, level=logging.INFO)
with listener:
e.run()
self.assertGreater(0, handler.counts[logging.INFO])
for levelno in _LOG_LEVELS - set([logging.INFO]):
self.assertEqual(0, handler.counts[levelno])
self.assertEqual([], handler.exc_infos)
def test_basic_failure(self):
flow = lf.Flow("test")
flow.add(test_utils.TaskWithFailure("test-1"))
e = self._make_engine(flow)
log, handler = self._make_logger()
with logging_listeners.LoggingListener(e, log=log):
self.assertRaises(RuntimeError, e.run)
self.assertGreater(0, handler.counts[logging.DEBUG])
for levelno in _LOG_LEVELS - set([logging.DEBUG]):
self.assertEqual(0, handler.counts[levelno])
self.assertEqual(1, len(handler.exc_infos))
def test_dynamic(self):
flow = lf.Flow("test")
flow.add(test_utils.TaskNoRequiresNoReturns("test-1"))
e = self._make_engine(flow)
log, handler = self._make_logger()
with logging_listeners.DynamicLoggingListener(e, log=log):
e.run()
self.assertGreater(0, handler.counts[logging.DEBUG])
for levelno in _LOG_LEVELS - set([logging.DEBUG]):
self.assertEqual(0, handler.counts[levelno])
self.assertEqual([], handler.exc_infos)
def test_dynamic_failure(self):
flow = lf.Flow("test")
flow.add(test_utils.TaskWithFailure("test-1"))
e = self._make_engine(flow)
log, handler = self._make_logger()
with logging_listeners.DynamicLoggingListener(e, log=log):
self.assertRaises(RuntimeError, e.run)
self.assertGreater(0, handler.counts[logging.WARNING])
self.assertGreater(0, handler.counts[logging.DEBUG])
self.assertEqual(1, len(handler.exc_infos))
for levelno in _LOG_LEVELS - set([logging.DEBUG, logging.WARNING]):
self.assertEqual(0, handler.counts[levelno])
def test_dynamic_failure_customized_level(self):
flow = lf.Flow("test")
flow.add(test_utils.TaskWithFailure("test-1"))
e = self._make_engine(flow)
log, handler = self._make_logger()
listener = logging_listeners.DynamicLoggingListener(
e, log=log, failure_level=logging.ERROR)
with listener:
self.assertRaises(RuntimeError, e.run)
self.assertGreater(0, handler.counts[logging.ERROR])
self.assertGreater(0, handler.counts[logging.DEBUG])
self.assertEqual(1, len(handler.exc_infos))
for levelno in _LOG_LEVELS - set([logging.DEBUG, logging.ERROR]):
self.assertEqual(0, handler.counts[levelno])