diff --git a/oslo_utils/excutils.py b/oslo_utils/excutils.py index 1faaf0d3..9a0d9ca6 100644 --- a/oslo_utils/excutils.py +++ b/oslo_utils/excutils.py @@ -17,6 +17,7 @@ Exception related utilities. """ +import functools import logging import os import sys @@ -278,3 +279,69 @@ def forever_retry_uncaught_exceptions(*args, **kwargs): return decorator(args[0]) else: return decorator + + +class exception_filter(object): + """A context manager that prevents some exceptions from being raised. + + Use this class as a decorator for a function that returns whether a given + exception should be ignored, in cases where complex logic beyond subclass + matching is required. e.g. + + >>> @exception_filter + >>> def ignore_test_assertions(ex): + ... return isinstance(ex, AssertionError) and 'test' in str(ex) + + The filter matching function can then be used as a context manager: + + >>> with ignore_test_assertions: + ... assert False, 'This is a test' + + or called directly: + + >>> try: + ... assert False, 'This is a test' + ... except Exception as ex: + ... ignore_test_assertions(ex) + + Any non-matching exception will be re-raised. When the filter is used as a + context manager, the traceback for re-raised exceptions is always + preserved. When the filter is called as a function, the traceback is + preserved provided that no other exceptions have been raised in the + intervening time. The context manager method is preferred for this reason + except in cases where the ignored exception affects control flow. + """ + + def __init__(self, should_ignore_ex): + self._should_ignore_ex = should_ignore_ex + + if all(hasattr(should_ignore_ex, a) + for a in functools.WRAPPER_ASSIGNMENTS): + functools.update_wrapper(self, should_ignore_ex) + + def __get__(self, obj, owner): + return type(self)(self._should_ignore_ex.__get__(obj, owner)) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_val is not None: + return self._should_ignore_ex(exc_val) + + def __call__(self, ex): + """Re-raise any exception value not being filtered out. + + If the exception was the last to be raised, it will be re-raised with + its original traceback. + """ + exc_type, exc_val, traceback = sys.exc_info() + + try: + if not self._should_ignore_ex(ex): + if exc_val is ex: + six.reraise(exc_type, exc_val, traceback) + else: + raise ex + finally: + del exc_type, exc_val, traceback diff --git a/oslo_utils/tests/test_excutils.py b/oslo_utils/tests/test_excutils.py index 5300da74..6f419725 100644 --- a/oslo_utils/tests/test_excutils.py +++ b/oslo_utils/tests/test_excutils.py @@ -320,3 +320,250 @@ class ForeverRetryUncaughtExceptionsTest(test_base.BaseTestCase): self.exc_retrier_sequence(exc_id=2, exc_count=1, after_timestamp_calls=[110, 111]) self.exc_retrier_common_end() + + +class ExceptionFilterTest(test_base.BaseTestCase): + + def _make_filter_func(self, ignore_classes=AssertionError): + @excutils.exception_filter + def ignore_exceptions(ex): + '''Ignore some exceptions F''' + return isinstance(ex, ignore_classes) + + return ignore_exceptions + + def _make_filter_method(self, ignore_classes=AssertionError): + class ExceptionIgnorer(object): + def __init__(self, ignore): + self.ignore = ignore + + @excutils.exception_filter + def ignore_exceptions(self, ex): + '''Ignore some exceptions M''' + return isinstance(ex, self.ignore) + + return ExceptionIgnorer(ignore_classes).ignore_exceptions + + def _make_filter_classmethod(self, ignore_classes=AssertionError): + class ExceptionIgnorer(object): + ignore = ignore_classes + + @excutils.exception_filter + @classmethod + def ignore_exceptions(cls, ex): + '''Ignore some exceptions C''' + return isinstance(ex, cls.ignore) + + return ExceptionIgnorer.ignore_exceptions + + def _make_filter_staticmethod(self, ignore_classes=AssertionError): + class ExceptionIgnorer(object): + @excutils.exception_filter + @staticmethod + def ignore_exceptions(ex): + '''Ignore some exceptions S''' + return isinstance(ex, ignore_classes) + + return ExceptionIgnorer.ignore_exceptions + + def test_filter_func_call(self): + ignore_assertion_error = self._make_filter_func() + + try: + assert False, "This is a test" + except Exception as exc: + ignore_assertion_error(exc) + + def test_raise_func_call(self): + ignore_assertion_error = self._make_filter_func() + + try: + raise RuntimeError + except Exception as exc: + self.assertRaises(RuntimeError, ignore_assertion_error, exc) + + def test_raise_previous_func_call(self): + ignore_assertion_error = self._make_filter_func() + + try: + raise RuntimeError + except Exception as exc1: + try: + raise RuntimeError + except Exception as exc2: + self.assertIsNot(exc1, exc2) + raised = self.assertRaises(RuntimeError, + ignore_assertion_error, + exc1) + self.assertIs(exc1, raised) + + def test_raise_previous_after_filtered_func_call(self): + ignore_assertion_error = self._make_filter_func() + + try: + raise RuntimeError + except Exception as exc1: + try: + assert False, "This is a test" + except Exception: + pass + self.assertRaises(RuntimeError, ignore_assertion_error, exc1) + + def test_raise_other_func_call(self): + @excutils.exception_filter + def translate_exceptions(ex): + raise RuntimeError + + try: + assert False, "This is a test" + except Exception as exc: + self.assertRaises(RuntimeError, translate_exceptions, exc) + + def test_filter_func_context_manager(self): + ignore_assertion_error = self._make_filter_func() + + with ignore_assertion_error: + assert False, "This is a test" + + def test_raise_func_context_manager(self): + ignore_assertion_error = self._make_filter_func() + + def try_runtime_err(): + with ignore_assertion_error: + raise RuntimeError + + self.assertRaises(RuntimeError, try_runtime_err) + + def test_raise_other_func_context_manager(self): + @excutils.exception_filter + def translate_exceptions(ex): + raise RuntimeError + + def try_assertion(): + with translate_exceptions: + assert False, "This is a test" + + self.assertRaises(RuntimeError, try_assertion) + + def test_noexc_func_context_manager(self): + ignore_assertion_error = self._make_filter_func() + + with ignore_assertion_error: + pass + + def test_noexc_nocall_func_context_manager(self): + @excutils.exception_filter + def translate_exceptions(ex): + raise RuntimeError + + with translate_exceptions: + pass + + def test_func_docstring(self): + ignore_func = self._make_filter_func() + self.assertEqual('Ignore some exceptions F', ignore_func.__doc__) + + def test_filter_method_call(self): + ignore_assertion_error = self._make_filter_method() + + try: + assert False, "This is a test" + except Exception as exc: + ignore_assertion_error(exc) + + def test_raise_method_call(self): + ignore_assertion_error = self._make_filter_method() + + try: + raise RuntimeError + except Exception as exc: + self.assertRaises(RuntimeError, ignore_assertion_error, exc) + + def test_filter_method_context_manager(self): + ignore_assertion_error = self._make_filter_method() + + with ignore_assertion_error: + assert False, "This is a test" + + def test_raise_method_context_manager(self): + ignore_assertion_error = self._make_filter_method() + + def try_runtime_err(): + with ignore_assertion_error: + raise RuntimeError + + self.assertRaises(RuntimeError, try_runtime_err) + + def test_method_docstring(self): + ignore_func = self._make_filter_method() + self.assertEqual('Ignore some exceptions M', ignore_func.__doc__) + + def test_filter_classmethod_call(self): + ignore_assertion_error = self._make_filter_classmethod() + + try: + assert False, "This is a test" + except Exception as exc: + ignore_assertion_error(exc) + + def test_raise_classmethod_call(self): + ignore_assertion_error = self._make_filter_classmethod() + + try: + raise RuntimeError + except Exception as exc: + self.assertRaises(RuntimeError, ignore_assertion_error, exc) + + def test_filter_classmethod_context_manager(self): + ignore_assertion_error = self._make_filter_classmethod() + + with ignore_assertion_error: + assert False, "This is a test" + + def test_raise_classmethod_context_manager(self): + ignore_assertion_error = self._make_filter_classmethod() + + def try_runtime_err(): + with ignore_assertion_error: + raise RuntimeError + + self.assertRaises(RuntimeError, try_runtime_err) + + def test_classmethod_docstring(self): + ignore_func = self._make_filter_classmethod() + self.assertEqual('Ignore some exceptions C', ignore_func.__doc__) + + def test_filter_staticmethod_call(self): + ignore_assertion_error = self._make_filter_staticmethod() + + try: + assert False, "This is a test" + except Exception as exc: + ignore_assertion_error(exc) + + def test_raise_staticmethod_call(self): + ignore_assertion_error = self._make_filter_staticmethod() + + try: + raise RuntimeError + except Exception as exc: + self.assertRaises(RuntimeError, ignore_assertion_error, exc) + + def test_filter_staticmethod_context_manager(self): + ignore_assertion_error = self._make_filter_staticmethod() + + with ignore_assertion_error: + assert False, "This is a test" + + def test_raise_staticmethod_context_manager(self): + ignore_assertion_error = self._make_filter_staticmethod() + + def try_runtime_err(): + with ignore_assertion_error: + raise RuntimeError + + self.assertRaises(RuntimeError, try_runtime_err) + + def test_staticmethod_docstring(self): + ignore_func = self._make_filter_staticmethod() + self.assertEqual('Ignore some exceptions S', ignore_func.__doc__)