# -*- coding: utf-8 -*- # vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. from taskflow import exceptions from taskflow import test from taskflow.utils import misc def _captured_failure(msg): try: raise RuntimeError(msg) except Exception: return misc.Failure() class GeneralFailureObjTestsMixin(object): def test_captures_message(self): self.assertEquals(self.fail_obj.exception_str, 'Woot!') def test_str(self): self.assertEquals(str(self.fail_obj), 'Failure: exceptions.RuntimeError: Woot!') def test_exception_types(self): self.assertEquals(list(self.fail_obj), ['exceptions.RuntimeError', 'exceptions.StandardError', 'exceptions.Exception']) def test_check_str(self): val = 'exceptions.StandardError' self.assertEquals(self.fail_obj.check(val), val) def test_check_str_not_there(self): val = 'exceptions.ValueError' self.assertEquals(self.fail_obj.check(val), None) def test_check_type(self): self.assertIs(self.fail_obj.check(RuntimeError), RuntimeError) def test_check_type_not_there(self): self.assertIs(self.fail_obj.check(ValueError), None) class CaptureFailureTestCase(test.TestCase, GeneralFailureObjTestsMixin): def setUp(self): super(CaptureFailureTestCase, self).setUp() self.fail_obj = _captured_failure('Woot!') def test_captures_value(self): self.assertIsInstance(self.fail_obj.exception, RuntimeError) def test_captures_exc_info(self): exc_info = self.fail_obj.exc_info self.assertEquals(len(exc_info), 3) self.assertEquals(exc_info[0], RuntimeError) self.assertIs(exc_info[1], self.fail_obj.exception) def test_reraises(self): with self.assertRaisesRegexp(RuntimeError, '^Woot!$'): self.fail_obj.reraise() class ReCreatedFailureTestCase(test.TestCase, GeneralFailureObjTestsMixin): def setUp(self): super(ReCreatedFailureTestCase, self).setUp() fail_obj = _captured_failure('Woot!') self.fail_obj = misc.Failure(exception_str=fail_obj.exception_str, traceback_str=fail_obj.traceback_str, exc_type_names=list(fail_obj)) def test_value_lost(self): self.assertIs(self.fail_obj.exception, None) def test_no_exc_info(self): self.assertIs(self.fail_obj.exc_info, None) def test_reraises(self): with self.assertRaises(exceptions.WrappedFailure) as ctx: self.fail_obj.reraise() exc = ctx.exception self.assertIs(exc.check(RuntimeError), RuntimeError) class FailureObjectTestCase(test.TestCase): def test_dont_catch_base_exception(self): try: raise SystemExit() except BaseException: self.assertRaises(TypeError, misc.Failure) def test_unknown_argument(self): with self.assertRaises(TypeError) as ctx: misc.Failure( exception_str='Woot!', traceback_str=None, exc_type_names=['exceptions.Exception'], hi='hi there') expected = "Failure.__init__ got unexpected keyword argument: 'hi'" self.assertEquals(str(ctx.exception), expected) def test_empty_does_not_reraise(self): self.assertIs(misc.Failure.reraise_if_any([]), None) def test_reraises_one(self): fls = [_captured_failure('Woot!')] with self.assertRaisesRegexp(RuntimeError, '^Woot!$'): misc.Failure.reraise_if_any(fls) def test_reraises_several(self): fls = [ _captured_failure('Woot!'), _captured_failure('Oh, not again!') ] with self.assertRaises(exceptions.WrappedFailure) as ctx: misc.Failure.reraise_if_any(fls) self.assertEquals(list(ctx.exception), fls) def test_failure_copy(self): fail_obj = _captured_failure('Woot!') copied = fail_obj.copy() self.assertIsNot(fail_obj, copied) self.assertEquals(fail_obj, copied) def test_failure_copy_recaptured(self): captured = _captured_failure('Woot!') fail_obj = misc.Failure(exception_str=captured.exception_str, traceback_str=captured.traceback_str, exc_type_names=list(captured)) copied = fail_obj.copy() self.assertIsNot(fail_obj, copied) self.assertEquals(fail_obj, copied) self.assertFalse(fail_obj != copied) def test_recaptured_not_eq(self): captured = _captured_failure('Woot!') fail_obj = misc.Failure(exception_str=captured.exception_str, traceback_str=captured.traceback_str, exc_type_names=list(captured)) self.assertFalse(fail_obj == captured) self.assertTrue(fail_obj != captured) class WrappedFailureTestCase(test.TestCase): def test_simple_iter(self): fail_obj = _captured_failure('Woot!') wf = exceptions.WrappedFailure([fail_obj]) self.assertEquals(len(wf), 1) self.assertEquals(list(wf), [fail_obj]) def test_simple_check(self): fail_obj = _captured_failure('Woot!') wf = exceptions.WrappedFailure([fail_obj]) self.assertEquals(wf.check(RuntimeError), RuntimeError) self.assertEquals(wf.check(ValueError), None) def test_two_failures(self): fls = [ _captured_failure('Woot!'), _captured_failure('Oh, not again!') ] wf = exceptions.WrappedFailure(fls) self.assertEquals(len(wf), 2) self.assertEquals(list(wf), fls) def test_flattening(self): f1 = _captured_failure('Wrap me') f2 = _captured_failure('Wrap me, too') f3 = _captured_failure('Woot!') try: raise exceptions.WrappedFailure([f1, f2]) except Exception: fail_obj = misc.Failure() wf = exceptions.WrappedFailure([fail_obj, f3]) self.assertEquals(list(wf), [f1, f2, f3])