diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 52dd6cf3c..660c07771 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -26,6 +26,7 @@ from taskflow.engines import base from taskflow import exceptions as exc from taskflow.openstack.common import excutils +from taskflow.openstack.common import uuidutils from taskflow import states from taskflow import storage as t_storage @@ -45,7 +46,7 @@ class ActionEngine(base.EngineBase): def __init__(self, flow, flow_detail, backend, conf): super(ActionEngine, self).__init__(flow, flow_detail, backend, conf) - self._failures = [] + self._failures = {} # task uuid => failure self._root = None self._lock = threading.RLock() self._state_lock = threading.RLock() @@ -63,16 +64,13 @@ class ActionEngine(base.EngineBase): self._change_state(state) if state == states.SUSPENDED: return - misc.Failure.reraise_if_any(self._failures) + misc.Failure.reraise_if_any(self._failures.values()) if current_failure: current_failure.reraise() def __str__(self): return "%s: %s" % (reflection.get_class_name(self), id(self)) - def _reset(self): - self._failures = [] - def suspend(self): self._change_state(states.SUSPENDING) @@ -82,16 +80,13 @@ class ActionEngine(base.EngineBase): @lock_utils.locked def run(self): - if self.storage.get_flow_state() != states.SUSPENDED: - self.compile() - self._reset() + self.compile() + external_provides = set(self.storage.fetch_all().keys()) + missing = self._flow.requires - external_provides + if missing: + raise exc.MissingDependencies(self._flow, sorted(missing)) - external_provides = set(self.storage.fetch_all().keys()) - missing = self._flow.requires - external_provides - if missing: - raise exc.MissingDependencies(self._flow, sorted(missing)) - self._run() - elif self._failures: + if self._failures: self._revert() else: self._run() @@ -129,25 +124,51 @@ class ActionEngine(base.EngineBase): def on_task_state_change(self, task_action, state, result=None): if isinstance(result, misc.Failure): - self._failures.append(result) + self._failures[task_action.uuid] = result details = dict(engine=self, task_name=task_action.name, task_uuid=task_action.uuid, result=result) self.task_notifier.notify(state, details) - def _translate_flow_to_action(self): + def compile(self): + if self._root is not None: + return + assert self._graph_action is not None, ('Graph action class must be' ' specified') + self._change_state(states.RESUMING) # does nothing in PENDING state task_graph = flow_utils.flatten(self._flow) - ga = self._graph_action(task_graph) - for n in task_graph.nodes_iter(): - ga.add(n, task_action.TaskAction(n, self)) - return ga + self._root = self._graph_action(task_graph) + loaded_failures = {} - def compile(self): - if self._root is None: - self._root = self._translate_flow_to_action() + for task in task_graph.nodes_iter(): + try: + task_id = self.storage.get_uuid_by_name(task.name) + except exc.NotFound: + task_id = uuidutils.generate_uuid() + task_version = misc.get_version_string(task) + self.storage.add_task(task_name=task.name, uuid=task_id, + task_version=task_version) + try: + result = self.storage.get(task_id) + except exc.NotFound: + result = None + + if isinstance(result, misc.Failure): + # NOTE(imelnikov): old failure may have exc_info which + # might get lost during serialization, so we preserve + # old failure object if possible. + old_failure = self._failures.get(task_id, None) + if result.matches(old_failure): + loaded_failures[task_id] = old_failure + else: + loaded_failures[task_id] = result + + self.storage.set_result_mapping(task_id, task.save_as) + self._root.add(task, task_action.TaskAction(task, task_id)) + self._failures = loaded_failures + self._change_state(states.SUSPENDED) # does nothing in PENDING state @property def is_running(self): diff --git a/taskflow/engines/action_engine/task_action.py b/taskflow/engines/action_engine/task_action.py index dc46afae0..ffd2b5218 100644 --- a/taskflow/engines/action_engine/task_action.py +++ b/taskflow/engines/action_engine/task_action.py @@ -20,9 +20,7 @@ import contextlib import logging from taskflow.engines.action_engine import base_action as base -from taskflow import exceptions from taskflow.openstack.common import excutils -from taskflow.openstack.common import uuidutils from taskflow import states from taskflow.utils import misc @@ -45,21 +43,9 @@ def _autobind(task, bind_name, bind_func, **kwargs): class TaskAction(base.Action): - def __init__(self, task, engine): + def __init__(self, task, task_id): self._task = task - self._result_mapping = task.save_as - self._args_mapping = task.rebind - try: - self._id = engine.storage.get_uuid_by_name(self._task.name) - except exceptions.NotFound: - # TODO(harlowja): we might need to save whether the results of this - # task will be a tuple + other additional metadata when doing this - # add to the underlying storage backend for later resumption of - # this task. - self._id = uuidutils.generate_uuid() - engine.storage.add_task(task_name=self.name, uuid=self.uuid, - task_version=self.version) - engine.storage.set_result_mapping(self.uuid, self._result_mapping) + self._id = task_id @property def name(self): @@ -69,10 +55,6 @@ class TaskAction(base.Action): def uuid(self): return self._id - @property - def version(self): - return misc.get_version_string(self._task) - def _change_state(self, engine, state, result=None, progress=None): """Update result and change state.""" if state in RESET_TASK_STATES: @@ -109,7 +91,7 @@ class TaskAction(base.Action): 'update_progress', self._on_update_progress, engine=engine): try: - kwargs = engine.storage.fetch_mapped_args(self._args_mapping) + kwargs = engine.storage.fetch_mapped_args(self._task.rebind) result = self._task.execute(**kwargs) except Exception: failure = misc.Failure() @@ -127,7 +109,7 @@ class TaskAction(base.Action): with _autobind(self._task, 'update_progress', self._on_update_progress, engine=engine): - kwargs = engine.storage.fetch_mapped_args(self._args_mapping) + kwargs = engine.storage.fetch_mapped_args(self._task.rebind) try: self._task.revert(result=engine.storage.get(self._id), **kwargs) diff --git a/taskflow/examples/resume_from_backend.out.txt b/taskflow/examples/resume_from_backend.out.txt new file mode 100644 index 000000000..e4a30cf33 --- /dev/null +++ b/taskflow/examples/resume_from_backend.out.txt @@ -0,0 +1,21 @@ + +At the beginning, there is no state: +Flow state: None + +Running: +executing first==1.0 + +After running: +Flow state: SUSPENDED +boom==1.0: SUCCESS, result=None +first==1.0: SUCCESS, result=u'ok' +second==1.0: PENDING, result=None + +Resuming and running again: +executing second==1.0 + +At the end: +Flow state: SUCCESS +boom==1.0: SUCCESS, result=None +first==1.0: SUCCESS, result=u'ok' +second==1.0: SUCCESS, result=u'ok' diff --git a/taskflow/examples/resume_from_backend.py b/taskflow/examples/resume_from_backend.py new file mode 100644 index 000000000..b9df1caa6 --- /dev/null +++ b/taskflow/examples/resume_from_backend.py @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import os +import sys + +logging.basicConfig(level=logging.ERROR) + +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) + +import taskflow.engines +from taskflow.patterns import linear_flow as lf +from taskflow.persistence import backends +from taskflow import task +from taskflow.utils import persistence_utils as p_utils + + +### UTILITY FUNCTIONS ######################################### + + +def print_task_states(flowdetail, msg): + print(msg) + print('Flow state: %s' % flowdetail.state) + items = sorted((td.name, td.version, td.state, td.results) + for td in flowdetail) + for item in items: + print("%s==%s: %s, result=%r" % item) + + +def get_backend(): + try: + backend_uri = sys.argv[1] + except Exception: + backend_uri = 'sqlite://' + + backend = backends.fetch({'connection': backend_uri}) + backend.get_connection().upgrade() + return backend + + +def find_flow_detail(backend, lb_id, fd_id): + conn = backend.get_connection() + lb = conn.get_logbook(lb_id) + return lb.find(fd_id) + + +### CREATE FLOW ############################################### + + +class InterruptTask(task.Task): + def execute(self): + # DO NOT TRY THIS AT HOME + engine.suspend() + + +class TestTask(task.Task): + def execute(self): + print 'executing %s' % self + return 'ok' + + +def flow_factory(): + return lf.Flow('resume from backend example').add( + TestTask(name='first'), + InterruptTask(name='boom'), + TestTask(name='second')) + + +### INITIALIZE PERSISTENCE #################################### + +backend = get_backend() +logbook = p_utils.temporary_log_book(backend) + + +### CREATE AND RUN THE FLOW: FIRST ATTEMPT #################### + +flow = flow_factory() +flowdetail = p_utils.create_flow_detail(flow, logbook, backend) +engine = taskflow.engines.load(flow, flow_detail=flowdetail, + backend=backend) + +print_task_states(flowdetail, "\nAt the beginning, there is no state:") +print("\nRunning:") +engine.run() +print_task_states(flowdetail, "\nAfter running:") + + +### RE-CREATE, RESUME, RUN #################################### + +print("\nResuming and running again:") +# reload flowdetail from backend +flowdetail2 = find_flow_detail(backend, logbook.uuid, + flowdetail.uuid) +engine2 = taskflow.engines.load(flow_factory(), + flow_detail=flowdetail, + backend=backend) +engine2.run() +print_task_states(flowdetail, "\nAt the end:") diff --git a/taskflow/examples/resume_many_flows.out.txt b/taskflow/examples/resume_many_flows.out.txt new file mode 100644 index 000000000..bfaa07277 --- /dev/null +++ b/taskflow/examples/resume_many_flows.out.txt @@ -0,0 +1,32 @@ +Run flow: +Running flow example 18995b55-aaad-49fa-938f-006ac21ea4c7 +executing first==1.0 +executing boom==1.0 +> this time not exiting +executing second==1.0 + + +Run flow, something happens: +Running flow example f8f62ea6-1c9b-4e81-9ff9-1acaa299a648 +executing first==1.0 +executing boom==1.0 +> Critical error: boom = exit please + + +Run flow, something happens again: +Running flow example 16f11c15-4d8a-4552-b422-399565c873c4 +executing first==1.0 +executing boom==1.0 +> Critical error: boom = exit please + + +Resuming all failed flows +Resuming flow example f8f62ea6-1c9b-4e81-9ff9-1acaa299a648 +executing boom==1.0 +> this time not exiting +executing second==1.0 +Resuming flow example 16f11c15-4d8a-4552-b422-399565c873c4 +executing boom==1.0 +> this time not exiting +executing second==1.0 + diff --git a/taskflow/examples/resume_many_flows.py b/taskflow/examples/resume_many_flows.py new file mode 100644 index 000000000..bab583288 --- /dev/null +++ b/taskflow/examples/resume_many_flows.py @@ -0,0 +1,71 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import subprocess +import sys +import tempfile + + +def _exec(cmd, add_env=None): + env = None + if add_env: + env = os.environ.copy() + env.update(add_env) + + proc = subprocess.Popen(cmd, env=env, stdin=None, + stdout=subprocess.PIPE, + stderr=sys.stderr) + + stdout, stderr = proc.communicate() + rc = proc.returncode + if rc != 0: + raise RuntimeError("Could not run %s [%s]", cmd, rc) + print stdout + + +def _path_to(name): + return os.path.abspath(os.path.join(os.path.dirname(__file__), + 'resume_many_flows', name)) + + +def main(): + try: + fd, db_path = tempfile.mkstemp(prefix='tf-resume-example') + os.close(fd) + backend_uri = 'sqlite:///%s' % db_path + + def run_example(name, add_env=None): + _exec([sys.executable, _path_to(name), backend_uri], add_env) + + print('Run flow:') + run_example('run_flow.py') + + print('\nRun flow, something happens:') + run_example('run_flow.py', {'BOOM': 'exit please'}) + + print('\nRun flow, something happens again:') + run_example('run_flow.py', {'BOOM': 'exit please'}) + + print('\nResuming all failed flows') + run_example('resume_all.py') + finally: + os.unlink(db_path) + +if __name__ == '__main__': + main() diff --git a/taskflow/examples/resume_many_flows/my_flows.py b/taskflow/examples/resume_many_flows/my_flows.py new file mode 100644 index 000000000..4cd81e6dd --- /dev/null +++ b/taskflow/examples/resume_many_flows/my_flows.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os + +from taskflow.patterns import linear_flow as lf +from taskflow import task + + +class UnfortunateTask(task.Task): + def execute(self): + print('executing %s' % self) + boom = os.environ.get('BOOM') + if boom: + print('> Critical error: boom = %s' % boom) + raise SystemExit() + else: + print('> this time not exiting') + + +class TestTask(task.Task): + def execute(self): + print('executing %s' % self) + + +def flow_factory(): + return lf.Flow('example').add( + TestTask(name='first'), + UnfortunateTask(name='boom'), + TestTask(name='second')) diff --git a/taskflow/examples/resume_many_flows/my_utils.py b/taskflow/examples/resume_many_flows/my_utils.py new file mode 100644 index 000000000..dd1fbd5ed --- /dev/null +++ b/taskflow/examples/resume_many_flows/my_utils.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import sys + +from taskflow.persistence import backends + + +def get_backend(): + try: + backend_uri = sys.argv[1] + except Exception: + backend_uri = 'sqlite://' + backend = backends.fetch({'connection': backend_uri}) + backend.get_connection().upgrade() + return backend diff --git a/taskflow/examples/resume_many_flows/resume_all.py b/taskflow/examples/resume_many_flows/resume_all.py new file mode 100644 index 000000000..82be3073d --- /dev/null +++ b/taskflow/examples/resume_many_flows/resume_all.py @@ -0,0 +1,62 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import os +import sys + +logging.basicConfig(level=logging.ERROR) + +self_dir = os.path.abspath(os.path.dirname(__file__)) +top_dir = os.path.abspath( + os.path.join(self_dir, os.pardir, os.pardir, os.pardir)) + +sys.path.insert(0, top_dir) +sys.path.insert(0, self_dir) + + +import taskflow.engines + +from taskflow import states + +import my_flows # noqa +import my_utils # noqa + + +FINISHED_STATES = (states.SUCCESS, states.FAILURE, states.REVERTED) + + +def resume(flowdetail, backend): + print('Resuming flow %s %s' % (flowdetail.name, flowdetail.uuid)) + engine = taskflow.engines.load(my_flows.flow_factory(), + flow_detail=flowdetail, + backend=backend) + engine.run() + + +def main(): + backend = my_utils.get_backend() + logbooks = list(backend.get_connection().get_logbooks()) + for lb in logbooks: + for fd in lb: + if fd.state not in FINISHED_STATES: + resume(fd, backend) + + +if __name__ == '__main__': + main() diff --git a/taskflow/examples/resume_many_flows/run_flow.py b/taskflow/examples/resume_many_flows/run_flow.py new file mode 100644 index 000000000..523c98461 --- /dev/null +++ b/taskflow/examples/resume_many_flows/run_flow.py @@ -0,0 +1,49 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import os +import sys + +logging.basicConfig(level=logging.ERROR) + +self_dir = os.path.abspath(os.path.dirname(__file__)) +top_dir = os.path.abspath( + os.path.join(self_dir, os.pardir, os.pardir, os.pardir)) + +sys.path.insert(0, top_dir) +sys.path.insert(0, self_dir) + +import taskflow.engines +from taskflow.utils import persistence_utils as p_utils + +import my_flows # noqa +import my_utils # noqa + + +backend = my_utils.get_backend() +logbook = p_utils.temporary_log_book(backend) + +flow = my_flows.flow_factory() + +flowdetail = p_utils.create_flow_detail(flow, logbook, backend) +engine = taskflow.engines.load(flow, flow_detail=flowdetail, + backend=backend) + +print('Running flow %s %s' % (flowdetail.name, flowdetail.uuid)) +engine.run() diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py index ef0dc5b1a..759416a83 100644 --- a/taskflow/tests/unit/test_action_engine.py +++ b/taskflow/tests/unit/test_action_engine.py @@ -647,6 +647,23 @@ class SuspendFlowTest(EngineTestBase): 'b reverted(5)', 'a reverted(5)']) + def test_storage_is_rechecked(self): + flow = lf.Flow('linear').add( + AutoSuspendingTask(self.values, 'b'), + TestTask(self.values, name='c') + ) + engine = self._make_engine(flow) + engine.storage.inject({'engine': engine, 'boo': True}) + engine.run() + self.assertEquals(engine.storage.get_flow_state(), states.SUSPENDED) + # uninject engine + engine.storage.save( + engine.storage.get_uuid_by_name(engine.storage.injector_name), + None, + states.FAILURE) + with self.assertRaises(exc.MissingDependencies): + engine.run() + class SingleThreadedEngineTest(EngineTaskTest, EngineLinearFlowTest, diff --git a/taskflow/tests/unit/test_utils_failure.py b/taskflow/tests/unit/test_utils_failure.py index 79acb17e2..38a3ff2fa 100644 --- a/taskflow/tests/unit/test_utils_failure.py +++ b/taskflow/tests/unit/test_utils_failure.py @@ -143,6 +143,7 @@ class FailureObjectTestCase(test.TestCase): copied = fail_obj.copy() self.assertIsNot(fail_obj, copied) self.assertEquals(fail_obj, copied) + self.assertTrue(fail_obj.matches(copied)) def test_failure_copy_recaptured(self): captured = _captured_failure('Woot!') @@ -153,6 +154,7 @@ class FailureObjectTestCase(test.TestCase): self.assertIsNot(fail_obj, copied) self.assertEquals(fail_obj, copied) self.assertFalse(fail_obj != copied) + self.assertTrue(fail_obj.matches(copied)) def test_recaptured_not_eq(self): captured = _captured_failure('Woot!') @@ -161,6 +163,29 @@ class FailureObjectTestCase(test.TestCase): exc_type_names=list(captured)) self.assertFalse(fail_obj == captured) self.assertTrue(fail_obj != captured) + self.assertTrue(fail_obj.matches(captured)) + + def test_two_captured_eq(self): + captured = _captured_failure('Woot!') + captured2 = _captured_failure('Woot!') + self.assertEquals(captured, captured2) + + def test_two_recaptured_neq(self): + captured = _captured_failure('Woot!') + fail_obj = misc.Failure(exception_str=captured.exception_str, + traceback_str=captured.traceback_str, + exc_type_names=list(captured)) + new_exc_str = captured.exception_str.replace('Woot', 'w00t') + fail_obj2 = misc.Failure(exception_str=new_exc_str, + traceback_str=captured.traceback_str, + exc_type_names=list(captured)) + self.assertNotEquals(fail_obj, fail_obj2) + self.assertFalse(fail_obj2.matches(fail_obj)) + + def test_compares_to_none(self): + captured = _captured_failure('Woot!') + self.assertNotEquals(captured, None) + self.assertFalse(captured.matches(None)) class WrappedFailureTestCase(test.TestCase): diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index d2afacfc3..fe848637f 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -233,11 +233,17 @@ def are_equal_exc_info_tuples(ei1, ei2): # NOTE(imelnikov): we can't compare exceptions with '==' # because we want exc_info be equal to it's copy made with # copy_exc_info above - return all((ei1[0] is ei2[0], - type(ei1[1]) == type(ei2[1]), + if ei1[0] is not ei2[0]: + return False + if not all((type(ei1[1]) == type(ei2[1]), str(ei1[1]) == str(ei2[1]), - repr(ei1[1]) == repr(ei2[1]), - ei1[2] == ei2[2])) + repr(ei1[1]) == repr(ei2[1]))): + return False + if ei1[2] == ei2[2]: + return True + tb1 = traceback.format_tb(ei1[2]) + tb2 = traceback.format_tb(ei2[2]) + return tb1 == tb2 class Failure(object): @@ -268,19 +274,32 @@ class Failure(object): raise TypeError('Failure.__init__ got unexpected keyword ' 'argument: %r' % kwargs.keys()[0]) + def _matches(self, other): + if self is other: + return True + return (self._exc_type_names == other._exc_type_names + and self.exception_str == other.exception_str + and self.traceback_str == other.traceback_str) + + def matches(self, other): + if not isinstance(other, Failure): + return False + if self.exc_info is None or other.exc_info is None: + return self._matches(other) + else: + return self == other + def __eq__(self, other): if not isinstance(other, Failure): return NotImplemented - return all((are_equal_exc_info_tuples(self.exc_info, other.exc_info), - self._exc_type_names == other._exc_type_names, - self.exception_str == other.exception_str, - self.traceback_str == other.traceback_str)) + return (self._matches(other) and + are_equal_exc_info_tuples(self.exc_info, other.exc_info)) def __ne__(self, other): return not (self == other) # NOTE(imelnikov): obj.__hash__() should return same values for equal - # objects, so we should redefine __hash__. Our equality semantics + # objects, so we should redefine __hash__. Failure equality semantics # is a bit complicated, so for now we just mark Failure objects as # unhashable. See python docs on object.__hash__ for more info: # http://docs.python.org/2/reference/datamodel.html#object.__hash__ @@ -321,6 +340,7 @@ class Failure(object): this failure is reraised. Else, WrappedFailure exception is raised with failures list as causes. """ + failures = list(failures) if len(failures) == 1: failures[0].reraise() elif len(failures) > 1: