Add a set of useful listeners

In order to understand how listeners are used it is pretty
nice to have a default set that do basic printing and logging
that can be used in debugging (or just as examples).

Include an example that uses this as well.

Fixes: bug 1224060

Change-Id: I7ba6e9dcbdca84d014b9d1f5054ce7a37eb766f2
This commit is contained in:
Joshua Harlow
2013-10-04 12:58:14 -07:00
parent 8750840ac8
commit c8f3903fa4
7 changed files with 303 additions and 1 deletions

View File

@@ -32,6 +32,7 @@ from taskflow import storage as t_storage
from taskflow.utils import flow_utils
from taskflow.utils import misc
from taskflow.utils import reflection
from taskflow.utils import threading_utils
@@ -66,6 +67,9 @@ class ActionEngine(base.EngineBase):
if current_failure:
current_failure.reraise()
def __str__(self):
return "%s: %s" % (reflection.get_class_name(self), id(self))
def _reset(self):
self._failures = []
@@ -108,7 +112,19 @@ class ActionEngine(base.EngineBase):
if not states.check_flow_transition(old_state, state):
return
self.storage.set_flow_state(state)
details = dict(engine=self, old_state=old_state)
try:
flow_uuid = self._flow.uuid
except AttributeError:
# NOTE(harlowja): if the flow was just a single task, then it will
# not itself have a uuid, but the constructed flow_detail will.
if self._flow_detail is not None:
flow_uuid = self._flow_detail.uuid
else:
flow_uuid = None
details = dict(engine=self,
flow_name=self._flow.name,
flow_uuid=flow_uuid,
old_state=old_state)
self.notifier.notify(state, details)
def on_task_state_change(self, task_action, state, result=None):

View File

@@ -25,6 +25,7 @@ class EngineBase(object):
def __init__(self, flow, flow_detail, backend, conf):
self._flow = flow
self._flow_detail = flow_detail
self.storage = self._storage_cls(flow_detail, backend)
@abc.abstractproperty

View File

@@ -0,0 +1,84 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import logging
import os
import random
import sys
import time
logging.basicConfig(level=logging.ERROR)
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir,
os.pardir))
sys.path.insert(0, top_dir)
from taskflow import engines
from taskflow.listeners import printing
from taskflow.patterns import unordered_flow as uf
from taskflow import task
from taskflow.utils import reflection
@contextlib.contextmanager
def show_time(name=''):
start = time.time()
yield
end = time.time()
print(" -- %s took %0.3f seconds" % (name, end - start))
MAX_CREATE_TIME = 3
VOLUME_COUNT = 5
SERIAL = False
class VolumeCreator(task.Task):
def __init__(self, volume_id):
base_name = reflection.get_callable_name(self)
super(VolumeCreator, self).__init__(name="%s-%s" % (base_name,
volume_id))
self._volume_id = volume_id
def execute(self):
print("Making volume %s" % (self._volume_id))
time.sleep(random.random() * MAX_CREATE_TIME)
print("Finished making volume %s" % (self._volume_id))
# Assume there is no ordering dependency between volumes
flow = uf.Flow("volume-maker")
for i in xrange(0, VOLUME_COUNT):
flow.add(VolumeCreator(volume_id="vol-%s" % (i)))
if SERIAL:
engine_conf = {
'engine': 'serial',
}
else:
engine_conf = {
'engine': 'parallel',
}
with show_time(name=flow.name.title()):
eng = engines.load(flow, engine_conf=engine_conf)
with printing.PrintingListener(eng):
eng.run()

View File

@@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@@ -0,0 +1,95 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import abc
import logging
from taskflow import states
from taskflow.utils import misc
LOG = logging.getLogger(__name__)
# NOTE(harlowja): on these states will results be useable, all other states
# do not produce results.
FINISH_STATES = (states.FAILURE, states.SUCCESS)
class LoggingBase(object):
"""This provides a simple listener that can be attached to an engine which
can be derived from to log the received actions to some logging backend. It
provides a useful context manager access to be able to register and
unregister with a given engine automatically when a context is entered and
when it is exited.
"""
__metaclass__ = abc.ABCMeta
def __init__(self, engine,
listen_for=misc.TransitionNotifier.ANY):
self._listen_for = listen_for
self._engine = engine
self._registered = False
@abc.abstractmethod
def _log(self, message, *args, **kwargs):
raise NotImplementedError()
def _flow_receiver(self, state, details):
self._log("%s has moved flow '%s' (%s) into state '%s'",
details['engine'], details['flow_name'],
details['flow_uuid'], state)
def _task_receiver(self, state, details):
if state in FINISH_STATES:
result = details.get('result')
exc_info = None
was_failure = False
if isinstance(result, misc.Failure):
if result.exc_info:
exc_info = tuple(result.exc_info)
was_failure = True
self._log("%s has moved task '%s' (%s) into state '%s'"
" with result '%s' (failure=%s)",
details['engine'], details['task_name'],
details['task_uuid'], state, result, was_failure,
exc_info=exc_info)
else:
self._log("%s has moved task '%s' (%s) into state '%s'",
details['engine'], details['task_name'],
details['task_uuid'], state)
def __enter__(self):
if not self._registered:
self._engine.notifier.register(self._listen_for,
self._flow_receiver)
self._engine.task_notifier.register(self._listen_for,
self._task_receiver)
self._registered = True
def __exit__(self, type, value, tb):
try:
self._engine.notifier.deregister(self._listen_for,
self._flow_receiver)
self._engine.task_notifier.deregister(self._listen_for,
self._task_receiver)
self._registered = False
except Exception:
# Don't let deregistering throw exceptions
LOG.exception("Failed deregistering listeners from engine %s",
self._engine)

View File

@@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import logging
from taskflow.listeners import base
from taskflow.utils import misc
LOG = logging.getLogger(__name__)
class LoggingListener(base.LoggingBase):
"""Listens for task and flow notifications and writes those notifications
to a provided logging backend (if none is provided then this modules
logger is used instead) using a configurable logging level (logging.DEBUG
if not provided).
"""
def __init__(self, engine,
listen_for=misc.TransitionNotifier.ANY,
log=None,
level=logging.DEBUG):
super(LoggingListener, self).__init__(engine, listen_for)
self._logger = log
if not self._logger:
self._logger = LOG
self._level = level
def _log(self, message, *args, **kwargs):
self._logger.log(self._level, message, *args, **kwargs)

View File

@@ -0,0 +1,43 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import sys
import traceback
from taskflow.listeners import base
from taskflow.utils import misc
class PrintingListener(base.LoggingBase):
"""Writes the task and flow notifications messages to stdout or stderr"""
def __init__(self, engine,
listen_for=misc.TransitionNotifier.ANY, stderr=False):
super(PrintingListener, self).__init__(engine, listen_for)
if stderr:
self._file = sys.stderr
else:
self._file = sys.stdout
def _log(self, message, *args, **kwargs):
print(message % args, file=self._file)
exc_info = kwargs.get('exc_info')
if exc_info is not None:
traceback.print_exception(exc_info[0], exc_info[1], exc_info[2],
file=self._file)