Remove direct usage of the deprecated failure location

Internally we should be using the new location and not the
deprecated location wherever possible. This avoids emitting
warnings messages on our own code, which is a dirty habit.

Change-Id: Idac5a772eca7529d92542ada3be1cea092880e25
This commit is contained in:
Joshua Harlow
2014-10-18 19:22:08 -07:00
committed by Joshua Harlow
parent 58f27fcd2d
commit 3c9871d8c3
28 changed files with 138 additions and 134 deletions

View File

@@ -350,7 +350,7 @@ For ``result`` value, two cases are possible:
* If the task is being reverted because it failed (an exception was raised * If the task is being reverted because it failed (an exception was raised
from its |task.execute| method), the ``result`` value is an instance of a from its |task.execute| method), the ``result`` value is an instance of a
:py:class:`~taskflow.utils.misc.Failure` object that holds the exception :py:class:`~taskflow.types.failure.Failure` object that holds the exception
information. information.
* If the task is being reverted because some other task failed, and this task * If the task is being reverted because some other task failed, and this task
@@ -361,9 +361,9 @@ All other arguments are fetched from storage in the same way it is done for
|task.execute| method. |task.execute| method.
To determine if a task failed you can check whether ``result`` is instance of To determine if a task failed you can check whether ``result`` is instance of
:py:class:`~taskflow.utils.misc.Failure`:: :py:class:`~taskflow.types.failure.Failure`::
from taskflow.utils import misc from taskflow.types import failure
class RevertingTask(task.Task): class RevertingTask(task.Task):
@@ -371,7 +371,7 @@ To determine if a task failed you can check whether ``result`` is instance of
return do_something(spam, eggs) return do_something(spam, eggs)
def revert(self, result, spam, eggs): def revert(self, result, spam, eggs):
if isinstance(result, misc.Failure): if isinstance(result, failure.Failure):
print("This task failed, exception: %s" print("This task failed, exception: %s"
% result.exception_str) % result.exception_str)
else: else:
@@ -389,7 +389,7 @@ A |Retry| controller works with arguments in the same way as a |Task|. But
it has an additional parameter ``'history'`` that is a list of tuples. Each it has an additional parameter ``'history'`` that is a list of tuples. Each
tuple contains a result of the previous retry run and a table where the key tuple contains a result of the previous retry run and a table where the key
is a failed task and the value is a is a failed task and the value is a
:py:class:`~taskflow.utils.misc.Failure` object. :py:class:`~taskflow.types.failure.Failure` object.
Consider the following implementation:: Consider the following implementation::
@@ -412,7 +412,7 @@ Imagine the above retry had returned a value ``'5'`` and then some task ``'A'``
failed with some exception. In this case the above retrys ``on_failure`` failed with some exception. In this case the above retrys ``on_failure``
method will receive the following history:: method will receive the following history::
[('5', {'A': misc.Failure()})] [('5', {'A': failure.Failure()})]
At this point (since the implementation returned ``RETRY``) the At this point (since the implementation returned ``RETRY``) the
|retry.execute| method will be called again and it will receive the same |retry.execute| method will be called again and it will receive the same
@@ -421,10 +421,10 @@ there behavior.
If instead the |retry.execute| method raises an exception, the |retry.revert| If instead the |retry.execute| method raises an exception, the |retry.revert|
method of the implementation will be called and method of the implementation will be called and
a :py:class:`~taskflow.utils.misc.Failure` object will be present in the a :py:class:`~taskflow.types.failure.Failure` object will be present in the
history instead of the typical result:: history instead of the typical result::
[('5', {'A': misc.Failure()}), (misc.Failure(), {})] [('5', {'A': failure.Failure()}), (failure.Failure(), {})]
.. note:: .. note::

View File

@@ -135,7 +135,7 @@ engine executor in the following manner:
executes the task). executes the task).
2. If dispatched succeeded then the worker sends a confirmation response 2. If dispatched succeeded then the worker sends a confirmation response
to the executor otherwise the worker sends a failed response along with to the executor otherwise the worker sends a failed response along with
a serialized :py:class:`failure <taskflow.utils.misc.Failure>` object a serialized :py:class:`failure <taskflow.types.failure.Failure>` object
that contains what has failed (and why). that contains what has failed (and why).
3. The worker executes the task and once it is finished sends the result 3. The worker executes the task and once it is finished sends the result
back to the originating executor (every time a task progress event is back to the originating executor (every time a task progress event is
@@ -152,11 +152,11 @@ engine executor in the following manner:
.. note:: .. note::
:py:class:`~taskflow.utils.misc.Failure` objects are not json-serializable :py:class:`~taskflow.types.failure.Failure` objects are not directly
(they contain references to tracebacks which are not serializable), so they json-serializable (they contain references to tracebacks which are not
are converted to dicts before sending and converted from dicts after serializable), so they are converted to dicts before sending and converted
receiving on both executor & worker sides (this translation is lossy since from dicts after receiving on both executor & worker sides (this
the traceback won't be fully retained). translation is lossy since the traceback won't be fully retained).
Executor request format Executor request format
~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~
@@ -165,7 +165,7 @@ Executor request format
* **action** - task action to be performed (e.g. execute, revert) * **action** - task action to be performed (e.g. execute, revert)
* **arguments** - arguments the task action to be called with * **arguments** - arguments the task action to be called with
* **result** - task execution result (result or * **result** - task execution result (result or
:py:class:`~taskflow.utils.misc.Failure`) *[passed to revert only]* :py:class:`~taskflow.types.failure.Failure`) *[passed to revert only]*
Additionally, the following parameters are added to the request message: Additionally, the following parameters are added to the request message:
@@ -222,7 +222,7 @@ When **failed:**
{ {
"event": <event>, "event": <event>,
"result": <misc.Failure>, "result": <types.failure.Failure>,
"state": "FAILURE" "state": "FAILURE"
} }

View File

@@ -26,6 +26,7 @@ from taskflow.engines import base
from taskflow import exceptions as exc from taskflow import exceptions as exc
from taskflow import states from taskflow import states
from taskflow import storage as atom_storage from taskflow import storage as atom_storage
from taskflow.types import failure
from taskflow.utils import lock_utils from taskflow.utils import lock_utils
from taskflow.utils import misc from taskflow.utils import misc
from taskflow.utils import reflection from taskflow.utils import reflection
@@ -129,7 +130,7 @@ class ActionEngine(base.EngineBase):
closed = False closed = False
for (last_state, failures) in runner.run_iter(timeout=timeout): for (last_state, failures) in runner.run_iter(timeout=timeout):
if failures: if failures:
misc.Failure.reraise_if_any(failures) failure.Failure.reraise_if_any(failures)
if closed: if closed:
continue continue
try: try:
@@ -152,7 +153,7 @@ class ActionEngine(base.EngineBase):
self._change_state(last_state) self._change_state(last_state)
if last_state not in [states.SUSPENDED, states.SUCCESS]: if last_state not in [states.SUSPENDED, states.SUCCESS]:
failures = self.storage.get_failures() failures = self.storage.get_failures()
misc.Failure.reraise_if_any(failures.values()) failure.Failure.reraise_if_any(failures.values())
def _change_state(self, state): def _change_state(self, state):
with self._state_lock: with self._state_lock:

View File

@@ -19,9 +19,9 @@ import abc
import six import six
from taskflow import task as _task from taskflow import task as _task
from taskflow.types import failure
from taskflow.types import futures from taskflow.types import futures
from taskflow.utils import async_utils from taskflow.utils import async_utils
from taskflow.utils import misc
from taskflow.utils import threading_utils from taskflow.utils import threading_utils
# Execution and reversion events. # Execution and reversion events.
@@ -37,7 +37,7 @@ def _execute_task(task, arguments, progress_callback):
except Exception: except Exception:
# NOTE(imelnikov): wrap current exception with Failure # NOTE(imelnikov): wrap current exception with Failure
# object and return it. # object and return it.
result = misc.Failure() result = failure.Failure()
finally: finally:
task.post_execute() task.post_execute()
return (task, EXECUTED, result) return (task, EXECUTED, result)
@@ -54,7 +54,7 @@ def _revert_task(task, arguments, result, failures, progress_callback):
except Exception: except Exception:
# NOTE(imelnikov): wrap current exception with Failure # NOTE(imelnikov): wrap current exception with Failure
# object and return it. # object and return it.
result = misc.Failure() result = failure.Failure()
finally: finally:
task.post_revert() task.post_revert()
return (task, REVERTED, result) return (task, REVERTED, result)

View File

@@ -18,8 +18,8 @@ import logging
from taskflow.engines.action_engine import executor as ex from taskflow.engines.action_engine import executor as ex
from taskflow import states from taskflow import states
from taskflow.types import failure
from taskflow.types import futures from taskflow.types import futures
from taskflow.utils import misc
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@@ -65,12 +65,12 @@ class RetryAction(object):
try: try:
result = retry.execute(**kwargs) result = retry.execute(**kwargs)
except Exception: except Exception:
result = misc.Failure() result = failure.Failure()
return (retry, ex.EXECUTED, result) return (retry, ex.EXECUTED, result)
def _on_done_callback(fut): def _on_done_callback(fut):
result = fut.result()[-1] result = fut.result()[-1]
if isinstance(result, misc.Failure): if isinstance(result, failure.Failure):
self.change_state(retry, states.FAILURE, result=result) self.change_state(retry, states.FAILURE, result=result)
else: else:
self.change_state(retry, states.SUCCESS, result=result) self.change_state(retry, states.SUCCESS, result=result)
@@ -88,12 +88,12 @@ class RetryAction(object):
try: try:
result = retry.revert(**kwargs) result = retry.revert(**kwargs)
except Exception: except Exception:
result = misc.Failure() result = failure.Failure()
return (retry, ex.REVERTED, result) return (retry, ex.REVERTED, result)
def _on_done_callback(fut): def _on_done_callback(fut):
result = fut.result()[-1] result = fut.result()[-1]
if isinstance(result, misc.Failure): if isinstance(result, failure.Failure):
self.change_state(retry, states.FAILURE) self.change_state(retry, states.FAILURE)
else: else:
self.change_state(retry, states.REVERTED) self.change_state(retry, states.REVERTED)

View File

@@ -17,8 +17,8 @@
import logging import logging
from taskflow import states as st from taskflow import states as st
from taskflow.types import failure
from taskflow.types import fsm from taskflow.types import fsm
from taskflow.utils import misc
# Waiting state timeout (in seconds). # Waiting state timeout (in seconds).
_WAITING_TIMEOUT = 60 _WAITING_TIMEOUT = 60
@@ -132,15 +132,15 @@ class _MachineBuilder(object):
try: try:
node, event, result = fut.result() node, event, result = fut.result()
retain = self._completer.complete(node, event, result) retain = self._completer.complete(node, event, result)
if retain and isinstance(result, misc.Failure): if retain and isinstance(result, failure.Failure):
memory.failures.append(result) memory.failures.append(result)
except Exception: except Exception:
memory.failures.append(misc.Failure()) memory.failures.append(failure.Failure())
else: else:
try: try:
more_nodes = self._analyzer.get_next_nodes(node) more_nodes = self._analyzer.get_next_nodes(node)
except Exception: except Exception:
memory.failures.append(misc.Failure()) memory.failures.append(failure.Failure())
else: else:
next_nodes.update(more_nodes) next_nodes.update(more_nodes)
if self.runnable() and next_nodes and not memory.failures: if self.runnable() and next_nodes and not memory.failures:

View File

@@ -24,6 +24,7 @@ from taskflow import exceptions as excp
from taskflow import retry as retry_atom from taskflow import retry as retry_atom
from taskflow import states as st from taskflow import states as st
from taskflow import task as task_atom from taskflow import task as task_atom
from taskflow.types import failure
from taskflow.utils import misc from taskflow.utils import misc
@@ -155,7 +156,7 @@ class Completer(object):
""" """
if isinstance(node, task_atom.BaseTask): if isinstance(node, task_atom.BaseTask):
self._complete_task(node, event, result) self._complete_task(node, event, result)
if isinstance(result, misc.Failure): if isinstance(result, failure.Failure):
if event == ex.EXECUTED: if event == ex.EXECUTED:
self._process_atom_failure(node, result) self._process_atom_failure(node, result)
else: else:
@@ -270,5 +271,5 @@ class Scheduler(object):
# Immediately stop scheduling future work so that we can # Immediately stop scheduling future work so that we can
# exit execution early (rather than later) if a single task # exit execution early (rather than later) if a single task
# fails to schedule correctly. # fails to schedule correctly.
return (futures, [misc.Failure()]) return (futures, [failure.Failure()])
return (futures, []) return (futures, [])

View File

@@ -17,7 +17,7 @@
import logging import logging
from taskflow import states from taskflow import states
from taskflow.utils import misc from taskflow.types import failure
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@@ -91,7 +91,7 @@ class TaskAction(object):
self._on_update_progress) self._on_update_progress)
def complete_execution(self, task, result): def complete_execution(self, task, result):
if isinstance(result, misc.Failure): if isinstance(result, failure.Failure):
self.change_state(task, states.FAILURE, result=result) self.change_state(task, states.FAILURE, result=result)
else: else:
self.change_state(task, states.SUCCESS, self.change_state(task, states.SUCCESS,
@@ -112,7 +112,7 @@ class TaskAction(object):
return future return future
def complete_reversion(self, task, rev_result): def complete_reversion(self, task, rev_result):
if isinstance(rev_result, misc.Failure): if isinstance(rev_result, failure.Failure):
self.change_state(task, states.FAILURE) self.change_state(task, states.FAILURE)
else: else:
self.change_state(task, states.REVERTED, progress=1.0) self.change_state(task, states.REVERTED, progress=1.0)

View File

@@ -172,9 +172,9 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
" seconds for it to transition out of (%s) states" " seconds for it to transition out of (%s) states"
% (request, request_age, ", ".join(pr.WAITING_STATES))) % (request, request_age, ", ".join(pr.WAITING_STATES)))
except exc.RequestTimeout: except exc.RequestTimeout:
with misc.capture_failure() as fail: with misc.capture_failure() as failure:
LOG.debug(fail.exception_str) LOG.debug(failure.exception_str)
request.set_result(fail) request.set_result(failure)
def _on_wait(self): def _on_wait(self):
"""This function is called cyclically between draining events.""" """This function is called cyclically between draining events."""

View File

@@ -26,9 +26,9 @@ import six
from taskflow.engines.action_engine import executor from taskflow.engines.action_engine import executor
from taskflow import exceptions as excp from taskflow import exceptions as excp
from taskflow.types import failure as ft
from taskflow.types import timing as tt from taskflow.types import timing as tt
from taskflow.utils import lock_utils from taskflow.utils import lock_utils
from taskflow.utils import misc
from taskflow.utils import reflection from taskflow.utils import reflection
# NOTE(skudriashev): This is protocol states and events, which are not # NOTE(skudriashev): This is protocol states and events, which are not
@@ -270,15 +270,15 @@ class Request(Message):
"""Return json-serializable request. """Return json-serializable request.
To convert requests that have failed due to some exception this will To convert requests that have failed due to some exception this will
convert all `misc.Failure` objects into dictionaries (which will then convert all `failure.Failure` objects into dictionaries (which will
be reconstituted by the receiver). then be reconstituted by the receiver).
""" """
request = dict(task_cls=self._task_cls, task_name=self._task.name, request = dict(task_cls=self._task_cls, task_name=self._task.name,
task_version=self._task.version, action=self._action, task_version=self._task.version, action=self._action,
arguments=self._arguments) arguments=self._arguments)
if 'result' in self._kwargs: if 'result' in self._kwargs:
result = self._kwargs['result'] result = self._kwargs['result']
if isinstance(result, misc.Failure): if isinstance(result, ft.Failure):
request['result'] = ('failure', result.to_dict()) request['result'] = ('failure', result.to_dict())
else: else:
request['result'] = ('success', result) request['result'] = ('success', result)
@@ -417,7 +417,7 @@ class Response(Message):
state = data['state'] state = data['state']
data = data['data'] data = data['data']
if state == FAILURE and 'result' in data: if state == FAILURE and 'result' in data:
data['result'] = misc.Failure.from_dict(data['result']) data['result'] = ft.Failure.from_dict(data['result'])
return cls(state, **data) return cls(state, **data)
@property @property

View File

@@ -21,6 +21,7 @@ import six
from taskflow.engines.worker_based import protocol as pr from taskflow.engines.worker_based import protocol as pr
from taskflow.engines.worker_based import proxy from taskflow.engines.worker_based import proxy
from taskflow.types import failure as ft
from taskflow.utils import misc from taskflow.utils import misc
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@@ -69,20 +70,20 @@ class Server(object):
failures=None, **kwargs): failures=None, **kwargs):
"""Parse request before it can be further processed. """Parse request before it can be further processed.
All `misc.Failure` objects that have been converted to dict on the All `failure.Failure` objects that have been converted to dict on the
remote side will now converted back to `misc.Failure` objects. remote side will now converted back to `failure.Failure` objects.
""" """
action_args = dict(arguments=arguments, task_name=task_name) action_args = dict(arguments=arguments, task_name=task_name)
if result is not None: if result is not None:
data_type, data = result data_type, data = result
if data_type == 'failure': if data_type == 'failure':
action_args['result'] = misc.Failure.from_dict(data) action_args['result'] = ft.Failure.from_dict(data)
else: else:
action_args['result'] = data action_args['result'] = data
if failures is not None: if failures is not None:
action_args['failures'] = {} action_args['failures'] = {}
for k, v in failures.items(): for key, data in six.iteritems(failures):
action_args['failures'][k] = misc.Failure.from_dict(v) action_args['failures'][key] = ft.Failure.from_dict(data)
return task_cls, action, action_args return task_cls, action, action_args
@staticmethod @staticmethod
@@ -218,7 +219,7 @@ class Server(object):
message.delivery_tag, exc_info=True) message.delivery_tag, exc_info=True)
reply_callback(result=failure.to_dict()) reply_callback(result=failure.to_dict())
else: else:
if isinstance(result, misc.Failure): if isinstance(result, ft.Failure):
reply_callback(result=result.to_dict()) reply_callback(result=result.to_dict())
else: else:
reply_callback(state=pr.SUCCESS, result=result) reply_callback(state=pr.SUCCESS, result=result)

View File

@@ -33,7 +33,7 @@ from taskflow import exceptions
from taskflow.patterns import unordered_flow as uf from taskflow.patterns import unordered_flow as uf
from taskflow import task from taskflow import task
from taskflow.tests import utils from taskflow.tests import utils
from taskflow.utils import misc from taskflow.types import failure
import example_utils as eu # noqa import example_utils as eu # noqa
@@ -96,15 +96,15 @@ def run(**store):
engine='parallel') engine='parallel')
except exceptions.WrappedFailure as ex: except exceptions.WrappedFailure as ex:
unknown_failures = [] unknown_failures = []
for failure in ex: for a_failure in ex:
if failure.check(FirstException): if a_failure.check(FirstException):
print("Got FirstException: %s" % failure.exception_str) print("Got FirstException: %s" % a_failure.exception_str)
elif failure.check(SecondException): elif a_failure.check(SecondException):
print("Got SecondException: %s" % failure.exception_str) print("Got SecondException: %s" % a_failure.exception_str)
else: else:
print("Unknown failure: %s" % failure) print("Unknown failure: %s" % a_failure)
unknown_failures.append(failure) unknown_failures.append(a_failure)
misc.Failure.reraise_if_any(unknown_failures) failure.Failure.reraise_if_any(unknown_failures)
eu.print_wrapped("Raise and catch first exception only") eu.print_wrapped("Raise and catch first exception only")

View File

@@ -178,8 +178,8 @@ class WrappedFailure(Exception):
See the failure class documentation for a more comprehensive set of reasons See the failure class documentation for a more comprehensive set of reasons
why this object *may* be reraised instead of the original exception. why this object *may* be reraised instead of the original exception.
:param causes: the :py:class:`~taskflow.utils.misc.Failure` objects that :param causes: the :py:class:`~taskflow.types.failure.Failure` objects
caused this this exception to be raised. that caused this this exception to be raised.
""" """
def __init__(self, causes): def __init__(self, causes):

View File

@@ -23,8 +23,8 @@ from oslo.utils import excutils
import six import six
from taskflow import states from taskflow import states
from taskflow.types import failure
from taskflow.types import notifier from taskflow.types import notifier
from taskflow.utils import misc
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@@ -142,7 +142,7 @@ class LoggingBase(ListenerBase):
result = details.get('result') result = details.get('result')
exc_info = None exc_info = None
was_failure = False was_failure = False
if isinstance(result, misc.Failure): if isinstance(result, failure.Failure):
if result.exc_info: if result.exc_info:
exc_info = tuple(result.exc_info) exc_info = tuple(result.exc_info)
was_failure = True was_failure = True

View File

@@ -21,8 +21,8 @@ import sys
from taskflow.listeners import base from taskflow.listeners import base
from taskflow import states from taskflow import states
from taskflow.types import failure
from taskflow.types import notifier from taskflow.types import notifier
from taskflow.utils import misc
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@@ -92,8 +92,8 @@ class DynamicLoggingListener(base.ListenerBase):
* ``states.RETRYING`` * ``states.RETRYING``
* ``states.REVERTING`` * ``states.REVERTING``
When a task produces a :py:class:`~taskflow.utils.misc.Failure` object as When a task produces a :py:class:`~taskflow.types.failure.Failure` object
its result (typically this happens when a task raises an exception) this as its result (typically this happens when a task raises an exception) this
will **always** switch the logger to use ``logging.WARNING`` (if the will **always** switch the logger to use ``logging.WARNING`` (if the
failure object contains a ``exc_info`` tuple this will also be logged to failure object contains a ``exc_info`` tuple this will also be logged to
provide a meaningful traceback). provide a meaningful traceback).
@@ -130,7 +130,7 @@ class DynamicLoggingListener(base.ListenerBase):
# If the task failed, it's useful to show the exception traceback # If the task failed, it's useful to show the exception traceback
# and any other available exception information. # and any other available exception information.
result = details.get('result') result = details.get('result')
if isinstance(result, misc.Failure): if isinstance(result, failure.Failure):
if result.exc_info: if result.exc_info:
exc_info = result.exc_info exc_info = result.exc_info
manual_tb = '' manual_tb = ''

View File

@@ -37,6 +37,7 @@ from taskflow.persistence.backends import base
from taskflow.persistence.backends.sqlalchemy import migration from taskflow.persistence.backends.sqlalchemy import migration
from taskflow.persistence.backends.sqlalchemy import models from taskflow.persistence.backends.sqlalchemy import models
from taskflow.persistence import logbook from taskflow.persistence import logbook
from taskflow.types import failure
from taskflow.utils import async_utils from taskflow.utils import async_utils
from taskflow.utils import misc from taskflow.utils import misc
@@ -328,7 +329,7 @@ class Connection(base.Connection):
pass pass
except sa_exc.OperationalError as ex: except sa_exc.OperationalError as ex:
if _is_db_connection_error(six.text_type(ex.args[0])): if _is_db_connection_error(six.text_type(ex.args[0])):
failures.append(misc.Failure()) failures.append(failure.Failure())
return False return False
return True return True

View File

@@ -25,7 +25,7 @@ import six
from taskflow import exceptions as exc from taskflow import exceptions as exc
from taskflow.openstack.common import uuidutils from taskflow.openstack.common import uuidutils
from taskflow import states from taskflow import states
from taskflow.utils import misc from taskflow.types import failure as ft
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@@ -50,7 +50,7 @@ def _safe_unmarshal_time(when):
def _was_failure(state, result): def _was_failure(state, result):
return state == states.FAILURE and isinstance(result, misc.Failure) return state == states.FAILURE and isinstance(result, ft.Failure)
def _fix_meta(data): def _fix_meta(data):
@@ -363,7 +363,7 @@ class AtomDetail(object):
self.meta = _fix_meta(data) self.meta = _fix_meta(data)
failure = data.get('failure') failure = data.get('failure')
if failure: if failure:
self.failure = misc.Failure.from_dict(failure) self.failure = ft.Failure.from_dict(failure)
@property @property
def uuid(self): def uuid(self):
@@ -467,8 +467,8 @@ class RetryDetail(AtomDetail):
new_results = [] new_results = []
for (data, failures) in results: for (data, failures) in results:
new_failures = {} new_failures = {}
for (key, failure_data) in six.iteritems(failures): for (key, data) in six.iteritems(failures):
new_failures[key] = misc.Failure.from_dict(failure_data) new_failures[key] = ft.Failure.from_dict(data)
new_results.append((data, new_failures)) new_results.append((data, new_failures))
return new_results return new_results

View File

@@ -26,6 +26,7 @@ from taskflow.persistence import logbook
from taskflow import retry from taskflow import retry
from taskflow import states from taskflow import states
from taskflow import task from taskflow import task
from taskflow.types import failure
from taskflow.utils import lock_utils from taskflow.utils import lock_utils
from taskflow.utils import misc from taskflow.utils import misc
from taskflow.utils import reflection from taskflow.utils import reflection
@@ -425,7 +426,7 @@ class Storage(object):
with self._lock.write_lock(): with self._lock.write_lock():
ad = self._atomdetail_by_name(atom_name) ad = self._atomdetail_by_name(atom_name)
ad.put(state, data) ad.put(state, data)
if state == states.FAILURE and isinstance(data, misc.Failure): if state == states.FAILURE and isinstance(data, failure.Failure):
# NOTE(imelnikov): failure serialization looses information, # NOTE(imelnikov): failure serialization looses information,
# so we cache failures here, in atom name -> failure mapping. # so we cache failures here, in atom name -> failure mapping.
self._failures[ad.name] = data self._failures[ad.name] = data

View File

@@ -20,7 +20,7 @@ from taskflow import exceptions as exc
from taskflow.openstack.common import uuidutils from taskflow.openstack.common import uuidutils
from taskflow.persistence import logbook from taskflow.persistence import logbook
from taskflow import states from taskflow import states
from taskflow.utils import misc from taskflow.types import failure
class PersistenceTestMixin(object): class PersistenceTestMixin(object):
@@ -147,7 +147,7 @@ class PersistenceTestMixin(object):
try: try:
raise RuntimeError('Woot!') raise RuntimeError('Woot!')
except Exception: except Exception:
td.failure = misc.Failure() td.failure = failure.Failure()
fd.add(td) fd.add(td)
@@ -161,10 +161,9 @@ class PersistenceTestMixin(object):
lb2 = conn.get_logbook(lb_id) lb2 = conn.get_logbook(lb_id)
fd2 = lb2.find(fd.uuid) fd2 = lb2.find(fd.uuid)
td2 = fd2.find(td.uuid) td2 = fd2.find(td.uuid)
failure = td2.failure self.assertEqual(td2.failure.exception_str, 'Woot!')
self.assertEqual(failure.exception_str, 'Woot!') self.assertIs(td2.failure.check(RuntimeError), RuntimeError)
self.assertIs(failure.check(RuntimeError), RuntimeError) self.assertEqual(td2.failure.traceback_str, td.failure.traceback_str)
self.assertEqual(failure.traceback_str, td.failure.traceback_str)
self.assertIsInstance(td2, logbook.TaskDetail) self.assertIsInstance(td2, logbook.TaskDetail)
def test_logbook_merge_flow_detail(self): def test_logbook_merge_flow_detail(self):
@@ -269,7 +268,7 @@ class PersistenceTestMixin(object):
fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid()) fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
lb.add(fd) lb.add(fd)
rd = logbook.RetryDetail("retry-1", uuid=uuidutils.generate_uuid()) rd = logbook.RetryDetail("retry-1", uuid=uuidutils.generate_uuid())
fail = misc.Failure.from_exception(RuntimeError('fail')) fail = failure.Failure.from_exception(RuntimeError('fail'))
rd.results.append((42, {'some-task': fail})) rd.results.append((42, {'some-task': fail}))
fd.add(rd) fd.add(rd)
@@ -286,7 +285,7 @@ class PersistenceTestMixin(object):
rd2 = fd2.find(rd.uuid) rd2 = fd2.find(rd.uuid)
self.assertIsInstance(rd2, logbook.RetryDetail) self.assertIsInstance(rd2, logbook.RetryDetail)
fail2 = rd2.results[0][1].get('some-task') fail2 = rd2.results[0][1].get('some-task')
self.assertIsInstance(fail2, misc.Failure) self.assertIsInstance(fail2, failure.Failure)
self.assertTrue(fail.matches(fail2)) self.assertTrue(fail.matches(fail2))
def test_retry_detail_save_intention(self): def test_retry_detail_save_intention(self):

View File

@@ -32,10 +32,10 @@ from taskflow import states
from taskflow import task from taskflow import task
from taskflow import test from taskflow import test
from taskflow.tests import utils from taskflow.tests import utils
from taskflow.types import failure
from taskflow.types import futures from taskflow.types import futures
from taskflow.types import graph as gr from taskflow.types import graph as gr
from taskflow.utils import async_utils as au from taskflow.utils import async_utils as au
from taskflow.utils import misc
from taskflow.utils import persistence_utils as p_utils from taskflow.utils import persistence_utils as p_utils
@@ -529,7 +529,7 @@ class EngineCheckingTaskTest(utils.EngineTestBase):
self.assertEqual(result, 'RESULT') self.assertEqual(result, 'RESULT')
self.assertEqual(list(flow_failures.keys()), ['fail1']) self.assertEqual(list(flow_failures.keys()), ['fail1'])
fail = flow_failures['fail1'] fail = flow_failures['fail1']
self.assertIsInstance(fail, misc.Failure) self.assertIsInstance(fail, failure.Failure)
self.assertEqual(str(fail), 'Failure: RuntimeError: Woot!') self.assertEqual(str(fail), 'Failure: RuntimeError: Woot!')
flow = lf.Flow('test').add( flow = lf.Flow('test').add(

View File

@@ -22,7 +22,6 @@ from taskflow import exceptions
from taskflow import test from taskflow import test
from taskflow.tests import utils as test_utils from taskflow.tests import utils as test_utils
from taskflow.types import failure from taskflow.types import failure
from taskflow.utils import misc
def _captured_failure(msg): def _captured_failure(msg):
@@ -217,7 +216,7 @@ class FailureObjectTestCase(test.TestCase):
def test_pformat_traceback_captured_no_exc_info(self): def test_pformat_traceback_captured_no_exc_info(self):
captured = _captured_failure('Woot!') captured = _captured_failure('Woot!')
captured = misc.Failure.from_dict(captured.to_dict()) captured = failure.Failure.from_dict(captured.to_dict())
text = captured.pformat(traceback=True) text = captured.pformat(traceback=True)
self.assertIn("Traceback (most recent call last):", text) self.assertIn("Traceback (most recent call last):", text)

View File

@@ -25,7 +25,7 @@ from taskflow import states
from taskflow import storage from taskflow import storage
from taskflow import test from taskflow import test
from taskflow.tests import utils as test_utils from taskflow.tests import utils as test_utils
from taskflow.utils import misc from taskflow.types import failure
from taskflow.utils import persistence_utils as p_utils from taskflow.utils import persistence_utils as p_utils
@@ -128,46 +128,46 @@ class StorageTestMixin(object):
self.assertEqual(s.get_atom_state('my task'), states.FAILURE) self.assertEqual(s.get_atom_state('my task'), states.FAILURE)
def test_save_and_get_cached_failure(self): def test_save_and_get_cached_failure(self):
failure = misc.Failure.from_exception(RuntimeError('Woot!')) a_failure = failure.Failure.from_exception(RuntimeError('Woot!'))
s = self._get_storage() s = self._get_storage()
s.ensure_atom(test_utils.NoopTask('my task')) s.ensure_atom(test_utils.NoopTask('my task'))
s.save('my task', failure, states.FAILURE) s.save('my task', a_failure, states.FAILURE)
self.assertEqual(s.get('my task'), failure) self.assertEqual(s.get('my task'), a_failure)
self.assertEqual(s.get_atom_state('my task'), states.FAILURE) self.assertEqual(s.get_atom_state('my task'), states.FAILURE)
self.assertTrue(s.has_failures()) self.assertTrue(s.has_failures())
self.assertEqual(s.get_failures(), {'my task': failure}) self.assertEqual(s.get_failures(), {'my task': a_failure})
def test_save_and_get_non_cached_failure(self): def test_save_and_get_non_cached_failure(self):
failure = misc.Failure.from_exception(RuntimeError('Woot!')) a_failure = failure.Failure.from_exception(RuntimeError('Woot!'))
s = self._get_storage() s = self._get_storage()
s.ensure_atom(test_utils.NoopTask('my task')) s.ensure_atom(test_utils.NoopTask('my task'))
s.save('my task', failure, states.FAILURE) s.save('my task', a_failure, states.FAILURE)
self.assertEqual(s.get('my task'), failure) self.assertEqual(s.get('my task'), a_failure)
s._failures['my task'] = None s._failures['my task'] = None
self.assertTrue(failure.matches(s.get('my task'))) self.assertTrue(a_failure.matches(s.get('my task')))
def test_get_failure_from_reverted_task(self): def test_get_failure_from_reverted_task(self):
failure = misc.Failure.from_exception(RuntimeError('Woot!')) a_failure = failure.Failure.from_exception(RuntimeError('Woot!'))
s = self._get_storage() s = self._get_storage()
s.ensure_atom(test_utils.NoopTask('my task')) s.ensure_atom(test_utils.NoopTask('my task'))
s.save('my task', failure, states.FAILURE) s.save('my task', a_failure, states.FAILURE)
s.set_atom_state('my task', states.REVERTING) s.set_atom_state('my task', states.REVERTING)
self.assertEqual(s.get('my task'), failure) self.assertEqual(s.get('my task'), a_failure)
s.set_atom_state('my task', states.REVERTED) s.set_atom_state('my task', states.REVERTED)
self.assertEqual(s.get('my task'), failure) self.assertEqual(s.get('my task'), a_failure)
def test_get_failure_after_reload(self): def test_get_failure_after_reload(self):
failure = misc.Failure.from_exception(RuntimeError('Woot!')) a_failure = failure.Failure.from_exception(RuntimeError('Woot!'))
s = self._get_storage() s = self._get_storage()
s.ensure_atom(test_utils.NoopTask('my task')) s.ensure_atom(test_utils.NoopTask('my task'))
s.save('my task', failure, states.FAILURE) s.save('my task', a_failure, states.FAILURE)
s2 = self._get_storage(s._flowdetail) s2 = self._get_storage(s._flowdetail)
self.assertTrue(s2.has_failures()) self.assertTrue(s2.has_failures())
self.assertEqual(1, len(s2.get_failures())) self.assertEqual(1, len(s2.get_failures()))
self.assertTrue(failure.matches(s2.get('my task'))) self.assertTrue(a_failure.matches(s2.get('my task')))
self.assertEqual(s2.get_atom_state('my task'), states.FAILURE) self.assertEqual(s2.get_atom_state('my task'), states.FAILURE)
def test_get_non_existing_var(self): def test_get_non_existing_var(self):
@@ -486,15 +486,15 @@ class StorageTestMixin(object):
self.assertEqual(s.fetch_all(), {}) self.assertEqual(s.fetch_all(), {})
def test_cached_retry_failure(self): def test_cached_retry_failure(self):
failure = misc.Failure.from_exception(RuntimeError('Woot!')) a_failure = failure.Failure.from_exception(RuntimeError('Woot!'))
s = self._get_storage() s = self._get_storage()
s.ensure_atom(test_utils.NoopRetry('my retry', provides=['x'])) s.ensure_atom(test_utils.NoopRetry('my retry', provides=['x']))
s.save('my retry', 'a') s.save('my retry', 'a')
s.save('my retry', failure, states.FAILURE) s.save('my retry', a_failure, states.FAILURE)
history = s.get_retry_history('my retry') history = s.get_retry_history('my retry')
self.assertEqual(history, [('a', {}), (failure, {})]) self.assertEqual(history, [('a', {}), (a_failure, {})])
self.assertIs(s.has_failures(), True) self.assertIs(s.has_failures(), True)
self.assertEqual(s.get_failures(), {'my retry': failure}) self.assertEqual(s.get_failures(), {'my retry': a_failure})
def test_logbook_get_unknown_atom_type(self): def test_logbook_get_unknown_atom_type(self):
self.assertRaisesRegexp(TypeError, self.assertRaisesRegexp(TypeError,

View File

@@ -24,7 +24,7 @@ from taskflow.engines.worker_based import protocol as pr
from taskflow import test from taskflow import test
from taskflow.test import mock from taskflow.test import mock
from taskflow.tests import utils as test_utils from taskflow.tests import utils as test_utils
from taskflow.utils import misc from taskflow.types import failure
from taskflow.utils import threading_utils from taskflow.utils import threading_utils
@@ -111,8 +111,8 @@ class TestWorkerTaskExecutor(test.MockTestCase):
[mock.call.on_progress(progress=1.0)]) [mock.call.on_progress(progress=1.0)])
def test_on_message_response_state_failure(self): def test_on_message_response_state_failure(self):
failure = misc.Failure.from_exception(Exception('test')) a_failure = failure.Failure.from_exception(Exception('test'))
failure_dict = failure.to_dict() failure_dict = a_failure.to_dict()
response = pr.Response(pr.FAILURE, result=failure_dict) response = pr.Response(pr.FAILURE, result=failure_dict)
ex = self.executor() ex = self.executor()
ex._requests_cache[self.task_uuid] = self.request_inst_mock ex._requests_cache[self.task_uuid] = self.request_inst_mock
@@ -121,7 +121,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
self.assertEqual(len(ex._requests_cache), 0) self.assertEqual(len(ex._requests_cache), 0)
expected_calls = [ expected_calls = [
mock.call.transition_and_log_error(pr.FAILURE, logger=mock.ANY), mock.call.transition_and_log_error(pr.FAILURE, logger=mock.ANY),
mock.call.set_result(result=test_utils.FailureMatcher(failure)) mock.call.set_result(result=test_utils.FailureMatcher(a_failure))
] ]
self.assertEqual(expected_calls, self.request_inst_mock.mock_calls) self.assertEqual(expected_calls, self.request_inst_mock.mock_calls)

View File

@@ -24,7 +24,7 @@ from taskflow.engines.worker_based import server as worker_server
from taskflow.openstack.common import uuidutils from taskflow.openstack.common import uuidutils
from taskflow import test from taskflow import test
from taskflow.tests import utils as test_utils from taskflow.tests import utils as test_utils
from taskflow.utils import misc from taskflow.types import failure
TEST_EXCHANGE, TEST_TOPIC = ('test-exchange', 'test-topic') TEST_EXCHANGE, TEST_TOPIC = ('test-exchange', 'test-topic')
@@ -94,5 +94,5 @@ class TestPipeline(test.TestCase):
executor.wait_for_any([f]) executor.wait_for_any([f])
_t2, _action, result = f.result() _t2, _action, result = f.result()
self.assertIsInstance(result, misc.Failure) self.assertIsInstance(result, failure.Failure)
self.assertEqual(RuntimeError, result.check(RuntimeError)) self.assertEqual(RuntimeError, result.check(RuntimeError))

View File

@@ -23,7 +23,7 @@ from taskflow.openstack.common import uuidutils
from taskflow import test from taskflow import test
from taskflow.test import mock from taskflow.test import mock
from taskflow.tests import utils from taskflow.tests import utils
from taskflow.utils import misc from taskflow.types import failure
class TestProtocolValidation(test.TestCase): class TestProtocolValidation(test.TestCase):
@@ -149,15 +149,16 @@ class TestProtocol(test.TestCase):
self.request_to_dict(result=('success', None))) self.request_to_dict(result=('success', None)))
def test_to_dict_with_result_failure(self): def test_to_dict_with_result_failure(self):
failure = misc.Failure.from_exception(RuntimeError('Woot!')) a_failure = failure.Failure.from_exception(RuntimeError('Woot!'))
expected = self.request_to_dict(result=('failure', failure.to_dict())) expected = self.request_to_dict(result=('failure',
self.assertEqual(self.request(result=failure).to_dict(), expected) a_failure.to_dict()))
self.assertEqual(self.request(result=a_failure).to_dict(), expected)
def test_to_dict_with_failures(self): def test_to_dict_with_failures(self):
failure = misc.Failure.from_exception(RuntimeError('Woot!')) a_failure = failure.Failure.from_exception(RuntimeError('Woot!'))
request = self.request(failures={self.task.name: failure}) request = self.request(failures={self.task.name: a_failure})
expected = self.request_to_dict( expected = self.request_to_dict(
failures={self.task.name: failure.to_dict()}) failures={self.task.name: a_failure.to_dict()})
self.assertEqual(request.to_dict(), expected) self.assertEqual(request.to_dict(), expected)
def test_pending_not_expired(self): def test_pending_not_expired(self):

View File

@@ -22,7 +22,7 @@ from taskflow.engines.worker_based import server
from taskflow import test from taskflow import test
from taskflow.test import mock from taskflow.test import mock
from taskflow.tests import utils from taskflow.tests import utils
from taskflow.utils import misc from taskflow.types import failure
class TestServer(test.MockTestCase): class TestServer(test.MockTestCase):
@@ -121,19 +121,19 @@ class TestServer(test.MockTestCase):
result=1))) result=1)))
def test_parse_request_with_failure_result(self): def test_parse_request_with_failure_result(self):
failure = misc.Failure.from_exception(Exception('test')) a_failure = failure.Failure.from_exception(Exception('test'))
request = self.make_request(action='revert', result=failure) request = self.make_request(action='revert', result=a_failure)
task_cls, action, task_args = server.Server._parse_request(**request) task_cls, action, task_args = server.Server._parse_request(**request)
self.assertEqual((task_cls, action, task_args), self.assertEqual((task_cls, action, task_args),
(self.task.name, 'revert', (self.task.name, 'revert',
dict(task_name=self.task.name, dict(task_name=self.task.name,
arguments=self.task_args, arguments=self.task_args,
result=utils.FailureMatcher(failure)))) result=utils.FailureMatcher(a_failure))))
def test_parse_request_with_failures(self): def test_parse_request_with_failures(self):
failures = {'0': misc.Failure.from_exception(Exception('test1')), failures = {'0': failure.Failure.from_exception(Exception('test1')),
'1': misc.Failure.from_exception(Exception('test2'))} '1': failure.Failure.from_exception(Exception('test2'))}
request = self.make_request(action='revert', failures=failures) request = self.make_request(action='revert', failures=failures)
task_cls, action, task_args = server.Server._parse_request(**request) task_cls, action, task_args = server.Server._parse_request(**request)
@@ -220,16 +220,16 @@ class TestServer(test.MockTestCase):
self.assertEqual(self.master_mock.mock_calls, []) self.assertEqual(self.master_mock.mock_calls, [])
self.assertTrue(mocked_exception.called) self.assertTrue(mocked_exception.called)
@mock.patch.object(misc.Failure, 'from_dict') @mock.patch.object(failure.Failure, 'from_dict')
@mock.patch.object(misc.Failure, 'to_dict') @mock.patch.object(failure.Failure, 'to_dict')
def test_process_request_parse_request_failure(self, to_mock, from_mock): def test_process_request_parse_request_failure(self, to_mock, from_mock):
failure_dict = { failure_dict = {
'failure': 'failure', 'failure': 'failure',
} }
failure = misc.Failure.from_exception(RuntimeError('Woot!')) a_failure = failure.Failure.from_exception(RuntimeError('Woot!'))
to_mock.return_value = failure_dict to_mock.return_value = failure_dict
from_mock.side_effect = ValueError('Woot!') from_mock.side_effect = ValueError('Woot!')
request = self.make_request(result=failure) request = self.make_request(result=a_failure)
# create server and process request # create server and process request
s = self.server(reset_master_mock=True) s = self.server(reset_master_mock=True)
@@ -244,7 +244,7 @@ class TestServer(test.MockTestCase):
] ]
self.assertEqual(master_mock_calls, self.master_mock.mock_calls) self.assertEqual(master_mock_calls, self.master_mock.mock_calls)
@mock.patch.object(misc.Failure, 'to_dict') @mock.patch.object(failure.Failure, 'to_dict')
def test_process_request_endpoint_not_found(self, to_mock): def test_process_request_endpoint_not_found(self, to_mock):
failure_dict = { failure_dict = {
'failure': 'failure', 'failure': 'failure',
@@ -265,7 +265,7 @@ class TestServer(test.MockTestCase):
] ]
self.assertEqual(self.master_mock.mock_calls, master_mock_calls) self.assertEqual(self.master_mock.mock_calls, master_mock_calls)
@mock.patch.object(misc.Failure, 'to_dict') @mock.patch.object(failure.Failure, 'to_dict')
def test_process_request_execution_failure(self, to_mock): def test_process_request_execution_failure(self, to_mock):
failure_dict = { failure_dict = {
'failure': 'failure', 'failure': 'failure',
@@ -287,7 +287,7 @@ class TestServer(test.MockTestCase):
] ]
self.assertEqual(self.master_mock.mock_calls, master_mock_calls) self.assertEqual(self.master_mock.mock_calls, master_mock_calls)
@mock.patch.object(misc.Failure, 'to_dict') @mock.patch.object(failure.Failure, 'to_dict')
def test_process_request_task_failure(self, to_mock): def test_process_request_task_failure(self, to_mock):
failure_dict = { failure_dict = {
'failure': 'failure', 'failure': 'failure',

View File

@@ -23,8 +23,8 @@ from taskflow import exceptions
from taskflow.persistence.backends import impl_memory from taskflow.persistence.backends import impl_memory
from taskflow import retry from taskflow import retry
from taskflow import task from taskflow import task
from taskflow.types import failure
from taskflow.utils import kazoo_utils from taskflow.utils import kazoo_utils
from taskflow.utils import misc
from taskflow.utils import threading_utils from taskflow.utils import threading_utils
ARGS_KEY = '__args__' ARGS_KEY = '__args__'
@@ -50,7 +50,7 @@ def wrap_all_failures():
try: try:
yield yield
except Exception: except Exception:
raise exceptions.WrappedFailure([misc.Failure()]) raise exceptions.WrappedFailure([failure.Failure()])
def zookeeper_available(min_version, timeout=3): def zookeeper_available(min_version, timeout=3):

View File

@@ -68,7 +68,7 @@ class Failure(object):
exception and desire to reraise it to the user/caller of the WBE based exception and desire to reraise it to the user/caller of the WBE based
engine for appropriate handling (this matches the behavior of non-remote engine for appropriate handling (this matches the behavior of non-remote
engines). To accomplish this a failure object (or a engines). To accomplish this a failure object (or a
:py:meth:`~misc.Failure.to_dict` form) would be sent over the WBE channel :py:meth:`~.Failure.to_dict` form) would be sent over the WBE channel
and the WBE based engine would deserialize it and use this objects and the WBE based engine would deserialize it and use this objects
:meth:`.reraise` method to cause an exception that contains :meth:`.reraise` method to cause an exception that contains
similar/equivalent information as the original exception to be reraised, similar/equivalent information as the original exception to be reraised,