From c8f3903fa4f1dd3d4359be51ffb5295ecb21c897 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Fri, 4 Oct 2013 12:58:14 -0700 Subject: [PATCH] Add a set of useful listeners In order to understand how listeners are used it is pretty nice to have a default set that do basic printing and logging that can be used in debugging (or just as examples). Include an example that uses this as well. Fixes: bug 1224060 Change-Id: I7ba6e9dcbdca84d014b9d1f5054ce7a37eb766f2 --- taskflow/engines/action_engine/engine.py | 18 +++- taskflow/engines/base.py | 1 + taskflow/examples/create_parallel_volume.py | 84 ++++++++++++++++++ taskflow/listeners/__init__.py | 17 ++++ taskflow/listeners/base.py | 95 +++++++++++++++++++++ taskflow/listeners/logging.py | 46 ++++++++++ taskflow/listeners/printing.py | 43 ++++++++++ 7 files changed, 303 insertions(+), 1 deletion(-) create mode 100644 taskflow/examples/create_parallel_volume.py create mode 100644 taskflow/listeners/__init__.py create mode 100644 taskflow/listeners/base.py create mode 100644 taskflow/listeners/logging.py create mode 100644 taskflow/listeners/printing.py diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index b9915664..c9b2af5e 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -32,6 +32,7 @@ from taskflow import storage as t_storage from taskflow.utils import flow_utils from taskflow.utils import misc +from taskflow.utils import reflection from taskflow.utils import threading_utils @@ -66,6 +67,9 @@ class ActionEngine(base.EngineBase): if current_failure: current_failure.reraise() + def __str__(self): + return "%s: %s" % (reflection.get_class_name(self), id(self)) + def _reset(self): self._failures = [] @@ -108,7 +112,19 @@ class ActionEngine(base.EngineBase): if not states.check_flow_transition(old_state, state): return self.storage.set_flow_state(state) - details = dict(engine=self, old_state=old_state) + try: + flow_uuid = self._flow.uuid + except AttributeError: + # NOTE(harlowja): if the flow was just a single task, then it will + # not itself have a uuid, but the constructed flow_detail will. + if self._flow_detail is not None: + flow_uuid = self._flow_detail.uuid + else: + flow_uuid = None + details = dict(engine=self, + flow_name=self._flow.name, + flow_uuid=flow_uuid, + old_state=old_state) self.notifier.notify(state, details) def on_task_state_change(self, task_action, state, result=None): diff --git a/taskflow/engines/base.py b/taskflow/engines/base.py index 1debd771..ac51857b 100644 --- a/taskflow/engines/base.py +++ b/taskflow/engines/base.py @@ -25,6 +25,7 @@ class EngineBase(object): def __init__(self, flow, flow_detail, backend, conf): self._flow = flow + self._flow_detail = flow_detail self.storage = self._storage_cls(flow_detail, backend) @abc.abstractproperty diff --git a/taskflow/examples/create_parallel_volume.py b/taskflow/examples/create_parallel_volume.py new file mode 100644 index 00000000..d14a5418 --- /dev/null +++ b/taskflow/examples/create_parallel_volume.py @@ -0,0 +1,84 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import contextlib +import logging +import os +import random +import sys +import time + +logging.basicConfig(level=logging.ERROR) + +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) + +from taskflow import engines +from taskflow.listeners import printing +from taskflow.patterns import unordered_flow as uf +from taskflow import task +from taskflow.utils import reflection + + +@contextlib.contextmanager +def show_time(name=''): + start = time.time() + yield + end = time.time() + print(" -- %s took %0.3f seconds" % (name, end - start)) + + +MAX_CREATE_TIME = 3 +VOLUME_COUNT = 5 +SERIAL = False + + +class VolumeCreator(task.Task): + def __init__(self, volume_id): + base_name = reflection.get_callable_name(self) + super(VolumeCreator, self).__init__(name="%s-%s" % (base_name, + volume_id)) + self._volume_id = volume_id + + def execute(self): + print("Making volume %s" % (self._volume_id)) + time.sleep(random.random() * MAX_CREATE_TIME) + print("Finished making volume %s" % (self._volume_id)) + + +# Assume there is no ordering dependency between volumes +flow = uf.Flow("volume-maker") +for i in xrange(0, VOLUME_COUNT): + flow.add(VolumeCreator(volume_id="vol-%s" % (i))) + +if SERIAL: + engine_conf = { + 'engine': 'serial', + } +else: + engine_conf = { + 'engine': 'parallel', + } + + +with show_time(name=flow.name.title()): + eng = engines.load(flow, engine_conf=engine_conf) + with printing.PrintingListener(eng): + eng.run() diff --git a/taskflow/listeners/__init__.py b/taskflow/listeners/__init__.py new file mode 100644 index 00000000..408f9de3 --- /dev/null +++ b/taskflow/listeners/__init__.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/taskflow/listeners/base.py b/taskflow/listeners/base.py new file mode 100644 index 00000000..15e86327 --- /dev/null +++ b/taskflow/listeners/base.py @@ -0,0 +1,95 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from __future__ import absolute_import + +import abc +import logging + +from taskflow import states +from taskflow.utils import misc + +LOG = logging.getLogger(__name__) + +# NOTE(harlowja): on these states will results be useable, all other states +# do not produce results. +FINISH_STATES = (states.FAILURE, states.SUCCESS) + + +class LoggingBase(object): + """This provides a simple listener that can be attached to an engine which + can be derived from to log the received actions to some logging backend. It + provides a useful context manager access to be able to register and + unregister with a given engine automatically when a context is entered and + when it is exited. + """ + __metaclass__ = abc.ABCMeta + + def __init__(self, engine, + listen_for=misc.TransitionNotifier.ANY): + self._listen_for = listen_for + self._engine = engine + self._registered = False + + @abc.abstractmethod + def _log(self, message, *args, **kwargs): + raise NotImplementedError() + + def _flow_receiver(self, state, details): + self._log("%s has moved flow '%s' (%s) into state '%s'", + details['engine'], details['flow_name'], + details['flow_uuid'], state) + + def _task_receiver(self, state, details): + if state in FINISH_STATES: + result = details.get('result') + exc_info = None + was_failure = False + if isinstance(result, misc.Failure): + if result.exc_info: + exc_info = tuple(result.exc_info) + was_failure = True + self._log("%s has moved task '%s' (%s) into state '%s'" + " with result '%s' (failure=%s)", + details['engine'], details['task_name'], + details['task_uuid'], state, result, was_failure, + exc_info=exc_info) + else: + self._log("%s has moved task '%s' (%s) into state '%s'", + details['engine'], details['task_name'], + details['task_uuid'], state) + + def __enter__(self): + if not self._registered: + self._engine.notifier.register(self._listen_for, + self._flow_receiver) + self._engine.task_notifier.register(self._listen_for, + self._task_receiver) + self._registered = True + + def __exit__(self, type, value, tb): + try: + self._engine.notifier.deregister(self._listen_for, + self._flow_receiver) + self._engine.task_notifier.deregister(self._listen_for, + self._task_receiver) + self._registered = False + except Exception: + # Don't let deregistering throw exceptions + LOG.exception("Failed deregistering listeners from engine %s", + self._engine) diff --git a/taskflow/listeners/logging.py b/taskflow/listeners/logging.py new file mode 100644 index 00000000..d6cb9926 --- /dev/null +++ b/taskflow/listeners/logging.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from __future__ import absolute_import + +import logging + +from taskflow.listeners import base +from taskflow.utils import misc + +LOG = logging.getLogger(__name__) + + +class LoggingListener(base.LoggingBase): + """Listens for task and flow notifications and writes those notifications + to a provided logging backend (if none is provided then this modules + logger is used instead) using a configurable logging level (logging.DEBUG + if not provided). + """ + def __init__(self, engine, + listen_for=misc.TransitionNotifier.ANY, + log=None, + level=logging.DEBUG): + super(LoggingListener, self).__init__(engine, listen_for) + self._logger = log + if not self._logger: + self._logger = LOG + self._level = level + + def _log(self, message, *args, **kwargs): + self._logger.log(self._level, message, *args, **kwargs) diff --git a/taskflow/listeners/printing.py b/taskflow/listeners/printing.py new file mode 100644 index 00000000..42be1383 --- /dev/null +++ b/taskflow/listeners/printing.py @@ -0,0 +1,43 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from __future__ import print_function + +import sys +import traceback + +from taskflow.listeners import base +from taskflow.utils import misc + + +class PrintingListener(base.LoggingBase): + """Writes the task and flow notifications messages to stdout or stderr""" + def __init__(self, engine, + listen_for=misc.TransitionNotifier.ANY, stderr=False): + super(PrintingListener, self).__init__(engine, listen_for) + if stderr: + self._file = sys.stderr + else: + self._file = sys.stdout + + def _log(self, message, *args, **kwargs): + print(message % args, file=self._file) + exc_info = kwargs.get('exc_info') + if exc_info is not None: + traceback.print_exception(exc_info[0], exc_info[1], exc_info[2], + file=self._file)