diff --git a/doc/source/arguments_and_results.rst b/doc/source/arguments_and_results.rst index d7b960950..3082e2fc3 100644 --- a/doc/source/arguments_and_results.rst +++ b/doc/source/arguments_and_results.rst @@ -350,7 +350,7 @@ For ``result`` value, two cases are possible: * If the task is being reverted because it failed (an exception was raised from its |task.execute| method), the ``result`` value is an instance of a - :py:class:`~taskflow.utils.misc.Failure` object that holds the exception + :py:class:`~taskflow.types.failure.Failure` object that holds the exception information. * If the task is being reverted because some other task failed, and this task @@ -361,9 +361,9 @@ All other arguments are fetched from storage in the same way it is done for |task.execute| method. To determine if a task failed you can check whether ``result`` is instance of -:py:class:`~taskflow.utils.misc.Failure`:: +:py:class:`~taskflow.types.failure.Failure`:: - from taskflow.utils import misc + from taskflow.types import failure class RevertingTask(task.Task): @@ -371,7 +371,7 @@ To determine if a task failed you can check whether ``result`` is instance of return do_something(spam, eggs) def revert(self, result, spam, eggs): - if isinstance(result, misc.Failure): + if isinstance(result, failure.Failure): print("This task failed, exception: %s" % result.exception_str) else: @@ -389,7 +389,7 @@ A |Retry| controller works with arguments in the same way as a |Task|. But it has an additional parameter ``'history'`` that is a list of tuples. Each tuple contains a result of the previous retry run and a table where the key is a failed task and the value is a -:py:class:`~taskflow.utils.misc.Failure` object. +:py:class:`~taskflow.types.failure.Failure` object. Consider the following implementation:: @@ -412,7 +412,7 @@ Imagine the above retry had returned a value ``'5'`` and then some task ``'A'`` failed with some exception. In this case the above retrys ``on_failure`` method will receive the following history:: - [('5', {'A': misc.Failure()})] + [('5', {'A': failure.Failure()})] At this point (since the implementation returned ``RETRY``) the |retry.execute| method will be called again and it will receive the same @@ -421,10 +421,10 @@ there behavior. If instead the |retry.execute| method raises an exception, the |retry.revert| method of the implementation will be called and -a :py:class:`~taskflow.utils.misc.Failure` object will be present in the +a :py:class:`~taskflow.types.failure.Failure` object will be present in the history instead of the typical result:: - [('5', {'A': misc.Failure()}), (misc.Failure(), {})] + [('5', {'A': failure.Failure()}), (failure.Failure(), {})] .. note:: diff --git a/doc/source/workers.rst b/doc/source/workers.rst index caac6aa04..4bb5b3622 100644 --- a/doc/source/workers.rst +++ b/doc/source/workers.rst @@ -135,7 +135,7 @@ engine executor in the following manner: executes the task). 2. If dispatched succeeded then the worker sends a confirmation response to the executor otherwise the worker sends a failed response along with - a serialized :py:class:`failure ` object + a serialized :py:class:`failure ` object that contains what has failed (and why). 3. The worker executes the task and once it is finished sends the result back to the originating executor (every time a task progress event is @@ -152,11 +152,11 @@ engine executor in the following manner: .. note:: - :py:class:`~taskflow.utils.misc.Failure` objects are not json-serializable - (they contain references to tracebacks which are not serializable), so they - are converted to dicts before sending and converted from dicts after - receiving on both executor & worker sides (this translation is lossy since - the traceback won't be fully retained). + :py:class:`~taskflow.types.failure.Failure` objects are not directly + json-serializable (they contain references to tracebacks which are not + serializable), so they are converted to dicts before sending and converted + from dicts after receiving on both executor & worker sides (this + translation is lossy since the traceback won't be fully retained). Executor request format ~~~~~~~~~~~~~~~~~~~~~~~ @@ -165,7 +165,7 @@ Executor request format * **action** - task action to be performed (e.g. execute, revert) * **arguments** - arguments the task action to be called with * **result** - task execution result (result or - :py:class:`~taskflow.utils.misc.Failure`) *[passed to revert only]* + :py:class:`~taskflow.types.failure.Failure`) *[passed to revert only]* Additionally, the following parameters are added to the request message: @@ -222,7 +222,7 @@ When **failed:** { "event": , - "result": , + "result": , "state": "FAILURE" } diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 22db3ed7b..a501c45da 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -26,6 +26,7 @@ from taskflow.engines import base from taskflow import exceptions as exc from taskflow import states from taskflow import storage as atom_storage +from taskflow.types import failure from taskflow.utils import lock_utils from taskflow.utils import misc from taskflow.utils import reflection @@ -129,7 +130,7 @@ class ActionEngine(base.EngineBase): closed = False for (last_state, failures) in runner.run_iter(timeout=timeout): if failures: - misc.Failure.reraise_if_any(failures) + failure.Failure.reraise_if_any(failures) if closed: continue try: @@ -152,7 +153,7 @@ class ActionEngine(base.EngineBase): self._change_state(last_state) if last_state not in [states.SUSPENDED, states.SUCCESS]: failures = self.storage.get_failures() - misc.Failure.reraise_if_any(failures.values()) + failure.Failure.reraise_if_any(failures.values()) def _change_state(self, state): with self._state_lock: diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index 83da3b6bb..78d16ff9a 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -19,9 +19,9 @@ import abc import six from taskflow import task as _task +from taskflow.types import failure from taskflow.types import futures from taskflow.utils import async_utils -from taskflow.utils import misc from taskflow.utils import threading_utils # Execution and reversion events. @@ -37,7 +37,7 @@ def _execute_task(task, arguments, progress_callback): except Exception: # NOTE(imelnikov): wrap current exception with Failure # object and return it. - result = misc.Failure() + result = failure.Failure() finally: task.post_execute() return (task, EXECUTED, result) @@ -54,7 +54,7 @@ def _revert_task(task, arguments, result, failures, progress_callback): except Exception: # NOTE(imelnikov): wrap current exception with Failure # object and return it. - result = misc.Failure() + result = failure.Failure() finally: task.post_revert() return (task, REVERTED, result) diff --git a/taskflow/engines/action_engine/retry_action.py b/taskflow/engines/action_engine/retry_action.py index 3bf6f491c..0ffa4b2a0 100644 --- a/taskflow/engines/action_engine/retry_action.py +++ b/taskflow/engines/action_engine/retry_action.py @@ -18,8 +18,8 @@ import logging from taskflow.engines.action_engine import executor as ex from taskflow import states +from taskflow.types import failure from taskflow.types import futures -from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -65,12 +65,12 @@ class RetryAction(object): try: result = retry.execute(**kwargs) except Exception: - result = misc.Failure() + result = failure.Failure() return (retry, ex.EXECUTED, result) def _on_done_callback(fut): result = fut.result()[-1] - if isinstance(result, misc.Failure): + if isinstance(result, failure.Failure): self.change_state(retry, states.FAILURE, result=result) else: self.change_state(retry, states.SUCCESS, result=result) @@ -88,12 +88,12 @@ class RetryAction(object): try: result = retry.revert(**kwargs) except Exception: - result = misc.Failure() + result = failure.Failure() return (retry, ex.REVERTED, result) def _on_done_callback(fut): result = fut.result()[-1] - if isinstance(result, misc.Failure): + if isinstance(result, failure.Failure): self.change_state(retry, states.FAILURE) else: self.change_state(retry, states.REVERTED) diff --git a/taskflow/engines/action_engine/runner.py b/taskflow/engines/action_engine/runner.py index c2b1788ad..79ebc657d 100644 --- a/taskflow/engines/action_engine/runner.py +++ b/taskflow/engines/action_engine/runner.py @@ -17,8 +17,8 @@ import logging from taskflow import states as st +from taskflow.types import failure from taskflow.types import fsm -from taskflow.utils import misc # Waiting state timeout (in seconds). _WAITING_TIMEOUT = 60 @@ -132,15 +132,15 @@ class _MachineBuilder(object): try: node, event, result = fut.result() retain = self._completer.complete(node, event, result) - if retain and isinstance(result, misc.Failure): + if retain and isinstance(result, failure.Failure): memory.failures.append(result) except Exception: - memory.failures.append(misc.Failure()) + memory.failures.append(failure.Failure()) else: try: more_nodes = self._analyzer.get_next_nodes(node) except Exception: - memory.failures.append(misc.Failure()) + memory.failures.append(failure.Failure()) else: next_nodes.update(more_nodes) if self.runnable() and next_nodes and not memory.failures: diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py index c0c58367b..06959f29e 100644 --- a/taskflow/engines/action_engine/runtime.py +++ b/taskflow/engines/action_engine/runtime.py @@ -24,6 +24,7 @@ from taskflow import exceptions as excp from taskflow import retry as retry_atom from taskflow import states as st from taskflow import task as task_atom +from taskflow.types import failure from taskflow.utils import misc @@ -155,7 +156,7 @@ class Completer(object): """ if isinstance(node, task_atom.BaseTask): self._complete_task(node, event, result) - if isinstance(result, misc.Failure): + if isinstance(result, failure.Failure): if event == ex.EXECUTED: self._process_atom_failure(node, result) else: @@ -270,5 +271,5 @@ class Scheduler(object): # Immediately stop scheduling future work so that we can # exit execution early (rather than later) if a single task # fails to schedule correctly. - return (futures, [misc.Failure()]) + return (futures, [failure.Failure()]) return (futures, []) diff --git a/taskflow/engines/action_engine/task_action.py b/taskflow/engines/action_engine/task_action.py index 3503df7cc..eb9510f9f 100644 --- a/taskflow/engines/action_engine/task_action.py +++ b/taskflow/engines/action_engine/task_action.py @@ -17,7 +17,7 @@ import logging from taskflow import states -from taskflow.utils import misc +from taskflow.types import failure LOG = logging.getLogger(__name__) @@ -91,7 +91,7 @@ class TaskAction(object): self._on_update_progress) def complete_execution(self, task, result): - if isinstance(result, misc.Failure): + if isinstance(result, failure.Failure): self.change_state(task, states.FAILURE, result=result) else: self.change_state(task, states.SUCCESS, @@ -112,7 +112,7 @@ class TaskAction(object): return future def complete_reversion(self, task, rev_result): - if isinstance(rev_result, misc.Failure): + if isinstance(rev_result, failure.Failure): self.change_state(task, states.FAILURE) else: self.change_state(task, states.REVERTED, progress=1.0) diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index 235f3c934..ae8e0e40a 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -172,9 +172,9 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): " seconds for it to transition out of (%s) states" % (request, request_age, ", ".join(pr.WAITING_STATES))) except exc.RequestTimeout: - with misc.capture_failure() as fail: - LOG.debug(fail.exception_str) - request.set_result(fail) + with misc.capture_failure() as failure: + LOG.debug(failure.exception_str) + request.set_result(failure) def _on_wait(self): """This function is called cyclically between draining events.""" diff --git a/taskflow/engines/worker_based/protocol.py b/taskflow/engines/worker_based/protocol.py index a97240a99..3cd7e1781 100644 --- a/taskflow/engines/worker_based/protocol.py +++ b/taskflow/engines/worker_based/protocol.py @@ -26,9 +26,9 @@ import six from taskflow.engines.action_engine import executor from taskflow import exceptions as excp +from taskflow.types import failure as ft from taskflow.types import timing as tt from taskflow.utils import lock_utils -from taskflow.utils import misc from taskflow.utils import reflection # NOTE(skudriashev): This is protocol states and events, which are not @@ -270,15 +270,15 @@ class Request(Message): """Return json-serializable request. To convert requests that have failed due to some exception this will - convert all `misc.Failure` objects into dictionaries (which will then - be reconstituted by the receiver). + convert all `failure.Failure` objects into dictionaries (which will + then be reconstituted by the receiver). """ request = dict(task_cls=self._task_cls, task_name=self._task.name, task_version=self._task.version, action=self._action, arguments=self._arguments) if 'result' in self._kwargs: result = self._kwargs['result'] - if isinstance(result, misc.Failure): + if isinstance(result, ft.Failure): request['result'] = ('failure', result.to_dict()) else: request['result'] = ('success', result) @@ -417,7 +417,7 @@ class Response(Message): state = data['state'] data = data['data'] if state == FAILURE and 'result' in data: - data['result'] = misc.Failure.from_dict(data['result']) + data['result'] = ft.Failure.from_dict(data['result']) return cls(state, **data) @property diff --git a/taskflow/engines/worker_based/server.py b/taskflow/engines/worker_based/server.py index 9440c96ac..db61edc60 100644 --- a/taskflow/engines/worker_based/server.py +++ b/taskflow/engines/worker_based/server.py @@ -21,6 +21,7 @@ import six from taskflow.engines.worker_based import protocol as pr from taskflow.engines.worker_based import proxy +from taskflow.types import failure as ft from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -69,20 +70,20 @@ class Server(object): failures=None, **kwargs): """Parse request before it can be further processed. - All `misc.Failure` objects that have been converted to dict on the - remote side will now converted back to `misc.Failure` objects. + All `failure.Failure` objects that have been converted to dict on the + remote side will now converted back to `failure.Failure` objects. """ action_args = dict(arguments=arguments, task_name=task_name) if result is not None: data_type, data = result if data_type == 'failure': - action_args['result'] = misc.Failure.from_dict(data) + action_args['result'] = ft.Failure.from_dict(data) else: action_args['result'] = data if failures is not None: action_args['failures'] = {} - for k, v in failures.items(): - action_args['failures'][k] = misc.Failure.from_dict(v) + for key, data in six.iteritems(failures): + action_args['failures'][key] = ft.Failure.from_dict(data) return task_cls, action, action_args @staticmethod @@ -218,7 +219,7 @@ class Server(object): message.delivery_tag, exc_info=True) reply_callback(result=failure.to_dict()) else: - if isinstance(result, misc.Failure): + if isinstance(result, ft.Failure): reply_callback(result=result.to_dict()) else: reply_callback(state=pr.SUCCESS, result=result) diff --git a/taskflow/examples/wrapped_exception.py b/taskflow/examples/wrapped_exception.py index dff6b2b4a..78b5ad06e 100644 --- a/taskflow/examples/wrapped_exception.py +++ b/taskflow/examples/wrapped_exception.py @@ -33,7 +33,7 @@ from taskflow import exceptions from taskflow.patterns import unordered_flow as uf from taskflow import task from taskflow.tests import utils -from taskflow.utils import misc +from taskflow.types import failure import example_utils as eu # noqa @@ -96,15 +96,15 @@ def run(**store): engine='parallel') except exceptions.WrappedFailure as ex: unknown_failures = [] - for failure in ex: - if failure.check(FirstException): - print("Got FirstException: %s" % failure.exception_str) - elif failure.check(SecondException): - print("Got SecondException: %s" % failure.exception_str) + for a_failure in ex: + if a_failure.check(FirstException): + print("Got FirstException: %s" % a_failure.exception_str) + elif a_failure.check(SecondException): + print("Got SecondException: %s" % a_failure.exception_str) else: - print("Unknown failure: %s" % failure) - unknown_failures.append(failure) - misc.Failure.reraise_if_any(unknown_failures) + print("Unknown failure: %s" % a_failure) + unknown_failures.append(a_failure) + failure.Failure.reraise_if_any(unknown_failures) eu.print_wrapped("Raise and catch first exception only") diff --git a/taskflow/exceptions.py b/taskflow/exceptions.py index cdffe0f95..876a3e3b7 100644 --- a/taskflow/exceptions.py +++ b/taskflow/exceptions.py @@ -178,8 +178,8 @@ class WrappedFailure(Exception): See the failure class documentation for a more comprehensive set of reasons why this object *may* be reraised instead of the original exception. - :param causes: the :py:class:`~taskflow.utils.misc.Failure` objects that - caused this this exception to be raised. + :param causes: the :py:class:`~taskflow.types.failure.Failure` objects + that caused this this exception to be raised. """ def __init__(self, causes): diff --git a/taskflow/listeners/base.py b/taskflow/listeners/base.py index ca2ee6e26..e1d475f76 100644 --- a/taskflow/listeners/base.py +++ b/taskflow/listeners/base.py @@ -23,8 +23,8 @@ from oslo.utils import excutils import six from taskflow import states +from taskflow.types import failure from taskflow.types import notifier -from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -142,7 +142,7 @@ class LoggingBase(ListenerBase): result = details.get('result') exc_info = None was_failure = False - if isinstance(result, misc.Failure): + if isinstance(result, failure.Failure): if result.exc_info: exc_info = tuple(result.exc_info) was_failure = True diff --git a/taskflow/listeners/logging.py b/taskflow/listeners/logging.py index 87528f37d..3629bb2c7 100644 --- a/taskflow/listeners/logging.py +++ b/taskflow/listeners/logging.py @@ -21,8 +21,8 @@ import sys from taskflow.listeners import base from taskflow import states +from taskflow.types import failure from taskflow.types import notifier -from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -92,8 +92,8 @@ class DynamicLoggingListener(base.ListenerBase): * ``states.RETRYING`` * ``states.REVERTING`` - When a task produces a :py:class:`~taskflow.utils.misc.Failure` object as - its result (typically this happens when a task raises an exception) this + When a task produces a :py:class:`~taskflow.types.failure.Failure` object + as its result (typically this happens when a task raises an exception) this will **always** switch the logger to use ``logging.WARNING`` (if the failure object contains a ``exc_info`` tuple this will also be logged to provide a meaningful traceback). @@ -130,7 +130,7 @@ class DynamicLoggingListener(base.ListenerBase): # If the task failed, it's useful to show the exception traceback # and any other available exception information. result = details.get('result') - if isinstance(result, misc.Failure): + if isinstance(result, failure.Failure): if result.exc_info: exc_info = result.exc_info manual_tb = '' diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py index 29ab8c97c..4b12b782c 100644 --- a/taskflow/persistence/backends/impl_sqlalchemy.py +++ b/taskflow/persistence/backends/impl_sqlalchemy.py @@ -37,6 +37,7 @@ from taskflow.persistence.backends import base from taskflow.persistence.backends.sqlalchemy import migration from taskflow.persistence.backends.sqlalchemy import models from taskflow.persistence import logbook +from taskflow.types import failure from taskflow.utils import async_utils from taskflow.utils import misc @@ -328,7 +329,7 @@ class Connection(base.Connection): pass except sa_exc.OperationalError as ex: if _is_db_connection_error(six.text_type(ex.args[0])): - failures.append(misc.Failure()) + failures.append(failure.Failure()) return False return True diff --git a/taskflow/persistence/logbook.py b/taskflow/persistence/logbook.py index 3d60aa523..f66f68efe 100644 --- a/taskflow/persistence/logbook.py +++ b/taskflow/persistence/logbook.py @@ -25,7 +25,7 @@ import six from taskflow import exceptions as exc from taskflow.openstack.common import uuidutils from taskflow import states -from taskflow.utils import misc +from taskflow.types import failure as ft LOG = logging.getLogger(__name__) @@ -50,7 +50,7 @@ def _safe_unmarshal_time(when): def _was_failure(state, result): - return state == states.FAILURE and isinstance(result, misc.Failure) + return state == states.FAILURE and isinstance(result, ft.Failure) def _fix_meta(data): @@ -363,7 +363,7 @@ class AtomDetail(object): self.meta = _fix_meta(data) failure = data.get('failure') if failure: - self.failure = misc.Failure.from_dict(failure) + self.failure = ft.Failure.from_dict(failure) @property def uuid(self): @@ -467,8 +467,8 @@ class RetryDetail(AtomDetail): new_results = [] for (data, failures) in results: new_failures = {} - for (key, failure_data) in six.iteritems(failures): - new_failures[key] = misc.Failure.from_dict(failure_data) + for (key, data) in six.iteritems(failures): + new_failures[key] = ft.Failure.from_dict(data) new_results.append((data, new_failures)) return new_results diff --git a/taskflow/storage.py b/taskflow/storage.py index 6ee10f602..c667509b4 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -26,6 +26,7 @@ from taskflow.persistence import logbook from taskflow import retry from taskflow import states from taskflow import task +from taskflow.types import failure from taskflow.utils import lock_utils from taskflow.utils import misc from taskflow.utils import reflection @@ -425,7 +426,7 @@ class Storage(object): with self._lock.write_lock(): ad = self._atomdetail_by_name(atom_name) ad.put(state, data) - if state == states.FAILURE and isinstance(data, misc.Failure): + if state == states.FAILURE and isinstance(data, failure.Failure): # NOTE(imelnikov): failure serialization looses information, # so we cache failures here, in atom name -> failure mapping. self._failures[ad.name] = data diff --git a/taskflow/tests/unit/persistence/base.py b/taskflow/tests/unit/persistence/base.py index 3d28695c6..6d96df667 100644 --- a/taskflow/tests/unit/persistence/base.py +++ b/taskflow/tests/unit/persistence/base.py @@ -20,7 +20,7 @@ from taskflow import exceptions as exc from taskflow.openstack.common import uuidutils from taskflow.persistence import logbook from taskflow import states -from taskflow.utils import misc +from taskflow.types import failure class PersistenceTestMixin(object): @@ -147,7 +147,7 @@ class PersistenceTestMixin(object): try: raise RuntimeError('Woot!') except Exception: - td.failure = misc.Failure() + td.failure = failure.Failure() fd.add(td) @@ -161,10 +161,9 @@ class PersistenceTestMixin(object): lb2 = conn.get_logbook(lb_id) fd2 = lb2.find(fd.uuid) td2 = fd2.find(td.uuid) - failure = td2.failure - self.assertEqual(failure.exception_str, 'Woot!') - self.assertIs(failure.check(RuntimeError), RuntimeError) - self.assertEqual(failure.traceback_str, td.failure.traceback_str) + self.assertEqual(td2.failure.exception_str, 'Woot!') + self.assertIs(td2.failure.check(RuntimeError), RuntimeError) + self.assertEqual(td2.failure.traceback_str, td.failure.traceback_str) self.assertIsInstance(td2, logbook.TaskDetail) def test_logbook_merge_flow_detail(self): @@ -269,7 +268,7 @@ class PersistenceTestMixin(object): fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid()) lb.add(fd) rd = logbook.RetryDetail("retry-1", uuid=uuidutils.generate_uuid()) - fail = misc.Failure.from_exception(RuntimeError('fail')) + fail = failure.Failure.from_exception(RuntimeError('fail')) rd.results.append((42, {'some-task': fail})) fd.add(rd) @@ -286,7 +285,7 @@ class PersistenceTestMixin(object): rd2 = fd2.find(rd.uuid) self.assertIsInstance(rd2, logbook.RetryDetail) fail2 = rd2.results[0][1].get('some-task') - self.assertIsInstance(fail2, misc.Failure) + self.assertIsInstance(fail2, failure.Failure) self.assertTrue(fail.matches(fail2)) def test_retry_detail_save_intention(self): diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index 0823002d6..d1c758b46 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -32,10 +32,10 @@ from taskflow import states from taskflow import task from taskflow import test from taskflow.tests import utils +from taskflow.types import failure from taskflow.types import futures from taskflow.types import graph as gr from taskflow.utils import async_utils as au -from taskflow.utils import misc from taskflow.utils import persistence_utils as p_utils @@ -529,7 +529,7 @@ class EngineCheckingTaskTest(utils.EngineTestBase): self.assertEqual(result, 'RESULT') self.assertEqual(list(flow_failures.keys()), ['fail1']) fail = flow_failures['fail1'] - self.assertIsInstance(fail, misc.Failure) + self.assertIsInstance(fail, failure.Failure) self.assertEqual(str(fail), 'Failure: RuntimeError: Woot!') flow = lf.Flow('test').add( diff --git a/taskflow/tests/unit/test_failure.py b/taskflow/tests/unit/test_failure.py index 3f4d001ed..64e1c44d0 100644 --- a/taskflow/tests/unit/test_failure.py +++ b/taskflow/tests/unit/test_failure.py @@ -22,7 +22,6 @@ from taskflow import exceptions from taskflow import test from taskflow.tests import utils as test_utils from taskflow.types import failure -from taskflow.utils import misc def _captured_failure(msg): @@ -217,7 +216,7 @@ class FailureObjectTestCase(test.TestCase): def test_pformat_traceback_captured_no_exc_info(self): captured = _captured_failure('Woot!') - captured = misc.Failure.from_dict(captured.to_dict()) + captured = failure.Failure.from_dict(captured.to_dict()) text = captured.pformat(traceback=True) self.assertIn("Traceback (most recent call last):", text) diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py index 7d3b55b6e..deb4db4fa 100644 --- a/taskflow/tests/unit/test_storage.py +++ b/taskflow/tests/unit/test_storage.py @@ -25,7 +25,7 @@ from taskflow import states from taskflow import storage from taskflow import test from taskflow.tests import utils as test_utils -from taskflow.utils import misc +from taskflow.types import failure from taskflow.utils import persistence_utils as p_utils @@ -128,46 +128,46 @@ class StorageTestMixin(object): self.assertEqual(s.get_atom_state('my task'), states.FAILURE) def test_save_and_get_cached_failure(self): - failure = misc.Failure.from_exception(RuntimeError('Woot!')) + a_failure = failure.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() s.ensure_atom(test_utils.NoopTask('my task')) - s.save('my task', failure, states.FAILURE) - self.assertEqual(s.get('my task'), failure) + s.save('my task', a_failure, states.FAILURE) + self.assertEqual(s.get('my task'), a_failure) self.assertEqual(s.get_atom_state('my task'), states.FAILURE) self.assertTrue(s.has_failures()) - self.assertEqual(s.get_failures(), {'my task': failure}) + self.assertEqual(s.get_failures(), {'my task': a_failure}) def test_save_and_get_non_cached_failure(self): - failure = misc.Failure.from_exception(RuntimeError('Woot!')) + a_failure = failure.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() s.ensure_atom(test_utils.NoopTask('my task')) - s.save('my task', failure, states.FAILURE) - self.assertEqual(s.get('my task'), failure) + s.save('my task', a_failure, states.FAILURE) + self.assertEqual(s.get('my task'), a_failure) s._failures['my task'] = None - self.assertTrue(failure.matches(s.get('my task'))) + self.assertTrue(a_failure.matches(s.get('my task'))) def test_get_failure_from_reverted_task(self): - failure = misc.Failure.from_exception(RuntimeError('Woot!')) + a_failure = failure.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() s.ensure_atom(test_utils.NoopTask('my task')) - s.save('my task', failure, states.FAILURE) + s.save('my task', a_failure, states.FAILURE) s.set_atom_state('my task', states.REVERTING) - self.assertEqual(s.get('my task'), failure) + self.assertEqual(s.get('my task'), a_failure) s.set_atom_state('my task', states.REVERTED) - self.assertEqual(s.get('my task'), failure) + self.assertEqual(s.get('my task'), a_failure) def test_get_failure_after_reload(self): - failure = misc.Failure.from_exception(RuntimeError('Woot!')) + a_failure = failure.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() s.ensure_atom(test_utils.NoopTask('my task')) - s.save('my task', failure, states.FAILURE) + s.save('my task', a_failure, states.FAILURE) s2 = self._get_storage(s._flowdetail) self.assertTrue(s2.has_failures()) self.assertEqual(1, len(s2.get_failures())) - self.assertTrue(failure.matches(s2.get('my task'))) + self.assertTrue(a_failure.matches(s2.get('my task'))) self.assertEqual(s2.get_atom_state('my task'), states.FAILURE) def test_get_non_existing_var(self): @@ -486,15 +486,15 @@ class StorageTestMixin(object): self.assertEqual(s.fetch_all(), {}) def test_cached_retry_failure(self): - failure = misc.Failure.from_exception(RuntimeError('Woot!')) + a_failure = failure.Failure.from_exception(RuntimeError('Woot!')) s = self._get_storage() s.ensure_atom(test_utils.NoopRetry('my retry', provides=['x'])) s.save('my retry', 'a') - s.save('my retry', failure, states.FAILURE) + s.save('my retry', a_failure, states.FAILURE) history = s.get_retry_history('my retry') - self.assertEqual(history, [('a', {}), (failure, {})]) + self.assertEqual(history, [('a', {}), (a_failure, {})]) self.assertIs(s.has_failures(), True) - self.assertEqual(s.get_failures(), {'my retry': failure}) + self.assertEqual(s.get_failures(), {'my retry': a_failure}) def test_logbook_get_unknown_atom_type(self): self.assertRaisesRegexp(TypeError, diff --git a/taskflow/tests/unit/worker_based/test_executor.py b/taskflow/tests/unit/worker_based/test_executor.py index 3e494e88d..d2b97bfee 100644 --- a/taskflow/tests/unit/worker_based/test_executor.py +++ b/taskflow/tests/unit/worker_based/test_executor.py @@ -24,7 +24,7 @@ from taskflow.engines.worker_based import protocol as pr from taskflow import test from taskflow.test import mock from taskflow.tests import utils as test_utils -from taskflow.utils import misc +from taskflow.types import failure from taskflow.utils import threading_utils @@ -111,8 +111,8 @@ class TestWorkerTaskExecutor(test.MockTestCase): [mock.call.on_progress(progress=1.0)]) def test_on_message_response_state_failure(self): - failure = misc.Failure.from_exception(Exception('test')) - failure_dict = failure.to_dict() + a_failure = failure.Failure.from_exception(Exception('test')) + failure_dict = a_failure.to_dict() response = pr.Response(pr.FAILURE, result=failure_dict) ex = self.executor() ex._requests_cache[self.task_uuid] = self.request_inst_mock @@ -121,7 +121,7 @@ class TestWorkerTaskExecutor(test.MockTestCase): self.assertEqual(len(ex._requests_cache), 0) expected_calls = [ mock.call.transition_and_log_error(pr.FAILURE, logger=mock.ANY), - mock.call.set_result(result=test_utils.FailureMatcher(failure)) + mock.call.set_result(result=test_utils.FailureMatcher(a_failure)) ] self.assertEqual(expected_calls, self.request_inst_mock.mock_calls) diff --git a/taskflow/tests/unit/worker_based/test_pipeline.py b/taskflow/tests/unit/worker_based/test_pipeline.py index ae11efd25..b86cedd07 100644 --- a/taskflow/tests/unit/worker_based/test_pipeline.py +++ b/taskflow/tests/unit/worker_based/test_pipeline.py @@ -24,7 +24,7 @@ from taskflow.engines.worker_based import server as worker_server from taskflow.openstack.common import uuidutils from taskflow import test from taskflow.tests import utils as test_utils -from taskflow.utils import misc +from taskflow.types import failure TEST_EXCHANGE, TEST_TOPIC = ('test-exchange', 'test-topic') @@ -94,5 +94,5 @@ class TestPipeline(test.TestCase): executor.wait_for_any([f]) _t2, _action, result = f.result() - self.assertIsInstance(result, misc.Failure) + self.assertIsInstance(result, failure.Failure) self.assertEqual(RuntimeError, result.check(RuntimeError)) diff --git a/taskflow/tests/unit/worker_based/test_protocol.py b/taskflow/tests/unit/worker_based/test_protocol.py index 6df2bb414..4c9c4b77e 100644 --- a/taskflow/tests/unit/worker_based/test_protocol.py +++ b/taskflow/tests/unit/worker_based/test_protocol.py @@ -23,7 +23,7 @@ from taskflow.openstack.common import uuidutils from taskflow import test from taskflow.test import mock from taskflow.tests import utils -from taskflow.utils import misc +from taskflow.types import failure class TestProtocolValidation(test.TestCase): @@ -149,15 +149,16 @@ class TestProtocol(test.TestCase): self.request_to_dict(result=('success', None))) def test_to_dict_with_result_failure(self): - failure = misc.Failure.from_exception(RuntimeError('Woot!')) - expected = self.request_to_dict(result=('failure', failure.to_dict())) - self.assertEqual(self.request(result=failure).to_dict(), expected) + a_failure = failure.Failure.from_exception(RuntimeError('Woot!')) + expected = self.request_to_dict(result=('failure', + a_failure.to_dict())) + self.assertEqual(self.request(result=a_failure).to_dict(), expected) def test_to_dict_with_failures(self): - failure = misc.Failure.from_exception(RuntimeError('Woot!')) - request = self.request(failures={self.task.name: failure}) + a_failure = failure.Failure.from_exception(RuntimeError('Woot!')) + request = self.request(failures={self.task.name: a_failure}) expected = self.request_to_dict( - failures={self.task.name: failure.to_dict()}) + failures={self.task.name: a_failure.to_dict()}) self.assertEqual(request.to_dict(), expected) def test_pending_not_expired(self): diff --git a/taskflow/tests/unit/worker_based/test_server.py b/taskflow/tests/unit/worker_based/test_server.py index b6e626711..5e9129ae6 100644 --- a/taskflow/tests/unit/worker_based/test_server.py +++ b/taskflow/tests/unit/worker_based/test_server.py @@ -22,7 +22,7 @@ from taskflow.engines.worker_based import server from taskflow import test from taskflow.test import mock from taskflow.tests import utils -from taskflow.utils import misc +from taskflow.types import failure class TestServer(test.MockTestCase): @@ -121,19 +121,19 @@ class TestServer(test.MockTestCase): result=1))) def test_parse_request_with_failure_result(self): - failure = misc.Failure.from_exception(Exception('test')) - request = self.make_request(action='revert', result=failure) + a_failure = failure.Failure.from_exception(Exception('test')) + request = self.make_request(action='revert', result=a_failure) task_cls, action, task_args = server.Server._parse_request(**request) self.assertEqual((task_cls, action, task_args), (self.task.name, 'revert', dict(task_name=self.task.name, arguments=self.task_args, - result=utils.FailureMatcher(failure)))) + result=utils.FailureMatcher(a_failure)))) def test_parse_request_with_failures(self): - failures = {'0': misc.Failure.from_exception(Exception('test1')), - '1': misc.Failure.from_exception(Exception('test2'))} + failures = {'0': failure.Failure.from_exception(Exception('test1')), + '1': failure.Failure.from_exception(Exception('test2'))} request = self.make_request(action='revert', failures=failures) task_cls, action, task_args = server.Server._parse_request(**request) @@ -220,16 +220,16 @@ class TestServer(test.MockTestCase): self.assertEqual(self.master_mock.mock_calls, []) self.assertTrue(mocked_exception.called) - @mock.patch.object(misc.Failure, 'from_dict') - @mock.patch.object(misc.Failure, 'to_dict') + @mock.patch.object(failure.Failure, 'from_dict') + @mock.patch.object(failure.Failure, 'to_dict') def test_process_request_parse_request_failure(self, to_mock, from_mock): failure_dict = { 'failure': 'failure', } - failure = misc.Failure.from_exception(RuntimeError('Woot!')) + a_failure = failure.Failure.from_exception(RuntimeError('Woot!')) to_mock.return_value = failure_dict from_mock.side_effect = ValueError('Woot!') - request = self.make_request(result=failure) + request = self.make_request(result=a_failure) # create server and process request s = self.server(reset_master_mock=True) @@ -244,7 +244,7 @@ class TestServer(test.MockTestCase): ] self.assertEqual(master_mock_calls, self.master_mock.mock_calls) - @mock.patch.object(misc.Failure, 'to_dict') + @mock.patch.object(failure.Failure, 'to_dict') def test_process_request_endpoint_not_found(self, to_mock): failure_dict = { 'failure': 'failure', @@ -265,7 +265,7 @@ class TestServer(test.MockTestCase): ] self.assertEqual(self.master_mock.mock_calls, master_mock_calls) - @mock.patch.object(misc.Failure, 'to_dict') + @mock.patch.object(failure.Failure, 'to_dict') def test_process_request_execution_failure(self, to_mock): failure_dict = { 'failure': 'failure', @@ -287,7 +287,7 @@ class TestServer(test.MockTestCase): ] self.assertEqual(self.master_mock.mock_calls, master_mock_calls) - @mock.patch.object(misc.Failure, 'to_dict') + @mock.patch.object(failure.Failure, 'to_dict') def test_process_request_task_failure(self, to_mock): failure_dict = { 'failure': 'failure', diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index d01f91a30..a0b2ff0d2 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -23,8 +23,8 @@ from taskflow import exceptions from taskflow.persistence.backends import impl_memory from taskflow import retry from taskflow import task +from taskflow.types import failure from taskflow.utils import kazoo_utils -from taskflow.utils import misc from taskflow.utils import threading_utils ARGS_KEY = '__args__' @@ -50,7 +50,7 @@ def wrap_all_failures(): try: yield except Exception: - raise exceptions.WrappedFailure([misc.Failure()]) + raise exceptions.WrappedFailure([failure.Failure()]) def zookeeper_available(min_version, timeout=3): diff --git a/taskflow/types/failure.py b/taskflow/types/failure.py index 93829ad6d..c1f28be89 100644 --- a/taskflow/types/failure.py +++ b/taskflow/types/failure.py @@ -68,7 +68,7 @@ class Failure(object): exception and desire to reraise it to the user/caller of the WBE based engine for appropriate handling (this matches the behavior of non-remote engines). To accomplish this a failure object (or a - :py:meth:`~misc.Failure.to_dict` form) would be sent over the WBE channel + :py:meth:`~.Failure.to_dict` form) would be sent over the WBE channel and the WBE based engine would deserialize it and use this objects :meth:`.reraise` method to cause an exception that contains similar/equivalent information as the original exception to be reraised,