Revert "Move taskflow.utils.misc.Failure to its own module"
This reverts commit 42ca240e81
which
was a breaking change in a library consumed by other OpenStack
projects with no deprecation or backwards compatibility
considerations. It was able to merge because openstack/taskflow is
apparently not yet part of the integrated gate via the proposed
I202f4809afd689155e2cc4a00fc704fd772a0e92 change.
Change-Id: I96cf36dc317499df91e43502efc85221f8177395
Closes-Bug: #1300161
This commit is contained in:
@@ -339,7 +339,7 @@ For ``result`` value, two cases are possible:
|
||||
|
||||
* if task is being reverted because it failed (an exception was raised from its
|
||||
|task.execute| method), ``result`` value is instance of
|
||||
:py:class:`taskflow.failure.Failure` object that holds exception information;
|
||||
:py:class:`taskflow.utils.misc.Failure` object that holds exception information;
|
||||
|
||||
* if task is being reverted because some other task failed, and this task
|
||||
finished successfully, ``result`` value is task result fetched from storage:
|
||||
@@ -349,9 +349,9 @@ All other arguments are fetched from storage in the same way it is done for
|
||||
|task.execute| method.
|
||||
|
||||
To determine if task failed you can check whether ``result`` is instance of
|
||||
:py:class:`taskflow.failure.Failure`::
|
||||
:py:class:`taskflow.utils.misc.Failure`::
|
||||
|
||||
from taskflow.utils import failure
|
||||
from taskflow.utils import misc
|
||||
|
||||
class RevertingTask(task.Task):
|
||||
|
||||
@@ -359,7 +359,7 @@ To determine if task failed you can check whether ``result`` is instance of
|
||||
return do_something(spam, eggs)
|
||||
|
||||
def revert(self, result, spam, eggs):
|
||||
if isinstance(result, failure.Failure):
|
||||
if isinstance(result, misc.Failure):
|
||||
print("This task failed, exception: %s" % result.exception_str)
|
||||
else:
|
||||
print("do_something returned %r" % result)
|
||||
@@ -374,7 +374,7 @@ Retry Arguments
|
||||
|
||||
A Retry controller works with arguments in the same way as a Task. But it has an additional parameter 'history' that is
|
||||
a list of tuples. Each tuple contains a result of the previous Retry run and a table where a key is a failed task and a value
|
||||
is a :py:class:`taskflow.failure.Failure`.
|
||||
is a :py:class:`taskflow.utils.misc.Failure`.
|
||||
|
||||
Consider the following Retry::
|
||||
|
||||
@@ -396,13 +396,13 @@ Consider the following Retry::
|
||||
Imagine the following Retry had returned a value '5' and then some task 'A' failed with some exception.
|
||||
In this case ``on_failure`` method will receive the following history::
|
||||
|
||||
[('5', {'A': failure.Failure()})]
|
||||
[('5', {'A': misc.Failure()})]
|
||||
|
||||
Then the |retry.execute| method will be called again and it'll receive the same history.
|
||||
|
||||
If the |retry.execute| method raises an exception, the |retry.revert| method of Retry will be called and :py:class:`taskflow.failure.Failure` object will be present
|
||||
If the |retry.execute| method raises an exception, the |retry.revert| method of Retry will be called and :py:class:`taskflow.utils.misc.Failure` object will be present
|
||||
in the history instead of Retry result::
|
||||
|
||||
[('5', {'A': failure.Failure()}), (failure.Failure(), {})]
|
||||
[('5', {'A': misc.Failure()}), (misc.Failure(), {})]
|
||||
|
||||
After the Retry has been reverted, the Retry history will be cleaned.
|
||||
|
@@ -1,5 +0,0 @@
|
||||
=======
|
||||
Failure
|
||||
=======
|
||||
|
||||
.. automodule:: taskflow.failure
|
@@ -19,7 +19,7 @@ Contents
|
||||
storage
|
||||
persistence
|
||||
exceptions
|
||||
failure
|
||||
utils
|
||||
states
|
||||
|
||||
|
||||
|
5
doc/source/utils.rst
Normal file
5
doc/source/utils.rst
Normal file
@@ -0,0 +1,5 @@
|
||||
-----
|
||||
Utils
|
||||
-----
|
||||
|
||||
.. autoclass:: taskflow.utils.misc.Failure
|
@@ -24,7 +24,6 @@ from taskflow.engines.action_engine import task_action
|
||||
from taskflow.engines import base
|
||||
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow import failure
|
||||
from taskflow.openstack.common import excutils
|
||||
from taskflow import retry
|
||||
from taskflow import states
|
||||
@@ -116,7 +115,7 @@ class ActionEngine(base.EngineBase):
|
||||
self._change_state(state)
|
||||
if state != states.SUSPENDED and state != states.SUCCESS:
|
||||
failures = self.storage.get_failures()
|
||||
failure.Failure.reraise_if_any(failures.values())
|
||||
misc.Failure.reraise_if_any(failures.values())
|
||||
|
||||
@lock_utils.locked(lock='_state_lock')
|
||||
def _change_state(self, state):
|
||||
|
@@ -19,8 +19,8 @@ import abc
|
||||
from concurrent import futures
|
||||
import six
|
||||
|
||||
from taskflow import failure
|
||||
from taskflow.utils import async_utils
|
||||
from taskflow.utils import misc
|
||||
from taskflow.utils import threading_utils
|
||||
|
||||
# Execution and reversion events.
|
||||
@@ -35,7 +35,7 @@ def _execute_task(task, arguments, progress_callback):
|
||||
except Exception:
|
||||
# NOTE(imelnikov): wrap current exception with Failure
|
||||
# object and return it.
|
||||
result = failure.Failure()
|
||||
result = misc.Failure()
|
||||
return (task, EXECUTED, result)
|
||||
|
||||
|
||||
@@ -49,7 +49,7 @@ def _revert_task(task, arguments, result, failures, progress_callback):
|
||||
except Exception:
|
||||
# NOTE(imelnikov): wrap current exception with Failure
|
||||
# object and return it.
|
||||
result = failure.Failure()
|
||||
result = misc.Failure()
|
||||
return (task, REVERTED, result)
|
||||
|
||||
|
||||
|
@@ -16,10 +16,10 @@
|
||||
|
||||
from taskflow.engines.action_engine import executor as ex
|
||||
from taskflow import exceptions as excp
|
||||
from taskflow import failure
|
||||
from taskflow import retry as r
|
||||
from taskflow import states as st
|
||||
from taskflow import task
|
||||
from taskflow.utils import misc
|
||||
|
||||
|
||||
_WAITING_TIMEOUT = 60 # in seconds
|
||||
@@ -77,7 +77,7 @@ class FutureGraphAction(object):
|
||||
node, event, result = future.result()
|
||||
if isinstance(node, task.BaseTask):
|
||||
self._complete_task(node, event, result)
|
||||
if isinstance(result, failure.Failure):
|
||||
if isinstance(result, misc.Failure):
|
||||
if event == ex.EXECUTED:
|
||||
self._process_atom_failure(node, result)
|
||||
else:
|
||||
@@ -88,7 +88,7 @@ class FutureGraphAction(object):
|
||||
not_done.extend(self._schedule(next_nodes))
|
||||
|
||||
if failures:
|
||||
failure.Failure.reraise_if_any(failures)
|
||||
misc.Failure.reraise_if_any(failures)
|
||||
|
||||
if self._analyzer.get_next_nodes():
|
||||
return st.SUSPENDED
|
||||
|
@@ -18,9 +18,9 @@ import logging
|
||||
|
||||
from taskflow.engines.action_engine import executor as ex
|
||||
from taskflow import exceptions
|
||||
from taskflow import failure
|
||||
from taskflow import states
|
||||
from taskflow.utils import async_utils
|
||||
from taskflow.utils import misc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@@ -63,7 +63,7 @@ class RetryAction(object):
|
||||
try:
|
||||
result = retry.execute(**kwargs)
|
||||
except Exception:
|
||||
result = failure.Failure()
|
||||
result = misc.Failure()
|
||||
self.change_state(retry, states.FAILURE, result=result)
|
||||
else:
|
||||
self.change_state(retry, states.SUCCESS, result=result)
|
||||
@@ -79,7 +79,7 @@ class RetryAction(object):
|
||||
try:
|
||||
result = retry.revert(**kwargs)
|
||||
except Exception:
|
||||
result = failure.Failure()
|
||||
result = misc.Failure()
|
||||
self.change_state(retry, states.FAILURE)
|
||||
else:
|
||||
self.change_state(retry, states.REVERTED)
|
||||
|
@@ -17,8 +17,8 @@
|
||||
import logging
|
||||
|
||||
from taskflow import exceptions
|
||||
from taskflow import failure
|
||||
from taskflow import states
|
||||
from taskflow.utils import misc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@@ -71,7 +71,7 @@ class TaskAction(object):
|
||||
self._on_update_progress)
|
||||
|
||||
def complete_execution(self, task, result):
|
||||
if isinstance(result, failure.Failure):
|
||||
if isinstance(result, misc.Failure):
|
||||
self.change_state(task, states.FAILURE, result=result)
|
||||
else:
|
||||
self.change_state(task, states.SUCCESS,
|
||||
@@ -91,7 +91,7 @@ class TaskAction(object):
|
||||
return future
|
||||
|
||||
def complete_reversion(self, task, rev_result):
|
||||
if isinstance(rev_result, failure.Failure):
|
||||
if isinstance(rev_result, misc.Failure):
|
||||
self.change_state(task, states.FAILURE)
|
||||
else:
|
||||
self.change_state(task, states.REVERTED, progress=1.0)
|
||||
|
@@ -24,7 +24,6 @@ from taskflow.engines.worker_based import cache
|
||||
from taskflow.engines.worker_based import protocol as pr
|
||||
from taskflow.engines.worker_based import proxy
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow import failure
|
||||
from taskflow.utils import async_utils
|
||||
from taskflow.utils import misc
|
||||
|
||||
@@ -126,7 +125,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
|
||||
"""
|
||||
LOG.debug("Request '%r' has expired.", request)
|
||||
LOG.debug("The '%r' request has expired.", request)
|
||||
request.set_result(failure.Failure.from_exception(
|
||||
request.set_result(misc.Failure.from_exception(
|
||||
exc.RequestTimeout("The '%r' request has expired" % request)))
|
||||
|
||||
def _on_wait(self):
|
||||
@@ -162,11 +161,11 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
|
||||
reply_to=self._uuid,
|
||||
correlation_id=request.uuid)
|
||||
except Exception:
|
||||
with failure.capture_failure() as fail:
|
||||
with misc.capture_failure() as failure:
|
||||
LOG.exception("Failed to submit the '%s' request." %
|
||||
request)
|
||||
self._requests_cache.delete(request.uuid)
|
||||
request.set_result(fail)
|
||||
request.set_result(failure)
|
||||
|
||||
def _notify_topics(self):
|
||||
"""Cyclically publish notify message to each topic."""
|
||||
|
@@ -21,7 +21,6 @@ import six
|
||||
from concurrent import futures
|
||||
|
||||
from taskflow.engines.action_engine import executor
|
||||
from taskflow import failure
|
||||
from taskflow.utils import misc
|
||||
from taskflow.utils import reflection
|
||||
|
||||
@@ -137,7 +136,7 @@ class Request(Message):
|
||||
return False
|
||||
|
||||
def to_dict(self):
|
||||
"""Return json-serializable request, converting all `failure.Failure`
|
||||
"""Return json-serializable request, converting all `misc.Failure`
|
||||
objects into dictionaries.
|
||||
"""
|
||||
request = dict(task_cls=self._task_cls, task_name=self._task.name,
|
||||
@@ -145,15 +144,15 @@ class Request(Message):
|
||||
arguments=self._arguments)
|
||||
if 'result' in self._kwargs:
|
||||
result = self._kwargs['result']
|
||||
if isinstance(result, failure.Failure):
|
||||
if isinstance(result, misc.Failure):
|
||||
request['result'] = ('failure', result.to_dict())
|
||||
else:
|
||||
request['result'] = ('success', result)
|
||||
if 'failures' in self._kwargs:
|
||||
failures = self._kwargs['failures']
|
||||
request['failures'] = {}
|
||||
for task, fail in six.iteritems(failures):
|
||||
request['failures'][task] = fail.to_dict()
|
||||
for task, failure in six.iteritems(failures):
|
||||
request['failures'][task] = failure.to_dict()
|
||||
return request
|
||||
|
||||
def set_result(self, result):
|
||||
@@ -183,7 +182,7 @@ class Response(Message):
|
||||
state = data['state']
|
||||
data = data['data']
|
||||
if state == FAILURE and 'result' in data:
|
||||
data['result'] = failure.Failure.from_dict(data['result'])
|
||||
data['result'] = misc.Failure.from_dict(data['result'])
|
||||
return cls(state, **data)
|
||||
|
||||
@property
|
||||
|
@@ -21,7 +21,7 @@ from kombu import exceptions as kombu_exc
|
||||
|
||||
from taskflow.engines.worker_based import protocol as pr
|
||||
from taskflow.engines.worker_based import proxy
|
||||
from taskflow import failure
|
||||
from taskflow.utils import misc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@@ -77,22 +77,21 @@ class Server(object):
|
||||
@staticmethod
|
||||
def _parse_request(task_cls, task_name, action, arguments, result=None,
|
||||
failures=None, **kwargs):
|
||||
"""Parse request before it can be processed.
|
||||
|
||||
All `failure.Failure` objects that have been converted to dict on the
|
||||
remote side to be serializable are now converted back to objects.
|
||||
"""Parse request before it can be processed. All `misc.Failure` objects
|
||||
that have been converted to dict on the remote side to be serializable
|
||||
are now converted back to objects.
|
||||
"""
|
||||
action_args = dict(arguments=arguments, task_name=task_name)
|
||||
if result is not None:
|
||||
data_type, data = result
|
||||
if data_type == 'failure':
|
||||
action_args['result'] = failure.Failure.from_dict(data)
|
||||
action_args['result'] = misc.Failure.from_dict(data)
|
||||
else:
|
||||
action_args['result'] = data
|
||||
if failures is not None:
|
||||
action_args['failures'] = {}
|
||||
for k, v in failures.items():
|
||||
action_args['failures'][k] = failure.Failure.from_dict(v)
|
||||
action_args['failures'][k] = misc.Failure.from_dict(v)
|
||||
return task_cls, action, action_args
|
||||
|
||||
@staticmethod
|
||||
@@ -162,19 +161,19 @@ class Server(object):
|
||||
action_args.update(task_uuid=task_uuid,
|
||||
progress_callback=progress_callback)
|
||||
except ValueError:
|
||||
with failure.capture_failure() as fail:
|
||||
with misc.capture_failure() as failure:
|
||||
LOG.exception("Failed to parse request")
|
||||
reply_callback(result=fail.to_dict())
|
||||
reply_callback(result=failure.to_dict())
|
||||
return
|
||||
|
||||
# get task endpoint
|
||||
try:
|
||||
endpoint = self._endpoints[task_cls]
|
||||
except KeyError:
|
||||
with failure.capture_failure() as fail:
|
||||
with misc.capture_failure() as failure:
|
||||
LOG.exception("The '%s' task endpoint does not exist",
|
||||
task_cls)
|
||||
reply_callback(result=fail.to_dict())
|
||||
reply_callback(result=failure.to_dict())
|
||||
return
|
||||
else:
|
||||
reply_callback(state=pr.RUNNING)
|
||||
@@ -183,11 +182,11 @@ class Server(object):
|
||||
try:
|
||||
result = getattr(endpoint, action)(**action_args)
|
||||
except Exception:
|
||||
with failure.capture_failure() as fail:
|
||||
with misc.capture_failure() as failure:
|
||||
LOG.exception("The %s task execution failed", endpoint)
|
||||
reply_callback(result=fail.to_dict())
|
||||
reply_callback(result=failure.to_dict())
|
||||
else:
|
||||
if isinstance(result, failure.Failure):
|
||||
if isinstance(result, misc.Failure):
|
||||
reply_callback(result=result.to_dict())
|
||||
else:
|
||||
reply_callback(state=pr.SUCCESS, result=result)
|
||||
|
@@ -31,9 +31,10 @@ sys.path.insert(0, top_dir)
|
||||
import taskflow.engines
|
||||
|
||||
from taskflow import exceptions
|
||||
from taskflow import failure
|
||||
from taskflow.patterns import unordered_flow as uf
|
||||
from taskflow import task
|
||||
from taskflow.tests import utils
|
||||
from taskflow.utils import misc
|
||||
|
||||
# INTRO: In this example we create two tasks which can trigger exceptions
|
||||
# based on various inputs to show how to analyze the thrown exceptions for
|
||||
@@ -95,20 +96,20 @@ def run(**store):
|
||||
SecondTask()
|
||||
)
|
||||
try:
|
||||
with failure.wrap_all_failures():
|
||||
with utils.wrap_all_failures():
|
||||
taskflow.engines.run(flow, store=store,
|
||||
engine_conf='parallel')
|
||||
except exceptions.WrappedFailure as ex:
|
||||
unknown_failures = []
|
||||
for fail in ex:
|
||||
if fail.check(FirstException):
|
||||
print("Got FirstException: %s" % fail.exception_str)
|
||||
elif fail.check(SecondException):
|
||||
print("Got SecondException: %s" % fail.exception_str)
|
||||
for failure in ex:
|
||||
if failure.check(FirstException):
|
||||
print("Got FirstException: %s" % failure.exception_str)
|
||||
elif failure.check(SecondException):
|
||||
print("Got SecondException: %s" % failure.exception_str)
|
||||
else:
|
||||
print("Unknown failure: %s" % fail)
|
||||
unknown_failures.append(fail)
|
||||
failure.Failure.reraise_if_any(unknown_failures)
|
||||
print("Unknown failure: %s" % failure)
|
||||
unknown_failures.append(failure)
|
||||
misc.Failure.reraise_if_any(unknown_failures)
|
||||
|
||||
|
||||
print_wrapped("Raise and catch first exception only")
|
||||
|
@@ -1,231 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (C) 2013-2014 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import sys
|
||||
import traceback
|
||||
|
||||
import six
|
||||
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow.utils import misc
|
||||
from taskflow.utils import reflection
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def wrap_all_failures():
|
||||
"""Convert any exceptions to WrappedFailure.
|
||||
|
||||
When you expect several failures, it may be convenient
|
||||
to wrap any exception with WrappedFailure in order to
|
||||
unify error handling.
|
||||
"""
|
||||
try:
|
||||
yield
|
||||
except Exception:
|
||||
raise exc.WrappedFailure([Failure()])
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def capture_failure():
|
||||
"""Save current exception, and yield back the failure (or raises a
|
||||
runtime error if no active exception is being handled).
|
||||
|
||||
In some cases the exception context can be cleared, resulting in None
|
||||
being attempted to be saved after an exception handler is run. This
|
||||
can happen when eventlet switches greenthreads or when running an
|
||||
exception handler, code raises and catches an exception. In both
|
||||
cases the exception context will be cleared.
|
||||
|
||||
To work around this, we save the exception state, yield a failure and
|
||||
then run other code.
|
||||
|
||||
For example::
|
||||
|
||||
except Exception:
|
||||
with capture_failure() as fail:
|
||||
LOG.warn("Activating cleanup")
|
||||
cleanup()
|
||||
save_failure(fail)
|
||||
"""
|
||||
exc_info = sys.exc_info()
|
||||
if not any(exc_info):
|
||||
raise RuntimeError("No active exception is being handled")
|
||||
else:
|
||||
yield Failure(exc_info=exc_info)
|
||||
|
||||
|
||||
class Failure(object):
|
||||
"""Object that represents failure.
|
||||
|
||||
Failure objects encapsulate exception information so that
|
||||
it can be re-used later to re-raise or inspect.
|
||||
"""
|
||||
DICT_VERSION = 1
|
||||
|
||||
def __init__(self, exc_info=None, **kwargs):
|
||||
if not kwargs:
|
||||
if exc_info is None:
|
||||
exc_info = sys.exc_info()
|
||||
self._exc_info = exc_info
|
||||
self._exc_type_names = list(
|
||||
reflection.get_all_class_names(exc_info[0], up_to=Exception))
|
||||
if not self._exc_type_names:
|
||||
raise TypeError('Invalid exception type: %r' % exc_info[0])
|
||||
self._exception_str = exc.exception_message(self._exc_info[1])
|
||||
self._traceback_str = ''.join(
|
||||
traceback.format_tb(self._exc_info[2]))
|
||||
else:
|
||||
self._exc_info = exc_info # may be None
|
||||
self._exception_str = kwargs.pop('exception_str')
|
||||
self._exc_type_names = kwargs.pop('exc_type_names', [])
|
||||
self._traceback_str = kwargs.pop('traceback_str', None)
|
||||
if kwargs:
|
||||
raise TypeError(
|
||||
'Failure.__init__ got unexpected keyword argument(s): %s'
|
||||
% ', '.join(six.iterkeys(kwargs)))
|
||||
|
||||
@classmethod
|
||||
def from_exception(cls, exception):
|
||||
return cls((type(exception), exception, None))
|
||||
|
||||
def _matches(self, other):
|
||||
if self is other:
|
||||
return True
|
||||
return (self._exc_type_names == other._exc_type_names
|
||||
and self.exception_str == other.exception_str
|
||||
and self.traceback_str == other.traceback_str)
|
||||
|
||||
def matches(self, other):
|
||||
if not isinstance(other, Failure):
|
||||
return False
|
||||
if self.exc_info is None or other.exc_info is None:
|
||||
return self._matches(other)
|
||||
else:
|
||||
return self == other
|
||||
|
||||
def __eq__(self, other):
|
||||
if not isinstance(other, Failure):
|
||||
return NotImplemented
|
||||
return (self._matches(other) and
|
||||
misc.are_equal_exc_info_tuples(self.exc_info, other.exc_info))
|
||||
|
||||
def __ne__(self, other):
|
||||
return not (self == other)
|
||||
|
||||
# NOTE(imelnikov): obj.__hash__() should return same values for equal
|
||||
# objects, so we should redefine __hash__. Failure equality semantics
|
||||
# is a bit complicated, so for now we just mark Failure objects as
|
||||
# unhashable. See python docs on object.__hash__ for more info:
|
||||
# http://docs.python.org/2/reference/datamodel.html#object.__hash__
|
||||
__hash__ = None
|
||||
|
||||
@property
|
||||
def exception(self):
|
||||
"""Exception value, or None if exception value is not present.
|
||||
|
||||
Exception value may be lost during serialization.
|
||||
"""
|
||||
if self._exc_info:
|
||||
return self._exc_info[1]
|
||||
else:
|
||||
return None
|
||||
|
||||
@property
|
||||
def exception_str(self):
|
||||
"""String representation of exception."""
|
||||
return self._exception_str
|
||||
|
||||
@property
|
||||
def exc_info(self):
|
||||
"""Exception info tuple or None."""
|
||||
return self._exc_info
|
||||
|
||||
@property
|
||||
def traceback_str(self):
|
||||
"""Exception traceback as string."""
|
||||
return self._traceback_str
|
||||
|
||||
@staticmethod
|
||||
def reraise_if_any(failures):
|
||||
"""Re-raise exceptions if argument is not empty.
|
||||
|
||||
If argument is empty list, this method returns None. If
|
||||
argument is list with single Failure object in it,
|
||||
this failure is reraised. Else, WrappedFailure exception
|
||||
is raised with failures list as causes.
|
||||
"""
|
||||
failures = list(failures)
|
||||
if len(failures) == 1:
|
||||
failures[0].reraise()
|
||||
elif len(failures) > 1:
|
||||
raise exc.WrappedFailure(failures)
|
||||
|
||||
def reraise(self):
|
||||
"""Re-raise captured exception."""
|
||||
if self._exc_info:
|
||||
six.reraise(*self._exc_info)
|
||||
else:
|
||||
raise exc.WrappedFailure([self])
|
||||
|
||||
def check(self, *exc_classes):
|
||||
"""Check if any of exc_classes caused the failure.
|
||||
|
||||
Arguments of this method can be exception types or type
|
||||
names (stings). If captured exception is instance of
|
||||
exception of given type, the corresponding argument is
|
||||
returned. Else, None is returned.
|
||||
"""
|
||||
for cls in exc_classes:
|
||||
if isinstance(cls, type):
|
||||
err = reflection.get_class_name(cls)
|
||||
else:
|
||||
err = cls
|
||||
if err in self._exc_type_names:
|
||||
return cls
|
||||
return None
|
||||
|
||||
def __str__(self):
|
||||
return 'Failure: %s: %s' % (self._exc_type_names[0],
|
||||
self._exception_str)
|
||||
|
||||
def __iter__(self):
|
||||
"""Iterate over exception type names."""
|
||||
for et in self._exc_type_names:
|
||||
yield et
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data):
|
||||
data = dict(data)
|
||||
version = data.pop('version', None)
|
||||
if version != cls.DICT_VERSION:
|
||||
raise ValueError('Invalid dict version of failure object: %r'
|
||||
% version)
|
||||
return cls(**data)
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
'exception_str': self.exception_str,
|
||||
'traceback_str': self.traceback_str,
|
||||
'exc_type_names': list(self),
|
||||
'version': self.DICT_VERSION,
|
||||
}
|
||||
|
||||
def copy(self):
|
||||
return Failure(exc_info=misc.copy_exc_info(self.exc_info),
|
||||
exception_str=self.exception_str,
|
||||
traceback_str=self.traceback_str,
|
||||
exc_type_names=self._exc_type_names[:])
|
@@ -21,7 +21,6 @@ import logging
|
||||
|
||||
import six
|
||||
|
||||
from taskflow import failure
|
||||
from taskflow.openstack.common import excutils
|
||||
from taskflow import states
|
||||
from taskflow.utils import misc
|
||||
@@ -142,7 +141,7 @@ class LoggingBase(ListenerBase):
|
||||
result = details.get('result')
|
||||
exc_info = None
|
||||
was_failure = False
|
||||
if isinstance(result, failure.Failure):
|
||||
if isinstance(result, misc.Failure):
|
||||
if result.exc_info:
|
||||
exc_info = tuple(result.exc_info)
|
||||
was_failure = True
|
||||
|
@@ -31,7 +31,6 @@ from sqlalchemy import orm as sa_orm
|
||||
from sqlalchemy import pool as sa_pool
|
||||
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow import failure
|
||||
from taskflow.persistence.backends import base
|
||||
from taskflow.persistence.backends.sqlalchemy import migration
|
||||
from taskflow.persistence.backends.sqlalchemy import models
|
||||
@@ -268,7 +267,7 @@ class Connection(base.Connection):
|
||||
pass
|
||||
except sa_exc.OperationalError as ex:
|
||||
if _is_db_connection_error(six.text_type(ex.args[0])):
|
||||
failures.append(failure.Failure())
|
||||
failures.append(misc.Failure())
|
||||
return False
|
||||
return True
|
||||
|
||||
@@ -514,7 +513,7 @@ def _convert_ad_to_external(ad):
|
||||
# to change the internal sqlalchemy model easily by forcing a defined
|
||||
# interface (that isn't the sqlalchemy model itself).
|
||||
atom_cls = logbook.atom_detail_class(ad.atom_type)
|
||||
result = atom_cls.from_dict({
|
||||
return atom_cls.from_dict({
|
||||
'state': ad.state,
|
||||
'intention': ad.intention,
|
||||
'results': ad.results,
|
||||
@@ -524,7 +523,6 @@ def _convert_ad_to_external(ad):
|
||||
'name': ad.name,
|
||||
'uuid': ad.uuid,
|
||||
})
|
||||
return result
|
||||
|
||||
|
||||
def _convert_lb_to_external(lb_m):
|
||||
|
@@ -21,10 +21,10 @@ import logging
|
||||
|
||||
import six
|
||||
|
||||
from taskflow import failure
|
||||
from taskflow.openstack.common import timeutils
|
||||
from taskflow.openstack.common import uuidutils
|
||||
from taskflow import states
|
||||
from taskflow.utils import misc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@@ -311,11 +311,11 @@ class AtomDetail(object):
|
||||
|
||||
def _to_dict_shared(self):
|
||||
if self.failure:
|
||||
fail = self.failure.to_dict()
|
||||
failure = self.failure.to_dict()
|
||||
else:
|
||||
fail = None
|
||||
failure = None
|
||||
return {
|
||||
'failure': fail,
|
||||
'failure': failure,
|
||||
'meta': self.meta,
|
||||
'name': self.name,
|
||||
'results': self.results,
|
||||
@@ -331,9 +331,9 @@ class AtomDetail(object):
|
||||
self.results = data.get('results')
|
||||
self.version = data.get('version')
|
||||
self.meta = _fix_meta(data)
|
||||
fail = data.get('failure')
|
||||
if fail:
|
||||
self.failure = failure.Failure.from_dict(fail)
|
||||
failure = data.get('failure')
|
||||
if failure:
|
||||
self.failure = misc.Failure.from_dict(failure)
|
||||
|
||||
@property
|
||||
def uuid(self):
|
||||
@@ -405,7 +405,7 @@ class RetryDetail(AtomDetail):
|
||||
for (data, failures) in results:
|
||||
new_failures = {}
|
||||
for (key, failure_data) in six.iteritems(failures):
|
||||
new_failures[key] = failure.Failure.from_dict(failure_data)
|
||||
new_failures[key] = misc.Failure.from_dict(failure_data)
|
||||
new_results.append((data, new_failures))
|
||||
return new_results
|
||||
|
||||
@@ -423,8 +423,8 @@ class RetryDetail(AtomDetail):
|
||||
new_results = []
|
||||
for (data, failures) in results:
|
||||
new_failures = {}
|
||||
for (key, fail) in six.iteritems(failures):
|
||||
new_failures[key] = fail.to_dict()
|
||||
for (key, failure) in six.iteritems(failures):
|
||||
new_failures[key] = failure.to_dict()
|
||||
new_results.append((data, new_failures))
|
||||
return new_results
|
||||
|
||||
@@ -443,11 +443,11 @@ class RetryDetail(AtomDetail):
|
||||
# contain tracebacks, which are not copyable.
|
||||
for (data, failures) in other.results:
|
||||
copied_failures = {}
|
||||
for (key, fail) in six.iteritems(failures):
|
||||
for (key, failure) in six.iteritems(failures):
|
||||
if deep_copy:
|
||||
copied_failures[key] = fail.copy()
|
||||
copied_failures[key] = failure.copy()
|
||||
else:
|
||||
copied_failures[key] = fail
|
||||
copied_failures[key] = failure
|
||||
results.append((data, copied_failures))
|
||||
self.results = results
|
||||
return self
|
||||
|
@@ -21,7 +21,6 @@ import logging
|
||||
import six
|
||||
|
||||
from taskflow import exceptions
|
||||
from taskflow import failure
|
||||
from taskflow.openstack.common import uuidutils
|
||||
from taskflow.persistence import logbook
|
||||
from taskflow import states
|
||||
@@ -54,7 +53,7 @@ class Storage(object):
|
||||
self._lock = self._lock_cls()
|
||||
|
||||
# NOTE(imelnikov): failure serialization looses information,
|
||||
# so we cache failures here, in task name -> failure.Failure mapping.
|
||||
# so we cache failures here, in task name -> misc.Failure mapping.
|
||||
self._failures = {}
|
||||
for ad in self._flowdetail:
|
||||
if ad.failure is not None:
|
||||
@@ -326,7 +325,7 @@ class Storage(object):
|
||||
with self._lock.write_lock():
|
||||
ad = self._atomdetail_by_name(atom_name)
|
||||
ad.state = state
|
||||
if state == states.FAILURE and isinstance(data, failure.Failure):
|
||||
if state == states.FAILURE and isinstance(data, misc.Failure):
|
||||
# FIXME(harlowja): this seems like it should be internal logic
|
||||
# in the atom detail object and not in here. Fix that soon...
|
||||
#
|
||||
|
@@ -24,7 +24,7 @@ import fixtures
|
||||
import six
|
||||
|
||||
from taskflow import exceptions
|
||||
from taskflow import failure
|
||||
from taskflow.tests import utils
|
||||
from taskflow.utils import misc
|
||||
|
||||
|
||||
@@ -49,13 +49,13 @@ class FailureRegexpMatcher(object):
|
||||
self.exc_class = exc_class
|
||||
self.pattern = pattern
|
||||
|
||||
def match(self, fail):
|
||||
for cause in fail:
|
||||
def match(self, failure):
|
||||
for cause in failure:
|
||||
if cause.check(self.exc_class) is not None:
|
||||
return matchers.MatchesRegex(
|
||||
self.pattern).match(cause.exception_str)
|
||||
return matchers.Mismatch("The `%s` wasn't caused by the `%s`" %
|
||||
(fail, self.exc_class))
|
||||
(failure, self.exc_class))
|
||||
|
||||
|
||||
class ItemsEqual(object):
|
||||
@@ -171,7 +171,7 @@ class TestCase(testcase.TestCase):
|
||||
string matches to the given pattern.
|
||||
"""
|
||||
try:
|
||||
with failure.wrap_all_failures():
|
||||
with utils.wrap_all_failures():
|
||||
callable_obj(*args, **kwargs)
|
||||
except exceptions.WrappedFailure as e:
|
||||
self.assertThat(e, FailureRegexpMatcher(exc_class, pattern))
|
||||
|
@@ -17,10 +17,10 @@
|
||||
import contextlib
|
||||
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow import failure
|
||||
from taskflow.openstack.common import uuidutils
|
||||
from taskflow.persistence import logbook
|
||||
from taskflow import states
|
||||
from taskflow.utils import misc
|
||||
|
||||
|
||||
class PersistenceTestMixin(object):
|
||||
@@ -147,7 +147,7 @@ class PersistenceTestMixin(object):
|
||||
try:
|
||||
raise RuntimeError('Woot!')
|
||||
except Exception:
|
||||
td.failure = failure.Failure()
|
||||
td.failure = misc.Failure()
|
||||
|
||||
fd.add(td)
|
||||
|
||||
@@ -161,10 +161,10 @@ class PersistenceTestMixin(object):
|
||||
lb2 = conn.get_logbook(lb_id)
|
||||
fd2 = lb2.find(fd.uuid)
|
||||
td2 = fd2.find(td.uuid)
|
||||
fail = td2.failure
|
||||
self.assertEqual(fail.exception_str, 'Woot!')
|
||||
self.assertIs(fail.check(RuntimeError), RuntimeError)
|
||||
self.assertEqual(fail.traceback_str, td.failure.traceback_str)
|
||||
failure = td2.failure
|
||||
self.assertEqual(failure.exception_str, 'Woot!')
|
||||
self.assertIs(failure.check(RuntimeError), RuntimeError)
|
||||
self.assertEqual(failure.traceback_str, td.failure.traceback_str)
|
||||
self.assertIsInstance(td2, logbook.TaskDetail)
|
||||
|
||||
def test_logbook_merge_flow_detail(self):
|
||||
@@ -269,7 +269,7 @@ class PersistenceTestMixin(object):
|
||||
fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid())
|
||||
lb.add(fd)
|
||||
rd = logbook.RetryDetail("retry-1", uuid=uuidutils.generate_uuid())
|
||||
fail = failure.Failure.from_exception(RuntimeError('fail'))
|
||||
fail = misc.Failure.from_exception(RuntimeError('fail'))
|
||||
rd.results.append((42, {'some-task': fail}))
|
||||
fd.add(rd)
|
||||
|
||||
@@ -286,7 +286,7 @@ class PersistenceTestMixin(object):
|
||||
rd2 = fd2.find(rd.uuid)
|
||||
self.assertIsInstance(rd2, logbook.RetryDetail)
|
||||
fail2 = rd2.results[0][1].get('some-task')
|
||||
self.assertIsInstance(fail2, failure.Failure)
|
||||
self.assertIsInstance(fail2, misc.Failure)
|
||||
self.assertTrue(fail.matches(fail2))
|
||||
|
||||
def test_retry_detail_save_intention(self):
|
||||
|
@@ -31,7 +31,6 @@ from taskflow.engines.action_engine import engine as eng
|
||||
from taskflow.engines.worker_based import engine as w_eng
|
||||
from taskflow.engines.worker_based import worker as wkr
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow import failure
|
||||
from taskflow.persistence import logbook
|
||||
from taskflow import states
|
||||
from taskflow import task
|
||||
@@ -39,6 +38,7 @@ from taskflow import test
|
||||
from taskflow.tests import utils
|
||||
|
||||
from taskflow.utils import eventlet_utils as eu
|
||||
from taskflow.utils import misc
|
||||
from taskflow.utils import persistence_utils as p_utils
|
||||
|
||||
|
||||
@@ -488,7 +488,7 @@ class EngineCheckingTaskTest(utils.EngineTestBase):
|
||||
self.assertEqual(result, 'RESULT')
|
||||
self.assertEqual(list(flow_failures.keys()), ['fail1'])
|
||||
fail = flow_failures['fail1']
|
||||
self.assertIsInstance(fail, failure.Failure)
|
||||
self.assertIsInstance(fail, misc.Failure)
|
||||
self.assertEqual(str(fail), 'Failure: RuntimeError: Woot!')
|
||||
|
||||
flow = lf.Flow('test').add(
|
||||
|
@@ -21,11 +21,11 @@ from taskflow.patterns import unordered_flow as uf
|
||||
import taskflow.engines
|
||||
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow import failure
|
||||
from taskflow import retry
|
||||
from taskflow import states as st
|
||||
from taskflow import test
|
||||
from taskflow.tests import utils
|
||||
from taskflow.utils import misc
|
||||
|
||||
|
||||
class RetryTest(utils.EngineTestBase):
|
||||
@@ -558,7 +558,7 @@ class RetryTest(utils.EngineTestBase):
|
||||
# we execute retry
|
||||
engine.storage.save('flow-1_retry', 1)
|
||||
# task fails
|
||||
fail = failure.Failure.from_exception(RuntimeError('foo')),
|
||||
fail = misc.Failure.from_exception(RuntimeError('foo')),
|
||||
engine.storage.save('task1', fail, state=st.FAILURE)
|
||||
if when == 'task fails':
|
||||
return engine
|
||||
@@ -634,7 +634,7 @@ class RetryTest(utils.EngineTestBase):
|
||||
self._make_engine(flow).run)
|
||||
self.assertEqual(len(r.history), 1)
|
||||
self.assertEqual(r.history[0][1], {})
|
||||
self.assertEqual(isinstance(r.history[0][0], failure.Failure), True)
|
||||
self.assertEqual(isinstance(r.history[0][0], misc.Failure), True)
|
||||
|
||||
def test_retry_revert_fails(self):
|
||||
|
||||
@@ -690,7 +690,7 @@ class RetryTest(utils.EngineTestBase):
|
||||
engine.storage.save('test2_retry', 1)
|
||||
engine.storage.save('b', 11)
|
||||
# pretend that 'c' failed
|
||||
fail = failure.Failure.from_exception(RuntimeError('Woot!'))
|
||||
fail = misc.Failure.from_exception(RuntimeError('Woot!'))
|
||||
engine.storage.save('c', fail, st.FAILURE)
|
||||
|
||||
engine.run()
|
||||
|
@@ -20,13 +20,13 @@ import threading
|
||||
import mock
|
||||
|
||||
from taskflow import exceptions
|
||||
from taskflow import failure
|
||||
from taskflow.openstack.common import uuidutils
|
||||
from taskflow.persistence import backends
|
||||
from taskflow.persistence import logbook
|
||||
from taskflow import states
|
||||
from taskflow import storage
|
||||
from taskflow import test
|
||||
from taskflow.utils import misc
|
||||
from taskflow.utils import persistence_utils as p_utils
|
||||
|
||||
|
||||
@@ -127,46 +127,46 @@ class StorageTestMixin(object):
|
||||
self.assertEqual(s.get_atom_state('my task'), states.FAILURE)
|
||||
|
||||
def test_save_and_get_cached_failure(self):
|
||||
fail = failure.Failure.from_exception(RuntimeError('Woot!'))
|
||||
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task')
|
||||
s.save('my task', fail, states.FAILURE)
|
||||
self.assertEqual(s.get('my task'), fail)
|
||||
s.save('my task', failure, states.FAILURE)
|
||||
self.assertEqual(s.get('my task'), failure)
|
||||
self.assertEqual(s.get_atom_state('my task'), states.FAILURE)
|
||||
self.assertTrue(s.has_failures())
|
||||
self.assertEqual(s.get_failures(), {'my task': fail})
|
||||
self.assertEqual(s.get_failures(), {'my task': failure})
|
||||
|
||||
def test_save_and_get_non_cached_failure(self):
|
||||
fail = failure.Failure.from_exception(RuntimeError('Woot!'))
|
||||
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task')
|
||||
s.save('my task', fail, states.FAILURE)
|
||||
self.assertEqual(s.get('my task'), fail)
|
||||
s.save('my task', failure, states.FAILURE)
|
||||
self.assertEqual(s.get('my task'), failure)
|
||||
s._failures['my task'] = None
|
||||
self.assertTrue(fail.matches(s.get('my task')))
|
||||
self.assertTrue(failure.matches(s.get('my task')))
|
||||
|
||||
def test_get_failure_from_reverted_task(self):
|
||||
fail = failure.Failure.from_exception(RuntimeError('Woot!'))
|
||||
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
|
||||
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task')
|
||||
s.save('my task', fail, states.FAILURE)
|
||||
s.save('my task', failure, states.FAILURE)
|
||||
|
||||
s.set_atom_state('my task', states.REVERTING)
|
||||
self.assertEqual(s.get('my task'), fail)
|
||||
self.assertEqual(s.get('my task'), failure)
|
||||
|
||||
s.set_atom_state('my task', states.REVERTED)
|
||||
self.assertEqual(s.get('my task'), fail)
|
||||
self.assertEqual(s.get('my task'), failure)
|
||||
|
||||
def test_get_failure_after_reload(self):
|
||||
fail = failure.Failure.from_exception(RuntimeError('Woot!'))
|
||||
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
|
||||
s = self._get_storage()
|
||||
s.ensure_task('my task')
|
||||
s.save('my task', fail, states.FAILURE)
|
||||
s.save('my task', failure, states.FAILURE)
|
||||
s2 = self._get_storage(s._flowdetail)
|
||||
self.assertTrue(s2.has_failures())
|
||||
self.assertEqual(1, len(s2.get_failures()))
|
||||
self.assertTrue(fail.matches(s2.get('my task')))
|
||||
self.assertTrue(failure.matches(s2.get('my task')))
|
||||
self.assertEqual(s2.get_atom_state('my task'), states.FAILURE)
|
||||
|
||||
def test_get_non_existing_var(self):
|
||||
@@ -483,15 +483,15 @@ class StorageTestMixin(object):
|
||||
self.assertEqual(s.fetch_all(), {})
|
||||
|
||||
def test_cached_retry_failure(self):
|
||||
fail = failure.Failure.from_exception(RuntimeError('Woot!'))
|
||||
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
|
||||
s = self._get_storage()
|
||||
s.ensure_retry('my retry', result_mapping={'x': 0})
|
||||
s.save('my retry', 'a')
|
||||
s.save('my retry', fail, states.FAILURE)
|
||||
s.save('my retry', failure, states.FAILURE)
|
||||
history = s.get_retry_history('my retry')
|
||||
self.assertEqual(history, [('a', {}), (fail, {})])
|
||||
self.assertEqual(history, [('a', {}), (failure, {})])
|
||||
self.assertIs(s.has_failures(), True)
|
||||
self.assertEqual(s.get_failures(), {'my retry': fail})
|
||||
self.assertEqual(s.get_failures(), {'my retry': failure})
|
||||
|
||||
def test_logbook_get_unknown_atom_type(self):
|
||||
self.assertRaisesRegexp(TypeError,
|
||||
|
@@ -19,7 +19,6 @@ import functools
|
||||
import sys
|
||||
import time
|
||||
|
||||
from taskflow import failure
|
||||
from taskflow import states
|
||||
from taskflow import test
|
||||
from taskflow.tests import utils as test_utils
|
||||
@@ -285,8 +284,8 @@ class GetClassNameTest(test.TestCase):
|
||||
self.assertEqual(name, 'RuntimeError')
|
||||
|
||||
def test_global_class(self):
|
||||
name = reflection.get_class_name(failure.Failure)
|
||||
self.assertEqual(name, 'taskflow.failure.Failure')
|
||||
name = reflection.get_class_name(misc.Failure)
|
||||
self.assertEqual(name, 'taskflow.utils.misc.Failure')
|
||||
|
||||
def test_class(self):
|
||||
name = reflection.get_class_name(Class)
|
||||
|
@@ -20,14 +20,14 @@ from taskflow import exceptions
|
||||
from taskflow import test
|
||||
from taskflow.tests import utils as test_utils
|
||||
|
||||
from taskflow import failure
|
||||
from taskflow.utils import misc
|
||||
|
||||
|
||||
def _captured_failure(msg):
|
||||
try:
|
||||
raise RuntimeError(msg)
|
||||
except Exception:
|
||||
return failure.Failure()
|
||||
return misc.Failure()
|
||||
|
||||
|
||||
class GeneralFailureObjTestsMixin(object):
|
||||
@@ -82,7 +82,7 @@ class ReCreatedFailureTestCase(test.TestCase, GeneralFailureObjTestsMixin):
|
||||
def setUp(self):
|
||||
super(ReCreatedFailureTestCase, self).setUp()
|
||||
fail_obj = _captured_failure('Woot!')
|
||||
self.fail_obj = failure.Failure(exception_str=fail_obj.exception_str,
|
||||
self.fail_obj = misc.Failure(exception_str=fail_obj.exception_str,
|
||||
traceback_str=fail_obj.traceback_str,
|
||||
exc_type_names=list(fail_obj))
|
||||
|
||||
@@ -102,7 +102,7 @@ class FromExceptionTestCase(test.TestCase, GeneralFailureObjTestsMixin):
|
||||
|
||||
def setUp(self):
|
||||
super(FromExceptionTestCase, self).setUp()
|
||||
self.fail_obj = failure.Failure.from_exception(RuntimeError('Woot!'))
|
||||
self.fail_obj = misc.Failure.from_exception(RuntimeError('Woot!'))
|
||||
|
||||
|
||||
class FailureObjectTestCase(test.TestCase):
|
||||
@@ -111,10 +111,10 @@ class FailureObjectTestCase(test.TestCase):
|
||||
try:
|
||||
raise SystemExit()
|
||||
except BaseException:
|
||||
self.assertRaises(TypeError, failure.Failure)
|
||||
self.assertRaises(TypeError, misc.Failure)
|
||||
|
||||
def test_unknown_argument(self):
|
||||
exc = self.assertRaises(TypeError, failure.Failure,
|
||||
exc = self.assertRaises(TypeError, misc.Failure,
|
||||
exception_str='Woot!',
|
||||
traceback_str=None,
|
||||
exc_type_names=['Exception'],
|
||||
@@ -123,12 +123,12 @@ class FailureObjectTestCase(test.TestCase):
|
||||
self.assertEqual(str(exc), expected)
|
||||
|
||||
def test_empty_does_not_reraise(self):
|
||||
self.assertIs(failure.Failure.reraise_if_any([]), None)
|
||||
self.assertIs(misc.Failure.reraise_if_any([]), None)
|
||||
|
||||
def test_reraises_one(self):
|
||||
fls = [_captured_failure('Woot!')]
|
||||
self.assertRaisesRegexp(RuntimeError, '^Woot!$',
|
||||
failure.Failure.reraise_if_any, fls)
|
||||
misc.Failure.reraise_if_any, fls)
|
||||
|
||||
def test_reraises_several(self):
|
||||
fls = [
|
||||
@@ -136,7 +136,7 @@ class FailureObjectTestCase(test.TestCase):
|
||||
_captured_failure('Oh, not again!')
|
||||
]
|
||||
exc = self.assertRaises(exceptions.WrappedFailure,
|
||||
failure.Failure.reraise_if_any, fls)
|
||||
misc.Failure.reraise_if_any, fls)
|
||||
self.assertEqual(list(exc), fls)
|
||||
|
||||
def test_failure_copy(self):
|
||||
@@ -149,7 +149,7 @@ class FailureObjectTestCase(test.TestCase):
|
||||
|
||||
def test_failure_copy_recaptured(self):
|
||||
captured = _captured_failure('Woot!')
|
||||
fail_obj = failure.Failure(exception_str=captured.exception_str,
|
||||
fail_obj = misc.Failure(exception_str=captured.exception_str,
|
||||
traceback_str=captured.traceback_str,
|
||||
exc_type_names=list(captured))
|
||||
copied = fail_obj.copy()
|
||||
@@ -160,7 +160,7 @@ class FailureObjectTestCase(test.TestCase):
|
||||
|
||||
def test_recaptured_not_eq(self):
|
||||
captured = _captured_failure('Woot!')
|
||||
fail_obj = failure.Failure(exception_str=captured.exception_str,
|
||||
fail_obj = misc.Failure(exception_str=captured.exception_str,
|
||||
traceback_str=captured.traceback_str,
|
||||
exc_type_names=list(captured))
|
||||
self.assertFalse(fail_obj == captured)
|
||||
@@ -174,11 +174,11 @@ class FailureObjectTestCase(test.TestCase):
|
||||
|
||||
def test_two_recaptured_neq(self):
|
||||
captured = _captured_failure('Woot!')
|
||||
fail_obj = failure.Failure(exception_str=captured.exception_str,
|
||||
fail_obj = misc.Failure(exception_str=captured.exception_str,
|
||||
traceback_str=captured.traceback_str,
|
||||
exc_type_names=list(captured))
|
||||
new_exc_str = captured.exception_str.replace('Woot', 'w00t')
|
||||
fail_obj2 = failure.Failure(exception_str=new_exc_str,
|
||||
fail_obj2 = misc.Failure(exception_str=new_exc_str,
|
||||
traceback_str=captured.traceback_str,
|
||||
exc_type_names=list(captured))
|
||||
self.assertNotEqual(fail_obj, fail_obj2)
|
||||
@@ -220,7 +220,7 @@ class WrappedFailureTestCase(test.TestCase):
|
||||
try:
|
||||
raise exceptions.WrappedFailure([f1, f2])
|
||||
except Exception:
|
||||
fail_obj = failure.Failure()
|
||||
fail_obj = misc.Failure()
|
||||
|
||||
wf = exceptions.WrappedFailure([fail_obj, f3])
|
||||
self.assertEqual(list(wf), [f1, f2, f3])
|
||||
@@ -230,13 +230,13 @@ class NonAsciiExceptionsTestCase(test.TestCase):
|
||||
|
||||
def test_exception_with_non_ascii_str(self):
|
||||
bad_string = chr(200)
|
||||
fail = failure.Failure.from_exception(ValueError(bad_string))
|
||||
fail = misc.Failure.from_exception(ValueError(bad_string))
|
||||
self.assertEqual(fail.exception_str, bad_string)
|
||||
self.assertEqual(str(fail), 'Failure: ValueError: %s' % bad_string)
|
||||
|
||||
def test_exception_non_ascii_unicode(self):
|
||||
hi_ru = u'привет'
|
||||
fail = failure.Failure.from_exception(ValueError(hi_ru))
|
||||
fail = misc.Failure.from_exception(ValueError(hi_ru))
|
||||
self.assertEqual(fail.exception_str, hi_ru)
|
||||
self.assertIsInstance(fail.exception_str, six.text_type)
|
||||
self.assertEqual(six.text_type(fail),
|
||||
@@ -246,7 +246,7 @@ class NonAsciiExceptionsTestCase(test.TestCase):
|
||||
hi_cn = u'嗨'
|
||||
fail = ValueError(hi_cn)
|
||||
self.assertEqual(hi_cn, exceptions.exception_message(fail))
|
||||
fail = failure.Failure.from_exception(fail)
|
||||
fail = misc.Failure.from_exception(fail)
|
||||
wrapped_fail = exceptions.WrappedFailure([fail])
|
||||
if six.PY2:
|
||||
# Python 2.x will unicode escape it, while python 3.3+ will not,
|
||||
@@ -261,12 +261,12 @@ class NonAsciiExceptionsTestCase(test.TestCase):
|
||||
|
||||
def test_failure_equality_with_non_ascii_str(self):
|
||||
bad_string = chr(200)
|
||||
fail = failure.Failure.from_exception(ValueError(bad_string))
|
||||
fail = misc.Failure.from_exception(ValueError(bad_string))
|
||||
copied = fail.copy()
|
||||
self.assertEqual(fail, copied)
|
||||
|
||||
def test_failure_equality_non_ascii_unicode(self):
|
||||
hi_ru = u'привет'
|
||||
fail = failure.Failure.from_exception(ValueError(hi_ru))
|
||||
fail = misc.Failure.from_exception(ValueError(hi_ru))
|
||||
copied = fail.copy()
|
||||
self.assertEqual(fail, copied)
|
||||
|
@@ -23,9 +23,9 @@ from kombu import exceptions as kombu_exc
|
||||
|
||||
from taskflow.engines.worker_based import executor
|
||||
from taskflow.engines.worker_based import protocol as pr
|
||||
from taskflow import failure
|
||||
from taskflow import test
|
||||
from taskflow.tests import utils
|
||||
from taskflow.utils import misc
|
||||
|
||||
|
||||
class TestWorkerTaskExecutor(test.MockTestCase):
|
||||
@@ -111,8 +111,8 @@ class TestWorkerTaskExecutor(test.MockTestCase):
|
||||
self.assertEqual(self.message_mock.mock_calls, [mock.call.ack()])
|
||||
|
||||
def test_on_message_response_state_failure(self):
|
||||
fail = failure.Failure.from_exception(Exception('test'))
|
||||
failure_dict = fail.to_dict()
|
||||
failure = misc.Failure.from_exception(Exception('test'))
|
||||
failure_dict = failure.to_dict()
|
||||
response = pr.Response(pr.FAILURE, result=failure_dict)
|
||||
ex = self.executor()
|
||||
ex._requests_cache.set(self.task_uuid, self.request_inst_mock)
|
||||
@@ -120,7 +120,7 @@ class TestWorkerTaskExecutor(test.MockTestCase):
|
||||
|
||||
self.assertEqual(len(ex._requests_cache._data), 0)
|
||||
self.assertEqual(self.request_inst_mock.mock_calls, [
|
||||
mock.call.set_result(result=utils.FailureMatcher(fail))
|
||||
mock.call.set_result(result=utils.FailureMatcher(failure))
|
||||
])
|
||||
self.assertEqual(self.message_mock.mock_calls, [mock.call.ack()])
|
||||
|
||||
|
@@ -19,9 +19,9 @@ import mock
|
||||
from concurrent import futures
|
||||
|
||||
from taskflow.engines.worker_based import protocol as pr
|
||||
from taskflow import failure
|
||||
from taskflow import test
|
||||
from taskflow.tests import utils
|
||||
from taskflow.utils import misc
|
||||
|
||||
|
||||
class TestProtocol(test.TestCase):
|
||||
@@ -81,15 +81,15 @@ class TestProtocol(test.TestCase):
|
||||
self.request_to_dict(result=('success', None)))
|
||||
|
||||
def test_to_dict_with_result_failure(self):
|
||||
fail = failure.Failure.from_exception(RuntimeError('Woot!'))
|
||||
expected = self.request_to_dict(result=('failure', fail.to_dict()))
|
||||
self.assertEqual(self.request(result=fail).to_dict(), expected)
|
||||
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
|
||||
expected = self.request_to_dict(result=('failure', failure.to_dict()))
|
||||
self.assertEqual(self.request(result=failure).to_dict(), expected)
|
||||
|
||||
def test_to_dict_with_failures(self):
|
||||
fail = failure.Failure.from_exception(RuntimeError('Woot!'))
|
||||
request = self.request(failures={self.task.name: fail})
|
||||
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
|
||||
request = self.request(failures={self.task.name: failure})
|
||||
expected = self.request_to_dict(
|
||||
failures={self.task.name: fail.to_dict()})
|
||||
failures={self.task.name: failure.to_dict()})
|
||||
self.assertEqual(request.to_dict(), expected)
|
||||
|
||||
@mock.patch('taskflow.engines.worker_based.protocol.misc.wallclock')
|
||||
|
@@ -23,9 +23,9 @@ from kombu import exceptions as exc
|
||||
from taskflow.engines.worker_based import endpoint as ep
|
||||
from taskflow.engines.worker_based import protocol as pr
|
||||
from taskflow.engines.worker_based import server
|
||||
from taskflow import failure
|
||||
from taskflow import test
|
||||
from taskflow.tests import utils
|
||||
from taskflow.utils import misc
|
||||
|
||||
|
||||
class TestServer(test.MockTestCase):
|
||||
@@ -185,19 +185,19 @@ class TestServer(test.MockTestCase):
|
||||
result=1)))
|
||||
|
||||
def test_parse_request_with_failure_result(self):
|
||||
fail = failure.Failure.from_exception(Exception('test'))
|
||||
request = self.make_request(action='revert', result=fail)
|
||||
failure = misc.Failure.from_exception(Exception('test'))
|
||||
request = self.make_request(action='revert', result=failure)
|
||||
task_cls, action, task_args = server.Server._parse_request(**request)
|
||||
|
||||
self.assertEqual((task_cls, action, task_args),
|
||||
(self.task.name, 'revert',
|
||||
dict(task_name=self.task.name,
|
||||
arguments=self.task_args,
|
||||
result=utils.FailureMatcher(fail))))
|
||||
result=utils.FailureMatcher(failure))))
|
||||
|
||||
def test_parse_request_with_failures(self):
|
||||
failures = {'0': failure.Failure.from_exception(Exception('test1')),
|
||||
'1': failure.Failure.from_exception(Exception('test2'))}
|
||||
failures = {'0': misc.Failure.from_exception(Exception('test1')),
|
||||
'1': misc.Failure.from_exception(Exception('test2'))}
|
||||
request = self.make_request(action='revert', failures=failures)
|
||||
task_cls, action, task_args = server.Server._parse_request(**request)
|
||||
|
||||
@@ -274,16 +274,16 @@ class TestServer(test.MockTestCase):
|
||||
self.assertEqual(self.master_mock.mock_calls, [])
|
||||
self.assertTrue(mocked_exception.called)
|
||||
|
||||
@mock.patch.object(failure.Failure, 'from_dict')
|
||||
@mock.patch.object(failure.Failure, 'to_dict')
|
||||
@mock.patch.object(misc.Failure, 'from_dict')
|
||||
@mock.patch.object(misc.Failure, 'to_dict')
|
||||
def test_process_request_parse_request_failure(self, to_mock, from_mock):
|
||||
failure_dict = {
|
||||
'failure': 'failure',
|
||||
}
|
||||
fail = failure.Failure.from_exception(RuntimeError('Woot!'))
|
||||
failure = misc.Failure.from_exception(RuntimeError('Woot!'))
|
||||
to_mock.return_value = failure_dict
|
||||
from_mock.side_effect = ValueError('Woot!')
|
||||
request = self.make_request(result=fail)
|
||||
request = self.make_request(result=failure)
|
||||
|
||||
# create server and process request
|
||||
s = self.server(reset_master_mock=True)
|
||||
@@ -298,7 +298,7 @@ class TestServer(test.MockTestCase):
|
||||
]
|
||||
self.assertEqual(master_mock_calls, self.master_mock.mock_calls)
|
||||
|
||||
@mock.patch.object(failure.Failure, 'to_dict')
|
||||
@mock.patch.object(misc.Failure, 'to_dict')
|
||||
def test_process_request_endpoint_not_found(self, to_mock):
|
||||
failure_dict = {
|
||||
'failure': 'failure',
|
||||
@@ -319,7 +319,7 @@ class TestServer(test.MockTestCase):
|
||||
]
|
||||
self.assertEqual(self.master_mock.mock_calls, master_mock_calls)
|
||||
|
||||
@mock.patch.object(failure.Failure, 'to_dict')
|
||||
@mock.patch.object(misc.Failure, 'to_dict')
|
||||
def test_process_request_execution_failure(self, to_mock):
|
||||
failure_dict = {
|
||||
'failure': 'failure',
|
||||
@@ -344,7 +344,7 @@ class TestServer(test.MockTestCase):
|
||||
]
|
||||
self.assertEqual(self.master_mock.mock_calls, master_mock_calls)
|
||||
|
||||
@mock.patch.object(failure.Failure, 'to_dict')
|
||||
@mock.patch.object(misc.Failure, 'to_dict')
|
||||
def test_process_request_task_failure(self, to_mock):
|
||||
failure_dict = {
|
||||
'failure': 'failure',
|
||||
|
@@ -19,15 +19,31 @@ import threading
|
||||
|
||||
import six
|
||||
|
||||
from taskflow import exceptions
|
||||
from taskflow.persistence.backends import impl_memory
|
||||
from taskflow import retry
|
||||
from taskflow import task
|
||||
from taskflow.utils import misc
|
||||
|
||||
ARGS_KEY = '__args__'
|
||||
KWARGS_KEY = '__kwargs__'
|
||||
ORDER_KEY = '__order__'
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def wrap_all_failures():
|
||||
"""Convert any exceptions to WrappedFailure.
|
||||
|
||||
When you expect several failures, it may be convenient
|
||||
to wrap any exception with WrappedFailure in order to
|
||||
unify error handling.
|
||||
"""
|
||||
try:
|
||||
yield
|
||||
except Exception:
|
||||
raise exceptions.WrappedFailure([misc.Failure()])
|
||||
|
||||
|
||||
class DummyTask(task.Task):
|
||||
|
||||
def execute(self, context, *args, **kwargs):
|
||||
|
@@ -1,6 +1,6 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (C) 2012-2014 Yahoo! Inc. All Rights Reserved.
|
||||
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
|
||||
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
@@ -16,6 +16,7 @@
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import contextlib
|
||||
import copy
|
||||
import errno
|
||||
import functools
|
||||
@@ -23,6 +24,7 @@ import keyword
|
||||
import logging
|
||||
import os
|
||||
import string
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
@@ -511,3 +513,195 @@ def are_equal_exc_info_tuples(ei1, ei2):
|
||||
tb1 = traceback.format_tb(ei1[2])
|
||||
tb2 = traceback.format_tb(ei2[2])
|
||||
return tb1 == tb2
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def capture_failure():
|
||||
"""Save current exception, and yield back the failure (or raises a
|
||||
runtime error if no active exception is being handled).
|
||||
|
||||
In some cases the exception context can be cleared, resulting in None
|
||||
being attempted to be saved after an exception handler is run. This
|
||||
can happen when eventlet switches greenthreads or when running an
|
||||
exception handler, code raises and catches an exception. In both
|
||||
cases the exception context will be cleared.
|
||||
|
||||
To work around this, we save the exception state, yield a failure and
|
||||
then run other code.
|
||||
|
||||
For example::
|
||||
|
||||
except Exception:
|
||||
with capture_failure() as fail:
|
||||
LOG.warn("Activating cleanup")
|
||||
cleanup()
|
||||
save_failure(fail)
|
||||
"""
|
||||
exc_info = sys.exc_info()
|
||||
if not any(exc_info):
|
||||
raise RuntimeError("No active exception is being handled")
|
||||
else:
|
||||
yield Failure(exc_info=exc_info)
|
||||
|
||||
|
||||
class Failure(object):
|
||||
"""Object that represents failure.
|
||||
|
||||
Failure objects encapsulate exception information so that
|
||||
it can be re-used later to re-raise or inspect.
|
||||
"""
|
||||
DICT_VERSION = 1
|
||||
|
||||
def __init__(self, exc_info=None, **kwargs):
|
||||
if not kwargs:
|
||||
if exc_info is None:
|
||||
exc_info = sys.exc_info()
|
||||
self._exc_info = exc_info
|
||||
self._exc_type_names = list(
|
||||
reflection.get_all_class_names(exc_info[0], up_to=Exception))
|
||||
if not self._exc_type_names:
|
||||
raise TypeError('Invalid exception type: %r' % exc_info[0])
|
||||
self._exception_str = exc.exception_message(self._exc_info[1])
|
||||
self._traceback_str = ''.join(
|
||||
traceback.format_tb(self._exc_info[2]))
|
||||
else:
|
||||
self._exc_info = exc_info # may be None
|
||||
self._exception_str = kwargs.pop('exception_str')
|
||||
self._exc_type_names = kwargs.pop('exc_type_names', [])
|
||||
self._traceback_str = kwargs.pop('traceback_str', None)
|
||||
if kwargs:
|
||||
raise TypeError(
|
||||
'Failure.__init__ got unexpected keyword argument(s): %s'
|
||||
% ', '.join(six.iterkeys(kwargs)))
|
||||
|
||||
@classmethod
|
||||
def from_exception(cls, exception):
|
||||
return cls((type(exception), exception, None))
|
||||
|
||||
def _matches(self, other):
|
||||
if self is other:
|
||||
return True
|
||||
return (self._exc_type_names == other._exc_type_names
|
||||
and self.exception_str == other.exception_str
|
||||
and self.traceback_str == other.traceback_str)
|
||||
|
||||
def matches(self, other):
|
||||
if not isinstance(other, Failure):
|
||||
return False
|
||||
if self.exc_info is None or other.exc_info is None:
|
||||
return self._matches(other)
|
||||
else:
|
||||
return self == other
|
||||
|
||||
def __eq__(self, other):
|
||||
if not isinstance(other, Failure):
|
||||
return NotImplemented
|
||||
return (self._matches(other) and
|
||||
are_equal_exc_info_tuples(self.exc_info, other.exc_info))
|
||||
|
||||
def __ne__(self, other):
|
||||
return not (self == other)
|
||||
|
||||
# NOTE(imelnikov): obj.__hash__() should return same values for equal
|
||||
# objects, so we should redefine __hash__. Failure equality semantics
|
||||
# is a bit complicated, so for now we just mark Failure objects as
|
||||
# unhashable. See python docs on object.__hash__ for more info:
|
||||
# http://docs.python.org/2/reference/datamodel.html#object.__hash__
|
||||
__hash__ = None
|
||||
|
||||
@property
|
||||
def exception(self):
|
||||
"""Exception value, or None if exception value is not present.
|
||||
|
||||
Exception value may be lost during serialization.
|
||||
"""
|
||||
if self._exc_info:
|
||||
return self._exc_info[1]
|
||||
else:
|
||||
return None
|
||||
|
||||
@property
|
||||
def exception_str(self):
|
||||
"""String representation of exception."""
|
||||
return self._exception_str
|
||||
|
||||
@property
|
||||
def exc_info(self):
|
||||
"""Exception info tuple or None."""
|
||||
return self._exc_info
|
||||
|
||||
@property
|
||||
def traceback_str(self):
|
||||
"""Exception traceback as string."""
|
||||
return self._traceback_str
|
||||
|
||||
@staticmethod
|
||||
def reraise_if_any(failures):
|
||||
"""Re-raise exceptions if argument is not empty.
|
||||
|
||||
If argument is empty list, this method returns None. If
|
||||
argument is list with single Failure object in it,
|
||||
this failure is reraised. Else, WrappedFailure exception
|
||||
is raised with failures list as causes.
|
||||
"""
|
||||
failures = list(failures)
|
||||
if len(failures) == 1:
|
||||
failures[0].reraise()
|
||||
elif len(failures) > 1:
|
||||
raise exc.WrappedFailure(failures)
|
||||
|
||||
def reraise(self):
|
||||
"""Re-raise captured exception."""
|
||||
if self._exc_info:
|
||||
six.reraise(*self._exc_info)
|
||||
else:
|
||||
raise exc.WrappedFailure([self])
|
||||
|
||||
def check(self, *exc_classes):
|
||||
"""Check if any of exc_classes caused the failure.
|
||||
|
||||
Arguments of this method can be exception types or type
|
||||
names (stings). If captured exception is instance of
|
||||
exception of given type, the corresponding argument is
|
||||
returned. Else, None is returned.
|
||||
"""
|
||||
for cls in exc_classes:
|
||||
if isinstance(cls, type):
|
||||
err = reflection.get_class_name(cls)
|
||||
else:
|
||||
err = cls
|
||||
if err in self._exc_type_names:
|
||||
return cls
|
||||
return None
|
||||
|
||||
def __str__(self):
|
||||
return 'Failure: %s: %s' % (self._exc_type_names[0],
|
||||
self._exception_str)
|
||||
|
||||
def __iter__(self):
|
||||
"""Iterate over exception type names."""
|
||||
for et in self._exc_type_names:
|
||||
yield et
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data):
|
||||
data = dict(data)
|
||||
version = data.pop('version', None)
|
||||
if version != cls.DICT_VERSION:
|
||||
raise ValueError('Invalid dict version of failure object: %r'
|
||||
% version)
|
||||
return cls(**data)
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
'exception_str': self.exception_str,
|
||||
'traceback_str': self.traceback_str,
|
||||
'exc_type_names': list(self),
|
||||
'version': self.DICT_VERSION,
|
||||
}
|
||||
|
||||
def copy(self):
|
||||
return Failure(exc_info=copy_exc_info(self.exc_info),
|
||||
exception_str=self.exception_str,
|
||||
traceback_str=self.traceback_str,
|
||||
exc_type_names=self._exc_type_names[:])
|
||||
|
Reference in New Issue
Block a user