From 42ca240e8157b840c298d14fbf478ae570376633 Mon Sep 17 00:00:00 2001 From: "Ivan A. Melnikov" Date: Sat, 29 Mar 2014 15:20:56 +0400 Subject: [PATCH] Move taskflow.utils.misc.Failure to its own module Failure class is important part of TaskFlow API, so it should be more visible and accessible. Breaking change: any client that used taskflow.utils.misc.Failure should be updated. Change-Id: Ib30000c9246bbcb227b34dfb0aba4d0b950bf926 --- doc/source/arguments_and_results.rst | 16 +- doc/source/failure.rst | 5 + doc/source/index.rst | 2 +- doc/source/utils.rst | 5 - taskflow/engines/action_engine/engine.py | 3 +- taskflow/engines/action_engine/executor.py | 6 +- .../engines/action_engine/graph_action.py | 6 +- .../engines/action_engine/retry_action.py | 6 +- taskflow/engines/action_engine/task_action.py | 6 +- taskflow/engines/worker_based/executor.py | 7 +- taskflow/engines/worker_based/protocol.py | 11 +- taskflow/engines/worker_based/server.py | 27 +- taskflow/examples/wrapped_exception.py | 21 +- taskflow/failure.py | 231 ++++++++++++++++++ taskflow/listeners/base.py | 3 +- .../persistence/backends/impl_sqlalchemy.py | 6 +- taskflow/persistence/logbook.py | 26 +- taskflow/storage.py | 5 +- taskflow/test.py | 10 +- taskflow/tests/unit/persistence/base.py | 16 +- taskflow/tests/unit/test_action_engine.py | 4 +- taskflow/tests/unit/test_retries.py | 8 +- taskflow/tests/unit/test_storage.py | 40 +-- taskflow/tests/unit/test_utils.py | 5 +- taskflow/tests/unit/test_utils_failure.py | 58 ++--- .../tests/unit/worker_based/test_executor.py | 8 +- .../tests/unit/worker_based/test_protocol.py | 14 +- .../tests/unit/worker_based/test_server.py | 26 +- taskflow/tests/utils.py | 16 -- taskflow/utils/misc.py | 196 +-------------- 30 files changed, 411 insertions(+), 382 deletions(-) create mode 100644 doc/source/failure.rst delete mode 100644 doc/source/utils.rst create mode 100644 taskflow/failure.py diff --git a/doc/source/arguments_and_results.rst b/doc/source/arguments_and_results.rst index ab1cc5fe..6e5236b4 100644 --- a/doc/source/arguments_and_results.rst +++ b/doc/source/arguments_and_results.rst @@ -339,7 +339,7 @@ For ``result`` value, two cases are possible: * if task is being reverted because it failed (an exception was raised from its |task.execute| method), ``result`` value is instance of - :py:class:`taskflow.utils.misc.Failure` object that holds exception information; + :py:class:`taskflow.failure.Failure` object that holds exception information; * if task is being reverted because some other task failed, and this task finished successfully, ``result`` value is task result fetched from storage: @@ -349,9 +349,9 @@ All other arguments are fetched from storage in the same way it is done for |task.execute| method. To determine if task failed you can check whether ``result`` is instance of -:py:class:`taskflow.utils.misc.Failure`:: +:py:class:`taskflow.failure.Failure`:: - from taskflow.utils import misc + from taskflow.utils import failure class RevertingTask(task.Task): @@ -359,7 +359,7 @@ To determine if task failed you can check whether ``result`` is instance of return do_something(spam, eggs) def revert(self, result, spam, eggs): - if isinstance(result, misc.Failure): + if isinstance(result, failure.Failure): print("This task failed, exception: %s" % result.exception_str) else: print("do_something returned %r" % result) @@ -374,7 +374,7 @@ Retry Arguments A Retry controller works with arguments in the same way as a Task. But it has an additional parameter 'history' that is a list of tuples. Each tuple contains a result of the previous Retry run and a table where a key is a failed task and a value -is a :py:class:`taskflow.utils.misc.Failure`. +is a :py:class:`taskflow.failure.Failure`. Consider the following Retry:: @@ -396,13 +396,13 @@ Consider the following Retry:: Imagine the following Retry had returned a value '5' and then some task 'A' failed with some exception. In this case ``on_failure`` method will receive the following history:: - [('5', {'A': misc.Failure()})] + [('5', {'A': failure.Failure()})] Then the |retry.execute| method will be called again and it'll receive the same history. -If the |retry.execute| method raises an exception, the |retry.revert| method of Retry will be called and :py:class:`taskflow.utils.misc.Failure` object will be present +If the |retry.execute| method raises an exception, the |retry.revert| method of Retry will be called and :py:class:`taskflow.failure.Failure` object will be present in the history instead of Retry result:: - [('5', {'A': misc.Failure()}), (misc.Failure(), {})] + [('5', {'A': failure.Failure()}), (failure.Failure(), {})] After the Retry has been reverted, the Retry history will be cleaned. diff --git a/doc/source/failure.rst b/doc/source/failure.rst new file mode 100644 index 00000000..9ba6cfb1 --- /dev/null +++ b/doc/source/failure.rst @@ -0,0 +1,5 @@ +======= +Failure +======= + +.. automodule:: taskflow.failure diff --git a/doc/source/index.rst b/doc/source/index.rst index a2196f08..c1930562 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -19,7 +19,7 @@ Contents storage persistence exceptions - utils + failure states diff --git a/doc/source/utils.rst b/doc/source/utils.rst deleted file mode 100644 index 4880922e..00000000 --- a/doc/source/utils.rst +++ /dev/null @@ -1,5 +0,0 @@ ------ -Utils ------ - -.. autoclass:: taskflow.utils.misc.Failure diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 1ad00a0d..218f285a 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -24,6 +24,7 @@ from taskflow.engines.action_engine import task_action from taskflow.engines import base from taskflow import exceptions as exc +from taskflow import failure from taskflow.openstack.common import excutils from taskflow import retry from taskflow import states @@ -109,7 +110,7 @@ class ActionEngine(base.EngineBase): self._change_state(state) if state != states.SUSPENDED and state != states.SUCCESS: failures = self.storage.get_failures() - misc.Failure.reraise_if_any(failures.values()) + failure.Failure.reraise_if_any(failures.values()) @lock_utils.locked(lock='_state_lock') def _change_state(self, state): diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index 846cc568..9837ad9b 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -19,8 +19,8 @@ import abc from concurrent import futures import six +from taskflow import failure from taskflow.utils import async_utils -from taskflow.utils import misc from taskflow.utils import threading_utils # Execution and reversion events. @@ -35,7 +35,7 @@ def _execute_task(task, arguments, progress_callback): except Exception: # NOTE(imelnikov): wrap current exception with Failure # object and return it. - result = misc.Failure() + result = failure.Failure() return (task, EXECUTED, result) @@ -49,7 +49,7 @@ def _revert_task(task, arguments, result, failures, progress_callback): except Exception: # NOTE(imelnikov): wrap current exception with Failure # object and return it. - result = misc.Failure() + result = failure.Failure() return (task, REVERTED, result) diff --git a/taskflow/engines/action_engine/graph_action.py b/taskflow/engines/action_engine/graph_action.py index ae7f9019..c7d54b86 100644 --- a/taskflow/engines/action_engine/graph_action.py +++ b/taskflow/engines/action_engine/graph_action.py @@ -16,10 +16,10 @@ from taskflow.engines.action_engine import executor as ex from taskflow import exceptions as excp +from taskflow import failure from taskflow import retry as r from taskflow import states as st from taskflow import task -from taskflow.utils import misc _WAITING_TIMEOUT = 60 # in seconds @@ -77,7 +77,7 @@ class FutureGraphAction(object): node, event, result = future.result() if isinstance(node, task.BaseTask): self._complete_task(node, event, result) - if isinstance(result, misc.Failure): + if isinstance(result, failure.Failure): if event == ex.EXECUTED: self._process_atom_failure(node, result) else: @@ -88,7 +88,7 @@ class FutureGraphAction(object): not_done.extend(self._schedule(next_nodes)) if failures: - misc.Failure.reraise_if_any(failures) + failure.Failure.reraise_if_any(failures) if self._analyzer.get_next_nodes(): return st.SUSPENDED diff --git a/taskflow/engines/action_engine/retry_action.py b/taskflow/engines/action_engine/retry_action.py index a860f698..c6ca85b7 100644 --- a/taskflow/engines/action_engine/retry_action.py +++ b/taskflow/engines/action_engine/retry_action.py @@ -18,9 +18,9 @@ import logging from taskflow.engines.action_engine import executor as ex from taskflow import exceptions +from taskflow import failure from taskflow import states from taskflow.utils import async_utils -from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -63,7 +63,7 @@ class RetryAction(object): try: result = retry.execute(**kwargs) except Exception: - result = misc.Failure() + result = failure.Failure() self.change_state(retry, states.FAILURE, result=result) else: self.change_state(retry, states.SUCCESS, result=result) @@ -79,7 +79,7 @@ class RetryAction(object): try: result = retry.revert(**kwargs) except Exception: - result = misc.Failure() + result = failure.Failure() self.change_state(retry, states.FAILURE) else: self.change_state(retry, states.REVERTED) diff --git a/taskflow/engines/action_engine/task_action.py b/taskflow/engines/action_engine/task_action.py index 32c0a179..ac2c9c11 100644 --- a/taskflow/engines/action_engine/task_action.py +++ b/taskflow/engines/action_engine/task_action.py @@ -17,8 +17,8 @@ import logging from taskflow import exceptions +from taskflow import failure from taskflow import states -from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -71,7 +71,7 @@ class TaskAction(object): self._on_update_progress) def complete_execution(self, task, result): - if isinstance(result, misc.Failure): + if isinstance(result, failure.Failure): self.change_state(task, states.FAILURE, result=result) else: self.change_state(task, states.SUCCESS, @@ -91,7 +91,7 @@ class TaskAction(object): return future def complete_reversion(self, task, rev_result): - if isinstance(rev_result, misc.Failure): + if isinstance(rev_result, failure.Failure): self.change_state(task, states.FAILURE) else: self.change_state(task, states.REVERTED, progress=1.0) diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index 4c4fc060..c6b259b5 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -24,6 +24,7 @@ from taskflow.engines.worker_based import cache from taskflow.engines.worker_based import protocol as pr from taskflow.engines.worker_based import proxy from taskflow import exceptions as exc +from taskflow import failure from taskflow.utils import async_utils from taskflow.utils import misc @@ -125,7 +126,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): """ LOG.debug("Request '%r' has expired.", request) LOG.debug("The '%r' request has expired.", request) - request.set_result(misc.Failure.from_exception( + request.set_result(failure.Failure.from_exception( exc.RequestTimeout("The '%r' request has expired" % request))) def _on_wait(self): @@ -161,11 +162,11 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): reply_to=self._uuid, correlation_id=request.uuid) except Exception: - with misc.capture_failure() as failure: + with failure.capture_failure() as fail: LOG.exception("Failed to submit the '%s' request." % request) self._requests_cache.delete(request.uuid) - request.set_result(failure) + request.set_result(fail) def _notify_topics(self): """Cyclically publish notify message to each topic.""" diff --git a/taskflow/engines/worker_based/protocol.py b/taskflow/engines/worker_based/protocol.py index 70859053..b8f89396 100644 --- a/taskflow/engines/worker_based/protocol.py +++ b/taskflow/engines/worker_based/protocol.py @@ -21,6 +21,7 @@ import six from concurrent import futures from taskflow.engines.action_engine import executor +from taskflow import failure from taskflow.utils import misc from taskflow.utils import reflection @@ -136,7 +137,7 @@ class Request(Message): return False def to_dict(self): - """Return json-serializable request, converting all `misc.Failure` + """Return json-serializable request, converting all `failure.Failure` objects into dictionaries. """ request = dict(task_cls=self._task_cls, task_name=self._task.name, @@ -144,15 +145,15 @@ class Request(Message): arguments=self._arguments) if 'result' in self._kwargs: result = self._kwargs['result'] - if isinstance(result, misc.Failure): + if isinstance(result, failure.Failure): request['result'] = ('failure', result.to_dict()) else: request['result'] = ('success', result) if 'failures' in self._kwargs: failures = self._kwargs['failures'] request['failures'] = {} - for task, failure in six.iteritems(failures): - request['failures'][task] = failure.to_dict() + for task, fail in six.iteritems(failures): + request['failures'][task] = fail.to_dict() return request def set_result(self, result): @@ -182,7 +183,7 @@ class Response(Message): state = data['state'] data = data['data'] if state == FAILURE and 'result' in data: - data['result'] = misc.Failure.from_dict(data['result']) + data['result'] = failure.Failure.from_dict(data['result']) return cls(state, **data) @property diff --git a/taskflow/engines/worker_based/server.py b/taskflow/engines/worker_based/server.py index 02f56647..b026c0c2 100644 --- a/taskflow/engines/worker_based/server.py +++ b/taskflow/engines/worker_based/server.py @@ -21,7 +21,7 @@ from kombu import exceptions as kombu_exc from taskflow.engines.worker_based import protocol as pr from taskflow.engines.worker_based import proxy -from taskflow.utils import misc +from taskflow import failure LOG = logging.getLogger(__name__) @@ -77,21 +77,22 @@ class Server(object): @staticmethod def _parse_request(task_cls, task_name, action, arguments, result=None, failures=None, **kwargs): - """Parse request before it can be processed. All `misc.Failure` objects - that have been converted to dict on the remote side to be serializable - are now converted back to objects. + """Parse request before it can be processed. + + All `failure.Failure` objects that have been converted to dict on the + remote side to be serializable are now converted back to objects. """ action_args = dict(arguments=arguments, task_name=task_name) if result is not None: data_type, data = result if data_type == 'failure': - action_args['result'] = misc.Failure.from_dict(data) + action_args['result'] = failure.Failure.from_dict(data) else: action_args['result'] = data if failures is not None: action_args['failures'] = {} for k, v in failures.items(): - action_args['failures'][k] = misc.Failure.from_dict(v) + action_args['failures'][k] = failure.Failure.from_dict(v) return task_cls, action, action_args @staticmethod @@ -161,19 +162,19 @@ class Server(object): action_args.update(task_uuid=task_uuid, progress_callback=progress_callback) except ValueError: - with misc.capture_failure() as failure: + with failure.capture_failure() as fail: LOG.exception("Failed to parse request") - reply_callback(result=failure.to_dict()) + reply_callback(result=fail.to_dict()) return # get task endpoint try: endpoint = self._endpoints[task_cls] except KeyError: - with misc.capture_failure() as failure: + with failure.capture_failure() as fail: LOG.exception("The '%s' task endpoint does not exist", task_cls) - reply_callback(result=failure.to_dict()) + reply_callback(result=fail.to_dict()) return else: reply_callback(state=pr.RUNNING) @@ -182,11 +183,11 @@ class Server(object): try: result = getattr(endpoint, action)(**action_args) except Exception: - with misc.capture_failure() as failure: + with failure.capture_failure() as fail: LOG.exception("The %s task execution failed", endpoint) - reply_callback(result=failure.to_dict()) + reply_callback(result=fail.to_dict()) else: - if isinstance(result, misc.Failure): + if isinstance(result, failure.Failure): reply_callback(result=result.to_dict()) else: reply_callback(state=pr.SUCCESS, result=result) diff --git a/taskflow/examples/wrapped_exception.py b/taskflow/examples/wrapped_exception.py index 17ae6322..843bd116 100644 --- a/taskflow/examples/wrapped_exception.py +++ b/taskflow/examples/wrapped_exception.py @@ -31,10 +31,9 @@ sys.path.insert(0, top_dir) import taskflow.engines from taskflow import exceptions +from taskflow import failure from taskflow.patterns import unordered_flow as uf from taskflow import task -from taskflow.tests import utils -from taskflow.utils import misc # INTRO: In this example we create two tasks which can trigger exceptions # based on various inputs to show how to analyze the thrown exceptions for @@ -96,20 +95,20 @@ def run(**store): SecondTask() ) try: - with utils.wrap_all_failures(): + with failure.wrap_all_failures(): taskflow.engines.run(flow, store=store, engine_conf='parallel') except exceptions.WrappedFailure as ex: unknown_failures = [] - for failure in ex: - if failure.check(FirstException): - print("Got FirstException: %s" % failure.exception_str) - elif failure.check(SecondException): - print("Got SecondException: %s" % failure.exception_str) + for fail in ex: + if fail.check(FirstException): + print("Got FirstException: %s" % fail.exception_str) + elif fail.check(SecondException): + print("Got SecondException: %s" % fail.exception_str) else: - print("Unknown failure: %s" % failure) - unknown_failures.append(failure) - misc.Failure.reraise_if_any(unknown_failures) + print("Unknown failure: %s" % fail) + unknown_failures.append(fail) + failure.Failure.reraise_if_any(unknown_failures) print_wrapped("Raise and catch first exception only") diff --git a/taskflow/failure.py b/taskflow/failure.py new file mode 100644 index 00000000..5dd825b0 --- /dev/null +++ b/taskflow/failure.py @@ -0,0 +1,231 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2013-2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import contextlib +import sys +import traceback + +import six + +from taskflow import exceptions as exc +from taskflow.utils import misc +from taskflow.utils import reflection + + +@contextlib.contextmanager +def wrap_all_failures(): + """Convert any exceptions to WrappedFailure. + + When you expect several failures, it may be convenient + to wrap any exception with WrappedFailure in order to + unify error handling. + """ + try: + yield + except Exception: + raise exc.WrappedFailure([Failure()]) + + +@contextlib.contextmanager +def capture_failure(): + """Save current exception, and yield back the failure (or raises a + runtime error if no active exception is being handled). + + In some cases the exception context can be cleared, resulting in None + being attempted to be saved after an exception handler is run. This + can happen when eventlet switches greenthreads or when running an + exception handler, code raises and catches an exception. In both + cases the exception context will be cleared. + + To work around this, we save the exception state, yield a failure and + then run other code. + + For example:: + + except Exception: + with capture_failure() as fail: + LOG.warn("Activating cleanup") + cleanup() + save_failure(fail) + """ + exc_info = sys.exc_info() + if not any(exc_info): + raise RuntimeError("No active exception is being handled") + else: + yield Failure(exc_info=exc_info) + + +class Failure(object): + """Object that represents failure. + + Failure objects encapsulate exception information so that + it can be re-used later to re-raise or inspect. + """ + DICT_VERSION = 1 + + def __init__(self, exc_info=None, **kwargs): + if not kwargs: + if exc_info is None: + exc_info = sys.exc_info() + self._exc_info = exc_info + self._exc_type_names = list( + reflection.get_all_class_names(exc_info[0], up_to=Exception)) + if not self._exc_type_names: + raise TypeError('Invalid exception type: %r' % exc_info[0]) + self._exception_str = exc.exception_message(self._exc_info[1]) + self._traceback_str = ''.join( + traceback.format_tb(self._exc_info[2])) + else: + self._exc_info = exc_info # may be None + self._exception_str = kwargs.pop('exception_str') + self._exc_type_names = kwargs.pop('exc_type_names', []) + self._traceback_str = kwargs.pop('traceback_str', None) + if kwargs: + raise TypeError( + 'Failure.__init__ got unexpected keyword argument(s): %s' + % ', '.join(six.iterkeys(kwargs))) + + @classmethod + def from_exception(cls, exception): + return cls((type(exception), exception, None)) + + def _matches(self, other): + if self is other: + return True + return (self._exc_type_names == other._exc_type_names + and self.exception_str == other.exception_str + and self.traceback_str == other.traceback_str) + + def matches(self, other): + if not isinstance(other, Failure): + return False + if self.exc_info is None or other.exc_info is None: + return self._matches(other) + else: + return self == other + + def __eq__(self, other): + if not isinstance(other, Failure): + return NotImplemented + return (self._matches(other) and + misc.are_equal_exc_info_tuples(self.exc_info, other.exc_info)) + + def __ne__(self, other): + return not (self == other) + + # NOTE(imelnikov): obj.__hash__() should return same values for equal + # objects, so we should redefine __hash__. Failure equality semantics + # is a bit complicated, so for now we just mark Failure objects as + # unhashable. See python docs on object.__hash__ for more info: + # http://docs.python.org/2/reference/datamodel.html#object.__hash__ + __hash__ = None + + @property + def exception(self): + """Exception value, or None if exception value is not present. + + Exception value may be lost during serialization. + """ + if self._exc_info: + return self._exc_info[1] + else: + return None + + @property + def exception_str(self): + """String representation of exception.""" + return self._exception_str + + @property + def exc_info(self): + """Exception info tuple or None.""" + return self._exc_info + + @property + def traceback_str(self): + """Exception traceback as string.""" + return self._traceback_str + + @staticmethod + def reraise_if_any(failures): + """Re-raise exceptions if argument is not empty. + + If argument is empty list, this method returns None. If + argument is list with single Failure object in it, + this failure is reraised. Else, WrappedFailure exception + is raised with failures list as causes. + """ + failures = list(failures) + if len(failures) == 1: + failures[0].reraise() + elif len(failures) > 1: + raise exc.WrappedFailure(failures) + + def reraise(self): + """Re-raise captured exception.""" + if self._exc_info: + six.reraise(*self._exc_info) + else: + raise exc.WrappedFailure([self]) + + def check(self, *exc_classes): + """Check if any of exc_classes caused the failure. + + Arguments of this method can be exception types or type + names (stings). If captured exception is instance of + exception of given type, the corresponding argument is + returned. Else, None is returned. + """ + for cls in exc_classes: + if isinstance(cls, type): + err = reflection.get_class_name(cls) + else: + err = cls + if err in self._exc_type_names: + return cls + return None + + def __str__(self): + return 'Failure: %s: %s' % (self._exc_type_names[0], + self._exception_str) + + def __iter__(self): + """Iterate over exception type names.""" + for et in self._exc_type_names: + yield et + + @classmethod + def from_dict(cls, data): + data = dict(data) + version = data.pop('version', None) + if version != cls.DICT_VERSION: + raise ValueError('Invalid dict version of failure object: %r' + % version) + return cls(**data) + + def to_dict(self): + return { + 'exception_str': self.exception_str, + 'traceback_str': self.traceback_str, + 'exc_type_names': list(self), + 'version': self.DICT_VERSION, + } + + def copy(self): + return Failure(exc_info=misc.copy_exc_info(self.exc_info), + exception_str=self.exception_str, + traceback_str=self.traceback_str, + exc_type_names=self._exc_type_names[:]) diff --git a/taskflow/listeners/base.py b/taskflow/listeners/base.py index e8f1674c..9c601d4a 100644 --- a/taskflow/listeners/base.py +++ b/taskflow/listeners/base.py @@ -21,6 +21,7 @@ import logging import six +from taskflow import failure from taskflow.openstack.common import excutils from taskflow import states from taskflow.utils import misc @@ -141,7 +142,7 @@ class LoggingBase(ListenerBase): result = details.get('result') exc_info = None was_failure = False - if isinstance(result, misc.Failure): + if isinstance(result, failure.Failure): if result.exc_info: exc_info = tuple(result.exc_info) was_failure = True diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py index 2067ff5a..248c5453 100644 --- a/taskflow/persistence/backends/impl_sqlalchemy.py +++ b/taskflow/persistence/backends/impl_sqlalchemy.py @@ -31,6 +31,7 @@ from sqlalchemy import orm as sa_orm from sqlalchemy import pool as sa_pool from taskflow import exceptions as exc +from taskflow import failure from taskflow.persistence.backends import base from taskflow.persistence.backends.sqlalchemy import migration from taskflow.persistence.backends.sqlalchemy import models @@ -267,7 +268,7 @@ class Connection(base.Connection): pass except sa_exc.OperationalError as ex: if _is_db_connection_error(six.text_type(ex.args[0])): - failures.append(misc.Failure()) + failures.append(failure.Failure()) return False return True @@ -513,7 +514,7 @@ def _convert_ad_to_external(ad): # to change the internal sqlalchemy model easily by forcing a defined # interface (that isn't the sqlalchemy model itself). atom_cls = logbook.atom_detail_class(ad.atom_type) - return atom_cls.from_dict({ + result = atom_cls.from_dict({ 'state': ad.state, 'intention': ad.intention, 'results': ad.results, @@ -523,6 +524,7 @@ def _convert_ad_to_external(ad): 'name': ad.name, 'uuid': ad.uuid, }) + return result def _convert_lb_to_external(lb_m): diff --git a/taskflow/persistence/logbook.py b/taskflow/persistence/logbook.py index f378b677..4dece262 100644 --- a/taskflow/persistence/logbook.py +++ b/taskflow/persistence/logbook.py @@ -21,10 +21,10 @@ import logging import six +from taskflow import failure from taskflow.openstack.common import timeutils from taskflow.openstack.common import uuidutils from taskflow import states -from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -311,11 +311,11 @@ class AtomDetail(object): def _to_dict_shared(self): if self.failure: - failure = self.failure.to_dict() + fail = self.failure.to_dict() else: - failure = None + fail = None return { - 'failure': failure, + 'failure': fail, 'meta': self.meta, 'name': self.name, 'results': self.results, @@ -331,9 +331,9 @@ class AtomDetail(object): self.results = data.get('results') self.version = data.get('version') self.meta = _fix_meta(data) - failure = data.get('failure') - if failure: - self.failure = misc.Failure.from_dict(failure) + fail = data.get('failure') + if fail: + self.failure = failure.Failure.from_dict(fail) @property def uuid(self): @@ -405,7 +405,7 @@ class RetryDetail(AtomDetail): for (data, failures) in results: new_failures = {} for (key, failure_data) in six.iteritems(failures): - new_failures[key] = misc.Failure.from_dict(failure_data) + new_failures[key] = failure.Failure.from_dict(failure_data) new_results.append((data, new_failures)) return new_results @@ -423,8 +423,8 @@ class RetryDetail(AtomDetail): new_results = [] for (data, failures) in results: new_failures = {} - for (key, failure) in six.iteritems(failures): - new_failures[key] = failure.to_dict() + for (key, fail) in six.iteritems(failures): + new_failures[key] = fail.to_dict() new_results.append((data, new_failures)) return new_results @@ -443,11 +443,11 @@ class RetryDetail(AtomDetail): # contain tracebacks, which are not copyable. for (data, failures) in other.results: copied_failures = {} - for (key, failure) in six.iteritems(failures): + for (key, fail) in six.iteritems(failures): if deep_copy: - copied_failures[key] = failure.copy() + copied_failures[key] = fail.copy() else: - copied_failures[key] = failure + copied_failures[key] = fail results.append((data, copied_failures)) self.results = results return self diff --git a/taskflow/storage.py b/taskflow/storage.py index dca870f3..23c90d53 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -21,6 +21,7 @@ import logging import six from taskflow import exceptions +from taskflow import failure from taskflow.openstack.common import uuidutils from taskflow.persistence import logbook from taskflow import states @@ -53,7 +54,7 @@ class Storage(object): self._lock = self._lock_cls() # NOTE(imelnikov): failure serialization looses information, - # so we cache failures here, in task name -> misc.Failure mapping. + # so we cache failures here, in task name -> failure.Failure mapping. self._failures = {} for ad in self._flowdetail: if ad.failure is not None: @@ -325,7 +326,7 @@ class Storage(object): with self._lock.write_lock(): ad = self._atomdetail_by_name(atom_name) ad.state = state - if state == states.FAILURE and isinstance(data, misc.Failure): + if state == states.FAILURE and isinstance(data, failure.Failure): # FIXME(harlowja): this seems like it should be internal logic # in the atom detail object and not in here. Fix that soon... # diff --git a/taskflow/test.py b/taskflow/test.py index ce99a373..0ecbce40 100644 --- a/taskflow/test.py +++ b/taskflow/test.py @@ -24,7 +24,7 @@ import fixtures import six from taskflow import exceptions -from taskflow.tests import utils +from taskflow import failure from taskflow.utils import misc @@ -49,13 +49,13 @@ class FailureRegexpMatcher(object): self.exc_class = exc_class self.pattern = pattern - def match(self, failure): - for cause in failure: + def match(self, fail): + for cause in fail: if cause.check(self.exc_class) is not None: return matchers.MatchesRegex( self.pattern).match(cause.exception_str) return matchers.Mismatch("The `%s` wasn't caused by the `%s`" % - (failure, self.exc_class)) + (fail, self.exc_class)) class ItemsEqual(object): @@ -171,7 +171,7 @@ class TestCase(testcase.TestCase): string matches to the given pattern. """ try: - with utils.wrap_all_failures(): + with failure.wrap_all_failures(): callable_obj(*args, **kwargs) except exceptions.WrappedFailure as e: self.assertThat(e, FailureRegexpMatcher(exc_class, pattern)) diff --git a/taskflow/tests/unit/persistence/base.py b/taskflow/tests/unit/persistence/base.py index 3d28695c..ca5a2db9 100644 --- a/taskflow/tests/unit/persistence/base.py +++ b/taskflow/tests/unit/persistence/base.py @@ -17,10 +17,10 @@ import contextlib from taskflow import exceptions as exc +from taskflow import failure from taskflow.openstack.common import uuidutils from taskflow.persistence import logbook from taskflow import states -from taskflow.utils import misc class PersistenceTestMixin(object): @@ -147,7 +147,7 @@ class PersistenceTestMixin(object): try: raise RuntimeError('Woot!') except Exception: - td.failure = misc.Failure() + td.failure = failure.Failure() fd.add(td) @@ -161,10 +161,10 @@ class PersistenceTestMixin(object): lb2 = conn.get_logbook(lb_id) fd2 = lb2.find(fd.uuid) td2 = fd2.find(td.uuid) - failure = td2.failure - self.assertEqual(failure.exception_str, 'Woot!') - self.assertIs(failure.check(RuntimeError), RuntimeError) - self.assertEqual(failure.traceback_str, td.failure.traceback_str) + fail = td2.failure + self.assertEqual(fail.exception_str, 'Woot!') + self.assertIs(fail.check(RuntimeError), RuntimeError) + self.assertEqual(fail.traceback_str, td.failure.traceback_str) self.assertIsInstance(td2, logbook.TaskDetail) def test_logbook_merge_flow_detail(self): @@ -269,7 +269,7 @@ class PersistenceTestMixin(object): fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid()) lb.add(fd) rd = logbook.RetryDetail("retry-1", uuid=uuidutils.generate_uuid()) - fail = misc.Failure.from_exception(RuntimeError('fail')) + fail = failure.Failure.from_exception(RuntimeError('fail')) rd.results.append((42, {'some-task': fail})) fd.add(rd) @@ -286,7 +286,7 @@ class PersistenceTestMixin(object): rd2 = fd2.find(rd.uuid) self.assertIsInstance(rd2, logbook.RetryDetail) fail2 = rd2.results[0][1].get('some-task') - self.assertIsInstance(fail2, misc.Failure) + self.assertIsInstance(fail2, failure.Failure) self.assertTrue(fail.matches(fail2)) def test_retry_detail_save_intention(self): diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py index ad69ec59..d6292bbe 100644 --- a/taskflow/tests/unit/test_action_engine.py +++ b/taskflow/tests/unit/test_action_engine.py @@ -31,6 +31,7 @@ from taskflow.engines.action_engine import engine as eng from taskflow.engines.worker_based import engine as w_eng from taskflow.engines.worker_based import worker as wkr from taskflow import exceptions as exc +from taskflow import failure from taskflow.persistence import logbook from taskflow import states from taskflow import task @@ -38,7 +39,6 @@ from taskflow import test from taskflow.tests import utils from taskflow.utils import eventlet_utils as eu -from taskflow.utils import misc from taskflow.utils import persistence_utils as p_utils @@ -486,7 +486,7 @@ class EngineCheckingTaskTest(utils.EngineTestBase): self.assertEqual(result, 'RESULT') self.assertEqual(list(flow_failures.keys()), ['fail1']) fail = flow_failures['fail1'] - self.assertIsInstance(fail, misc.Failure) + self.assertIsInstance(fail, failure.Failure) self.assertEqual(str(fail), 'Failure: RuntimeError: Woot!') flow = lf.Flow('test').add( diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py index 42f4a5fe..44874f19 100644 --- a/taskflow/tests/unit/test_retries.py +++ b/taskflow/tests/unit/test_retries.py @@ -21,11 +21,11 @@ from taskflow.patterns import unordered_flow as uf import taskflow.engines from taskflow import exceptions as exc +from taskflow import failure from taskflow import retry from taskflow import states as st from taskflow import test from taskflow.tests import utils -from taskflow.utils import misc class RetryTest(utils.EngineTestBase): @@ -558,7 +558,7 @@ class RetryTest(utils.EngineTestBase): # we execute retry engine.storage.save('flow-1_retry', 1) # task fails - fail = misc.Failure.from_exception(RuntimeError('foo')), + fail = failure.Failure.from_exception(RuntimeError('foo')), engine.storage.save('task1', fail, state=st.FAILURE) if when == 'task fails': return engine @@ -634,7 +634,7 @@ class RetryTest(utils.EngineTestBase): self._make_engine(flow).run) self.assertEqual(len(r.history), 1) self.assertEqual(r.history[0][1], {}) - self.assertEqual(isinstance(r.history[0][0], misc.Failure), True) + self.assertEqual(isinstance(r.history[0][0], failure.Failure), True) def test_retry_revert_fails(self): @@ -690,7 +690,7 @@ class RetryTest(utils.EngineTestBase): engine.storage.save('test2_retry', 1) engine.storage.save('b', 11) # pretend that 'c' failed - fail = misc.Failure.from_exception(RuntimeError('Woot!')) + fail = failure.Failure.from_exception(RuntimeError('Woot!')) engine.storage.save('c', fail, st.FAILURE) engine.run() diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py index eb088190..2c6fcd22 100644 --- a/taskflow/tests/unit/test_storage.py +++ b/taskflow/tests/unit/test_storage.py @@ -20,13 +20,13 @@ import threading import mock from taskflow import exceptions +from taskflow import failure from taskflow.openstack.common import uuidutils from taskflow.persistence import backends from taskflow.persistence import logbook from taskflow import states from taskflow import storage from taskflow import test -from taskflow.utils import misc from taskflow.utils import persistence_utils as p_utils @@ -127,46 +127,46 @@ class StorageTestMixin(object): self.assertEqual(s.get_atom_state('my task'), states.FAILURE) def test_save_and_get_cached_failure(self): - failure = misc.Failure.from_exception(RuntimeError('Woot!')) + fail = failure.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() s.ensure_task('my task') - s.save('my task', failure, states.FAILURE) - self.assertEqual(s.get('my task'), failure) + s.save('my task', fail, states.FAILURE) + self.assertEqual(s.get('my task'), fail) self.assertEqual(s.get_atom_state('my task'), states.FAILURE) self.assertTrue(s.has_failures()) - self.assertEqual(s.get_failures(), {'my task': failure}) + self.assertEqual(s.get_failures(), {'my task': fail}) def test_save_and_get_non_cached_failure(self): - failure = misc.Failure.from_exception(RuntimeError('Woot!')) + fail = failure.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() s.ensure_task('my task') - s.save('my task', failure, states.FAILURE) - self.assertEqual(s.get('my task'), failure) + s.save('my task', fail, states.FAILURE) + self.assertEqual(s.get('my task'), fail) s._failures['my task'] = None - self.assertTrue(failure.matches(s.get('my task'))) + self.assertTrue(fail.matches(s.get('my task'))) def test_get_failure_from_reverted_task(self): - failure = misc.Failure.from_exception(RuntimeError('Woot!')) + fail = failure.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() s.ensure_task('my task') - s.save('my task', failure, states.FAILURE) + s.save('my task', fail, states.FAILURE) s.set_atom_state('my task', states.REVERTING) - self.assertEqual(s.get('my task'), failure) + self.assertEqual(s.get('my task'), fail) s.set_atom_state('my task', states.REVERTED) - self.assertEqual(s.get('my task'), failure) + self.assertEqual(s.get('my task'), fail) def test_get_failure_after_reload(self): - failure = misc.Failure.from_exception(RuntimeError('Woot!')) + fail = failure.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() s.ensure_task('my task') - s.save('my task', failure, states.FAILURE) + s.save('my task', fail, states.FAILURE) s2 = self._get_storage(s._flowdetail) self.assertTrue(s2.has_failures()) self.assertEqual(1, len(s2.get_failures())) - self.assertTrue(failure.matches(s2.get('my task'))) + self.assertTrue(fail.matches(s2.get('my task'))) self.assertEqual(s2.get_atom_state('my task'), states.FAILURE) def test_get_non_existing_var(self): @@ -483,15 +483,15 @@ class StorageTestMixin(object): self.assertEqual(s.fetch_all(), {}) def test_cached_retry_failure(self): - failure = misc.Failure.from_exception(RuntimeError('Woot!')) + fail = failure.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() s.ensure_retry('my retry', result_mapping={'x': 0}) s.save('my retry', 'a') - s.save('my retry', failure, states.FAILURE) + s.save('my retry', fail, states.FAILURE) history = s.get_retry_history('my retry') - self.assertEqual(history, [('a', {}), (failure, {})]) + self.assertEqual(history, [('a', {}), (fail, {})]) self.assertIs(s.has_failures(), True) - self.assertEqual(s.get_failures(), {'my retry': failure}) + self.assertEqual(s.get_failures(), {'my retry': fail}) def test_logbook_get_unknown_atom_type(self): self.assertRaisesRegexp(TypeError, diff --git a/taskflow/tests/unit/test_utils.py b/taskflow/tests/unit/test_utils.py index 1c5c197b..5923dc4b 100644 --- a/taskflow/tests/unit/test_utils.py +++ b/taskflow/tests/unit/test_utils.py @@ -19,6 +19,7 @@ import functools import sys import time +from taskflow import failure from taskflow import states from taskflow import test from taskflow.tests import utils as test_utils @@ -284,8 +285,8 @@ class GetClassNameTest(test.TestCase): self.assertEqual(name, 'RuntimeError') def test_global_class(self): - name = reflection.get_class_name(misc.Failure) - self.assertEqual(name, 'taskflow.utils.misc.Failure') + name = reflection.get_class_name(failure.Failure) + self.assertEqual(name, 'taskflow.failure.Failure') def test_class(self): name = reflection.get_class_name(Class) diff --git a/taskflow/tests/unit/test_utils_failure.py b/taskflow/tests/unit/test_utils_failure.py index 394abfd2..be42129a 100644 --- a/taskflow/tests/unit/test_utils_failure.py +++ b/taskflow/tests/unit/test_utils_failure.py @@ -20,14 +20,14 @@ from taskflow import exceptions from taskflow import test from taskflow.tests import utils as test_utils -from taskflow.utils import misc +from taskflow import failure def _captured_failure(msg): try: raise RuntimeError(msg) except Exception: - return misc.Failure() + return failure.Failure() class GeneralFailureObjTestsMixin(object): @@ -82,9 +82,9 @@ class ReCreatedFailureTestCase(test.TestCase, GeneralFailureObjTestsMixin): def setUp(self): super(ReCreatedFailureTestCase, self).setUp() fail_obj = _captured_failure('Woot!') - self.fail_obj = misc.Failure(exception_str=fail_obj.exception_str, - traceback_str=fail_obj.traceback_str, - exc_type_names=list(fail_obj)) + self.fail_obj = failure.Failure(exception_str=fail_obj.exception_str, + traceback_str=fail_obj.traceback_str, + exc_type_names=list(fail_obj)) def test_value_lost(self): self.assertIs(self.fail_obj.exception, None) @@ -102,7 +102,7 @@ class FromExceptionTestCase(test.TestCase, GeneralFailureObjTestsMixin): def setUp(self): super(FromExceptionTestCase, self).setUp() - self.fail_obj = misc.Failure.from_exception(RuntimeError('Woot!')) + self.fail_obj = failure.Failure.from_exception(RuntimeError('Woot!')) class FailureObjectTestCase(test.TestCase): @@ -111,10 +111,10 @@ class FailureObjectTestCase(test.TestCase): try: raise SystemExit() except BaseException: - self.assertRaises(TypeError, misc.Failure) + self.assertRaises(TypeError, failure.Failure) def test_unknown_argument(self): - exc = self.assertRaises(TypeError, misc.Failure, + exc = self.assertRaises(TypeError, failure.Failure, exception_str='Woot!', traceback_str=None, exc_type_names=['Exception'], @@ -123,12 +123,12 @@ class FailureObjectTestCase(test.TestCase): self.assertEqual(str(exc), expected) def test_empty_does_not_reraise(self): - self.assertIs(misc.Failure.reraise_if_any([]), None) + self.assertIs(failure.Failure.reraise_if_any([]), None) def test_reraises_one(self): fls = [_captured_failure('Woot!')] self.assertRaisesRegexp(RuntimeError, '^Woot!$', - misc.Failure.reraise_if_any, fls) + failure.Failure.reraise_if_any, fls) def test_reraises_several(self): fls = [ @@ -136,7 +136,7 @@ class FailureObjectTestCase(test.TestCase): _captured_failure('Oh, not again!') ] exc = self.assertRaises(exceptions.WrappedFailure, - misc.Failure.reraise_if_any, fls) + failure.Failure.reraise_if_any, fls) self.assertEqual(list(exc), fls) def test_failure_copy(self): @@ -149,9 +149,9 @@ class FailureObjectTestCase(test.TestCase): def test_failure_copy_recaptured(self): captured = _captured_failure('Woot!') - fail_obj = misc.Failure(exception_str=captured.exception_str, - traceback_str=captured.traceback_str, - exc_type_names=list(captured)) + fail_obj = failure.Failure(exception_str=captured.exception_str, + traceback_str=captured.traceback_str, + exc_type_names=list(captured)) copied = fail_obj.copy() self.assertIsNot(fail_obj, copied) self.assertEqual(fail_obj, copied) @@ -160,9 +160,9 @@ class FailureObjectTestCase(test.TestCase): def test_recaptured_not_eq(self): captured = _captured_failure('Woot!') - fail_obj = misc.Failure(exception_str=captured.exception_str, - traceback_str=captured.traceback_str, - exc_type_names=list(captured)) + fail_obj = failure.Failure(exception_str=captured.exception_str, + traceback_str=captured.traceback_str, + exc_type_names=list(captured)) self.assertFalse(fail_obj == captured) self.assertTrue(fail_obj != captured) self.assertTrue(fail_obj.matches(captured)) @@ -174,13 +174,13 @@ class FailureObjectTestCase(test.TestCase): def test_two_recaptured_neq(self): captured = _captured_failure('Woot!') - fail_obj = misc.Failure(exception_str=captured.exception_str, - traceback_str=captured.traceback_str, - exc_type_names=list(captured)) + fail_obj = failure.Failure(exception_str=captured.exception_str, + traceback_str=captured.traceback_str, + exc_type_names=list(captured)) new_exc_str = captured.exception_str.replace('Woot', 'w00t') - fail_obj2 = misc.Failure(exception_str=new_exc_str, - traceback_str=captured.traceback_str, - exc_type_names=list(captured)) + fail_obj2 = failure.Failure(exception_str=new_exc_str, + traceback_str=captured.traceback_str, + exc_type_names=list(captured)) self.assertNotEqual(fail_obj, fail_obj2) self.assertFalse(fail_obj2.matches(fail_obj)) @@ -220,7 +220,7 @@ class WrappedFailureTestCase(test.TestCase): try: raise exceptions.WrappedFailure([f1, f2]) except Exception: - fail_obj = misc.Failure() + fail_obj = failure.Failure() wf = exceptions.WrappedFailure([fail_obj, f3]) self.assertEqual(list(wf), [f1, f2, f3]) @@ -230,13 +230,13 @@ class NonAsciiExceptionsTestCase(test.TestCase): def test_exception_with_non_ascii_str(self): bad_string = chr(200) - fail = misc.Failure.from_exception(ValueError(bad_string)) + fail = failure.Failure.from_exception(ValueError(bad_string)) self.assertEqual(fail.exception_str, bad_string) self.assertEqual(str(fail), 'Failure: ValueError: %s' % bad_string) def test_exception_non_ascii_unicode(self): hi_ru = u'привет' - fail = misc.Failure.from_exception(ValueError(hi_ru)) + fail = failure.Failure.from_exception(ValueError(hi_ru)) self.assertEqual(fail.exception_str, hi_ru) self.assertIsInstance(fail.exception_str, six.text_type) self.assertEqual(six.text_type(fail), @@ -246,7 +246,7 @@ class NonAsciiExceptionsTestCase(test.TestCase): hi_cn = u'嗨' fail = ValueError(hi_cn) self.assertEqual(hi_cn, exceptions.exception_message(fail)) - fail = misc.Failure.from_exception(fail) + fail = failure.Failure.from_exception(fail) wrapped_fail = exceptions.WrappedFailure([fail]) if six.PY2: # Python 2.x will unicode escape it, while python 3.3+ will not, @@ -261,12 +261,12 @@ class NonAsciiExceptionsTestCase(test.TestCase): def test_failure_equality_with_non_ascii_str(self): bad_string = chr(200) - fail = misc.Failure.from_exception(ValueError(bad_string)) + fail = failure.Failure.from_exception(ValueError(bad_string)) copied = fail.copy() self.assertEqual(fail, copied) def test_failure_equality_non_ascii_unicode(self): hi_ru = u'привет' - fail = misc.Failure.from_exception(ValueError(hi_ru)) + fail = failure.Failure.from_exception(ValueError(hi_ru)) copied = fail.copy() self.assertEqual(fail, copied) diff --git a/taskflow/tests/unit/worker_based/test_executor.py b/taskflow/tests/unit/worker_based/test_executor.py index 75092003..fc162e9d 100644 --- a/taskflow/tests/unit/worker_based/test_executor.py +++ b/taskflow/tests/unit/worker_based/test_executor.py @@ -23,9 +23,9 @@ from kombu import exceptions as kombu_exc from taskflow.engines.worker_based import executor from taskflow.engines.worker_based import protocol as pr +from taskflow import failure from taskflow import test from taskflow.tests import utils -from taskflow.utils import misc class TestWorkerTaskExecutor(test.MockTestCase): @@ -111,8 +111,8 @@ class TestWorkerTaskExecutor(test.MockTestCase): self.assertEqual(self.message_mock.mock_calls, [mock.call.ack()]) def test_on_message_response_state_failure(self): - failure = misc.Failure.from_exception(Exception('test')) - failure_dict = failure.to_dict() + fail = failure.Failure.from_exception(Exception('test')) + failure_dict = fail.to_dict() response = pr.Response(pr.FAILURE, result=failure_dict) ex = self.executor() ex._requests_cache.set(self.task_uuid, self.request_inst_mock) @@ -120,7 +120,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): self.assertEqual(len(ex._requests_cache._data), 0) self.assertEqual(self.request_inst_mock.mock_calls, [ - mock.call.set_result(result=utils.FailureMatcher(failure)) + mock.call.set_result(result=utils.FailureMatcher(fail)) ]) self.assertEqual(self.message_mock.mock_calls, [mock.call.ack()]) diff --git a/taskflow/tests/unit/worker_based/test_protocol.py b/taskflow/tests/unit/worker_based/test_protocol.py index 27d6e00d..ea18ab94 100644 --- a/taskflow/tests/unit/worker_based/test_protocol.py +++ b/taskflow/tests/unit/worker_based/test_protocol.py @@ -19,9 +19,9 @@ import mock from concurrent import futures from taskflow.engines.worker_based import protocol as pr +from taskflow import failure from taskflow import test from taskflow.tests import utils -from taskflow.utils import misc class TestProtocol(test.TestCase): @@ -81,15 +81,15 @@ class TestProtocol(test.TestCase): self.request_to_dict(result=('success', None))) def test_to_dict_with_result_failure(self): - failure = misc.Failure.from_exception(RuntimeError('Woot!')) - expected = self.request_to_dict(result=('failure', failure.to_dict())) - self.assertEqual(self.request(result=failure).to_dict(), expected) + fail = failure.Failure.from_exception(RuntimeError('Woot!')) + expected = self.request_to_dict(result=('failure', fail.to_dict())) + self.assertEqual(self.request(result=fail).to_dict(), expected) def test_to_dict_with_failures(self): - failure = misc.Failure.from_exception(RuntimeError('Woot!')) - request = self.request(failures={self.task.name: failure}) + fail = failure.Failure.from_exception(RuntimeError('Woot!')) + request = self.request(failures={self.task.name: fail}) expected = self.request_to_dict( - failures={self.task.name: failure.to_dict()}) + failures={self.task.name: fail.to_dict()}) self.assertEqual(request.to_dict(), expected) @mock.patch('taskflow.engines.worker_based.protocol.misc.wallclock') diff --git a/taskflow/tests/unit/worker_based/test_server.py b/taskflow/tests/unit/worker_based/test_server.py index a4eab7a8..b155d771 100644 --- a/taskflow/tests/unit/worker_based/test_server.py +++ b/taskflow/tests/unit/worker_based/test_server.py @@ -23,9 +23,9 @@ from kombu import exceptions as exc from taskflow.engines.worker_based import endpoint as ep from taskflow.engines.worker_based import protocol as pr from taskflow.engines.worker_based import server +from taskflow import failure from taskflow import test from taskflow.tests import utils -from taskflow.utils import misc class TestServer(test.MockTestCase): @@ -185,19 +185,19 @@ class TestServer(test.MockTestCase): result=1))) def test_parse_request_with_failure_result(self): - failure = misc.Failure.from_exception(Exception('test')) - request = self.make_request(action='revert', result=failure) + fail = failure.Failure.from_exception(Exception('test')) + request = self.make_request(action='revert', result=fail) task_cls, action, task_args = server.Server._parse_request(**request) self.assertEqual((task_cls, action, task_args), (self.task.name, 'revert', dict(task_name=self.task.name, arguments=self.task_args, - result=utils.FailureMatcher(failure)))) + result=utils.FailureMatcher(fail)))) def test_parse_request_with_failures(self): - failures = {'0': misc.Failure.from_exception(Exception('test1')), - '1': misc.Failure.from_exception(Exception('test2'))} + failures = {'0': failure.Failure.from_exception(Exception('test1')), + '1': failure.Failure.from_exception(Exception('test2'))} request = self.make_request(action='revert', failures=failures) task_cls, action, task_args = server.Server._parse_request(**request) @@ -274,16 +274,16 @@ class TestServer(test.MockTestCase): self.assertEqual(self.master_mock.mock_calls, []) self.assertTrue(mocked_exception.called) - @mock.patch.object(misc.Failure, 'from_dict') - @mock.patch.object(misc.Failure, 'to_dict') + @mock.patch.object(failure.Failure, 'from_dict') + @mock.patch.object(failure.Failure, 'to_dict') def test_process_request_parse_request_failure(self, to_mock, from_mock): failure_dict = { 'failure': 'failure', } - failure = misc.Failure.from_exception(RuntimeError('Woot!')) + fail = failure.Failure.from_exception(RuntimeError('Woot!')) to_mock.return_value = failure_dict from_mock.side_effect = ValueError('Woot!') - request = self.make_request(result=failure) + request = self.make_request(result=fail) # create server and process request s = self.server(reset_master_mock=True) @@ -298,7 +298,7 @@ class TestServer(test.MockTestCase): ] self.assertEqual(master_mock_calls, self.master_mock.mock_calls) - @mock.patch.object(misc.Failure, 'to_dict') + @mock.patch.object(failure.Failure, 'to_dict') def test_process_request_endpoint_not_found(self, to_mock): failure_dict = { 'failure': 'failure', @@ -319,7 +319,7 @@ class TestServer(test.MockTestCase): ] self.assertEqual(self.master_mock.mock_calls, master_mock_calls) - @mock.patch.object(misc.Failure, 'to_dict') + @mock.patch.object(failure.Failure, 'to_dict') def test_process_request_execution_failure(self, to_mock): failure_dict = { 'failure': 'failure', @@ -344,7 +344,7 @@ class TestServer(test.MockTestCase): ] self.assertEqual(self.master_mock.mock_calls, master_mock_calls) - @mock.patch.object(misc.Failure, 'to_dict') + @mock.patch.object(failure.Failure, 'to_dict') def test_process_request_task_failure(self, to_mock): failure_dict = { 'failure': 'failure', diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index 20fc3758..5ed9b75a 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -19,31 +19,15 @@ import threading import six -from taskflow import exceptions from taskflow.persistence.backends import impl_memory from taskflow import retry from taskflow import task -from taskflow.utils import misc ARGS_KEY = '__args__' KWARGS_KEY = '__kwargs__' ORDER_KEY = '__order__' -@contextlib.contextmanager -def wrap_all_failures(): - """Convert any exceptions to WrappedFailure. - - When you expect several failures, it may be convenient - to wrap any exception with WrappedFailure in order to - unify error handling. - """ - try: - yield - except Exception: - raise exceptions.WrappedFailure([misc.Failure()]) - - class DummyTask(task.Task): def execute(self, context, *args, **kwargs): diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index 1da28a4c..1e8bcb1a 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# Copyright (C) 2012-2014 Yahoo! Inc. All Rights Reserved. # Copyright (C) 2013 Rackspace Hosting All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -16,7 +16,6 @@ # under the License. import collections -import contextlib import copy import errno import functools @@ -24,7 +23,6 @@ import keyword import logging import os import string -import sys import threading import time import traceback @@ -513,195 +511,3 @@ def are_equal_exc_info_tuples(ei1, ei2): tb1 = traceback.format_tb(ei1[2]) tb2 = traceback.format_tb(ei2[2]) return tb1 == tb2 - - -@contextlib.contextmanager -def capture_failure(): - """Save current exception, and yield back the failure (or raises a - runtime error if no active exception is being handled). - - In some cases the exception context can be cleared, resulting in None - being attempted to be saved after an exception handler is run. This - can happen when eventlet switches greenthreads or when running an - exception handler, code raises and catches an exception. In both - cases the exception context will be cleared. - - To work around this, we save the exception state, yield a failure and - then run other code. - - For example:: - - except Exception: - with capture_failure() as fail: - LOG.warn("Activating cleanup") - cleanup() - save_failure(fail) - """ - exc_info = sys.exc_info() - if not any(exc_info): - raise RuntimeError("No active exception is being handled") - else: - yield Failure(exc_info=exc_info) - - -class Failure(object): - """Object that represents failure. - - Failure objects encapsulate exception information so that - it can be re-used later to re-raise or inspect. - """ - DICT_VERSION = 1 - - def __init__(self, exc_info=None, **kwargs): - if not kwargs: - if exc_info is None: - exc_info = sys.exc_info() - self._exc_info = exc_info - self._exc_type_names = list( - reflection.get_all_class_names(exc_info[0], up_to=Exception)) - if not self._exc_type_names: - raise TypeError('Invalid exception type: %r' % exc_info[0]) - self._exception_str = exc.exception_message(self._exc_info[1]) - self._traceback_str = ''.join( - traceback.format_tb(self._exc_info[2])) - else: - self._exc_info = exc_info # may be None - self._exception_str = kwargs.pop('exception_str') - self._exc_type_names = kwargs.pop('exc_type_names', []) - self._traceback_str = kwargs.pop('traceback_str', None) - if kwargs: - raise TypeError( - 'Failure.__init__ got unexpected keyword argument(s): %s' - % ', '.join(six.iterkeys(kwargs))) - - @classmethod - def from_exception(cls, exception): - return cls((type(exception), exception, None)) - - def _matches(self, other): - if self is other: - return True - return (self._exc_type_names == other._exc_type_names - and self.exception_str == other.exception_str - and self.traceback_str == other.traceback_str) - - def matches(self, other): - if not isinstance(other, Failure): - return False - if self.exc_info is None or other.exc_info is None: - return self._matches(other) - else: - return self == other - - def __eq__(self, other): - if not isinstance(other, Failure): - return NotImplemented - return (self._matches(other) and - are_equal_exc_info_tuples(self.exc_info, other.exc_info)) - - def __ne__(self, other): - return not (self == other) - - # NOTE(imelnikov): obj.__hash__() should return same values for equal - # objects, so we should redefine __hash__. Failure equality semantics - # is a bit complicated, so for now we just mark Failure objects as - # unhashable. See python docs on object.__hash__ for more info: - # http://docs.python.org/2/reference/datamodel.html#object.__hash__ - __hash__ = None - - @property - def exception(self): - """Exception value, or None if exception value is not present. - - Exception value may be lost during serialization. - """ - if self._exc_info: - return self._exc_info[1] - else: - return None - - @property - def exception_str(self): - """String representation of exception.""" - return self._exception_str - - @property - def exc_info(self): - """Exception info tuple or None.""" - return self._exc_info - - @property - def traceback_str(self): - """Exception traceback as string.""" - return self._traceback_str - - @staticmethod - def reraise_if_any(failures): - """Re-raise exceptions if argument is not empty. - - If argument is empty list, this method returns None. If - argument is list with single Failure object in it, - this failure is reraised. Else, WrappedFailure exception - is raised with failures list as causes. - """ - failures = list(failures) - if len(failures) == 1: - failures[0].reraise() - elif len(failures) > 1: - raise exc.WrappedFailure(failures) - - def reraise(self): - """Re-raise captured exception.""" - if self._exc_info: - six.reraise(*self._exc_info) - else: - raise exc.WrappedFailure([self]) - - def check(self, *exc_classes): - """Check if any of exc_classes caused the failure. - - Arguments of this method can be exception types or type - names (stings). If captured exception is instance of - exception of given type, the corresponding argument is - returned. Else, None is returned. - """ - for cls in exc_classes: - if isinstance(cls, type): - err = reflection.get_class_name(cls) - else: - err = cls - if err in self._exc_type_names: - return cls - return None - - def __str__(self): - return 'Failure: %s: %s' % (self._exc_type_names[0], - self._exception_str) - - def __iter__(self): - """Iterate over exception type names.""" - for et in self._exc_type_names: - yield et - - @classmethod - def from_dict(cls, data): - data = dict(data) - version = data.pop('version', None) - if version != cls.DICT_VERSION: - raise ValueError('Invalid dict version of failure object: %r' - % version) - return cls(**data) - - def to_dict(self): - return { - 'exception_str': self.exception_str, - 'traceback_str': self.traceback_str, - 'exc_type_names': list(self), - 'version': self.DICT_VERSION, - } - - def copy(self): - return Failure(exc_info=copy_exc_info(self.exc_info), - exception_str=self.exception_str, - traceback_str=self.traceback_str, - exc_type_names=self._exc_type_names[:])