From 3c9871d8c3e5746db23a8749df0123818c6d9f55 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Sat, 18 Oct 2014 19:22:08 -0700 Subject: [PATCH] Remove direct usage of the deprecated failure location Internally we should be using the new location and not the deprecated location wherever possible. This avoids emitting warnings messages on our own code, which is a dirty habit. Change-Id: Idac5a772eca7529d92542ada3be1cea092880e25 --- doc/source/arguments_and_results.rst | 16 ++++---- doc/source/workers.rst | 16 ++++---- taskflow/engines/action_engine/engine.py | 5 ++- taskflow/engines/action_engine/executor.py | 6 +-- .../engines/action_engine/retry_action.py | 10 ++--- taskflow/engines/action_engine/runner.py | 8 ++-- taskflow/engines/action_engine/runtime.py | 5 ++- taskflow/engines/action_engine/task_action.py | 6 +-- taskflow/engines/worker_based/executor.py | 6 +-- taskflow/engines/worker_based/protocol.py | 10 ++--- taskflow/engines/worker_based/server.py | 13 +++--- taskflow/examples/wrapped_exception.py | 18 ++++----- taskflow/exceptions.py | 4 +- taskflow/listeners/base.py | 4 +- taskflow/listeners/logging.py | 8 ++-- .../persistence/backends/impl_sqlalchemy.py | 3 +- taskflow/persistence/logbook.py | 10 ++--- taskflow/storage.py | 3 +- taskflow/tests/unit/persistence/base.py | 15 ++++--- taskflow/tests/unit/test_engines.py | 4 +- taskflow/tests/unit/test_failure.py | 3 +- taskflow/tests/unit/test_storage.py | 40 +++++++++---------- .../tests/unit/worker_based/test_executor.py | 8 ++-- .../tests/unit/worker_based/test_pipeline.py | 4 +- .../tests/unit/worker_based/test_protocol.py | 15 +++---- .../tests/unit/worker_based/test_server.py | 26 ++++++------ taskflow/tests/utils.py | 4 +- taskflow/types/failure.py | 2 +- 28 files changed, 138 insertions(+), 134 deletions(-) diff --git a/doc/source/arguments_and_results.rst b/doc/source/arguments_and_results.rst index d7b96095..3082e2fc 100644 --- a/doc/source/arguments_and_results.rst +++ b/doc/source/arguments_and_results.rst @@ -350,7 +350,7 @@ For ``result`` value, two cases are possible: * If the task is being reverted because it failed (an exception was raised from its |task.execute| method), the ``result`` value is an instance of a - :py:class:`~taskflow.utils.misc.Failure` object that holds the exception + :py:class:`~taskflow.types.failure.Failure` object that holds the exception information. * If the task is being reverted because some other task failed, and this task @@ -361,9 +361,9 @@ All other arguments are fetched from storage in the same way it is done for |task.execute| method. To determine if a task failed you can check whether ``result`` is instance of -:py:class:`~taskflow.utils.misc.Failure`:: +:py:class:`~taskflow.types.failure.Failure`:: - from taskflow.utils import misc + from taskflow.types import failure class RevertingTask(task.Task): @@ -371,7 +371,7 @@ To determine if a task failed you can check whether ``result`` is instance of return do_something(spam, eggs) def revert(self, result, spam, eggs): - if isinstance(result, misc.Failure): + if isinstance(result, failure.Failure): print("This task failed, exception: %s" % result.exception_str) else: @@ -389,7 +389,7 @@ A |Retry| controller works with arguments in the same way as a |Task|. But it has an additional parameter ``'history'`` that is a list of tuples. Each tuple contains a result of the previous retry run and a table where the key is a failed task and the value is a -:py:class:`~taskflow.utils.misc.Failure` object. +:py:class:`~taskflow.types.failure.Failure` object. Consider the following implementation:: @@ -412,7 +412,7 @@ Imagine the above retry had returned a value ``'5'`` and then some task ``'A'`` failed with some exception. In this case the above retrys ``on_failure`` method will receive the following history:: - [('5', {'A': misc.Failure()})] + [('5', {'A': failure.Failure()})] At this point (since the implementation returned ``RETRY``) the |retry.execute| method will be called again and it will receive the same @@ -421,10 +421,10 @@ there behavior. If instead the |retry.execute| method raises an exception, the |retry.revert| method of the implementation will be called and -a :py:class:`~taskflow.utils.misc.Failure` object will be present in the +a :py:class:`~taskflow.types.failure.Failure` object will be present in the history instead of the typical result:: - [('5', {'A': misc.Failure()}), (misc.Failure(), {})] + [('5', {'A': failure.Failure()}), (failure.Failure(), {})] .. note:: diff --git a/doc/source/workers.rst b/doc/source/workers.rst index caac6aa0..4bb5b362 100644 --- a/doc/source/workers.rst +++ b/doc/source/workers.rst @@ -135,7 +135,7 @@ engine executor in the following manner: executes the task). 2. If dispatched succeeded then the worker sends a confirmation response to the executor otherwise the worker sends a failed response along with - a serialized :py:class:`failure ` object + a serialized :py:class:`failure ` object that contains what has failed (and why). 3. The worker executes the task and once it is finished sends the result back to the originating executor (every time a task progress event is @@ -152,11 +152,11 @@ engine executor in the following manner: .. note:: - :py:class:`~taskflow.utils.misc.Failure` objects are not json-serializable - (they contain references to tracebacks which are not serializable), so they - are converted to dicts before sending and converted from dicts after - receiving on both executor & worker sides (this translation is lossy since - the traceback won't be fully retained). + :py:class:`~taskflow.types.failure.Failure` objects are not directly + json-serializable (they contain references to tracebacks which are not + serializable), so they are converted to dicts before sending and converted + from dicts after receiving on both executor & worker sides (this + translation is lossy since the traceback won't be fully retained). Executor request format ~~~~~~~~~~~~~~~~~~~~~~~ @@ -165,7 +165,7 @@ Executor request format * **action** - task action to be performed (e.g. execute, revert) * **arguments** - arguments the task action to be called with * **result** - task execution result (result or - :py:class:`~taskflow.utils.misc.Failure`) *[passed to revert only]* + :py:class:`~taskflow.types.failure.Failure`) *[passed to revert only]* Additionally, the following parameters are added to the request message: @@ -222,7 +222,7 @@ When **failed:** { "event": , - "result": , + "result": , "state": "FAILURE" } diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 22db3ed7..a501c45d 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -26,6 +26,7 @@ from taskflow.engines import base from taskflow import exceptions as exc from taskflow import states from taskflow import storage as atom_storage +from taskflow.types import failure from taskflow.utils import lock_utils from taskflow.utils import misc from taskflow.utils import reflection @@ -129,7 +130,7 @@ class ActionEngine(base.EngineBase): closed = False for (last_state, failures) in runner.run_iter(timeout=timeout): if failures: - misc.Failure.reraise_if_any(failures) + failure.Failure.reraise_if_any(failures) if closed: continue try: @@ -152,7 +153,7 @@ class ActionEngine(base.EngineBase): self._change_state(last_state) if last_state not in [states.SUSPENDED, states.SUCCESS]: failures = self.storage.get_failures() - misc.Failure.reraise_if_any(failures.values()) + failure.Failure.reraise_if_any(failures.values()) def _change_state(self, state): with self._state_lock: diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index 83da3b6b..78d16ff9 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -19,9 +19,9 @@ import abc import six from taskflow import task as _task +from taskflow.types import failure from taskflow.types import futures from taskflow.utils import async_utils -from taskflow.utils import misc from taskflow.utils import threading_utils # Execution and reversion events. @@ -37,7 +37,7 @@ def _execute_task(task, arguments, progress_callback): except Exception: # NOTE(imelnikov): wrap current exception with Failure # object and return it. - result = misc.Failure() + result = failure.Failure() finally: task.post_execute() return (task, EXECUTED, result) @@ -54,7 +54,7 @@ def _revert_task(task, arguments, result, failures, progress_callback): except Exception: # NOTE(imelnikov): wrap current exception with Failure # object and return it. - result = misc.Failure() + result = failure.Failure() finally: task.post_revert() return (task, REVERTED, result) diff --git a/taskflow/engines/action_engine/retry_action.py b/taskflow/engines/action_engine/retry_action.py index 3bf6f491..0ffa4b2a 100644 --- a/taskflow/engines/action_engine/retry_action.py +++ b/taskflow/engines/action_engine/retry_action.py @@ -18,8 +18,8 @@ import logging from taskflow.engines.action_engine import executor as ex from taskflow import states +from taskflow.types import failure from taskflow.types import futures -from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -65,12 +65,12 @@ class RetryAction(object): try: result = retry.execute(**kwargs) except Exception: - result = misc.Failure() + result = failure.Failure() return (retry, ex.EXECUTED, result) def _on_done_callback(fut): result = fut.result()[-1] - if isinstance(result, misc.Failure): + if isinstance(result, failure.Failure): self.change_state(retry, states.FAILURE, result=result) else: self.change_state(retry, states.SUCCESS, result=result) @@ -88,12 +88,12 @@ class RetryAction(object): try: result = retry.revert(**kwargs) except Exception: - result = misc.Failure() + result = failure.Failure() return (retry, ex.REVERTED, result) def _on_done_callback(fut): result = fut.result()[-1] - if isinstance(result, misc.Failure): + if isinstance(result, failure.Failure): self.change_state(retry, states.FAILURE) else: self.change_state(retry, states.REVERTED) diff --git a/taskflow/engines/action_engine/runner.py b/taskflow/engines/action_engine/runner.py index c2b1788a..79ebc657 100644 --- a/taskflow/engines/action_engine/runner.py +++ b/taskflow/engines/action_engine/runner.py @@ -17,8 +17,8 @@ import logging from taskflow import states as st +from taskflow.types import failure from taskflow.types import fsm -from taskflow.utils import misc # Waiting state timeout (in seconds). _WAITING_TIMEOUT = 60 @@ -132,15 +132,15 @@ class _MachineBuilder(object): try: node, event, result = fut.result() retain = self._completer.complete(node, event, result) - if retain and isinstance(result, misc.Failure): + if retain and isinstance(result, failure.Failure): memory.failures.append(result) except Exception: - memory.failures.append(misc.Failure()) + memory.failures.append(failure.Failure()) else: try: more_nodes = self._analyzer.get_next_nodes(node) except Exception: - memory.failures.append(misc.Failure()) + memory.failures.append(failure.Failure()) else: next_nodes.update(more_nodes) if self.runnable() and next_nodes and not memory.failures: diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py index c0c58367..06959f29 100644 --- a/taskflow/engines/action_engine/runtime.py +++ b/taskflow/engines/action_engine/runtime.py @@ -24,6 +24,7 @@ from taskflow import exceptions as excp from taskflow import retry as retry_atom from taskflow import states as st from taskflow import task as task_atom +from taskflow.types import failure from taskflow.utils import misc @@ -155,7 +156,7 @@ class Completer(object): """ if isinstance(node, task_atom.BaseTask): self._complete_task(node, event, result) - if isinstance(result, misc.Failure): + if isinstance(result, failure.Failure): if event == ex.EXECUTED: self._process_atom_failure(node, result) else: @@ -270,5 +271,5 @@ class Scheduler(object): # Immediately stop scheduling future work so that we can # exit execution early (rather than later) if a single task # fails to schedule correctly. - return (futures, [misc.Failure()]) + return (futures, [failure.Failure()]) return (futures, []) diff --git a/taskflow/engines/action_engine/task_action.py b/taskflow/engines/action_engine/task_action.py index 3503df7c..eb9510f9 100644 --- a/taskflow/engines/action_engine/task_action.py +++ b/taskflow/engines/action_engine/task_action.py @@ -17,7 +17,7 @@ import logging from taskflow import states -from taskflow.utils import misc +from taskflow.types import failure LOG = logging.getLogger(__name__) @@ -91,7 +91,7 @@ class TaskAction(object): self._on_update_progress) def complete_execution(self, task, result): - if isinstance(result, misc.Failure): + if isinstance(result, failure.Failure): self.change_state(task, states.FAILURE, result=result) else: self.change_state(task, states.SUCCESS, @@ -112,7 +112,7 @@ class TaskAction(object): return future def complete_reversion(self, task, rev_result): - if isinstance(rev_result, misc.Failure): + if isinstance(rev_result, failure.Failure): self.change_state(task, states.FAILURE) else: self.change_state(task, states.REVERTED, progress=1.0) diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index 235f3c93..ae8e0e40 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -172,9 +172,9 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): " seconds for it to transition out of (%s) states" % (request, request_age, ", ".join(pr.WAITING_STATES))) except exc.RequestTimeout: - with misc.capture_failure() as fail: - LOG.debug(fail.exception_str) - request.set_result(fail) + with misc.capture_failure() as failure: + LOG.debug(failure.exception_str) + request.set_result(failure) def _on_wait(self): """This function is called cyclically between draining events.""" diff --git a/taskflow/engines/worker_based/protocol.py b/taskflow/engines/worker_based/protocol.py index a97240a9..3cd7e178 100644 --- a/taskflow/engines/worker_based/protocol.py +++ b/taskflow/engines/worker_based/protocol.py @@ -26,9 +26,9 @@ import six from taskflow.engines.action_engine import executor from taskflow import exceptions as excp +from taskflow.types import failure as ft from taskflow.types import timing as tt from taskflow.utils import lock_utils -from taskflow.utils import misc from taskflow.utils import reflection # NOTE(skudriashev): This is protocol states and events, which are not @@ -270,15 +270,15 @@ class Request(Message): """Return json-serializable request. To convert requests that have failed due to some exception this will - convert all `misc.Failure` objects into dictionaries (which will then - be reconstituted by the receiver). + convert all `failure.Failure` objects into dictionaries (which will + then be reconstituted by the receiver). """ request = dict(task_cls=self._task_cls, task_name=self._task.name, task_version=self._task.version, action=self._action, arguments=self._arguments) if 'result' in self._kwargs: result = self._kwargs['result'] - if isinstance(result, misc.Failure): + if isinstance(result, ft.Failure): request['result'] = ('failure', result.to_dict()) else: request['result'] = ('success', result) @@ -417,7 +417,7 @@ class Response(Message): state = data['state'] data = data['data'] if state == FAILURE and 'result' in data: - data['result'] = misc.Failure.from_dict(data['result']) + data['result'] = ft.Failure.from_dict(data['result']) return cls(state, **data) @property diff --git a/taskflow/engines/worker_based/server.py b/taskflow/engines/worker_based/server.py index 9440c96a..db61edc6 100644 --- a/taskflow/engines/worker_based/server.py +++ b/taskflow/engines/worker_based/server.py @@ -21,6 +21,7 @@ import six from taskflow.engines.worker_based import protocol as pr from taskflow.engines.worker_based import proxy +from taskflow.types import failure as ft from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -69,20 +70,20 @@ class Server(object): failures=None, **kwargs): """Parse request before it can be further processed. - All `misc.Failure` objects that have been converted to dict on the - remote side will now converted back to `misc.Failure` objects. + All `failure.Failure` objects that have been converted to dict on the + remote side will now converted back to `failure.Failure` objects. """ action_args = dict(arguments=arguments, task_name=task_name) if result is not None: data_type, data = result if data_type == 'failure': - action_args['result'] = misc.Failure.from_dict(data) + action_args['result'] = ft.Failure.from_dict(data) else: action_args['result'] = data if failures is not None: action_args['failures'] = {} - for k, v in failures.items(): - action_args['failures'][k] = misc.Failure.from_dict(v) + for key, data in six.iteritems(failures): + action_args['failures'][key] = ft.Failure.from_dict(data) return task_cls, action, action_args @staticmethod @@ -218,7 +219,7 @@ class Server(object): message.delivery_tag, exc_info=True) reply_callback(result=failure.to_dict()) else: - if isinstance(result, misc.Failure): + if isinstance(result, ft.Failure): reply_callback(result=result.to_dict()) else: reply_callback(state=pr.SUCCESS, result=result) diff --git a/taskflow/examples/wrapped_exception.py b/taskflow/examples/wrapped_exception.py index dff6b2b4..78b5ad06 100644 --- a/taskflow/examples/wrapped_exception.py +++ b/taskflow/examples/wrapped_exception.py @@ -33,7 +33,7 @@ from taskflow import exceptions from taskflow.patterns import unordered_flow as uf from taskflow import task from taskflow.tests import utils -from taskflow.utils import misc +from taskflow.types import failure import example_utils as eu # noqa @@ -96,15 +96,15 @@ def run(**store): engine='parallel') except exceptions.WrappedFailure as ex: unknown_failures = [] - for failure in ex: - if failure.check(FirstException): - print("Got FirstException: %s" % failure.exception_str) - elif failure.check(SecondException): - print("Got SecondException: %s" % failure.exception_str) + for a_failure in ex: + if a_failure.check(FirstException): + print("Got FirstException: %s" % a_failure.exception_str) + elif a_failure.check(SecondException): + print("Got SecondException: %s" % a_failure.exception_str) else: - print("Unknown failure: %s" % failure) - unknown_failures.append(failure) - misc.Failure.reraise_if_any(unknown_failures) + print("Unknown failure: %s" % a_failure) + unknown_failures.append(a_failure) + failure.Failure.reraise_if_any(unknown_failures) eu.print_wrapped("Raise and catch first exception only") diff --git a/taskflow/exceptions.py b/taskflow/exceptions.py index cdffe0f9..876a3e3b 100644 --- a/taskflow/exceptions.py +++ b/taskflow/exceptions.py @@ -178,8 +178,8 @@ class WrappedFailure(Exception): See the failure class documentation for a more comprehensive set of reasons why this object *may* be reraised instead of the original exception. - :param causes: the :py:class:`~taskflow.utils.misc.Failure` objects that - caused this this exception to be raised. + :param causes: the :py:class:`~taskflow.types.failure.Failure` objects + that caused this this exception to be raised. """ def __init__(self, causes): diff --git a/taskflow/listeners/base.py b/taskflow/listeners/base.py index ca2ee6e2..e1d475f7 100644 --- a/taskflow/listeners/base.py +++ b/taskflow/listeners/base.py @@ -23,8 +23,8 @@ from oslo.utils import excutils import six from taskflow import states +from taskflow.types import failure from taskflow.types import notifier -from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -142,7 +142,7 @@ class LoggingBase(ListenerBase): result = details.get('result') exc_info = None was_failure = False - if isinstance(result, misc.Failure): + if isinstance(result, failure.Failure): if result.exc_info: exc_info = tuple(result.exc_info) was_failure = True diff --git a/taskflow/listeners/logging.py b/taskflow/listeners/logging.py index 87528f37..3629bb2c 100644 --- a/taskflow/listeners/logging.py +++ b/taskflow/listeners/logging.py @@ -21,8 +21,8 @@ import sys from taskflow.listeners import base from taskflow import states +from taskflow.types import failure from taskflow.types import notifier -from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -92,8 +92,8 @@ class DynamicLoggingListener(base.ListenerBase): * ``states.RETRYING`` * ``states.REVERTING`` - When a task produces a :py:class:`~taskflow.utils.misc.Failure` object as - its result (typically this happens when a task raises an exception) this + When a task produces a :py:class:`~taskflow.types.failure.Failure` object + as its result (typically this happens when a task raises an exception) this will **always** switch the logger to use ``logging.WARNING`` (if the failure object contains a ``exc_info`` tuple this will also be logged to provide a meaningful traceback). @@ -130,7 +130,7 @@ class DynamicLoggingListener(base.ListenerBase): # If the task failed, it's useful to show the exception traceback # and any other available exception information. result = details.get('result') - if isinstance(result, misc.Failure): + if isinstance(result, failure.Failure): if result.exc_info: exc_info = result.exc_info manual_tb = '' diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py index 29ab8c97..4b12b782 100644 --- a/taskflow/persistence/backends/impl_sqlalchemy.py +++ b/taskflow/persistence/backends/impl_sqlalchemy.py @@ -37,6 +37,7 @@ from taskflow.persistence.backends import base from taskflow.persistence.backends.sqlalchemy import migration from taskflow.persistence.backends.sqlalchemy import models from taskflow.persistence import logbook +from taskflow.types import failure from taskflow.utils import async_utils from taskflow.utils import misc @@ -328,7 +329,7 @@ class Connection(base.Connection): pass except sa_exc.OperationalError as ex: if _is_db_connection_error(six.text_type(ex.args[0])): - failures.append(misc.Failure()) + failures.append(failure.Failure()) return False return True diff --git a/taskflow/persistence/logbook.py b/taskflow/persistence/logbook.py index 3d60aa52..f66f68ef 100644 --- a/taskflow/persistence/logbook.py +++ b/taskflow/persistence/logbook.py @@ -25,7 +25,7 @@ import six from taskflow import exceptions as exc from taskflow.openstack.common import uuidutils from taskflow import states -from taskflow.utils import misc +from taskflow.types import failure as ft LOG = logging.getLogger(__name__) @@ -50,7 +50,7 @@ def _safe_unmarshal_time(when): def _was_failure(state, result): - return state == states.FAILURE and isinstance(result, misc.Failure) + return state == states.FAILURE and isinstance(result, ft.Failure) def _fix_meta(data): @@ -363,7 +363,7 @@ class AtomDetail(object): self.meta = _fix_meta(data) failure = data.get('failure') if failure: - self.failure = misc.Failure.from_dict(failure) + self.failure = ft.Failure.from_dict(failure) @property def uuid(self): @@ -467,8 +467,8 @@ class RetryDetail(AtomDetail): new_results = [] for (data, failures) in results: new_failures = {} - for (key, failure_data) in six.iteritems(failures): - new_failures[key] = misc.Failure.from_dict(failure_data) + for (key, data) in six.iteritems(failures): + new_failures[key] = ft.Failure.from_dict(data) new_results.append((data, new_failures)) return new_results diff --git a/taskflow/storage.py b/taskflow/storage.py index 6ee10f60..c667509b 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -26,6 +26,7 @@ from taskflow.persistence import logbook from taskflow import retry from taskflow import states from taskflow import task +from taskflow.types import failure from taskflow.utils import lock_utils from taskflow.utils import misc from taskflow.utils import reflection @@ -425,7 +426,7 @@ class Storage(object): with self._lock.write_lock(): ad = self._atomdetail_by_name(atom_name) ad.put(state, data) - if state == states.FAILURE and isinstance(data, misc.Failure): + if state == states.FAILURE and isinstance(data, failure.Failure): # NOTE(imelnikov): failure serialization looses information, # so we cache failures here, in atom name -> failure mapping. self._failures[ad.name] = data diff --git a/taskflow/tests/unit/persistence/base.py b/taskflow/tests/unit/persistence/base.py index 3d28695c..6d96df66 100644 --- a/taskflow/tests/unit/persistence/base.py +++ b/taskflow/tests/unit/persistence/base.py @@ -20,7 +20,7 @@ from taskflow import exceptions as exc from taskflow.openstack.common import uuidutils from taskflow.persistence import logbook from taskflow import states -from taskflow.utils import misc +from taskflow.types import failure class PersistenceTestMixin(object): @@ -147,7 +147,7 @@ class PersistenceTestMixin(object): try: raise RuntimeError('Woot!') except Exception: - td.failure = misc.Failure() + td.failure = failure.Failure() fd.add(td) @@ -161,10 +161,9 @@ class PersistenceTestMixin(object): lb2 = conn.get_logbook(lb_id) fd2 = lb2.find(fd.uuid) td2 = fd2.find(td.uuid) - failure = td2.failure - self.assertEqual(failure.exception_str, 'Woot!') - self.assertIs(failure.check(RuntimeError), RuntimeError) - self.assertEqual(failure.traceback_str, td.failure.traceback_str) + self.assertEqual(td2.failure.exception_str, 'Woot!') + self.assertIs(td2.failure.check(RuntimeError), RuntimeError) + self.assertEqual(td2.failure.traceback_str, td.failure.traceback_str) self.assertIsInstance(td2, logbook.TaskDetail) def test_logbook_merge_flow_detail(self): @@ -269,7 +268,7 @@ class PersistenceTestMixin(object): fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid()) lb.add(fd) rd = logbook.RetryDetail("retry-1", uuid=uuidutils.generate_uuid()) - fail = misc.Failure.from_exception(RuntimeError('fail')) + fail = failure.Failure.from_exception(RuntimeError('fail')) rd.results.append((42, {'some-task': fail})) fd.add(rd) @@ -286,7 +285,7 @@ class PersistenceTestMixin(object): rd2 = fd2.find(rd.uuid) self.assertIsInstance(rd2, logbook.RetryDetail) fail2 = rd2.results[0][1].get('some-task') - self.assertIsInstance(fail2, misc.Failure) + self.assertIsInstance(fail2, failure.Failure) self.assertTrue(fail.matches(fail2)) def test_retry_detail_save_intention(self): diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index 0823002d..d1c758b4 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -32,10 +32,10 @@ from taskflow import states from taskflow import task from taskflow import test from taskflow.tests import utils +from taskflow.types import failure from taskflow.types import futures from taskflow.types import graph as gr from taskflow.utils import async_utils as au -from taskflow.utils import misc from taskflow.utils import persistence_utils as p_utils @@ -529,7 +529,7 @@ class EngineCheckingTaskTest(utils.EngineTestBase): self.assertEqual(result, 'RESULT') self.assertEqual(list(flow_failures.keys()), ['fail1']) fail = flow_failures['fail1'] - self.assertIsInstance(fail, misc.Failure) + self.assertIsInstance(fail, failure.Failure) self.assertEqual(str(fail), 'Failure: RuntimeError: Woot!') flow = lf.Flow('test').add( diff --git a/taskflow/tests/unit/test_failure.py b/taskflow/tests/unit/test_failure.py index 3f4d001e..64e1c44d 100644 --- a/taskflow/tests/unit/test_failure.py +++ b/taskflow/tests/unit/test_failure.py @@ -22,7 +22,6 @@ from taskflow import exceptions from taskflow import test from taskflow.tests import utils as test_utils from taskflow.types import failure -from taskflow.utils import misc def _captured_failure(msg): @@ -217,7 +216,7 @@ class FailureObjectTestCase(test.TestCase): def test_pformat_traceback_captured_no_exc_info(self): captured = _captured_failure('Woot!') - captured = misc.Failure.from_dict(captured.to_dict()) + captured = failure.Failure.from_dict(captured.to_dict()) text = captured.pformat(traceback=True) self.assertIn("Traceback (most recent call last):", text) diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py index 7d3b55b6..deb4db4f 100644 --- a/taskflow/tests/unit/test_storage.py +++ b/taskflow/tests/unit/test_storage.py @@ -25,7 +25,7 @@ from taskflow import states from taskflow import storage from taskflow import test from taskflow.tests import utils as test_utils -from taskflow.utils import misc +from taskflow.types import failure from taskflow.utils import persistence_utils as p_utils @@ -128,46 +128,46 @@ class StorageTestMixin(object): self.assertEqual(s.get_atom_state('my task'), states.FAILURE) def test_save_and_get_cached_failure(self): - failure = misc.Failure.from_exception(RuntimeError('Woot!')) + a_failure = failure.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() s.ensure_atom(test_utils.NoopTask('my task')) - s.save('my task', failure, states.FAILURE) - self.assertEqual(s.get('my task'), failure) + s.save('my task', a_failure, states.FAILURE) + self.assertEqual(s.get('my task'), a_failure) self.assertEqual(s.get_atom_state('my task'), states.FAILURE) self.assertTrue(s.has_failures()) - self.assertEqual(s.get_failures(), {'my task': failure}) + self.assertEqual(s.get_failures(), {'my task': a_failure}) def test_save_and_get_non_cached_failure(self): - failure = misc.Failure.from_exception(RuntimeError('Woot!')) + a_failure = failure.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() s.ensure_atom(test_utils.NoopTask('my task')) - s.save('my task', failure, states.FAILURE) - self.assertEqual(s.get('my task'), failure) + s.save('my task', a_failure, states.FAILURE) + self.assertEqual(s.get('my task'), a_failure) s._failures['my task'] = None - self.assertTrue(failure.matches(s.get('my task'))) + self.assertTrue(a_failure.matches(s.get('my task'))) def test_get_failure_from_reverted_task(self): - failure = misc.Failure.from_exception(RuntimeError('Woot!')) + a_failure = failure.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() s.ensure_atom(test_utils.NoopTask('my task')) - s.save('my task', failure, states.FAILURE) + s.save('my task', a_failure, states.FAILURE) s.set_atom_state('my task', states.REVERTING) - self.assertEqual(s.get('my task'), failure) + self.assertEqual(s.get('my task'), a_failure) s.set_atom_state('my task', states.REVERTED) - self.assertEqual(s.get('my task'), failure) + self.assertEqual(s.get('my task'), a_failure) def test_get_failure_after_reload(self): - failure = misc.Failure.from_exception(RuntimeError('Woot!')) + a_failure = failure.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() s.ensure_atom(test_utils.NoopTask('my task')) - s.save('my task', failure, states.FAILURE) + s.save('my task', a_failure, states.FAILURE) s2 = self._get_storage(s._flowdetail) self.assertTrue(s2.has_failures()) self.assertEqual(1, len(s2.get_failures())) - self.assertTrue(failure.matches(s2.get('my task'))) + self.assertTrue(a_failure.matches(s2.get('my task'))) self.assertEqual(s2.get_atom_state('my task'), states.FAILURE) def test_get_non_existing_var(self): @@ -486,15 +486,15 @@ class StorageTestMixin(object): self.assertEqual(s.fetch_all(), {}) def test_cached_retry_failure(self): - failure = misc.Failure.from_exception(RuntimeError('Woot!')) + a_failure = failure.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() s.ensure_atom(test_utils.NoopRetry('my retry', provides=['x'])) s.save('my retry', 'a') - s.save('my retry', failure, states.FAILURE) + s.save('my retry', a_failure, states.FAILURE) history = s.get_retry_history('my retry') - self.assertEqual(history, [('a', {}), (failure, {})]) + self.assertEqual(history, [('a', {}), (a_failure, {})]) self.assertIs(s.has_failures(), True) - self.assertEqual(s.get_failures(), {'my retry': failure}) + self.assertEqual(s.get_failures(), {'my retry': a_failure}) def test_logbook_get_unknown_atom_type(self): self.assertRaisesRegexp(TypeError, diff --git a/taskflow/tests/unit/worker_based/test_executor.py b/taskflow/tests/unit/worker_based/test_executor.py index 3e494e88..d2b97bfe 100644 --- a/taskflow/tests/unit/worker_based/test_executor.py +++ b/taskflow/tests/unit/worker_based/test_executor.py @@ -24,7 +24,7 @@ from taskflow.engines.worker_based import protocol as pr from taskflow import test from taskflow.test import mock from taskflow.tests import utils as test_utils -from taskflow.utils import misc +from taskflow.types import failure from taskflow.utils import threading_utils @@ -111,8 +111,8 @@ class TestWorkerTaskExecutor(test.MockTestCase): [mock.call.on_progress(progress=1.0)]) def test_on_message_response_state_failure(self): - failure = misc.Failure.from_exception(Exception('test')) - failure_dict = failure.to_dict() + a_failure = failure.Failure.from_exception(Exception('test')) + failure_dict = a_failure.to_dict() response = pr.Response(pr.FAILURE, result=failure_dict) ex = self.executor() ex._requests_cache[self.task_uuid] = self.request_inst_mock @@ -121,7 +121,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): self.assertEqual(len(ex._requests_cache), 0) expected_calls = [ mock.call.transition_and_log_error(pr.FAILURE, logger=mock.ANY), - mock.call.set_result(result=test_utils.FailureMatcher(failure)) + mock.call.set_result(result=test_utils.FailureMatcher(a_failure)) ] self.assertEqual(expected_calls, self.request_inst_mock.mock_calls) diff --git a/taskflow/tests/unit/worker_based/test_pipeline.py b/taskflow/tests/unit/worker_based/test_pipeline.py index ae11efd2..b86cedd0 100644 --- a/taskflow/tests/unit/worker_based/test_pipeline.py +++ b/taskflow/tests/unit/worker_based/test_pipeline.py @@ -24,7 +24,7 @@ from taskflow.engines.worker_based import server as worker_server from taskflow.openstack.common import uuidutils from taskflow import test from taskflow.tests import utils as test_utils -from taskflow.utils import misc +from taskflow.types import failure TEST_EXCHANGE, TEST_TOPIC = ('test-exchange', 'test-topic') @@ -94,5 +94,5 @@ class TestPipeline(test.TestCase): executor.wait_for_any([f]) _t2, _action, result = f.result() - self.assertIsInstance(result, misc.Failure) + self.assertIsInstance(result, failure.Failure) self.assertEqual(RuntimeError, result.check(RuntimeError)) diff --git a/taskflow/tests/unit/worker_based/test_protocol.py b/taskflow/tests/unit/worker_based/test_protocol.py index 6df2bb41..4c9c4b77 100644 --- a/taskflow/tests/unit/worker_based/test_protocol.py +++ b/taskflow/tests/unit/worker_based/test_protocol.py @@ -23,7 +23,7 @@ from taskflow.openstack.common import uuidutils from taskflow import test from taskflow.test import mock from taskflow.tests import utils -from taskflow.utils import misc +from taskflow.types import failure class TestProtocolValidation(test.TestCase): @@ -149,15 +149,16 @@ class TestProtocol(test.TestCase): self.request_to_dict(result=('success', None))) def test_to_dict_with_result_failure(self): - failure = misc.Failure.from_exception(RuntimeError('Woot!')) - expected = self.request_to_dict(result=('failure', failure.to_dict())) - self.assertEqual(self.request(result=failure).to_dict(), expected) + a_failure = failure.Failure.from_exception(RuntimeError('Woot!')) + expected = self.request_to_dict(result=('failure', + a_failure.to_dict())) + self.assertEqual(self.request(result=a_failure).to_dict(), expected) def test_to_dict_with_failures(self): - failure = misc.Failure.from_exception(RuntimeError('Woot!')) - request = self.request(failures={self.task.name: failure}) + a_failure = failure.Failure.from_exception(RuntimeError('Woot!')) + request = self.request(failures={self.task.name: a_failure}) expected = self.request_to_dict( - failures={self.task.name: failure.to_dict()}) + failures={self.task.name: a_failure.to_dict()}) self.assertEqual(request.to_dict(), expected) def test_pending_not_expired(self): diff --git a/taskflow/tests/unit/worker_based/test_server.py b/taskflow/tests/unit/worker_based/test_server.py index b6e62671..5e9129ae 100644 --- a/taskflow/tests/unit/worker_based/test_server.py +++ b/taskflow/tests/unit/worker_based/test_server.py @@ -22,7 +22,7 @@ from taskflow.engines.worker_based import server from taskflow import test from taskflow.test import mock from taskflow.tests import utils -from taskflow.utils import misc +from taskflow.types import failure class TestServer(test.MockTestCase): @@ -121,19 +121,19 @@ class TestServer(test.MockTestCase): result=1))) def test_parse_request_with_failure_result(self): - failure = misc.Failure.from_exception(Exception('test')) - request = self.make_request(action='revert', result=failure) + a_failure = failure.Failure.from_exception(Exception('test')) + request = self.make_request(action='revert', result=a_failure) task_cls, action, task_args = server.Server._parse_request(**request) self.assertEqual((task_cls, action, task_args), (self.task.name, 'revert', dict(task_name=self.task.name, arguments=self.task_args, - result=utils.FailureMatcher(failure)))) + result=utils.FailureMatcher(a_failure)))) def test_parse_request_with_failures(self): - failures = {'0': misc.Failure.from_exception(Exception('test1')), - '1': misc.Failure.from_exception(Exception('test2'))} + failures = {'0': failure.Failure.from_exception(Exception('test1')), + '1': failure.Failure.from_exception(Exception('test2'))} request = self.make_request(action='revert', failures=failures) task_cls, action, task_args = server.Server._parse_request(**request) @@ -220,16 +220,16 @@ class TestServer(test.MockTestCase): self.assertEqual(self.master_mock.mock_calls, []) self.assertTrue(mocked_exception.called) - @mock.patch.object(misc.Failure, 'from_dict') - @mock.patch.object(misc.Failure, 'to_dict') + @mock.patch.object(failure.Failure, 'from_dict') + @mock.patch.object(failure.Failure, 'to_dict') def test_process_request_parse_request_failure(self, to_mock, from_mock): failure_dict = { 'failure': 'failure', } - failure = misc.Failure.from_exception(RuntimeError('Woot!')) + a_failure = failure.Failure.from_exception(RuntimeError('Woot!')) to_mock.return_value = failure_dict from_mock.side_effect = ValueError('Woot!') - request = self.make_request(result=failure) + request = self.make_request(result=a_failure) # create server and process request s = self.server(reset_master_mock=True) @@ -244,7 +244,7 @@ class TestServer(test.MockTestCase): ] self.assertEqual(master_mock_calls, self.master_mock.mock_calls) - @mock.patch.object(misc.Failure, 'to_dict') + @mock.patch.object(failure.Failure, 'to_dict') def test_process_request_endpoint_not_found(self, to_mock): failure_dict = { 'failure': 'failure', @@ -265,7 +265,7 @@ class TestServer(test.MockTestCase): ] self.assertEqual(self.master_mock.mock_calls, master_mock_calls) - @mock.patch.object(misc.Failure, 'to_dict') + @mock.patch.object(failure.Failure, 'to_dict') def test_process_request_execution_failure(self, to_mock): failure_dict = { 'failure': 'failure', @@ -287,7 +287,7 @@ class TestServer(test.MockTestCase): ] self.assertEqual(self.master_mock.mock_calls, master_mock_calls) - @mock.patch.object(misc.Failure, 'to_dict') + @mock.patch.object(failure.Failure, 'to_dict') def test_process_request_task_failure(self, to_mock): failure_dict = { 'failure': 'failure', diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index d01f91a3..a0b2ff0d 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -23,8 +23,8 @@ from taskflow import exceptions from taskflow.persistence.backends import impl_memory from taskflow import retry from taskflow import task +from taskflow.types import failure from taskflow.utils import kazoo_utils -from taskflow.utils import misc from taskflow.utils import threading_utils ARGS_KEY = '__args__' @@ -50,7 +50,7 @@ def wrap_all_failures(): try: yield except Exception: - raise exceptions.WrappedFailure([misc.Failure()]) + raise exceptions.WrappedFailure([failure.Failure()]) def zookeeper_available(min_version, timeout=3): diff --git a/taskflow/types/failure.py b/taskflow/types/failure.py index 93829ad6..c1f28be8 100644 --- a/taskflow/types/failure.py +++ b/taskflow/types/failure.py @@ -68,7 +68,7 @@ class Failure(object): exception and desire to reraise it to the user/caller of the WBE based engine for appropriate handling (this matches the behavior of non-remote engines). To accomplish this a failure object (or a - :py:meth:`~misc.Failure.to_dict` form) would be sent over the WBE channel + :py:meth:`~.Failure.to_dict` form) would be sent over the WBE channel and the WBE based engine would deserialize it and use this objects :meth:`.reraise` method to cause an exception that contains similar/equivalent information as the original exception to be reraised,