Merge "Remove direct usage of the deprecated failure location"
This commit is contained in:
@@ -350,7 +350,7 @@ For ``result`` value, two cases are possible:
|
||||
|
||||
* If the task is being reverted because it failed (an exception was raised
|
||||
from its |task.execute| method), the ``result`` value is an instance of a
|
||||
:py:class:`~taskflow.utils.misc.Failure` object that holds the exception
|
||||
:py:class:`~taskflow.types.failure.Failure` object that holds the exception
|
||||
information.
|
||||
|
||||
* If the task is being reverted because some other task failed, and this task
|
||||
@@ -361,9 +361,9 @@ All other arguments are fetched from storage in the same way it is done for
|
||||
|task.execute| method.
|
||||
|
||||
To determine if a task failed you can check whether ``result`` is instance of
|
||||
:py:class:`~taskflow.utils.misc.Failure`::
|
||||
:py:class:`~taskflow.types.failure.Failure`::
|
||||
|
||||
from taskflow.utils import misc
|
||||
from taskflow.types import failure
|
||||
|
||||
class RevertingTask(task.Task):
|
||||
|
||||
@@ -371,7 +371,7 @@ To determine if a task failed you can check whether ``result`` is instance of
|
||||
return do_something(spam, eggs)
|
||||
|
||||
def revert(self, result, spam, eggs):
|
||||
if isinstance(result, misc.Failure):
|
||||
if isinstance(result, failure.Failure):
|
||||
print("This task failed, exception: %s"
|
||||
% result.exception_str)
|
||||
else:
|
||||
@@ -389,7 +389,7 @@ A |Retry| controller works with arguments in the same way as a |Task|. But
|
||||
it has an additional parameter ``'history'`` that is a list of tuples. Each
|
||||
tuple contains a result of the previous retry run and a table where the key
|
||||
is a failed task and the value is a
|
||||
:py:class:`~taskflow.utils.misc.Failure` object.
|
||||
:py:class:`~taskflow.types.failure.Failure` object.
|
||||
|
||||
Consider the following implementation::
|
||||
|
||||
@@ -412,7 +412,7 @@ Imagine the above retry had returned a value ``'5'`` and then some task ``'A'``
|
||||
failed with some exception. In this case the above retrys ``on_failure``
|
||||
method will receive the following history::
|
||||
|
||||
[('5', {'A': misc.Failure()})]
|
||||
[('5', {'A': failure.Failure()})]
|
||||
|
||||
At this point (since the implementation returned ``RETRY``) the
|
||||
|retry.execute| method will be called again and it will receive the same
|
||||
@@ -421,10 +421,10 @@ there behavior.
|
||||
|
||||
If instead the |retry.execute| method raises an exception, the |retry.revert|
|
||||
method of the implementation will be called and
|
||||
a :py:class:`~taskflow.utils.misc.Failure` object will be present in the
|
||||
a :py:class:`~taskflow.types.failure.Failure` object will be present in the
|
||||
history instead of the typical result::
|
||||
|
||||
[('5', {'A': misc.Failure()}), (misc.Failure(), {})]
|
||||
[('5', {'A': failure.Failure()}), (failure.Failure(), {})]
|
||||
|
||||
.. note::
|
||||
|
||||
|
||||
@@ -135,7 +135,7 @@ engine executor in the following manner:
|
||||
executes the task).
|
||||
2. If dispatched succeeded then the worker sends a confirmation response
|
||||
to the executor otherwise the worker sends a failed response along with
|
||||
a serialized :py:class:`failure <taskflow.utils.misc.Failure>` object
|
||||
a serialized :py:class:`failure <taskflow.types.failure.Failure>` object
|
||||
that contains what has failed (and why).
|
||||
3. The worker executes the task and once it is finished sends the result
|
||||
back to the originating executor (every time a task progress event is
|
||||
@@ -152,11 +152,11 @@ engine executor in the following manner:
|
||||
|
||||
.. note::
|
||||
|
||||
:py:class:`~taskflow.utils.misc.Failure` objects are not json-serializable
|
||||
(they contain references to tracebacks which are not serializable), so they
|
||||
are converted to dicts before sending and converted from dicts after
|
||||
receiving on both executor & worker sides (this translation is lossy since
|
||||
the traceback won't be fully retained).
|
||||
:py:class:`~taskflow.types.failure.Failure` objects are not directly
|
||||
json-serializable (they contain references to tracebacks which are not
|
||||
serializable), so they are converted to dicts before sending and converted
|
||||
from dicts after receiving on both executor & worker sides (this
|
||||
translation is lossy since the traceback won't be fully retained).
|
||||
|
||||
Executor request format
|
||||
~~~~~~~~~~~~~~~~~~~~~~~
|
||||
@@ -165,7 +165,7 @@ Executor request format
|
||||
* **action** - task action to be performed (e.g. execute, revert)
|
||||
* **arguments** - arguments the task action to be called with
|
||||
* **result** - task execution result (result or
|
||||
:py:class:`~taskflow.utils.misc.Failure`) *[passed to revert only]*
|
||||
:py:class:`~taskflow.types.failure.Failure`) *[passed to revert only]*
|
||||
|
||||
Additionally, the following parameters are added to the request message:
|
||||
|
||||
@@ -222,7 +222,7 @@ When **failed:**
|
||||
|
||||
{
|
||||
"event": <event>,
|
||||
"result": <misc.Failure>,
|
||||
"result": <types.failure.Failure>,
|
||||
"state": "FAILURE"
|
||||
}
|
||||
|
||||
|
||||
@@ -26,6 +26,7 @@ from taskflow.engines import base
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow import states
|
||||
from taskflow import storage as atom_storage
|
||||
from taskflow.types import failure
|
||||
from taskflow.utils import lock_utils
|
||||
from taskflow.utils import misc
|
||||
from taskflow.utils import reflection
|
||||
@@ -129,7 +130,7 @@ class ActionEngine(base.EngineBase):
|
||||
closed = False
|
||||
for (last_state, failures) in runner.run_iter(timeout=timeout):
|
||||
if failures:
|
||||
misc.Failure.reraise_if_any(failures)
|
||||
failure.Failure.reraise_if_any(failures)
|
||||
if closed:
|
||||
continue
|
||||
try:
|
||||
@@ -152,7 +153,7 @@ class ActionEngine(base.EngineBase):
|
||||
self._change_state(last_state)
|
||||
if last_state not in [states.SUSPENDED, states.SUCCESS]:
|
||||
failures = self.storage.get_failures()
|
||||
misc.Failure.reraise_if_any(failures.values())
|
||||
failure.Failure.reraise_if_any(failures.values())
|
||||
|
||||
def _change_state(self, state):
|
||||
with self._state_lock:
|
||||
|
||||
@@ -19,9 +19,9 @@ import abc
|
||||
import six
|
||||
|
||||
from taskflow import task as _task
|
||||
from taskflow.types import failure
|
||||
from taskflow.types import futures
|
||||
from taskflow.utils import async_utils
|
||||
from taskflow.utils import misc
|
||||
from taskflow.utils import threading_utils
|
||||
|
||||
# Execution and reversion events.
|
||||
@@ -37,7 +37,7 @@ def _execute_task(task, arguments, progress_callback):
|
||||
except Exception:
|
||||
# NOTE(imelnikov): wrap current exception with Failure
|
||||
# object and return it.
|
||||
result = misc.Failure()
|
||||
result = failure.Failure()
|
||||
finally:
|
||||
task.post_execute()
|
||||
return (task, EXECUTED, result)
|
||||
@@ -54,7 +54,7 @@ def _revert_task(task, arguments, result, failures, progress_callback):
|
||||
except Exception:
|
||||
# NOTE(imelnikov): wrap current exception with Failure
|
||||
# object and return it.
|
||||
result = misc.Failure()
|
||||
result = failure.Failure()
|
||||
finally:
|
||||
task.post_revert()
|
||||
return (task, REVERTED, result)
|
||||
|
||||
@@ -19,8 +19,8 @@ import logging
|
||||
from taskflow.engines.action_engine import executor as ex
|
||||
from taskflow import retry as rt
|
||||
from taskflow import states
|
||||
from taskflow.types import failure
|
||||
from taskflow.types import futures
|
||||
from taskflow.utils import misc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@@ -69,12 +69,12 @@ class RetryAction(object):
|
||||
try:
|
||||
result = retry.execute(**kwargs)
|
||||
except Exception:
|
||||
result = misc.Failure()
|
||||
result = failure.Failure()
|
||||
return (retry, ex.EXECUTED, result)
|
||||
|
||||
def _on_done_callback(fut):
|
||||
result = fut.result()[-1]
|
||||
if isinstance(result, misc.Failure):
|
||||
if isinstance(result, failure.Failure):
|
||||
self.change_state(retry, states.FAILURE, result=result)
|
||||
else:
|
||||
self.change_state(retry, states.SUCCESS, result=result)
|
||||
@@ -91,12 +91,12 @@ class RetryAction(object):
|
||||
try:
|
||||
result = retry.revert(**kwargs)
|
||||
except Exception:
|
||||
result = misc.Failure()
|
||||
result = failure.Failure()
|
||||
return (retry, ex.REVERTED, result)
|
||||
|
||||
def _on_done_callback(fut):
|
||||
result = fut.result()[-1]
|
||||
if isinstance(result, misc.Failure):
|
||||
if isinstance(result, failure.Failure):
|
||||
self.change_state(retry, states.FAILURE)
|
||||
else:
|
||||
self.change_state(retry, states.REVERTED)
|
||||
|
||||
@@ -17,8 +17,8 @@
|
||||
import logging
|
||||
|
||||
from taskflow import states as st
|
||||
from taskflow.types import failure
|
||||
from taskflow.types import fsm
|
||||
from taskflow.utils import misc
|
||||
|
||||
# Waiting state timeout (in seconds).
|
||||
_WAITING_TIMEOUT = 60
|
||||
@@ -132,15 +132,15 @@ class _MachineBuilder(object):
|
||||
try:
|
||||
node, event, result = fut.result()
|
||||
retain = self._completer.complete(node, event, result)
|
||||
if retain and isinstance(result, misc.Failure):
|
||||
if retain and isinstance(result, failure.Failure):
|
||||
memory.failures.append(result)
|
||||
except Exception:
|
||||
memory.failures.append(misc.Failure())
|
||||
memory.failures.append(failure.Failure())
|
||||
else:
|
||||
try:
|
||||
more_nodes = self._analyzer.get_next_nodes(node)
|
||||
except Exception:
|
||||
memory.failures.append(misc.Failure())
|
||||
memory.failures.append(failure.Failure())
|
||||
else:
|
||||
next_nodes.update(more_nodes)
|
||||
if self.runnable() and next_nodes and not memory.failures:
|
||||
|
||||
@@ -24,6 +24,7 @@ from taskflow import exceptions as excp
|
||||
from taskflow import retry as retry_atom
|
||||
from taskflow import states as st
|
||||
from taskflow import task as task_atom
|
||||
from taskflow.types import failure
|
||||
from taskflow.utils import misc
|
||||
|
||||
|
||||
@@ -155,7 +156,7 @@ class Completer(object):
|
||||
"""
|
||||
if isinstance(node, task_atom.BaseTask):
|
||||
self._complete_task(node, event, result)
|
||||
if isinstance(result, misc.Failure):
|
||||
if isinstance(result, failure.Failure):
|
||||
if event == ex.EXECUTED:
|
||||
self._process_atom_failure(node, result)
|
||||
else:
|
||||
@@ -270,5 +271,5 @@ class Scheduler(object):
|
||||
# Immediately stop scheduling future work so that we can
|
||||
# exit execution early (rather than later) if a single task
|
||||
# fails to schedule correctly.
|
||||
return (futures, [misc.Failure()])
|
||||
return (futures, [failure.Failure()])
|
||||
return (futures, [])
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
import logging
|
||||
|
||||
from taskflow import states
|
||||
from taskflow.utils import misc
|
||||
from taskflow.types import failure
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@@ -91,7 +91,7 @@ class TaskAction(object):
|
||||
self._on_update_progress)
|
||||
|
||||
def complete_execution(self, task, result):
|
||||
if isinstance(result, misc.Failure):
|
||||
if isinstance(result, failure.Failure):
|
||||
self.change_state(task, states.FAILURE, result=result)
|
||||
else:
|
||||
self.change_state(task, states.SUCCESS,
|
||||
@@ -112,7 +112,7 @@ class TaskAction(object):
|
||||
return future
|
||||
|
||||
def complete_reversion(self, task, rev_result):
|
||||
if isinstance(rev_result, misc.Failure):
|
||||
if isinstance(rev_result, failure.Failure):
|
||||
self.change_state(task, states.FAILURE)
|
||||
else:
|
||||
self.change_state(task, states.REVERTED, progress=1.0)
|
||||
|
||||
@@ -172,9 +172,9 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
|
||||
" seconds for it to transition out of (%s) states"
|
||||
% (request, request_age, ", ".join(pr.WAITING_STATES)))
|
||||
except exc.RequestTimeout:
|
||||
with misc.capture_failure() as fail:
|
||||
LOG.debug(fail.exception_str)
|
||||
request.set_result(fail)
|
||||
with misc.capture_failure() as failure:
|
||||
LOG.debug(failure.exception_str)
|
||||
request.set_result(failure)
|
||||
|
||||
def _on_wait(self):
|
||||
"""This function is called cyclically between draining events."""
|
||||
|
||||
@@ -26,9 +26,9 @@ import six
|
||||
|
||||
from taskflow.engines.action_engine import executor
|
||||
from taskflow import exceptions as excp
|
||||
from taskflow.types import failure as ft
|
||||
from taskflow.types import timing as tt
|
||||
from taskflow.utils import lock_utils
|
||||
from taskflow.utils import misc
|
||||
from taskflow.utils import reflection
|
||||
|
||||
# NOTE(skudriashev): This is protocol states and events, which are not
|
||||
@@ -270,15 +270,15 @@ class Request(Message):
|
||||
"""Return json-serializable request.
|
||||
|
||||
To convert requests that have failed due to some exception this will
|
||||
convert all `misc.Failure` objects into dictionaries (which will then
|
||||
be reconstituted by the receiver).
|
||||
convert all `failure.Failure` objects into dictionaries (which will
|
||||
then be reconstituted by the receiver).
|
||||
"""
|
||||
request = dict(task_cls=self._task_cls, task_name=self._task.name,
|
||||
task_version=self._task.version, action=self._action,
|
||||
arguments=self._arguments)
|
||||
if 'result' in self._kwargs:
|
||||
result = self._kwargs['result']
|
||||
if isinstance(result, misc.Failure):
|
||||
if isinstance(result, ft.Failure):
|
||||
request['result'] = ('failure', result.to_dict())
|
||||
else:
|
||||
request['result'] = ('success', result)
|
||||
@@ -417,7 +417,7 @@ class Response(Message):
|
||||
state = data['state']
|
||||
data = data['data']
|
||||
if state == FAILURE and 'result' in data:
|
||||
data['result'] = misc.Failure.from_dict(data['result'])
|
||||
data['result'] = ft.Failure.from_dict(data['result'])
|
||||
return cls(state, **data)
|
||||
|
||||
@property
|
||||
|
||||
@@ -21,6 +21,7 @@ import six
|
||||
|
||||
from taskflow.engines.worker_based import protocol as pr
|
||||
from taskflow.engines.worker_based import proxy
|
||||
from taskflow.types import failure as ft
|
||||
from taskflow.utils import misc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@@ -69,20 +70,20 @@ class Server(object):
|
||||
failures=None, **kwargs):
|
||||
"""Parse request before it can be further processed.
|
||||
|
||||
All `misc.Failure` objects that have been converted to dict on the
|
||||
remote side will now converted back to `misc.Failure` objects.
|
||||
All `failure.Failure` objects that have been converted to dict on the
|
||||
remote side will now converted back to `failure.Failure` objects.
|
||||
"""
|
||||
action_args = dict(arguments=arguments, task_name=task_name)
|
||||
if result is not None:
|
||||
data_type, data = result
|
||||
if data_type == 'failure':
|
||||
action_args['result'] = misc.Failure.from_dict(data)
|
||||
action_args['result'] = ft.Failure.from_dict(data)
|
||||
else:
|
||||
action_args['result'] = data
|
||||
if failures is not None:
|
||||
action_args['failures'] = {}
|
||||
for k, v in failures.items():
|
||||
action_args['failures'][k] = misc.Failure.from_dict(v)
|
||||
for key, data in six.iteritems(failures):
|
||||
action_args['failures'][key] = ft.Failure.from_dict(data)
|
||||
return task_cls, action, action_args
|
||||
|
||||
@staticmethod
|
||||
@@ -218,7 +219,7 @@ class Server(object):
|
||||
message.delivery_tag, exc_info=True)
|
||||
reply_callback(result=failure.to_dict())
|
||||
else:
|
||||
if isinstance(result, misc.Failure):
|
||||
if isinstance(result, ft.Failure):
|
||||
reply_callback(result=result.to_dict())
|
||||
else:
|
||||
reply_callback(state=pr.SUCCESS, result=result)
|
||||
|
||||
@@ -33,7 +33,7 @@ from taskflow import exceptions
|
||||
from taskflow.patterns import unordered_flow as uf
|
||||
from taskflow import task
|
||||
from taskflow.tests import utils
|
||||
from taskflow.utils import misc
|
||||
from taskflow.types import failure
|
||||
|
||||
import example_utils as eu # noqa
|
||||
|
||||
@@ -96,15 +96,15 @@ def run(**store):
|
||||
engine='parallel')
|
||||
except exceptions.WrappedFailure as ex:
|
||||
unknown_failures = []
|
||||
for failure in ex:
|
||||
if failure.check(FirstException):
|
||||
print("Got FirstException: %s" % failure.exception_str)
|
||||
elif failure.check(SecondException):
|
||||
print("Got SecondException: %s" % failure.exception_str)
|
||||
for a_failure in ex:
|
||||
if a_failure.check(FirstException):
|
||||
print("Got FirstException: %s" % a_failure.exception_str)
|
||||
elif a_failure.check(SecondException):
|
||||
print("Got SecondException: %s" % a_failure.exception_str)
|
||||
else:
|
||||
print("Unknown failure: %s" % failure)
|
||||
unknown_failures.append(failure)
|
||||
misc.Failure.reraise_if_any(unknown_failures)
|
||||
print("Unknown failure: %s" % a_failure)
|
||||
unknown_failures.append(a_failure)
|
||||
failure.Failure.reraise_if_any(unknown_failures)
|
||||
|
||||
|
||||
eu.print_wrapped("Raise and catch first exception only")
|
||||
|
||||
@@ -178,8 +178,8 @@ class WrappedFailure(Exception):
|
||||
See the failure class documentation for a more comprehensive set of reasons
|
||||
why this object *may* be reraised instead of the original exception.
|
||||
|
||||
:param causes: the :py:class:`~taskflow.utils.misc.Failure` objects that
|
||||
caused this this exception to be raised.
|
||||
:param causes: the :py:class:`~taskflow.types.failure.Failure` objects
|
||||
that caused this this exception to be raised.
|
||||
"""
|
||||
|
||||
def __init__(self, causes):
|
||||
|
||||
@@ -23,8 +23,8 @@ from oslo.utils import excutils
|
||||
import six
|
||||
|
||||
from taskflow import states
|
||||
from taskflow.types import failure
|
||||
from taskflow.types import notifier
|
||||
from taskflow.utils import misc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@@ -142,7 +142,7 @@ class LoggingBase(ListenerBase):
|
||||
result = details.get('result')
|
||||
exc_info = None
|
||||
was_failure = False
|
||||
if isinstance(result, misc.Failure):
|
||||
if isinstance(result, failure.Failure):
|
||||
if result.exc_info:
|
||||
exc_info = tuple(result.exc_info)
|
||||
was_failure = True
|
||||
|
||||
@@ -21,8 +21,8 @@ import sys
|
||||
|
||||
from taskflow.listeners import base
|
||||
from taskflow import states
|
||||
from taskflow.types import failure
|
||||
from taskflow.types import notifier
|
||||
from taskflow.utils import misc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@@ -92,8 +92,8 @@ class DynamicLoggingListener(base.ListenerBase):
|
||||
* ``states.RETRYING``
|
||||
* ``states.REVERTING``
|
||||
|
||||
When a task produces a :py:class:`~taskflow.utils.misc.Failure` object as
|
||||
its result (typically this happens when a task raises an exception) this
|
||||
When a task produces a :py:class:`~taskflow.types.failure.Failure` object
|
||||
as its result (typically this happens when a task raises an exception) this
|
||||
will **always** switch the logger to use ``logging.WARNING`` (if the
|
||||
failure object contains a ``exc_info`` tuple this will also be logged to
|
||||
provide a meaningful traceback).
|
||||
@@ -130,7 +130,7 @@ class DynamicLoggingListener(base.ListenerBase):
|
||||
# If the task failed, it's useful to show the exception traceback
|
||||
# and any other available exception information.
|
||||
result = details.get('result')
|
||||
if isinstance(result, misc.Failure):
|
||||
if isinstance(result, failure.Failure):
|
||||
if result.exc_info:
|
||||
exc_info = result.exc_info
|
||||
manual_tb = ''
|
||||
|
||||
@@ -37,6 +37,7 @@ from taskflow.persistence.backends import base
|
||||
from taskflow.persistence.backends.sqlalchemy import migration
|
||||
from taskflow.persistence.backends.sqlalchemy import models
|
||||
from taskflow.persistence import logbook
|
||||
from taskflow.types import failure
|
||||
from taskflow.utils import async_utils
|
||||
from taskflow.utils import misc
|
||||
|
||||
@@ -328,7 +329,7 @@ class Connection(base.Connection):
|
||||
pass
|
||||
except sa_exc.OperationalError as ex:
|
||||
if _is_db_connection_error(six.text_type(ex.args[0])):
|
||||
failures.append(misc.Failure())
|
||||
failures.append(failure.Failure())
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@ import six
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow.openstack.common import uuidutils
|
||||
from taskflow import states
|
||||
from taskflow.utils import misc
|
||||
from taskflow.types import failure as ft
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@@ -50,7 +50,7 @@ def _safe_unmarshal_time(when):
|
||||
|
||||
|
||||
def _was_failure(state, result):
|
||||
return state == states.FAILURE and isinstance(result, misc.Failure)
|
||||
return state == states.FAILURE and isinstance(result, ft.Failure)
|
||||
|
||||
|
||||
def _fix_meta(data):
|
||||
@@ -363,7 +363,7 @@ class AtomDetail(object):
|
||||
self.meta = _fix_meta(data)
|
||||
failure = data.get('failure')
|
||||
if failure:
|
||||
self.failure = misc.Failure.from_dict(failure)
|
||||
self.failure = ft.Failure.from_dict(failure)
|
||||
|
||||
@property
|
||||
def uuid(self):
|
||||
@@ -467,8 +467,8 @@ class RetryDetail(AtomDetail):
|
||||
new_results = []
|
||||
for (data, failures) in results:
|
||||
new_failures = {}
|
||||
for (key, failure_data) in six.iteritems(failures):
|
||||
new_failures[key] = misc.Failure.from_dict(failure_data)
|
||||
for (key, data) in six.iteritems(failures):
|
||||
new_failures[key] = ft.Failure.from_dict(data)
|
||||
new_results.append((data, new_failures))
|
||||
return new_results
|
||||
|
||||
|
||||
@@ -26,6 +26,7 @@ from taskflow.persistence import logbook
|
||||
from taskflow import retry
|
||||
from taskflow import states
|
||||
from taskflow import task
|
||||
from taskflow.types import failure
|
||||
from taskflow.utils import lock_utils
|
||||
from taskflow.utils import misc
|
||||
from taskflow.utils import reflection
|
||||
@@ -425,7 +426,7 @@ class Storage(object):
|
||||
with self._lock.write_lock():
|
||||
ad = self._atomdetail_by_name(atom_name)
|
||||
ad.put(state, data)
|
||||
if state == states.FAILURE and isinstance(data, misc.Failure):
|
||||
if state == states.FAILURE and isinstance(data, failure.Failure):
|
||||
# NOTE(imelnikov): failure serialization looses information,
|
||||
# so we cache failures here, in atom name -> failure mapping.
|
||||
self._failures[ad.name] = data
|
||||
|
||||
@@ -20,7 +20,7 @@ from taskflow import exceptions as exc
|
||||
from taskflow.openstack.common import uuidutils
|
||||
from taskflow.persistence import logbook
|
||||
from taskflow import states
|
||||
from taskflow.utils import misc
|
||||
from taskflow.types import failure
|
||||
|
||||
|
||||
class PersistenceTestMixin(object):
|
||||
@@ -147,7 +147,7 @@ class PersistenceTestMixin(object):
|
||||
try:
|
||||
raise RuntimeError('Woot!')
|
||||
except Exception:
|
||||
td.failure = misc.Failure()
|
||||
td.failure = failure.Failure()
|
||||
|
||||
fd.add(td)
|
||||
|
||||
@@ -161,10 +161,9 @@ class PersistenceTestMixin(object):
|
||||
lb2 = conn.get_logbook(lb_id)
|
||||
fd2 = lb2.find(fd.uuid)
|
||||
td2 = fd2.find(td.uuid)
|
||||
failure = td2.failure
|
||||
self.assertEqual(failure.exception_str, 'Woot!')
|
||||
self.assertIs(failure.check(RuntimeError), RuntimeError)
|
||||
self.assertEqual(failure.traceback_str, td.failure.traceback_str)
|
||||
self.assertEqual(td2.failure.exception_str, 'Woot!')
|
||||
self.assertIs(td2.failure.check(RuntimeError), RuntimeError)
|
||||
self.assertEqual(td2.failure.traceback_str, td.failure.traceback_str)
|
||||
self.assertIsInstance(td2, logbook.TaskDetail)
|
||||
|
||||
def test_logbook_merge_flow_detail(self):
|
||||
@@ -269,7 +268,7 @@ class PersistenceTestMixin(object):
|
||||
fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb.add(fd)
|
||||
rd = logbook.RetryDetail("retry-1", uuid=uuidutils.generate_uuid())
|
||||
fail = misc.Failure.from_exception(RuntimeError('fail'))
|
||||
fail = failure.Failure.from_exception(RuntimeError('fail'))
|
||||
rd.results.append((42, {'some-task': fail}))
|
||||
fd.add(rd)
|
||||
|
||||
@@ -286,7 +285,7 @@ class PersistenceTestMixin(object):
|
||||
rd2 = fd2.find(rd.uuid)
|
||||
self.assertIsInstance(rd2, logbook.RetryDetail)
|
||||
fail2 = rd2.results[0][1].get('some-task')
|
||||
self.assertIsInstance(fail2, misc.Failure)
|
||||
self.assertIsInstance(fail2, failure.Failure)
|
||||
self.assertTrue(fail.matches(fail2))
|
||||
|
||||
def test_retry_detail_save_intention(self):
|
||||
|
||||
@@ -31,10 +31,10 @@ from taskflow import states
|
||||
from taskflow import task
|
||||
from taskflow import test
|
||||
from taskflow.tests import utils
|
||||
from taskflow.types import failure
|
||||
from taskflow.types import futures
|
||||
from taskflow.types import graph as gr
|
||||
from taskflow.utils import async_utils as au
|
||||
from taskflow.utils import misc
|
||||
from taskflow.utils import persistence_utils as p_utils
|
||||
from taskflow.utils import threading_utils as tu
|
||||
|
||||
@@ -529,7 +529,7 @@ class EngineCheckingTaskTest(utils.EngineTestBase):
|
||||
self.assertEqual(result, 'RESULT')
|
||||
self.assertEqual(list(flow_failures.keys()), ['fail1'])
|
||||
fail = flow_failures['fail1']
|
||||
self.assertIsInstance(fail, misc.Failure)
|
||||
self.assertIsInstance(fail, failure.Failure)
|
||||
self.assertEqual(str(fail), 'Failure: RuntimeError: Woot!')
|
||||
|
||||
flow = lf.Flow('test').add(
|
||||
|
||||
@@ -22,7 +22,6 @@ from taskflow import exceptions
|
||||
from taskflow import test
|
||||
from taskflow.tests import utils as test_utils
|
||||
from taskflow.types import failure
|
||||
from taskflow.utils import misc
|
||||
|
||||
|
||||
def _captured_failure(msg):
|
||||
@@ -225,7 +224,7 @@ class FailureObjectTestCase(test.TestCase):
|
||||
|
||||
def test_pformat_traceback_captured_no_exc_info(self):
|
||||
captured = _captured_failure('Woot!')
|
||||
captured = misc.Failure.from_dict(captured.to_dict())
|
||||
captured = failure.Failure.from_dict(captured.to_dict())
|
||||
text = captured.pformat(traceback=True)
|
||||
self.assertIn("Traceback (most recent call last):", text)
|
||||
|
||||
|
||||
@@ -25,7 +25,7 @@ from taskflow import states
|
||||
from taskflow import storage
|
||||
from taskflow import test
|
||||
from taskflow.tests import utils as test_utils
|
||||
from taskflow.utils import misc
|
||||
from taskflow.types import failure
|
||||
from taskflow.utils import persistence_utils as p_utils
|
||||
|
||||
|
||||
@@ -128,46 +128,46 @@ class StorageTestMixin(object):
|
||||
self.assertEqual(s.get_atom_state('my task'), states.FAILURE)
|
||||
|
||||
def test_save_and_get_cached_failure(self):
|
||||
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
|
||||
a_failure = failure.Failure.from_exception(RuntimeError('Woot!'))
|
||||
s = self._get_storage()
|
||||
s.ensure_atom(test_utils.NoopTask('my task'))
|
||||
s.save('my task', failure, states.FAILURE)
|
||||
self.assertEqual(s.get('my task'), failure)
|
||||
s.save('my task', a_failure, states.FAILURE)
|
||||
self.assertEqual(s.get('my task'), a_failure)
|
||||
self.assertEqual(s.get_atom_state('my task'), states.FAILURE)
|
||||
self.assertTrue(s.has_failures())
|
||||
self.assertEqual(s.get_failures(), {'my task': failure})
|
||||
self.assertEqual(s.get_failures(), {'my task': a_failure})
|
||||
|
||||
def test_save_and_get_non_cached_failure(self):
|
||||
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
|
||||
a_failure = failure.Failure.from_exception(RuntimeError('Woot!'))
|
||||
s = self._get_storage()
|
||||
s.ensure_atom(test_utils.NoopTask('my task'))
|
||||
s.save('my task', failure, states.FAILURE)
|
||||
self.assertEqual(s.get('my task'), failure)
|
||||
s.save('my task', a_failure, states.FAILURE)
|
||||
self.assertEqual(s.get('my task'), a_failure)
|
||||
s._failures['my task'] = None
|
||||
self.assertTrue(failure.matches(s.get('my task')))
|
||||
self.assertTrue(a_failure.matches(s.get('my task')))
|
||||
|
||||
def test_get_failure_from_reverted_task(self):
|
||||
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
|
||||
a_failure = failure.Failure.from_exception(RuntimeError('Woot!'))
|
||||
|
||||
s = self._get_storage()
|
||||
s.ensure_atom(test_utils.NoopTask('my task'))
|
||||
s.save('my task', failure, states.FAILURE)
|
||||
s.save('my task', a_failure, states.FAILURE)
|
||||
|
||||
s.set_atom_state('my task', states.REVERTING)
|
||||
self.assertEqual(s.get('my task'), failure)
|
||||
self.assertEqual(s.get('my task'), a_failure)
|
||||
|
||||
s.set_atom_state('my task', states.REVERTED)
|
||||
self.assertEqual(s.get('my task'), failure)
|
||||
self.assertEqual(s.get('my task'), a_failure)
|
||||
|
||||
def test_get_failure_after_reload(self):
|
||||
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
|
||||
a_failure = failure.Failure.from_exception(RuntimeError('Woot!'))
|
||||
s = self._get_storage()
|
||||
s.ensure_atom(test_utils.NoopTask('my task'))
|
||||
s.save('my task', failure, states.FAILURE)
|
||||
s.save('my task', a_failure, states.FAILURE)
|
||||
s2 = self._get_storage(s._flowdetail)
|
||||
self.assertTrue(s2.has_failures())
|
||||
self.assertEqual(1, len(s2.get_failures()))
|
||||
self.assertTrue(failure.matches(s2.get('my task')))
|
||||
self.assertTrue(a_failure.matches(s2.get('my task')))
|
||||
self.assertEqual(s2.get_atom_state('my task'), states.FAILURE)
|
||||
|
||||
def test_get_non_existing_var(self):
|
||||
@@ -486,15 +486,15 @@ class StorageTestMixin(object):
|
||||
self.assertEqual(s.fetch_all(), {})
|
||||
|
||||
def test_cached_retry_failure(self):
|
||||
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
|
||||
a_failure = failure.Failure.from_exception(RuntimeError('Woot!'))
|
||||
s = self._get_storage()
|
||||
s.ensure_atom(test_utils.NoopRetry('my retry', provides=['x']))
|
||||
s.save('my retry', 'a')
|
||||
s.save('my retry', failure, states.FAILURE)
|
||||
s.save('my retry', a_failure, states.FAILURE)
|
||||
history = s.get_retry_history('my retry')
|
||||
self.assertEqual(history, [('a', {}), (failure, {})])
|
||||
self.assertEqual(history, [('a', {}), (a_failure, {})])
|
||||
self.assertIs(s.has_failures(), True)
|
||||
self.assertEqual(s.get_failures(), {'my retry': failure})
|
||||
self.assertEqual(s.get_failures(), {'my retry': a_failure})
|
||||
|
||||
def test_logbook_get_unknown_atom_type(self):
|
||||
self.assertRaisesRegexp(TypeError,
|
||||
|
||||
@@ -24,7 +24,7 @@ from taskflow.engines.worker_based import protocol as pr
|
||||
from taskflow import test
|
||||
from taskflow.test import mock
|
||||
from taskflow.tests import utils as test_utils
|
||||
from taskflow.utils import misc
|
||||
from taskflow.types import failure
|
||||
from taskflow.utils import threading_utils
|
||||
|
||||
|
||||
@@ -111,8 +111,8 @@ class TestWorkerTaskExecutor(test.MockTestCase):
|
||||
[mock.call.on_progress(progress=1.0)])
|
||||
|
||||
def test_on_message_response_state_failure(self):
|
||||
failure = misc.Failure.from_exception(Exception('test'))
|
||||
failure_dict = failure.to_dict()
|
||||
a_failure = failure.Failure.from_exception(Exception('test'))
|
||||
failure_dict = a_failure.to_dict()
|
||||
response = pr.Response(pr.FAILURE, result=failure_dict)
|
||||
ex = self.executor()
|
||||
ex._requests_cache[self.task_uuid] = self.request_inst_mock
|
||||
@@ -121,7 +121,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
|
||||
self.assertEqual(len(ex._requests_cache), 0)
|
||||
expected_calls = [
|
||||
mock.call.transition_and_log_error(pr.FAILURE, logger=mock.ANY),
|
||||
mock.call.set_result(result=test_utils.FailureMatcher(failure))
|
||||
mock.call.set_result(result=test_utils.FailureMatcher(a_failure))
|
||||
]
|
||||
self.assertEqual(expected_calls, self.request_inst_mock.mock_calls)
|
||||
|
||||
|
||||
@@ -24,7 +24,7 @@ from taskflow.engines.worker_based import server as worker_server
|
||||
from taskflow.openstack.common import uuidutils
|
||||
from taskflow import test
|
||||
from taskflow.tests import utils as test_utils
|
||||
from taskflow.utils import misc
|
||||
from taskflow.types import failure
|
||||
|
||||
|
||||
TEST_EXCHANGE, TEST_TOPIC = ('test-exchange', 'test-topic')
|
||||
@@ -94,5 +94,5 @@ class TestPipeline(test.TestCase):
|
||||
executor.wait_for_any([f])
|
||||
|
||||
_t2, _action, result = f.result()
|
||||
self.assertIsInstance(result, misc.Failure)
|
||||
self.assertIsInstance(result, failure.Failure)
|
||||
self.assertEqual(RuntimeError, result.check(RuntimeError))
|
||||
|
||||
@@ -23,7 +23,7 @@ from taskflow.openstack.common import uuidutils
|
||||
from taskflow import test
|
||||
from taskflow.test import mock
|
||||
from taskflow.tests import utils
|
||||
from taskflow.utils import misc
|
||||
from taskflow.types import failure
|
||||
|
||||
|
||||
class TestProtocolValidation(test.TestCase):
|
||||
@@ -149,15 +149,16 @@ class TestProtocol(test.TestCase):
|
||||
self.request_to_dict(result=('success', None)))
|
||||
|
||||
def test_to_dict_with_result_failure(self):
|
||||
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
|
||||
expected = self.request_to_dict(result=('failure', failure.to_dict()))
|
||||
self.assertEqual(self.request(result=failure).to_dict(), expected)
|
||||
a_failure = failure.Failure.from_exception(RuntimeError('Woot!'))
|
||||
expected = self.request_to_dict(result=('failure',
|
||||
a_failure.to_dict()))
|
||||
self.assertEqual(self.request(result=a_failure).to_dict(), expected)
|
||||
|
||||
def test_to_dict_with_failures(self):
|
||||
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
|
||||
request = self.request(failures={self.task.name: failure})
|
||||
a_failure = failure.Failure.from_exception(RuntimeError('Woot!'))
|
||||
request = self.request(failures={self.task.name: a_failure})
|
||||
expected = self.request_to_dict(
|
||||
failures={self.task.name: failure.to_dict()})
|
||||
failures={self.task.name: a_failure.to_dict()})
|
||||
self.assertEqual(request.to_dict(), expected)
|
||||
|
||||
def test_pending_not_expired(self):
|
||||
|
||||
@@ -22,7 +22,7 @@ from taskflow.engines.worker_based import server
|
||||
from taskflow import test
|
||||
from taskflow.test import mock
|
||||
from taskflow.tests import utils
|
||||
from taskflow.utils import misc
|
||||
from taskflow.types import failure
|
||||
|
||||
|
||||
class TestServer(test.MockTestCase):
|
||||
@@ -121,19 +121,19 @@ class TestServer(test.MockTestCase):
|
||||
result=1)))
|
||||
|
||||
def test_parse_request_with_failure_result(self):
|
||||
failure = misc.Failure.from_exception(Exception('test'))
|
||||
request = self.make_request(action='revert', result=failure)
|
||||
a_failure = failure.Failure.from_exception(Exception('test'))
|
||||
request = self.make_request(action='revert', result=a_failure)
|
||||
task_cls, action, task_args = server.Server._parse_request(**request)
|
||||
|
||||
self.assertEqual((task_cls, action, task_args),
|
||||
(self.task.name, 'revert',
|
||||
dict(task_name=self.task.name,
|
||||
arguments=self.task_args,
|
||||
result=utils.FailureMatcher(failure))))
|
||||
result=utils.FailureMatcher(a_failure))))
|
||||
|
||||
def test_parse_request_with_failures(self):
|
||||
failures = {'0': misc.Failure.from_exception(Exception('test1')),
|
||||
'1': misc.Failure.from_exception(Exception('test2'))}
|
||||
failures = {'0': failure.Failure.from_exception(Exception('test1')),
|
||||
'1': failure.Failure.from_exception(Exception('test2'))}
|
||||
request = self.make_request(action='revert', failures=failures)
|
||||
task_cls, action, task_args = server.Server._parse_request(**request)
|
||||
|
||||
@@ -220,16 +220,16 @@ class TestServer(test.MockTestCase):
|
||||
self.assertEqual(self.master_mock.mock_calls, [])
|
||||
self.assertTrue(mocked_exception.called)
|
||||
|
||||
@mock.patch.object(misc.Failure, 'from_dict')
|
||||
@mock.patch.object(misc.Failure, 'to_dict')
|
||||
@mock.patch.object(failure.Failure, 'from_dict')
|
||||
@mock.patch.object(failure.Failure, 'to_dict')
|
||||
def test_process_request_parse_request_failure(self, to_mock, from_mock):
|
||||
failure_dict = {
|
||||
'failure': 'failure',
|
||||
}
|
||||
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
|
||||
a_failure = failure.Failure.from_exception(RuntimeError('Woot!'))
|
||||
to_mock.return_value = failure_dict
|
||||
from_mock.side_effect = ValueError('Woot!')
|
||||
request = self.make_request(result=failure)
|
||||
request = self.make_request(result=a_failure)
|
||||
|
||||
# create server and process request
|
||||
s = self.server(reset_master_mock=True)
|
||||
@@ -244,7 +244,7 @@ class TestServer(test.MockTestCase):
|
||||
]
|
||||
self.assertEqual(master_mock_calls, self.master_mock.mock_calls)
|
||||
|
||||
@mock.patch.object(misc.Failure, 'to_dict')
|
||||
@mock.patch.object(failure.Failure, 'to_dict')
|
||||
def test_process_request_endpoint_not_found(self, to_mock):
|
||||
failure_dict = {
|
||||
'failure': 'failure',
|
||||
@@ -265,7 +265,7 @@ class TestServer(test.MockTestCase):
|
||||
]
|
||||
self.assertEqual(self.master_mock.mock_calls, master_mock_calls)
|
||||
|
||||
@mock.patch.object(misc.Failure, 'to_dict')
|
||||
@mock.patch.object(failure.Failure, 'to_dict')
|
||||
def test_process_request_execution_failure(self, to_mock):
|
||||
failure_dict = {
|
||||
'failure': 'failure',
|
||||
@@ -287,7 +287,7 @@ class TestServer(test.MockTestCase):
|
||||
]
|
||||
self.assertEqual(self.master_mock.mock_calls, master_mock_calls)
|
||||
|
||||
@mock.patch.object(misc.Failure, 'to_dict')
|
||||
@mock.patch.object(failure.Failure, 'to_dict')
|
||||
def test_process_request_task_failure(self, to_mock):
|
||||
failure_dict = {
|
||||
'failure': 'failure',
|
||||
|
||||
@@ -23,8 +23,8 @@ from taskflow import exceptions
|
||||
from taskflow.persistence.backends import impl_memory
|
||||
from taskflow import retry
|
||||
from taskflow import task
|
||||
from taskflow.types import failure
|
||||
from taskflow.utils import kazoo_utils
|
||||
from taskflow.utils import misc
|
||||
from taskflow.utils import threading_utils
|
||||
|
||||
ARGS_KEY = '__args__'
|
||||
@@ -50,7 +50,7 @@ def wrap_all_failures():
|
||||
try:
|
||||
yield
|
||||
except Exception:
|
||||
raise exceptions.WrappedFailure([misc.Failure()])
|
||||
raise exceptions.WrappedFailure([failure.Failure()])
|
||||
|
||||
|
||||
def zookeeper_available(min_version, timeout=3):
|
||||
|
||||
@@ -68,7 +68,7 @@ class Failure(object):
|
||||
exception and desire to reraise it to the user/caller of the WBE based
|
||||
engine for appropriate handling (this matches the behavior of non-remote
|
||||
engines). To accomplish this a failure object (or a
|
||||
:py:meth:`~misc.Failure.to_dict` form) would be sent over the WBE channel
|
||||
:py:meth:`~.Failure.to_dict` form) would be sent over the WBE channel
|
||||
and the WBE based engine would deserialize it and use this objects
|
||||
:meth:`.reraise` method to cause an exception that contains
|
||||
similar/equivalent information as the original exception to be reraised,
|
||||
|
||||
Reference in New Issue
Block a user