Add a more dynamic/useful logging listener

Both cinder and glance are starting to share the same logic
for there engine notification listener, so instead of having
them copy around that code it will be much nicer if taskflow
can just provide itself a more capable listener that both
can share and use directly.

This avoids users of taskflow having to understand more about
the internals of taskflow and its associated state then they
likely need to understand (which makes taskflow easier to use
and less work to integrate).

Relevant locations where this already exists:

- https://github.com/openstack/cinder/blob/master/cinder/flow_utils.py
- https://review.openstack.org/#/c/85211/

Change-Id: I98eeb180b31bd488ae0eadd730e1530d7bae1f1f
This commit is contained in:
Joshua Harlow
2014-09-17 16:58:25 -07:00
committed by Joshua Harlow
parent 5780a5d77e
commit cf1e468cf1
5 changed files with 390 additions and 87 deletions

View File

@@ -158,6 +158,8 @@ Printing and logging listeners
.. autoclass:: taskflow.listeners.logging.LoggingListener
.. autoclass:: taskflow.listeners.logging.DynamicLoggingListener
.. autoclass:: taskflow.listeners.printing.PrintingListener
Timing listener

View File

@@ -17,20 +17,38 @@
from __future__ import absolute_import
import logging
import sys
from taskflow.listeners import base
from taskflow import states
from taskflow.utils import misc
LOG = logging.getLogger(__name__)
if sys.version_info[0:2] == (2, 6):
_PY26 = True
else:
_PY26 = False
# Fixes this for python 2.6 which was missing the is enabled for method
# when a logger adapter is being used/provided, this will no longer be needed
# when we can just support python 2.7+ (which fixed the lack of this method
# on adapters).
def _isEnabledFor(logger, level):
if _PY26 and isinstance(logger, logging.LoggerAdapter):
return logger.logger.isEnabledFor(level)
return logger.isEnabledFor(level)
class LoggingListener(base.LoggingBase):
"""Listener that logs notifications it receives.
It listens for task and flow notifications and writes those
notifications to provided logger, or logger of its module
(``taskflow.listeners.logging``) if none provided. Log level
can also be configured, ``logging.DEBUG`` is used by default.
It listens for task and flow notifications and writes those notifications
to a provided logger, or logger of its module
(``taskflow.listeners.logging``) if none is provided. The log level
can also be configured, ``logging.DEBUG`` is used by default when none
is provided.
"""
def __init__(self, engine,
task_listen_for=(misc.Notifier.ANY,),
@@ -40,10 +58,115 @@ class LoggingListener(base.LoggingBase):
super(LoggingListener, self).__init__(engine,
task_listen_for=task_listen_for,
flow_listen_for=flow_listen_for)
self._logger = log
if not self._logger:
if not log:
self._logger = LOG
else:
self._logger = log
self._level = level
def _log(self, message, *args, **kwargs):
self._logger.log(self._level, message, *args, **kwargs)
class DynamicLoggingListener(base.ListenerBase):
"""Listener that logs notifications it receives.
It listens for task and flow notifications and writes those notifications
to a provided logger, or logger of its module
(``taskflow.listeners.logging``) if none is provided. The log level
can *slightly* be configured and ``logging.DEBUG`` or ``logging.WARNING``
(unless overriden via a constructor parameter) will be selected
automatically based on the execution state and results produced.
The following flow states cause ``logging.WARNING`` (or provided
level) to be used:
* ``states.FAILURE``
* ``states.REVERTED``
The following task states cause ``logging.WARNING`` (or provided level)
to be used:
* ``states.FAILURE``
* ``states.RETRYING``
* ``states.REVERTING``
When a task produces a :py:class:`~taskflow.utils.misc.Failure` object as
its result (typically this happens when a task raises an exception) this
will **always** switch the logger to use ``logging.WARNING`` (if the
failure object contains a ``exc_info`` tuple this will also be logged to
provide a meaningful traceback).
"""
def __init__(self, engine,
task_listen_for=(misc.Notifier.ANY,),
flow_listen_for=(misc.Notifier.ANY,),
log=None, failure_level=logging.WARNING,
level=logging.DEBUG):
super(DynamicLoggingListener, self).__init__(
engine,
task_listen_for=task_listen_for,
flow_listen_for=flow_listen_for)
self._failure_level = failure_level
self._level = level
if not log:
self._logger = LOG
else:
self._logger = log
def _flow_receiver(self, state, details):
# Gets called on flow state changes.
level = self._level
if state in (states.FAILURE, states.REVERTED):
level = self._failure_level
self._logger.log(level, "Flow '%s' (%s) transitioned into state '%s'"
" from state '%s'", details['flow_name'],
details['flow_uuid'], state, details.get('old_state'))
def _task_receiver(self, state, details):
# Gets called on task state changes.
if 'result' in details and state in base.FINISH_STATES:
# If the task failed, it's useful to show the exception traceback
# and any other available exception information.
result = details.get('result')
if isinstance(result, misc.Failure):
if result.exc_info:
exc_info = result.exc_info
manual_tb = ''
else:
# When a remote failure occurs (or somehow the failure
# object lost its traceback), we will not have a valid
# exc_info that can be used but we *should* have a string
# version that we can use instead...
exc_info = None
manual_tb = "\n%s" % result.pformat(traceback=True)
self._logger.log(self._failure_level,
"Task '%s' (%s) transitioned into state"
" '%s'%s", details['task_name'],
details['task_uuid'], state, manual_tb,
exc_info=exc_info)
else:
# Otherwise, depending on the enabled logging level/state we
# will show or hide results that the task may have produced
# during execution.
level = self._level
if state == states.FAILURE:
level = self._failure_level
if (_isEnabledFor(self._logger, self._level)
or state == states.FAILURE):
self._logger.log(level, "Task '%s' (%s) transitioned into"
" state '%s' with result '%s'",
details['task_name'],
details['task_uuid'], state,
result)
else:
self._logger.log(level, "Task '%s' (%s) transitioned into"
" state '%s'", details['task_name'],
details['task_uuid'], state)
else:
level = self._level
if state in (states.REVERTING, states.RETRYING):
level = self._failure_level
self._logger.log(level, "Task '%s' (%s) transitioned into state"
" '%s'", details['task_name'],
details['task_uuid'], state)

View File

@@ -14,6 +14,9 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import logging
import fixtures
from oslotest import base
from oslotest import mockpatch
@@ -236,3 +239,69 @@ class MockTestCase(TestCase):
def resetMasterMock(self):
self.master_mock.reset_mock()
class CapturingLoggingHandler(logging.Handler):
"""A handler that saves record contents for post-test analysis."""
def __init__(self, level=logging.DEBUG):
# It seems needed to use the old style of base class calling, we
# can remove this old style when we only support py3.x
logging.Handler.__init__(self, level=level)
self._records = []
@property
def counts(self):
"""Returns a dictionary with the number of records at each level."""
self.acquire()
try:
captured = collections.defaultdict(int)
for r in self._records:
captured[r.levelno] += 1
return captured
finally:
self.release()
@property
def messages(self):
"""Returns a dictionary with list of record messages at each level."""
self.acquire()
try:
captured = collections.defaultdict(list)
for r in self._records:
captured[r.levelno].append(r.getMessage())
return captured
finally:
self.release()
@property
def exc_infos(self):
"""Returns a list of all the record exc_info tuples captured."""
self.acquire()
try:
captured = []
for r in self._records:
if r.exc_info:
captured.append(r.exc_info)
return captured
finally:
self.release()
def emit(self, record):
self.acquire()
try:
self._records.append(record)
finally:
self.release()
def reset(self):
"""Resets *all* internally captured state."""
self.acquire()
try:
self._records = []
finally:
self.release()
def close(self):
logging.Handler.close(self)
self.reset()

View File

@@ -1,81 +0,0 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import time
import taskflow.engines
from taskflow import exceptions as exc
from taskflow.listeners import timing
from taskflow.patterns import linear_flow as lf
from taskflow.persistence.backends import impl_memory
from taskflow import task
from taskflow import test
from taskflow.test import mock
from taskflow.tests import utils as t_utils
from taskflow.utils import persistence_utils as p_utils
class SleepyTask(task.Task):
def __init__(self, name, sleep_for=0.0):
super(SleepyTask, self).__init__(name=name)
self._sleep_for = float(sleep_for)
def execute(self):
if self._sleep_for <= 0:
return
else:
time.sleep(self._sleep_for)
class TestDuration(test.TestCase):
def make_engine(self, flow, flow_detail, backend):
e = taskflow.engines.load(flow,
flow_detail=flow_detail,
backend=backend)
e.compile()
return e
def test_duration(self):
with contextlib.closing(impl_memory.MemoryBackend({})) as be:
flow = lf.Flow("test")
flow.add(SleepyTask("test-1", sleep_for=0.1))
(lb, fd) = p_utils.temporary_flow_detail(be)
e = self.make_engine(flow, fd, be)
with timing.TimingListener(e):
e.run()
t_uuid = e.storage.get_atom_uuid("test-1")
td = fd.find(t_uuid)
self.assertIsNotNone(td)
self.assertIsNotNone(td.meta)
self.assertIn('duration', td.meta)
self.assertGreaterEqual(0.1, td.meta['duration'])
@mock.patch.object(timing.LOG, 'warn')
def test_record_ending_exception(self, mocked_warn):
with contextlib.closing(impl_memory.MemoryBackend({})) as be:
flow = lf.Flow("test")
flow.add(t_utils.TaskNoRequiresNoReturns("test-1"))
(lb, fd) = p_utils.temporary_flow_detail(be)
e = self.make_engine(flow, fd, be)
timing_listener = timing.TimingListener(e)
with mock.patch.object(timing_listener._engine.storage,
'update_atom_metadata') as mocked_uam:
mocked_uam.side_effect = exc.StorageFailure('Woot!')
with timing_listener:
e.run()
mocked_warn.assert_called_once_with(mock.ANY, mock.ANY, 'test-1',
exc_info=True)

View File

@@ -0,0 +1,190 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import logging
import time
import taskflow.engines
from taskflow import exceptions as exc
from taskflow.listeners import logging as logging_listeners
from taskflow.listeners import timing
from taskflow.patterns import linear_flow as lf
from taskflow.persistence.backends import impl_memory
from taskflow import task
from taskflow import test
from taskflow.test import mock
from taskflow.tests import utils as test_utils
from taskflow.utils import persistence_utils
from taskflow.utils import reflection
_LOG_LEVELS = frozenset([
logging.CRITICAL,
logging.DEBUG,
logging.ERROR,
logging.INFO,
logging.NOTSET,
logging.WARNING,
])
class SleepyTask(task.Task):
def __init__(self, name, sleep_for=0.0):
super(SleepyTask, self).__init__(name=name)
self._sleep_for = float(sleep_for)
def execute(self):
if self._sleep_for <= 0:
return
else:
time.sleep(self._sleep_for)
class EngineMakerMixin(object):
def _make_engine(self, flow, flow_detail=None, backend=None):
e = taskflow.engines.load(flow,
flow_detail=flow_detail,
backend=backend)
e.compile()
e.prepare()
return e
class TestTimingListener(test.TestCase, EngineMakerMixin):
def test_duration(self):
with contextlib.closing(impl_memory.MemoryBackend()) as be:
flow = lf.Flow("test")
flow.add(SleepyTask("test-1", sleep_for=0.1))
(lb, fd) = persistence_utils.temporary_flow_detail(be)
e = self._make_engine(flow, fd, be)
with timing.TimingListener(e):
e.run()
t_uuid = e.storage.get_atom_uuid("test-1")
td = fd.find(t_uuid)
self.assertIsNotNone(td)
self.assertIsNotNone(td.meta)
self.assertIn('duration', td.meta)
self.assertGreaterEqual(0.1, td.meta['duration'])
@mock.patch.object(timing.LOG, 'warn')
def test_record_ending_exception(self, mocked_warn):
with contextlib.closing(impl_memory.MemoryBackend()) as be:
flow = lf.Flow("test")
flow.add(test_utils.TaskNoRequiresNoReturns("test-1"))
(lb, fd) = persistence_utils.temporary_flow_detail(be)
e = self._make_engine(flow, fd, be)
timing_listener = timing.TimingListener(e)
with mock.patch.object(timing_listener._engine.storage,
'update_atom_metadata') as mocked_uam:
mocked_uam.side_effect = exc.StorageFailure('Woot!')
with timing_listener:
e.run()
mocked_warn.assert_called_once_with(mock.ANY, mock.ANY, 'test-1',
exc_info=True)
class TestLoggingListeners(test.TestCase, EngineMakerMixin):
def _make_logger(self, level=logging.DEBUG):
log = logging.getLogger(
reflection.get_callable_name(self._get_test_method()))
log.propagate = False
for handler in reversed(log.handlers):
log.removeHandler(handler)
handler = test.CapturingLoggingHandler(level=level)
log.addHandler(handler)
log.setLevel(level)
self.addCleanup(handler.reset)
self.addCleanup(log.removeHandler, handler)
return (log, handler)
def test_basic(self):
flow = lf.Flow("test")
flow.add(test_utils.TaskNoRequiresNoReturns("test-1"))
e = self._make_engine(flow)
log, handler = self._make_logger()
with logging_listeners.LoggingListener(e, log=log):
e.run()
self.assertGreater(0, handler.counts[logging.DEBUG])
for levelno in _LOG_LEVELS - set([logging.DEBUG]):
self.assertEqual(0, handler.counts[levelno])
self.assertEqual([], handler.exc_infos)
def test_basic_customized(self):
flow = lf.Flow("test")
flow.add(test_utils.TaskNoRequiresNoReturns("test-1"))
e = self._make_engine(flow)
log, handler = self._make_logger()
listener = logging_listeners.LoggingListener(
e, log=log, level=logging.INFO)
with listener:
e.run()
self.assertGreater(0, handler.counts[logging.INFO])
for levelno in _LOG_LEVELS - set([logging.INFO]):
self.assertEqual(0, handler.counts[levelno])
self.assertEqual([], handler.exc_infos)
def test_basic_failure(self):
flow = lf.Flow("test")
flow.add(test_utils.TaskWithFailure("test-1"))
e = self._make_engine(flow)
log, handler = self._make_logger()
with logging_listeners.LoggingListener(e, log=log):
self.assertRaises(RuntimeError, e.run)
self.assertGreater(0, handler.counts[logging.DEBUG])
for levelno in _LOG_LEVELS - set([logging.DEBUG]):
self.assertEqual(0, handler.counts[levelno])
self.assertEqual(1, len(handler.exc_infos))
def test_dynamic(self):
flow = lf.Flow("test")
flow.add(test_utils.TaskNoRequiresNoReturns("test-1"))
e = self._make_engine(flow)
log, handler = self._make_logger()
with logging_listeners.DynamicLoggingListener(e, log=log):
e.run()
self.assertGreater(0, handler.counts[logging.DEBUG])
for levelno in _LOG_LEVELS - set([logging.DEBUG]):
self.assertEqual(0, handler.counts[levelno])
self.assertEqual([], handler.exc_infos)
def test_dynamic_failure(self):
flow = lf.Flow("test")
flow.add(test_utils.TaskWithFailure("test-1"))
e = self._make_engine(flow)
log, handler = self._make_logger()
with logging_listeners.DynamicLoggingListener(e, log=log):
self.assertRaises(RuntimeError, e.run)
self.assertGreater(0, handler.counts[logging.WARNING])
self.assertGreater(0, handler.counts[logging.DEBUG])
self.assertEqual(1, len(handler.exc_infos))
for levelno in _LOG_LEVELS - set([logging.DEBUG, logging.WARNING]):
self.assertEqual(0, handler.counts[levelno])
def test_dynamic_failure_customized_level(self):
flow = lf.Flow("test")
flow.add(test_utils.TaskWithFailure("test-1"))
e = self._make_engine(flow)
log, handler = self._make_logger()
listener = logging_listeners.DynamicLoggingListener(
e, log=log, failure_level=logging.ERROR)
with listener:
self.assertRaises(RuntimeError, e.run)
self.assertGreater(0, handler.counts[logging.ERROR])
self.assertGreater(0, handler.counts[logging.DEBUG])
self.assertEqual(1, len(handler.exc_infos))
for levelno in _LOG_LEVELS - set([logging.DEBUG, logging.ERROR]):
self.assertEqual(0, handler.counts[levelno])