diff --git a/tobiko/__init__.py b/tobiko/__init__.py index 9ca5ba1f3..9608bb9bb 100644 --- a/tobiko/__init__.py +++ b/tobiko/__init__.py @@ -21,7 +21,6 @@ from tobiko.common import _config from tobiko.common import _deprecation from tobiko.common import _detail from tobiko.common import _exception -from tobiko.common import _fail from tobiko.common import _fixture from tobiko.common import _logging from tobiko.common.managers import loader as loader_manager @@ -53,9 +52,6 @@ deprecated = _deprecation.deprecated details_content = _detail.details_content -FailureException = _fail.FailureException -fail = _fail.fail - tobiko_config = _config.tobiko_config tobiko_config_dir = _config.tobiko_config_dir tobiko_config_path = _config.tobiko_config_path @@ -129,7 +125,10 @@ skip_test = _skip.skip_test skip_unless = _skip.skip_unless skip = _skip.skip +add_cleanup = _testcase.add_cleanup assert_test_case_was_skipped = _testcase.assert_test_case_was_skipped +fail = _testcase.fail +FailureException = _testcase.FailureException get_test_case = _testcase.get_test_case pop_test_case = _testcase.pop_test_case push_test_case = _testcase.push_test_case diff --git a/tobiko/common/_fail.py b/tobiko/common/_fail.py deleted file mode 100644 index 9d52e0ebc..000000000 --- a/tobiko/common/_fail.py +++ /dev/null @@ -1,40 +0,0 @@ -# Copyright 2018 Red Hat -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -from __future__ import absolute_import - -import typing - -import testtools - - -FailureException = testtools.TestCase.failureException - - -def fail(msg: str, *args, **kwargs) -> typing.NoReturn: - """Fail immediately current test case execution, with the given message. - - Unconditionally raises a tobiko.FailureException as in below equivalent - code: - - raise FailureException(msg.format(*args, **kwargs)) - - :param msg: string message used to create FailureException - :param *args: positional arguments to be passed to str.format method - :param **kwargs: key-word arguments to be passed to str.format method - :returns: It never returns - :raises FailureException: - """ - if args or kwargs: - msg = msg.format(*args, **kwargs) - raise FailureException(msg) diff --git a/tobiko/common/_retry.py b/tobiko/common/_retry.py index 9f439364a..a818e2fed 100644 --- a/tobiko/common/_retry.py +++ b/tobiko/common/_retry.py @@ -22,7 +22,7 @@ from oslo_log import log import testtools from tobiko.common import _exception -from tobiko.common import _fail +from tobiko.common import _testcase from tobiko.common import _time @@ -274,7 +274,7 @@ def retry_test_case(*exceptions: Exception, typing.Callable[[typing.Callable], typing.Callable]: """Re-run test case method in case it fails """ - exceptions = exceptions or (_fail.FailureException,) + exceptions = exceptions or _testcase.FailureException return retry_on_exception(*exceptions, count=count, timeout=timeout, diff --git a/tobiko/common/_testcase.py b/tobiko/common/_testcase.py index e7595ef0d..b44a60de2 100644 --- a/tobiko/common/_testcase.py +++ b/tobiko/common/_testcase.py @@ -16,6 +16,7 @@ from __future__ import absolute_import import os import sys import typing +import unittest from oslo_log import log import testtools @@ -28,11 +29,11 @@ LOG = log.getLogger(__name__) os.environ.setdefault('PYTHON', sys.executable) -TestCase = testtools.TestCase +TestCase = unittest.TestCase class TestCaseEntry(typing.NamedTuple): - test_case: testtools.TestCase + test_case: unittest.TestCase start_time: float @@ -43,21 +44,21 @@ class TestCasesManager(object): def __init__(self): self._test_cases: typing.List[TestCaseEntry] = [] - def get_test_case(self) -> testtools.TestCase: + def get_test_case(self) -> unittest.TestCase: try: return self._test_cases[-1].test_case except IndexError: return DUMMY_TEST_CASE - def pop_test_case(self) -> testtools.TestCase: + def pop_test_case(self) -> unittest.TestCase: entry = self._test_cases.pop() elapsed_time = _time.time() - entry.start_time LOG.debug(f"Exit test case '{entry.test_case.id()}' after " f"{elapsed_time} seconds") return entry.test_case - def push_test_case(self, test_case: testtools.TestCase): - _exception.check_valid_type(test_case, testtools.TestCase) + def push_test_case(self, test_case: unittest.TestCase): + _exception.check_valid_type(test_case, unittest.TestCase) entry = TestCaseEntry(test_case=test_case, start_time=_time.time()) self._test_cases.append(entry) @@ -67,22 +68,22 @@ class TestCasesManager(object): TEST_CASES = TestCasesManager() -def push_test_case(test_case: testtools.TestCase, +def push_test_case(test_case: unittest.TestCase, manager: TestCasesManager = TEST_CASES): return manager.push_test_case(test_case=test_case) def pop_test_case(manager: TestCasesManager = TEST_CASES) -> \ - testtools.TestCase: + unittest.TestCase: return manager.pop_test_case() def get_test_case(manager: TestCasesManager = TEST_CASES) -> \ - testtools.TestCase: + unittest.TestCase: return manager.get_test_case() -class DummyTestCase(testtools.TestCase): +class DummyTestCase(unittest.TestCase): def runTest(self): pass @@ -91,10 +92,16 @@ class DummyTestCase(testtools.TestCase): DUMMY_TEST_CASE = DummyTestCase() -def run_test(test_case: testtools.TestCase, - test_result: testtools.TestResult = None) -> testtools.TestResult: - test_result = test_result or testtools.TestResult() - test_case.run(test_result) +def run_test(test_case: unittest.TestCase, + test_result: unittest.TestResult = None, + manager: TestCasesManager = TEST_CASES) -> unittest.TestResult: + test_result = test_result or unittest.TestResult() + push_test_case(test_case, manager=manager) + try: + test_case.run(test_result) + finally: + popped = pop_test_case(manager=manager) + assert test_case is popped return test_result @@ -103,16 +110,15 @@ def assert_in(needle, haystack, message: typing.Optional[str] = None, get_test_case(manager=manager).assertIn(needle, haystack, message) -def get_skipped_test_cases(test_result: testtools.TestResult, - skip_reason: typing.Optional[str] = None): - if skip_reason is not None: - assert_in(skip_reason, test_result.skip_reasons) - return test_result.skip_reasons[skip_reason] - else: - skipped_test_cases = list() - for cases in test_result.skip_reasons.values(): - skipped_test_cases.extend(cases) - return skipped_test_cases +def get_skipped_test_cases(test_result: unittest.TestResult, + skip_reason: str = None) \ + -> typing.List[unittest.TestCase]: + if isinstance(test_result, testtools.TestResult): + raise NotImplementedError( + f"Unsupported result type: {test_result}") + return [case + for case, reason in test_result.skipped + if skip_reason is None or skip_reason in reason] def assert_test_case_was_skipped(test_case: testtools.TestCase, @@ -122,3 +128,32 @@ def assert_test_case_was_skipped(test_case: testtools.TestCase, skipped_tests = get_skipped_test_cases(test_result=test_result, skip_reason=skip_reason) assert_in(test_case, skipped_tests, manager=manager) + + +FailureException = typing.cast( + typing.Tuple[Exception, ...], + (unittest.TestCase.failureException, + testtools.TestCase.failureException, + AssertionError)) + + +def fail(msg: str, + cause: typing.Type[Exception] = None) -> typing.NoReturn: + """Fail immediately current test case execution, with the given message. + + Unconditionally raises a tobiko.FailureException as in below equivalent + code: + + raise FailureException(msg.format(*args, **kwargs)) + + :param msg: string message used to create FailureException + :param cause: error that caused the failure + :returns: It never returns + :raises failure_type or FailureException exception type: + """ + failure_type = get_test_case().failureException + raise failure_type(msg) from cause + + +def add_cleanup(function: typing.Callable, *args, **kwargs): + get_test_case().addCleanup(function, *args, **kwargs) diff --git a/tobiko/openstack/tests/_nova.py b/tobiko/openstack/tests/_nova.py index 78b91b7d1..be8b2fa70 100644 --- a/tobiko/openstack/tests/_nova.py +++ b/tobiko/openstack/tests/_nova.py @@ -20,6 +20,7 @@ import typing # noqa import time # import testtools +import testtools import tobiko from tobiko.shell import ping @@ -81,6 +82,7 @@ def test_servers_creation(stack=TestServerCreationStack, # Create all servers stacks for fixture in fixtures: + assert isinstance(test_case, testtools.TestCase) test_case.useFixture(fixture) # Check every server ID is unique and new diff --git a/tobiko/shell/curl/_process.py b/tobiko/shell/curl/_process.py index 755ed2a13..d153c8793 100644 --- a/tobiko/shell/curl/_process.py +++ b/tobiko/shell/curl/_process.py @@ -349,7 +349,7 @@ def download_file(url: str, headers_file_name=headers_file_name, ssh_client=ssh_client, sudo=sudo) - except tobiko.FailureException: + except tobiko.FailureException: # type: ignore pass else: LOG.debug(f"File '{url}' already downloaded.") diff --git a/tobiko/shell/ping/_exception.py b/tobiko/shell/ping/_exception.py index eb7acc623..98d77bf51 100644 --- a/tobiko/shell/ping/_exception.py +++ b/tobiko/shell/ping/_exception.py @@ -48,7 +48,7 @@ class BadAddressPingError(PingError): message = "bad address: {address}" -class PingFailed(PingError, tobiko.FailureException): +class PingFailed(PingError, AssertionError): """Raised when ping timeout expires before reaching expected message count """ diff --git a/tobiko/shell/ping/_statistics.py b/tobiko/shell/ping/_statistics.py index bf72b0766..f25418bb5 100644 --- a/tobiko/shell/ping/_statistics.py +++ b/tobiko/shell/ping/_statistics.py @@ -122,8 +122,13 @@ class PingStatistics(object): """ - def __init__(self, source=None, destination=None, transmitted=0, - received=0, undelivered=0, begin_interval=None, + def __init__(self, + source=None, + destination=None, + transmitted: int = 0, + received: int = 0, + undelivered: int = 0, + begin_interval=None, end_interval=None): self.source = source self.destination = destination @@ -134,18 +139,17 @@ class PingStatistics(object): self.end_interval = end_interval @property - def unreceived(self): + def unreceived(self) -> int: return max(0, self.transmitted - self.received) @property - def delivered(self): + def delivered(self) -> int: return max(0, self.transmitted - self.undelivered) @property - def loss(self): - transmitted = max(0, float(self.transmitted)) - if transmitted: - return float(self.unreceived) / transmitted + def loss(self) -> float: + if self.transmitted > 0: + return float(self.unreceived) / float(self.transmitted) else: return 0. @@ -173,28 +177,20 @@ class PingStatistics(object): def assert_transmitted(self): if not self.transmitted: - tobiko.fail("{transmitted!r} package(s) has been transmitted " - "to {destination!r}", - transmitted=self.transmitted, - destination=self.destination) + tobiko.fail(f"{self.transmitted} package(s) has been transmitted " + f"to {self.destination}") def assert_not_transmitted(self): if self.transmitted: - tobiko.fail("{transmitted!r} package(s) has been transmitted to " - "{destination!r}", - transmitted=self.transmitted, - destination=self.destination) + tobiko.fail(f"{self.transmitted} package(s) has been " + f"transmitted to {self.destination}") def assert_replied(self): if not self.received: - tobiko.fail("{received!r} reply package(s) has been received from " - "{destination!r}", - received=self.received, - destination=self.destination) + tobiko.fail(f"{self.received} reply package(s) has been " + f"received from {self.destination}") def assert_not_replied(self): if self.received: - tobiko.fail("{received!r} reply package(s) has been received from " - "{destination!r}", - received=self.received, - destination=self.destination) + tobiko.fail(f"{self.received} reply package(s) has been received " + f"from {self.destination}") diff --git a/tobiko/tests/scenario/cloud/test_nodes.py b/tobiko/tests/scenario/cloud/test_nodes.py index 216790214..6b90b2b75 100644 --- a/tobiko/tests/scenario/cloud/test_nodes.py +++ b/tobiko/tests/scenario/cloud/test_nodes.py @@ -34,8 +34,8 @@ class OpenstackNodesTest(testtools.TestCase): ping.ping(node.public_ip).assert_replied() other = ips.setdefault(node.public_ip, node) if node is not other: - tobiko.fail("Nodes {!r} and {!r} have the same IP: {!s}", - node.name, other.name, node.public_ip) + tobiko.fail(f"Nodes {node.name} and {other.name} have the " + f"same IP: {node.public_ip}") def test_hostnames(self): hostnames = dict() @@ -44,8 +44,8 @@ class OpenstackNodesTest(testtools.TestCase): self.assertTrue(hostname.startswith(node.name)) other = hostnames.setdefault(hostname, node) if node is not other: - tobiko.fail("Nodes {!r} and {!r} have the same hostname: {!r}", - node.name, other.name, hostname) + tobiko.fail(f"Nodes {node.name} and {other.name} have the " + f"same hostname: {hostname}") def test_network_namespaces(self): for node in self.topology.nodes: @@ -56,6 +56,5 @@ class OpenstackNodesTest(testtools.TestCase): network_namespace=namespace) other_ips = namespaces_ips.setdefault(namespace, ips) if ips is not other_ips: - tobiko.fail("Duplicate network namespace {!r} in node " - "{!r}: {!r}, {!r}", namespace, node.name, - other_ips, ips) + tobiko.fail(f"Duplicate network namespace {namespace} in " + f"node {node.name}: {other_ips}, {ips}") diff --git a/tobiko/tests/unit/test_fail.py b/tobiko/tests/unit/test_fail.py deleted file mode 100644 index e01c08131..000000000 --- a/tobiko/tests/unit/test_fail.py +++ /dev/null @@ -1,38 +0,0 @@ -# Copyright 2019 Red Hat -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -from __future__ import absolute_import - -import tobiko -from tobiko.tests import unit - - -class TestFail(unit.TobikoUnitTest): - - def test_fail(self): - self._test_fail('some_reason') - - def test_fail_with_args(self): - self._test_fail('some {1!r} {0!s}', 'reason', 'strange') - - def test_fail_with_kwargs(self): - self._test_fail('some {b!r} {a!s}', a='reason', b='strange') - - def _test_fail(self, reason, *args, **kwargs): - ex = self.assertRaises( - tobiko.FailureException, tobiko.fail, reason, *args, **kwargs) - if args or kwargs: - expected_reason = reason.format(*args, **kwargs) - else: - expected_reason = reason - self.assertEqual(expected_reason, str(ex)) diff --git a/tobiko/tests/unit/test_testcase.py b/tobiko/tests/unit/test_testcase.py index e3fa4dacd..c3867af35 100644 --- a/tobiko/tests/unit/test_testcase.py +++ b/tobiko/tests/unit/test_testcase.py @@ -13,7 +13,8 @@ # under the License. from __future__ import absolute_import -import testtools +import unittest +from unittest import mock import tobiko from tobiko.tests import unit @@ -38,25 +39,24 @@ class TestCaseTest(unit.TobikoUnitTest): def test_get_test_case_out_of_context(self): manager = tobiko.TestCasesManager() result = tobiko.get_test_case(manager=manager) - self.assertIsInstance(result, testtools.TestCase) + self.assertIsInstance(result, unittest.TestCase) self.assertEqual('tobiko.common._testcase.DummyTestCase.runTest', result.id()) def test_push_test_case(self): - class InnerTest(testtools.TestCase): + class InnerTest(unittest.TestCase): def runTest(self): pass inner_case = InnerTest() - tobiko.push_test_case(inner_case) self.assertIs(inner_case, tobiko.get_test_case()) def test_pop_test_case(self): - class InnerTest(testtools.TestCase): + class InnerTest(unittest.TestCase): def runTest(self): pass @@ -68,3 +68,63 @@ class TestCaseTest(unit.TobikoUnitTest): self.assertIs(inner_case, result) self.assertIs(self, tobiko.get_test_case()) + + def test_add_cleanup(self, + *args, + error: Exception = None, + failure: str = None, + **kwargs): + + mock_func = mock.Mock() + + class InnerTest(unittest.TestCase): + def runTest(self): + tobiko.add_cleanup(mock_func, *args, **kwargs) + if error is not None: + raise error + if failure is not None: + self.fail(failure) + + inner_case = InnerTest() + + mock_func.assert_not_called() + result = tobiko.run_test(inner_case) + self.assertEqual(1, result.testsRun) + mock_func.assert_called_once_with(*args, **kwargs) + + if error is not None: + self.assertEqual(1, len(result.errors)) + for _error in result.errors: + self.assertIs(inner_case, _error[0]) + self.assertIn(str(error), _error[1]) + else: + self.assertEqual([], result.errors) + + if failure is not None: + self.assertEqual(1, len(result.failures)) + for _failure in result.failures: + self.assertIs(inner_case, _failure[0]) + self.assertIn(failure, _failure[1]) + else: + self.assertEqual([], result.failures) + + def test_add_cleanup_with_args(self): + self.test_add_cleanup(1, 2, a='a', b='b') + + def test_add_cleanup_with_error(self): + self.test_add_cleanup(error=RuntimeError('some error')) + + def test_add_cleanup_with_failure(self): + self.test_add_cleanup(failure='some_failure') + + +class TestFail(unit.TobikoUnitTest): + + def test_fail(self, cause: Exception = None): + ex = self.assertRaises(tobiko.FailureException, tobiko.fail, + 'some_reason', cause=cause) + self.assertEqual('some_reason', str(ex)) + self.assertIs(cause, ex.__cause__) + + def test_fail_with_cause(self): + self.test_fail(cause=RuntimeError()) diff --git a/tobiko/tripleo/pacemaker.py b/tobiko/tripleo/pacemaker.py index e621ff007..4c3fd1e50 100644 --- a/tobiko/tripleo/pacemaker.py +++ b/tobiko/tripleo/pacemaker.py @@ -72,8 +72,7 @@ def get_pcs_resources_table(timeout=720, interval=2) -> pandas.DataFrame: break # exhausted all retries if failures: - tobiko.fail( - 'pcs status table import error\n{!s}', '\n'.join(failures)) + tobiko.fail('pcs status table import error\n' + '\n'.join(failures)) LOG.debug("Got pcs status :\n%s", table) return table