From 6134ff689a1d18a5ff23c3bb3fdcc140680f2387 Mon Sep 17 00:00:00 2001 From: "Ivan A. Melnikov" Date: Fri, 20 Sep 2013 16:51:08 +0400 Subject: [PATCH] Wrapping and serializing failures * save task failures to sqlalchemy backend and restore them from there; * for any wrapped exception use common WrappedFailre exception type, which makes it easier to handle particular exception types; * several minor improvements. Affects-db: this change adds db migration that looses exception information for saved task details. Change-Id: I575282002e6999646bbf51f492b82a7e3525787a --- taskflow/engines/action_engine/engine.py | 10 +- .../engines/action_engine/graph_action.py | 8 +- taskflow/examples/wrapped_exception.py | 106 ++++++++++ taskflow/exceptions.py | 75 ++++--- taskflow/flow.py | 7 +- taskflow/persistence/backends/impl_dir.py | 7 +- .../persistence/backends/impl_sqlalchemy.py | 20 +- .../1c783c0c2875_replace_exception_an.py | 29 +++ .../persistence/backends/sqlalchemy/models.py | 26 ++- taskflow/persistence/logbook.py | 12 +- taskflow/storage.py | 14 +- taskflow/task.py | 2 +- taskflow/tests/unit/persistence/base.py | 31 +++ taskflow/tests/unit/test_action_engine.py | 79 +++---- taskflow/tests/unit/test_utils.py | 83 ++++++++ taskflow/tests/unit/test_utils_failure.py | 199 ++++++++++++++++++ taskflow/utils/misc.py | 156 ++++++++++++-- taskflow/utils/persistence_utils.py | 42 +++- taskflow/utils/reflection.py | 25 +++ 19 files changed, 797 insertions(+), 134 deletions(-) create mode 100644 taskflow/examples/wrapped_exception.py create mode 100644 taskflow/persistence/backends/sqlalchemy/alembic/versions/1c783c0c2875_replace_exception_an.py create mode 100644 taskflow/tests/unit/test_utils_failure.py diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index fdd01c06..d0bcf538 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -57,14 +57,8 @@ class ActionEngine(base.EngineBase): if state == states.SUSPENDED: return self._change_state(states.FAILURE) - if self._failures: - if len(self._failures) == 1: - self._failures[0].reraise() - else: - exc_infos = [f.exc_info for f in self._failures] - raise exc.LinkedException.link(exc_infos) - else: - current_failure.reraise() + misc.Failure.reraise_if_any(self._failures) + current_failure.reraise() def _reset(self): self._failures = [] diff --git a/taskflow/engines/action_engine/graph_action.py b/taskflow/engines/action_engine/graph_action.py index d19f876d..77f61682 100644 --- a/taskflow/engines/action_engine/graph_action.py +++ b/taskflow/engines/action_engine/graph_action.py @@ -23,7 +23,6 @@ import threading from concurrent import futures from taskflow.engines.action_engine import base_action as base -from taskflow import exceptions as exc from taskflow import states as st from taskflow.utils import misc @@ -198,12 +197,7 @@ class ParallelGraphAction(SequentialGraphAction): pass except Exception: failures.append(misc.Failure()) - if len(failures) > 1: - raise exc.LinkedException.link([fail.exc_info - for fail in failures]) - elif len(failures) == 1: - failures[0].reraise() - + misc.Failure.reraise_if_any(failures) if was_suspended.is_set(): return st.SUSPENDED else: diff --git a/taskflow/examples/wrapped_exception.py b/taskflow/examples/wrapped_exception.py new file mode 100644 index 00000000..255c2853 --- /dev/null +++ b/taskflow/examples/wrapped_exception.py @@ -0,0 +1,106 @@ + +import contextlib +import logging +import os +import sys +import time + + +logging.basicConfig(level=logging.ERROR) + +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) + + +import taskflow.engines + +from taskflow import exceptions +from taskflow.patterns import unordered_flow as uf +from taskflow import task +from taskflow.utils import misc + + +@contextlib.contextmanager +def wrap_all_failures(): + """Convert any exceptions to WrappedFailure. + + When you expect several failures, it may be convenient + to wrap any exception with WrappedFailure in order to + unify error handling. + """ + try: + yield + except Exception: + raise exceptions.WrappedFailure([misc.Failure()]) + + +class FirstException(Exception): + """Exception that first task raises""" + + +class SecondException(Exception): + """Exception that second task raises""" + + +class FirstTask(task.Task): + def execute(self, sleep1, raise1): + time.sleep(sleep1) + if not isinstance(raise1, bool): + raise TypeError('Bad raise1 value: %r' % raise1) + if raise1: + raise FirstException('First task failed') + + +class SecondTask(task.Task): + def execute(self, sleep2, raise2): + time.sleep(sleep2) + if not isinstance(raise2, bool): + raise TypeError('Bad raise2 value: %r' % raise2) + if raise2: + raise SecondException('Second task failed') + + +def run(**store): + flow = uf.Flow('flow').add( + FirstTask(), + SecondTask() + ) + try: + with wrap_all_failures(): + taskflow.engines.run(flow, store=store, + engine_conf='parallel') + except exceptions.WrappedFailure as ex: + unknown_failures = [] + for failure in ex: + if failure.check(FirstException): + print("Got FirstException: %s" % failure.exception_str) + elif failure.check(SecondException): + print("Got SecondException: %s" % failure.exception_str) + else: + print("Unknown failure: %s" % failure) + unknown_failures.append(failure) + misc.Failure.reraise_if_any(unknown_failures) + + +print("== Raise and catch first exception only ==") +run(sleep1=0.0, raise1=True, + sleep2=0.0, raise2=False) + +print("\n== Raise and catch both exceptions ==") +# NOTE(imelnikov): in general, sleeping does not guarantee that +# we'll have both task running before one of them fails, but +# with current implementation this works most of times, +# which is enough for our purposes here. +run(sleep1=1.0, raise1=True, + sleep2=1.0, raise2=True) + +print("\n== Handle one exception, and re-raise another ==") +try: + run(sleep1=1.0, raise1=True, + sleep2=1.0, raise2='boom') +except TypeError as ex: + print("As expected, TypeError is here: %s" % ex) +else: + assert False, "TypeError expected" diff --git a/taskflow/exceptions.py b/taskflow/exceptions.py index 1fcb1c34..c286424e 100644 --- a/taskflow/exceptions.py +++ b/taskflow/exceptions.py @@ -16,10 +16,6 @@ # License for the specific language governing permissions and limitations # under the License. -import StringIO - -import traceback - class TaskFlowException(Exception): """Base class for exceptions emitted from this library.""" @@ -31,33 +27,6 @@ class Duplicate(TaskFlowException): pass -class LinkedException(TaskFlowException): - """A linked chain of many exceptions.""" - def __init__(self, message, cause, tb): - super(LinkedException, self).__init__(message) - self.cause = cause - self.tb = tb - self.next = None - - @classmethod - def link(cls, exc_infos): - first = None - previous = None - for i, exc_info in enumerate(exc_infos): - if not all(exc_info[0:2]) or len(exc_info) != 3: - raise ValueError("Invalid exc_info for index %s" % (i)) - buf = StringIO.StringIO() - traceback.print_exception(exc_info[0], exc_info[1], exc_info[2], - file=buf) - exc = cls(str(exc_info[1]), exc_info[1], buf.getvalue()) - if previous is not None: - previous.next = exc - else: - first = exc - previous = exc - return first - - class StorageError(TaskFlowException): """Raised when logbook can not be read/saved/deleted.""" @@ -117,3 +86,47 @@ class MissingDependencies(InvalidStateException): class DependencyFailure(TaskFlowException): """Raised when flow can't resolve dependency.""" pass + + +class WrappedFailure(TaskFlowException): + """Wraps one or several failures + + When exception cannot be re-raised (for example, because + the value and traceback is lost in serialization) or + there are several exceptions, we wrap corresponding Failure + objects into this exception class. + """ + + def __init__(self, causes): + self._causes = [] + for cause in causes: + if cause.check(type(self)) and cause.exception: + # flatten wrapped failures + self._causes.extend(cause.exception) + else: + self._causes.append(cause) + + def __iter__(self): + """Iterate over failures that caused the exception""" + return iter(self._causes) + + def __len__(self): + """Return number of wrapped failures""" + return len(self._causes) + + def check(self, *exc_classes): + """Check if any of exc_classes caused (part of) the failure. + + Arguments of this method can be exception types or type names + (stings). If any of wrapped failures were caused by exception + of given type, the corresponding argument is returned. Else, + None is returned. + """ + for cause in self: + result = cause.check(*exc_classes) + if result: + return result + return None + + def __str__(self): + return 'Wrapped Failure: %s' % self._causes diff --git a/taskflow/flow.py b/taskflow/flow.py index 4c079d16..00a23e8d 100644 --- a/taskflow/flow.py +++ b/taskflow/flow.py @@ -19,10 +19,7 @@ import abc from taskflow.openstack.common import uuidutils - - -def _class_name(obj): - return ".".join([obj.__class__.__module__, obj.__class__.__name__]) +from taskflow.utils import reflection class Flow(object): @@ -60,7 +57,7 @@ class Flow(object): raise NotImplementedError() def __str__(self): - lines = ["%s: %s" % (_class_name(self), self.name)] + lines = ["%s: %s" % (reflection.get_class_name(self), self.name)] lines.append("%s" % (self.uuid)) lines.append("%s" % (len(self))) return "; ".join(lines) diff --git a/taskflow/persistence/backends/impl_dir.py b/taskflow/persistence/backends/impl_dir.py index cb129fc0..131578d6 100644 --- a/taskflow/persistence/backends/impl_dir.py +++ b/taskflow/persistence/backends/impl_dir.py @@ -121,6 +121,7 @@ class Connection(base.Connection): except exc.TaskFlowException: raise except Exception as e: + LOG.exception("Failed running locking file based session") # NOTE(harlowja): trap all other errors as storage errors. raise exc.StorageError("Failed running locking file based " "session: %s" % e, e) @@ -415,11 +416,10 @@ def _str_2_datetime(text): def _format_task_detail(task_detail): return { - 'exception': task_detail.exception, + 'failure': p_utils.failure_to_dict(task_detail.failure), 'meta': task_detail.meta, 'name': task_detail.name, 'results': task_detail.results, - 'stacktrace': task_detail.stacktrace, 'state': task_detail.state, 'version': task_detail.version, } @@ -429,8 +429,7 @@ def _unformat_task_detail(uuid, td_data): td = logbook.TaskDetail(name=td_data['name'], uuid=uuid) td.state = td_data.get('state') td.results = td_data.get('results') - td.exception = td_data.get('exception') - td.stacktrace = td_data.get('stacktrace') + td.failure = p_utils.failure_from_dict(td_data.get('failure')) td.meta = td_data.get('meta') td.version = td_data.get('version') return td diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py index 470fc083..8f165517 100644 --- a/taskflow/persistence/backends/impl_sqlalchemy.py +++ b/taskflow/persistence/backends/impl_sqlalchemy.py @@ -277,6 +277,7 @@ class Connection(base.Connection): with session.begin(): return functor(session, *args, **kwargs) except sa_exc.SQLAlchemyError as e: + LOG.exception('Failed running database session') raise exc.StorageError("Failed running database session: %s" % e, e) @@ -284,6 +285,7 @@ class Connection(base.Connection): try: return self._session_maker() except sa_exc.SQLAlchemyError as e: + LOG.exception('Failed creating database session') raise exc.StorageError("Failed creating database session: %s" % e, e) @@ -301,6 +303,7 @@ class Connection(base.Connection): else: migration.db_sync(conn) except sa_exc.SQLAlchemyError as e: + LOG.exception('Failed upgrading database version') raise exc.StorageError("Failed upgrading database version: %s" % e, e) @@ -311,6 +314,7 @@ class Connection(base.Connection): try: return session.query(models.LogBook).delete() except sa_exc.DBAPIError as e: + LOG.exception('Failed clearing all entries') raise exc.StorageError("Failed clearing all entries: %s" % e, e) def clear_all(self): @@ -345,6 +349,7 @@ class Connection(base.Connection): lb = _logbook_get_model(lb_id, session=session) session.delete(lb) except sa_exc.DBAPIError as e: + LOG.exception('Failed destroying logbook') raise exc.StorageError("Failed destroying" " logbook %s: %s" % (lb_id, e), e) @@ -366,6 +371,7 @@ class Connection(base.Connection): lb_m = session.merge(lb_m) return _convert_lb_to_external(lb_m) except sa_exc.DBAPIError as e: + LOG.exception('Failed saving logbook') raise exc.StorageError("Failed saving logbook %s: %s" % (lb.uuid, e), e) @@ -378,6 +384,7 @@ class Connection(base.Connection): lb = _logbook_get_model(book_uuid, session=session) return _convert_lb_to_external(lb) except sa_exc.DBAPIError as e: + LOG.exception('Failed getting logbook') raise exc.StorageError("Failed getting logbook %s: %s" % (book_uuid, e), e) @@ -387,6 +394,7 @@ class Connection(base.Connection): raw_books = session.query(models.LogBook).all() books = [_convert_lb_to_external(lb) for lb in raw_books] except sa_exc.DBAPIError as e: + LOG.exception('Failed getting logbooks') raise exc.StorageError("Failed getting logbooks: %s" % e, e) for lb in books: yield lb @@ -421,8 +429,7 @@ def _convert_fd_to_internal(fd, parent_uuid): def _convert_td_to_internal(td, parent_uuid): return models.TaskDetail(name=td.name, uuid=td.uuid, state=td.state, results=td.results, - exception=td.exception, meta=td.meta, - stacktrace=td.stacktrace, + failure=td.failure, meta=td.meta, version=td.version, parent_uuid=parent_uuid) @@ -433,8 +440,7 @@ def _convert_td_to_external(td): td_c = logbook.TaskDetail(td.name, uuid=td.uuid) td_c.state = td.state td_c.results = td.results - td_c.exception = td.exception - td_c.stacktrace = td.stacktrace + td_c.failure = td.failure td_c.meta = td.meta td_c.version = td.version return td_c @@ -518,10 +524,8 @@ def _taskdetails_merge(td_m, td): td_m.state = td.state if td_m.results != td.results: td_m.results = td.results - if td_m.exception != td.exception: - td_m.exception = td.exception - if td_m.stacktrace != td.stacktrace: - td_m.stacktrace = td.stacktrace + if td_m.failure != td.failure: + td_m.failure = td.failure if td_m.meta != td.meta: td_m.meta = td.meta return td_m diff --git a/taskflow/persistence/backends/sqlalchemy/alembic/versions/1c783c0c2875_replace_exception_an.py b/taskflow/persistence/backends/sqlalchemy/alembic/versions/1c783c0c2875_replace_exception_an.py new file mode 100644 index 00000000..a994e1ab --- /dev/null +++ b/taskflow/persistence/backends/sqlalchemy/alembic/versions/1c783c0c2875_replace_exception_an.py @@ -0,0 +1,29 @@ +"""Replace exception and stacktrace with failure column + +Revision ID: 1c783c0c2875 +Revises: 1cea328f0f65 +Create Date: 2013-09-26 12:33:30.970122 + +""" + +# revision identifiers, used by Alembic. +revision = '1c783c0c2875' +down_revision = '1cea328f0f65' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.add_column('taskdetails', + sa.Column('failure', sa.Text(), nullable=True)) + op.drop_column('taskdetails', 'exception') + op.drop_column('taskdetails', 'stacktrace') + + +def downgrade(): + op.drop_column('taskdetails', 'failure') + op.add_column('taskdetails', + sa.Column('stacktrace', sa.Text(), nullable=True)) + op.add_column('taskdetails', + sa.Column('exception', sa.Text(), nullable=True)) diff --git a/taskflow/persistence/backends/sqlalchemy/models.py b/taskflow/persistence/backends/sqlalchemy/models.py index 5cef48f7..21de076d 100644 --- a/taskflow/persistence/backends/sqlalchemy/models.py +++ b/taskflow/persistence/backends/sqlalchemy/models.py @@ -28,6 +28,8 @@ from taskflow.openstack.common import jsonutils from taskflow.openstack.common import timeutils from taskflow.openstack.common import uuidutils +from taskflow.utils import persistence_utils + BASE = declarative_base() @@ -47,6 +49,27 @@ class Json(types.TypeDecorator, types.MutableType): return jsonutils.loads(value) +class Failure(types.TypeDecorator, types.MutableType): + """Put misc.Failure object into database column. + + We convert Failure object to dict, serialize that dict into + JSON and save it. None is stored as NULL. + + The conversion is lossy since we cannot save exc_info. + """ + impl = types.Text + + def process_bind_param(self, value, dialect): + if value is None: + return None + return jsonutils.dumps(persistence_utils.failure_to_dict(value)) + + def process_result_value(self, value, dialect): + if value is None: + return None + return persistence_utils.failure_from_dict(jsonutils.loads(value)) + + class ModelBase(TimestampMixin): """Base model for all taskflow objects""" uuid = Column(String, default=uuidutils.generate_uuid, @@ -88,8 +111,7 @@ class TaskDetail(BASE, ModelBase): # Member variables state = Column(String) results = Column(Json) - exception = Column(Json) - stacktrace = Column(Json) + failure = Column(Failure) version = Column(String) # Relationships diff --git a/taskflow/persistence/logbook.py b/taskflow/persistence/logbook.py index ca28d74c..18a0a44d 100644 --- a/taskflow/persistence/logbook.py +++ b/taskflow/persistence/logbook.py @@ -160,12 +160,9 @@ class TaskDetail(object): self.state = None # The results it may have produced (useful for reverting). self.results = None - # An exception that it may have thrown (or part of it), useful for - # knowing what failed. - self.exception = None - # Any stack trace the exception may have had, useful for debugging or - # examining the failure in more depth. - self.stacktrace = None + # An Failure object that holds exception the task may have thrown + # (or part of it), useful for knowing what failed. + self.failure = None # Any other metadata to include about this task while storing. For # example timing information could be stored here, other misc. task # related items. @@ -181,8 +178,7 @@ class TaskDetail(object): return self.state = td.state self.meta = td.meta - self.stacktrace = td.stacktrace - self.exception = td.exception + self.failure = td.failure self.results = td.results self.version = td.version diff --git a/taskflow/storage.py b/taskflow/storage.py index f0254c81..f06fc08f 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -180,18 +180,22 @@ class Storage(object): """Put result for task with id 'uuid' to storage""" td = self._taskdetail_by_uuid(uuid) td.state = state - td.results = data - self._with_connection(self._save_task_detail, task_detail=td) - - # Warn if result was incomplete - if not isinstance(data, misc.Failure): + if state == states.FAILURE and isinstance(data, misc.Failure): + td.results = None + td.failure = data + else: + td.results = data + td.failure = None self._check_all_results_provided(uuid, td.name, data) + self._with_connection(self._save_task_detail, task_detail=td) def get(self, uuid): """Get result for task with id 'uuid' to storage""" td = self._taskdetail_by_uuid(uuid) if td.state not in STATES_WITH_RESULTS: raise exceptions.NotFound("Result for task %r is not known" % uuid) + if td.failure: + return td.failure return td.results def reset(self, uuid, state=states.PENDING): diff --git a/taskflow/task.py b/taskflow/task.py index d20bf121..50d40174 100644 --- a/taskflow/task.py +++ b/taskflow/task.py @@ -227,7 +227,7 @@ class Task(BaseTask): auto_extract=True, rebind=None): """Initialize task instance""" if name is None: - name = reflection.get_callable_name(self) + name = reflection.get_class_name(self) if provides is None: provides = self.default_provides super(Task, self).__init__(name, diff --git a/taskflow/tests/unit/persistence/base.py b/taskflow/tests/unit/persistence/base.py index f362f0c7..b2499a45 100644 --- a/taskflow/tests/unit/persistence/base.py +++ b/taskflow/tests/unit/persistence/base.py @@ -21,6 +21,7 @@ import contextlib from taskflow import exceptions as exc from taskflow.openstack.common import uuidutils from taskflow.persistence import logbook +from taskflow.utils import misc class PersistenceTestMixin(object): @@ -90,6 +91,36 @@ class PersistenceTestMixin(object): conn.update_flow_details(fd) conn.update_task_details(td) + def test_task_detail_with_failure(self): + lb_id = uuidutils.generate_uuid() + lb_name = 'lb-%s' % (lb_id) + lb = logbook.LogBook(name=lb_name, uuid=lb_id) + fd = logbook.FlowDetail('test', uuid=uuidutils.generate_uuid()) + lb.add(fd) + td = logbook.TaskDetail("detail-1", uuid=uuidutils.generate_uuid()) + + try: + raise RuntimeError('Woot!') + except Exception: + td.failure = misc.Failure() + + fd.add(td) + + with contextlib.closing(self._get_connection()) as conn: + conn.save_logbook(lb) + conn.update_flow_details(fd) + conn.update_task_details(td) + + # Read failure back + with contextlib.closing(self._get_connection()) as conn: + lb2 = conn.get_logbook(lb_id) + fd2 = lb2.find(fd.uuid) + td2 = fd2.find(td.uuid) + failure = td2.failure + self.assertEquals(failure.exception_str, 'Woot!') + self.assertIs(failure.check(RuntimeError), RuntimeError) + self.assertEquals(failure.traceback_str, td.failure.traceback_str) + def test_logbook_merge_flow_detail(self): lb_id = uuidutils.generate_uuid() lb_name = 'lb-%s' % (lb_id) diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py index 6f2e053f..25646ae5 100644 --- a/taskflow/tests/unit/test_action_engine.py +++ b/taskflow/tests/unit/test_action_engine.py @@ -194,17 +194,18 @@ class EngineTaskTest(EngineTestBase): kwargs={'values': self.values}) with self.assertRaisesRegexp(RuntimeError, '^Woot'): engine.run() - self.assertEquals(self.values, - ['flow RUNNING', - 'fail RUNNING', - 'fail FAILURE', - 'flow REVERTING', - 'fail REVERTING', - 'fail reverted(Failure: RuntimeError: Woot!)', - 'fail REVERTED', - 'fail PENDING', - 'flow REVERTED', - 'flow FAILURE']) + self.assertEquals( + self.values, + ['flow RUNNING', + 'fail RUNNING', + 'fail FAILURE', + 'flow REVERTING', + 'fail REVERTING', + 'fail reverted(Failure: exceptions.RuntimeError: Woot!)', + 'fail REVERTED', + 'fail PENDING', + 'flow REVERTED', + 'flow FAILURE']) def test_invalid_flow_raises(self): value = 'i am string, not task/flow, sorry' @@ -379,8 +380,9 @@ class EngineLinearFlowTest(EngineTestBase): engine = self._make_engine(flow) with self.assertRaisesRegexp(RuntimeError, '^Woot'): engine.run() - self.assertEquals(self.values, - ['fail reverted(Failure: RuntimeError: Woot!)']) + self.assertEquals( + self.values, + ['fail reverted(Failure: exceptions.RuntimeError: Woot!)']) def test_correctly_reverts_children(self): flow = lf.Flow('root-1').add( @@ -393,10 +395,11 @@ class EngineLinearFlowTest(EngineTestBase): engine = self._make_engine(flow) with self.assertRaisesRegexp(RuntimeError, '^Woot'): engine.run() - self.assertEquals(self.values, - ['task1', 'task2', - 'fail reverted(Failure: RuntimeError: Woot!)', - 'task2 reverted(5)', 'task1 reverted(5)']) + self.assertEquals( + self.values, + ['task1', 'task2', + 'fail reverted(Failure: exceptions.RuntimeError: Woot!)', + 'task2 reverted(5)', 'task1 reverted(5)']) class EngineParallelFlowTest(EngineTestBase): @@ -534,10 +537,11 @@ class EngineGraphFlowTest(EngineTestBase): engine = self._make_engine(flow) with self.assertRaisesRegexp(RuntimeError, '^Woot'): engine.run() - self.assertEquals(self.values, - ['task1', 'task2', - 'task3 reverted(Failure: RuntimeError: Woot!)', - 'task2 reverted(5)', 'task1 reverted(5)']) + self.assertEquals( + self.values, + ['task1', 'task2', + 'task3 reverted(Failure: exceptions.RuntimeError: Woot!)', + 'task2 reverted(5)', 'task1 reverted(5)']) def test_graph_flow_four_tasks_revert_failure(self): flow = gf.Flow('g-3-nasty').add( @@ -627,20 +631,21 @@ class SuspendFlowTest(EngineTestBase): engine.storage.inject({'engine': engine}) engine.run() self.assertEquals(engine.storage.get_flow_state(), states.SUSPENDED) - self.assertEquals(self.values, - ['a', - 'b', - 'c reverted(Failure: RuntimeError: Woot!)', - 'b reverted(5)']) + self.assertEquals( + self.values, + ['a', 'b', + 'c reverted(Failure: exceptions.RuntimeError: Woot!)', + 'b reverted(5)']) with self.assertRaisesRegexp(RuntimeError, '^Woot'): engine.run() self.assertEquals(engine.storage.get_flow_state(), states.FAILURE) - self.assertEquals(self.values, - ['a', - 'b', - 'c reverted(Failure: RuntimeError: Woot!)', - 'b reverted(5)', - 'a reverted(5)']) + self.assertEquals( + self.values, + ['a', + 'b', + 'c reverted(Failure: exceptions.RuntimeError: Woot!)', + 'b reverted(5)', + 'a reverted(5)']) class SingleThreadedEngineTest(EngineTaskTest, @@ -826,10 +831,12 @@ class MultiThreadedEngineTest(EngineTaskTest, # Since this is an unordered flow we can not guarantee that task1 or # task2 will exist and be reverted, although they may exist depending # on how the OS thread scheduling and execution graph algorithm... - possible_result = set(['task1', 'task1 reverted(5)', - 'task2', 'task2 reverted(5)', - 'task3', 'task3 reverted(5)', - 'fail reverted(Failure: RuntimeError: Woot!)']) + possible_result = set([ + 'task1', 'task1 reverted(5)', + 'task2', 'task2 reverted(5)', + 'task3', 'task3 reverted(5)', + 'fail reverted(Failure: exceptions.RuntimeError: Woot!)' + ]) self.assertIsSubset(possible_result, result) def test_linear_nested_to_parallel_revert_exception(self): diff --git a/taskflow/tests/unit/test_utils.py b/taskflow/tests/unit/test_utils.py index 796875b9..ad0af721 100644 --- a/taskflow/tests/unit/test_utils.py +++ b/taskflow/tests/unit/test_utils.py @@ -16,8 +16,11 @@ # License for the specific language governing permissions and limitations # under the License. +import sys + from taskflow import decorators from taskflow import test +from taskflow.utils import misc from taskflow.utils import reflection @@ -142,3 +145,83 @@ class AcceptsKwargsTest(test.TestCase): def test_with_kwargs(self): self.assertEquals( reflection.accepts_kwargs(function_with_kwargs), True) + + +class GetClassNameTest(test.TestCase): + + def test_std_class(self): + name = reflection.get_class_name(RuntimeError) + self.assertEquals(name, 'exceptions.RuntimeError') + + def test_class(self): + name = reflection.get_class_name(Class) + self.assertEquals(name, '.'.join((__name__, 'Class'))) + + def test_instance(self): + name = reflection.get_class_name(Class()) + self.assertEquals(name, '.'.join((__name__, 'Class'))) + + def test_int(self): + name = reflection.get_class_name(42) + self.assertEquals(name, '__builtin__.int') + + +class GetAllClassNamesTest(test.TestCase): + + def test_std_class(self): + names = list(reflection.get_all_class_names(RuntimeError)) + self.assertEquals(names, [ + 'exceptions.RuntimeError', + 'exceptions.StandardError', + 'exceptions.Exception', + 'exceptions.BaseException', + '__builtin__.object']) + + def test_std_class_up_to(self): + names = list(reflection.get_all_class_names(RuntimeError, + up_to=Exception)) + self.assertEquals(names, [ + 'exceptions.RuntimeError', + 'exceptions.StandardError', + 'exceptions.Exception']) + + +class ExcInfoUtilsTest(test.TestCase): + + def _make_ex_info(self): + try: + raise RuntimeError('Woot!') + except Exception: + return sys.exc_info() + + def test_copy_none(self): + result = misc.copy_exc_info(None) + self.assertIsNone(result) + + def test_copy_exc_info(self): + exc_info = self._make_ex_info() + result = misc.copy_exc_info(exc_info) + self.assertIsNot(result, exc_info) + self.assertIs(result[0], RuntimeError) + self.assertIsNot(result[1], exc_info[1]) + self.assertIs(result[2], exc_info[2]) + + def test_none_equals(self): + self.assertTrue(misc.are_equal_exc_info_tuples(None, None)) + + def test_none_ne_tuple(self): + exc_info = self._make_ex_info() + self.assertFalse(misc.are_equal_exc_info_tuples(None, exc_info)) + + def test_tuple_nen_none(self): + exc_info = self._make_ex_info() + self.assertFalse(misc.are_equal_exc_info_tuples(exc_info, None)) + + def test_tuple_equals_itself(self): + exc_info = self._make_ex_info() + self.assertTrue(misc.are_equal_exc_info_tuples(exc_info, exc_info)) + + def test_typle_equals_copy(self): + exc_info = self._make_ex_info() + copied = misc.copy_exc_info(exc_info) + self.assertTrue(misc.are_equal_exc_info_tuples(exc_info, copied)) diff --git a/taskflow/tests/unit/test_utils_failure.py b/taskflow/tests/unit/test_utils_failure.py new file mode 100644 index 00000000..79acb17e --- /dev/null +++ b/taskflow/tests/unit/test_utils_failure.py @@ -0,0 +1,199 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from taskflow import exceptions +from taskflow import test + +from taskflow.utils import misc + + +def _captured_failure(msg): + try: + raise RuntimeError(msg) + except Exception: + return misc.Failure() + + +class GeneralFailureObjTestsMixin(object): + + def test_captures_message(self): + self.assertEquals(self.fail_obj.exception_str, 'Woot!') + + def test_str(self): + self.assertEquals(str(self.fail_obj), + 'Failure: exceptions.RuntimeError: Woot!') + + def test_exception_types(self): + self.assertEquals(list(self.fail_obj), + ['exceptions.RuntimeError', + 'exceptions.StandardError', + 'exceptions.Exception']) + + def test_check_str(self): + val = 'exceptions.StandardError' + self.assertEquals(self.fail_obj.check(val), val) + + def test_check_str_not_there(self): + val = 'exceptions.ValueError' + self.assertEquals(self.fail_obj.check(val), None) + + def test_check_type(self): + self.assertIs(self.fail_obj.check(RuntimeError), RuntimeError) + + def test_check_type_not_there(self): + self.assertIs(self.fail_obj.check(ValueError), None) + + +class CaptureFailureTestCase(test.TestCase, GeneralFailureObjTestsMixin): + + def setUp(self): + super(CaptureFailureTestCase, self).setUp() + self.fail_obj = _captured_failure('Woot!') + + def test_captures_value(self): + self.assertIsInstance(self.fail_obj.exception, RuntimeError) + + def test_captures_exc_info(self): + exc_info = self.fail_obj.exc_info + self.assertEquals(len(exc_info), 3) + self.assertEquals(exc_info[0], RuntimeError) + self.assertIs(exc_info[1], self.fail_obj.exception) + + def test_reraises(self): + with self.assertRaisesRegexp(RuntimeError, '^Woot!$'): + self.fail_obj.reraise() + + +class ReCreatedFailureTestCase(test.TestCase, GeneralFailureObjTestsMixin): + + def setUp(self): + super(ReCreatedFailureTestCase, self).setUp() + fail_obj = _captured_failure('Woot!') + self.fail_obj = misc.Failure(exception_str=fail_obj.exception_str, + traceback_str=fail_obj.traceback_str, + exc_type_names=list(fail_obj)) + + def test_value_lost(self): + self.assertIs(self.fail_obj.exception, None) + + def test_no_exc_info(self): + self.assertIs(self.fail_obj.exc_info, None) + + def test_reraises(self): + with self.assertRaises(exceptions.WrappedFailure) as ctx: + self.fail_obj.reraise() + exc = ctx.exception + self.assertIs(exc.check(RuntimeError), RuntimeError) + + +class FailureObjectTestCase(test.TestCase): + + def test_dont_catch_base_exception(self): + try: + raise SystemExit() + except BaseException: + self.assertRaises(TypeError, misc.Failure) + + def test_unknown_argument(self): + with self.assertRaises(TypeError) as ctx: + misc.Failure( + exception_str='Woot!', + traceback_str=None, + exc_type_names=['exceptions.Exception'], + hi='hi there') + expected = "Failure.__init__ got unexpected keyword argument: 'hi'" + self.assertEquals(str(ctx.exception), expected) + + def test_empty_does_not_reraise(self): + self.assertIs(misc.Failure.reraise_if_any([]), None) + + def test_reraises_one(self): + fls = [_captured_failure('Woot!')] + with self.assertRaisesRegexp(RuntimeError, '^Woot!$'): + misc.Failure.reraise_if_any(fls) + + def test_reraises_several(self): + fls = [ + _captured_failure('Woot!'), + _captured_failure('Oh, not again!') + ] + with self.assertRaises(exceptions.WrappedFailure) as ctx: + misc.Failure.reraise_if_any(fls) + self.assertEquals(list(ctx.exception), fls) + + def test_failure_copy(self): + fail_obj = _captured_failure('Woot!') + + copied = fail_obj.copy() + self.assertIsNot(fail_obj, copied) + self.assertEquals(fail_obj, copied) + + def test_failure_copy_recaptured(self): + captured = _captured_failure('Woot!') + fail_obj = misc.Failure(exception_str=captured.exception_str, + traceback_str=captured.traceback_str, + exc_type_names=list(captured)) + copied = fail_obj.copy() + self.assertIsNot(fail_obj, copied) + self.assertEquals(fail_obj, copied) + self.assertFalse(fail_obj != copied) + + def test_recaptured_not_eq(self): + captured = _captured_failure('Woot!') + fail_obj = misc.Failure(exception_str=captured.exception_str, + traceback_str=captured.traceback_str, + exc_type_names=list(captured)) + self.assertFalse(fail_obj == captured) + self.assertTrue(fail_obj != captured) + + +class WrappedFailureTestCase(test.TestCase): + + def test_simple_iter(self): + fail_obj = _captured_failure('Woot!') + wf = exceptions.WrappedFailure([fail_obj]) + self.assertEquals(len(wf), 1) + self.assertEquals(list(wf), [fail_obj]) + + def test_simple_check(self): + fail_obj = _captured_failure('Woot!') + wf = exceptions.WrappedFailure([fail_obj]) + self.assertEquals(wf.check(RuntimeError), RuntimeError) + self.assertEquals(wf.check(ValueError), None) + + def test_two_failures(self): + fls = [ + _captured_failure('Woot!'), + _captured_failure('Oh, not again!') + ] + wf = exceptions.WrappedFailure(fls) + self.assertEquals(len(wf), 2) + self.assertEquals(list(wf), fls) + + def test_flattening(self): + f1 = _captured_failure('Wrap me') + f2 = _captured_failure('Wrap me, too') + f3 = _captured_failure('Woot!') + try: + raise exceptions.WrappedFailure([f1, f2]) + except Exception: + fail_obj = misc.Failure() + + wf = exceptions.WrappedFailure([fail_obj, f3]) + self.assertEquals(list(wf), [f1, f2, f3]) diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index f084011e..18c9c48e 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -25,6 +25,10 @@ import errno import logging import os import sys +import traceback + +from taskflow import exceptions +from taskflow.utils import reflection import six @@ -176,31 +180,153 @@ class TransitionNotifier(object): break -class Failure(object): - """Indicates failure""" - # NOTE(imelnikov): flow_utils.FlowFailure uses runner, but - # engine code does not, so we need separate class +def copy_exc_info(exc_info): + """Make copy of exception info tuple, as deep as possible""" + if exc_info is None: + return None + exc_type, exc_value, tb = exc_info + # NOTE(imelnikov): there is no need to copy type, and + # we can't copy traceback + return (exc_type, copy.deepcopy(exc_value), tb) - def __init__(self, exc_info=None): - if exc_info is not None: + +def are_equal_exc_info_tuples(ei1, ei2): + if ei1 == ei2: + return True + if ei1 is None or ei2 is None: + return False # if both are None, we returned True above + + # NOTE(imelnikov): we can't compare exceptions with '==' + # because we want exc_info be equal to it's copy made with + # copy_exc_info above + return all((ei1[0] is ei2[0], + type(ei1[1]) == type(ei2[1]), + str(ei1[1]) == str(ei2[1]), + repr(ei1[1]) == repr(ei2[1]), + ei1[2] == ei2[2])) + + +class Failure(object): + """Object that represents failure. + + Failure objects encapsulate exception information so that + it can be re-used later to re-raise or inspect. + """ + + def __init__(self, exc_info=None, **kwargs): + if not kwargs: + if exc_info is None: + exc_info = sys.exc_info() self._exc_info = exc_info + self._exc_type_names = list( + reflection.get_all_class_names(exc_info[0], up_to=Exception)) + if not self._exc_type_names: + raise TypeError('Invalid exception type: %r' % exc_info[0]) + self._exception_str = str(self._exc_info[1]) + self._traceback_str = ''.join( + traceback.format_tb(self._exc_info[2])) else: - self._exc_info = sys.exc_info() + self._exc_info = exc_info # may be None + self._exception_str = kwargs.pop('exception_str') + self._exc_type_names = kwargs.pop('exc_type_names', []) + self._traceback_str = kwargs.pop('traceback_str', None) + if kwargs: + raise TypeError('Failure.__init__ got unexpected keyword ' + 'argument: %r' % kwargs.keys()[0]) + + def __eq__(self, other): + if not isinstance(other, Failure): + return NotImplemented + return all((are_equal_exc_info_tuples(self.exc_info, other.exc_info), + self._exc_type_names == other._exc_type_names, + self.exception_str == other.exception_str, + self.traceback_str == other.traceback_str)) + + def __ne__(self, other): + return not (self == other) + + # NOTE(imelnikov): obj.__hash__() should return same values for equal + # objects, so we should redefine __hash__. Our equality semantics + # is a bit complicated, so for now we just mark Failure objects as + # unhashable. See python docs on object.__hash__ for more info: + # http://docs.python.org/2/reference/datamodel.html#object.__hash__ + __hash__ = None + + @property + def exception(self): + """Exception value, or None if exception value is not present. + + Exception value may be lost during serialization. + """ + if self._exc_info: + return self._exc_info[1] + else: + return None + + @property + def exception_str(self): + """String representation of exception.""" + return self._exception_str @property def exc_info(self): + """Exception info tuple or None.""" return self._exc_info @property - def exc(self): - return self._exc_info[1] + def traceback_str(self): + """Exception traceback as string.""" + return self._traceback_str + + @staticmethod + def reraise_if_any(failures): + """Re-raise exceptions if argument is not empty. + + If argument is empty list, this method returns None. If + argument is list with single Failure object in it, + this failure is reraised. Else, WrappedFailure exception + is raised with failures list as causes. + """ + if len(failures) == 1: + failures[0].reraise() + elif len(failures) > 1: + raise exceptions.WrappedFailure(failures) def reraise(self): - raise self.exc_info[0], self.exc_info[1], self.exc_info[2] + """Re-raise captured exception""" + if self._exc_info: + six.reraise(*self._exc_info) + else: + raise exceptions.WrappedFailure([self]) + + def check(self, *exc_classes): + """Check if any of exc_classes caused the failure + + Arguments of this method can be exception types or type + names (stings). If captured excption is instance of + exception of given type, the corresponding argument is + returned. Else, None is returned. + """ + for cls in exc_classes: + if isinstance(cls, type): + err = reflection.get_class_name(cls) + else: + err = cls + if err in self._exc_type_names: + return cls + return None def __str__(self): - try: - exc_name = self.exc_info[0].__name__ - except AttributeError: - exc_name = str(self.exc_info) - return 'Failure: %s: %s' % (exc_name, self.exc_info[1]) + return 'Failure: %s: %s' % (self._exc_type_names[0], + self._exception_str) + + def __iter__(self): + """Iterate over exception type names""" + for et in self._exc_type_names: + yield et + + def copy(self): + return Failure(exc_info=copy_exc_info(self.exc_info), + exception_str=self.exception_str, + traceback_str=self.traceback_str, + exc_type_names=self._exc_type_names[:]) diff --git a/taskflow/utils/persistence_utils.py b/taskflow/utils/persistence_utils.py index 8f85050e..63f7aea5 100644 --- a/taskflow/utils/persistence_utils.py +++ b/taskflow/utils/persistence_utils.py @@ -22,6 +22,7 @@ import logging from taskflow.openstack.common import uuidutils from taskflow.persistence import logbook +from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -111,10 +112,13 @@ def task_details_merge(td_e, td_new, deep_copy=False): td_e.state = td_new.state if td_e.results != td_new.results: td_e.results = copy_fn(td_new.results) - if td_e.exception != td_new.exception: - td_e.exception = copy_fn(td_new.exception) - if td_e.stacktrace != td_new.stacktrace: - td_e.stacktrace = copy_fn(td_new.stacktrace) + if td_e.failure != td_new.failure: + # NOTE(imelnikov): we can't just deep copy Failures, as they + # contain tracebacks, which are not copyable. + if deep_copy: + td_e.failure = td_new.failure.copy() + else: + td_e.failure = td_new.failure if td_e.meta != td_new.meta: td_e.meta = copy_fn(td_new.meta) return td_e @@ -155,3 +159,33 @@ def logbook_merge(lb_e, lb_new, deep_copy=False): if lb_e.meta != lb_new.meta: lb_e.meta = copy_fn(lb_new.meta) return lb_e + + +def failure_to_dict(failure): + """Convert misc.Failure object to JSON-serializable dict""" + if not failure: + return None + if not isinstance(failure, misc.Failure): + raise TypeError('Failure object expected, but got %r' + % failure) + return { + 'exception_str': failure.exception_str, + 'traceback_str': failure.traceback_str, + 'exc_type_names': list(failure), + 'version': 1 + } + + +def failure_from_dict(data): + """Restore misc.Failure object from dict. + + The dict should be similar to what failure_to_dict() function + produces. + """ + if not data: + return None + version = data.pop('version', None) + if version != 1: + raise ValueError('Invalid version of saved Failure object: %r' + % version) + return misc.Failure(**data) diff --git a/taskflow/utils/reflection.py b/taskflow/utils/reflection.py index b35804cf..87f9d655 100644 --- a/taskflow/utils/reflection.py +++ b/taskflow/utils/reflection.py @@ -20,6 +20,31 @@ import inspect import types +def get_class_name(obj): + """Get class name for object. + + If object is a type, fully qualified name of the type is returned. + Else, fully qualified name of the type of the object is returned. + """ + if not isinstance(obj, type): + obj = type(obj) + return '.'.join((obj.__module__, obj.__name__)) + + +def get_all_class_names(obj, up_to=object): + """Get class names of object parent classes + + Iterate over all class names object is instance or subclass of, + in order of method resolution (mro). If up_to parameter is provided, + only name of classes that are sublcasses to that class are returned. + """ + if not isinstance(obj, type): + obj = type(obj) + for cls in obj.mro(): + if issubclass(cls, up_to): + yield get_class_name(cls) + + def get_callable_name(function): """Generate a name from callable