Attach log listeners to other engines

Attach the created task/flow/engine listener to
the other usages of taskflow that exist in cinder
so that those locations can also benefit from
the same logging of state and activity.

Part of blueprint task-logging

Change-Id: I4ba7fe625a88967607adaa18d329bec56825201c
This commit is contained in:
Joshua Harlow 2014-06-20 18:54:55 -07:00
parent 581d8ada14
commit 09a369ae1b
5 changed files with 106 additions and 95 deletions

View File

@ -10,8 +10,17 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import logging as base_logging
# For more information please visit: https://wiki.openstack.org/wiki/TaskFlow # For more information please visit: https://wiki.openstack.org/wiki/TaskFlow
from taskflow.listeners import base as base_listener
from taskflow import states
from taskflow import task from taskflow import task
from taskflow.utils import misc
from cinder.openstack.common import log as logging
LOG = logging.getLogger(__name__)
def _make_task_name(cls, addons=None): def _make_task_name(cls, addons=None):
@ -34,3 +43,88 @@ class CinderTask(task.Task):
super(CinderTask, self).__init__(_make_task_name(self.__class__, super(CinderTask, self).__init__(_make_task_name(self.__class__,
addons), addons),
**kwargs) **kwargs)
class DynamicLogListener(base_listener.ListenerBase):
"""This is used to attach to taskflow engines while they are running.
It provides a bunch of useful features that expose the actions happening
inside a taskflow engine, which can be useful for developers for debugging,
for operations folks for monitoring and tracking of the resource actions
and more...
"""
def __init__(self, engine,
task_listen_for=(misc.Notifier.ANY,),
flow_listen_for=(misc.Notifier.ANY,),
logger=None):
super(DynamicLogListener, self).__init__(
engine,
task_listen_for=task_listen_for,
flow_listen_for=flow_listen_for)
if logger is None:
self._logger = LOG
else:
self._logger = logger
def _flow_receiver(self, state, details):
# Gets called on flow state changes.
level = base_logging.DEBUG
if state in (states.FAILURE, states.REVERTED):
level = base_logging.WARNING
self._logger.log(level,
_("Flow '%(flow_name)s' (%(flow_uuid)s) transitioned"
" into state '%(state)s' from state"
" '%(old_state)s'") %
{'flow_name': details['flow_name'],
'flow_uuid': details['flow_uuid'],
'state': state,
'old_state': details.get('old_state')})
def _task_receiver(self, state, details):
# Gets called on task state changes.
if 'result' in details and state in base_listener.FINISH_STATES:
# If the task failed, it's useful to show the exception traceback
# and any other available exception information.
result = details.get('result')
if isinstance(result, misc.Failure):
self._logger.warn(_("Task '%(task_name)s' (%(task_uuid)s)"
" transitioned into state '%(state)s'") %
{'task_name': details['task_name'],
'task_uuid': details['task_uuid'],
'state': state},
exc_info=tuple(result.exc_info))
else:
# Otherwise, depending on the enabled logging level/state we
# will show or hide results that the task may have produced
# during execution.
level = base_logging.DEBUG
if state == states.FAILURE:
level = base_logging.WARNING
if (self._logger.isEnabledFor(base_logging.DEBUG) or
state == states.FAILURE):
self._logger.log(level,
_("Task '%(task_name)s' (%(task_uuid)s)"
" transitioned into state '%(state)s'"
" with result %(result)s") %
{'task_name': details['task_name'],
'task_uuid': details['task_uuid'],
'state': state, 'result': result})
else:
self._logger.log(level,
_("Task '%(task_name)s' (%(task_uuid)s)"
" transitioned into state"
" '%(state)s'") %
{'task_name': details['task_name'],
'task_uuid': details['task_uuid'],
'state': state})
else:
level = base_logging.DEBUG
if state in (states.REVERTING, states.RETRYING):
level = base_logging.WARNING
self._logger.log(level,
_("Task '%(task_name)s' (%(task_uuid)s)"
" transitioned into state '%(state)s'") %
{'task_name': details['task_name'],
'task_uuid': details['task_uuid'],
'state': state})

View File

@ -25,6 +25,7 @@ from oslo import messaging
from cinder import context from cinder import context
from cinder import db from cinder import db
from cinder import exception from cinder import exception
from cinder import flow_utils
from cinder import manager from cinder import manager
from cinder.openstack.common import excutils from cinder.openstack.common import excutils
from cinder.openstack.common import importutils from cinder.openstack.common import importutils
@ -101,7 +102,9 @@ class SchedulerManager(manager.Manager):
LOG.exception(_("Failed to create scheduler manager volume flow")) LOG.exception(_("Failed to create scheduler manager volume flow"))
raise exception.CinderException( raise exception.CinderException(
_("Failed to create scheduler manager volume flow")) _("Failed to create scheduler manager volume flow"))
flow_engine.run()
with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
flow_engine.run()
def request_service_capabilities(self, context): def request_service_capabilities(self, context):
volume_rpcapi.VolumeAPI().publish_service_capabilities(context) volume_rpcapi.VolumeAPI().publish_service_capabilities(context)

View File

@ -28,6 +28,7 @@ from oslo.config import cfg
from cinder import context from cinder import context
from cinder.db import base from cinder.db import base
from cinder import exception from cinder import exception
from cinder import flow_utils
from cinder.image import glance from cinder.image import glance
from cinder import keymgr from cinder import keymgr
from cinder.openstack.common import excutils from cinder.openstack.common import excutils
@ -40,7 +41,6 @@ from cinder import quota_utils
from cinder.scheduler import rpcapi as scheduler_rpcapi from cinder.scheduler import rpcapi as scheduler_rpcapi
from cinder import utils from cinder import utils
from cinder.volume.flows.api import create_volume from cinder.volume.flows.api import create_volume
from cinder.volume.flows import common as flow_common
from cinder.volume import qos_specs from cinder.volume import qos_specs
from cinder.volume import rpcapi as volume_rpcapi from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import utils as volume_utils from cinder.volume import utils as volume_utils
@ -214,7 +214,7 @@ class API(base.Base):
# Attaching this listener will capture all of the notifications that # Attaching this listener will capture all of the notifications that
# taskflow sends out and redirect them to a more useful log for # taskflow sends out and redirect them to a more useful log for
# cinders debugging (or error reporting) usage. # cinders debugging (or error reporting) usage.
with flow_common.DynamicLogListener(flow_engine, logger=LOG): with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
flow_engine.run() flow_engine.run()
return flow_engine.storage.fetch('volume') return flow_engine.storage.fetch('volume')

View File

@ -16,12 +16,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import logging as base_logging
import six import six
from taskflow.listeners import base as base_listener
from taskflow import states
from taskflow.utils import misc
from cinder import exception from cinder import exception
from cinder.openstack.common import log as logging from cinder.openstack.common import log as logging
@ -34,91 +29,6 @@ LOG = logging.getLogger(__name__)
REASON_LENGTH = 128 REASON_LENGTH = 128
class DynamicLogListener(base_listener.ListenerBase):
"""This is used to attach to taskflow engines while they are running.
It provides a bunch of useful features that expose the actions happening
inside a taskflow engine, which can be useful for developers for debugging,
for operations folks for monitoring and tracking of the resource actions
and more...
"""
def __init__(self, engine,
task_listen_for=(misc.Notifier.ANY,),
flow_listen_for=(misc.Notifier.ANY,),
logger=None):
super(DynamicLogListener, self).__init__(
engine,
task_listen_for=task_listen_for,
flow_listen_for=flow_listen_for)
if logger is None:
self._logger = LOG
else:
self._logger = logger
def _flow_receiver(self, state, details):
# Gets called on flow state changes.
level = base_logging.DEBUG
if state in (states.FAILURE, states.REVERTED):
level = base_logging.WARNING
self._logger.log(level,
_("Flow '%(flow_name)s' (%(flow_uuid)s) transitioned"
" into state '%(state)s' from state"
" '%(old_state)s'") %
{'flow_name': details['flow_name'],
'flow_uuid': details['flow_uuid'],
'state': state,
'old_state': details.get('old_state')})
def _task_receiver(self, state, details):
# Gets called on task state changes.
if 'result' in details and state in base_listener.FINISH_STATES:
# If the task failed, it's useful to show the exception traceback
# and any other available exception information.
result = details.get('result')
if isinstance(result, misc.Failure):
self._logger.warn(_("Task '%(task_name)s' (%(task_uuid)s)"
" transitioned into state '%(state)s'") %
{'task_name': details['task_name'],
'task_uuid': details['task_uuid'],
'state': state},
exc_info=tuple(result.exc_info))
else:
# Otherwise, depending on the enabled logging level/state we
# will show or hide results that the task may have produced
# during execution.
level = base_logging.DEBUG
if state == states.FAILURE:
level = base_logging.WARNING
if (self._logger.isEnabledFor(base_logging.DEBUG) or
state == states.FAILURE):
self._logger.log(level,
_("Task '%(task_name)s' (%(task_uuid)s)"
" transitioned into state '%(state)s'"
" with result %(result)s") %
{'task_name': details['task_name'],
'task_uuid': details['task_uuid'],
'state': state, 'result': result})
else:
self._logger.log(level,
_("Task '%(task_name)s' (%(task_uuid)s)"
" transitioned into state"
" '%(state)s'") %
{'task_name': details['task_name'],
'task_uuid': details['task_uuid'],
'state': state})
else:
level = base_logging.DEBUG
if state in (states.REVERTING, states.RETRYING):
level = base_logging.WARNING
self._logger.log(level,
_("Task '%(task_name)s' (%(task_uuid)s)"
" transitioned into state '%(state)s'") %
{'task_name': details['task_name'],
'task_uuid': details['task_uuid'],
'state': state})
def make_pretty_name(method): def make_pretty_name(method):
"""Makes a pretty name for a function/method.""" """Makes a pretty name for a function/method."""
meth_pieces = [method.__name__] meth_pieces = [method.__name__]

View File

@ -44,6 +44,7 @@ from oslo import messaging
from cinder import compute from cinder import compute
from cinder import context from cinder import context
from cinder import exception from cinder import exception
from cinder import flow_utils
from cinder.image import glance from cinder.image import glance
from cinder import manager from cinder import manager
from cinder.openstack.common import excutils from cinder.openstack.common import excutils
@ -325,7 +326,8 @@ class VolumeManager(manager.SchedulerDependentManager):
# flow reverts all job that was done and reraises an exception. # flow reverts all job that was done and reraises an exception.
# Otherwise, all data that was generated by flow becomes available # Otherwise, all data that was generated by flow becomes available
# in flow engine's storage. # in flow engine's storage.
flow_engine.run() with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
flow_engine.run()
@utils.synchronized(locked_action, external=True) @utils.synchronized(locked_action, external=True)
def _run_flow_locked(): def _run_flow_locked():
@ -1303,7 +1305,9 @@ class VolumeManager(manager.SchedulerDependentManager):
LOG.exception(_("Failed to create manage_existing flow.")) LOG.exception(_("Failed to create manage_existing flow."))
raise exception.CinderException( raise exception.CinderException(
_("Failed to create manage existing flow.")) _("Failed to create manage existing flow."))
flow_engine.run()
with flow_utils.DynamicLogListener(flow_engine, logger=LOG):
flow_engine.run()
# Fetch created volume from storage # Fetch created volume from storage
volume_ref = flow_engine.storage.fetch('volume') volume_ref = flow_engine.storage.fetch('volume')