Merge "Update tobiko/common/_testcase.py module"

This commit is contained in:
Zuul 2022-01-03 12:40:24 +00:00 committed by Gerrit Code Review
commit 0249a9d64b
12 changed files with 160 additions and 148 deletions

View File

@ -21,7 +21,6 @@ from tobiko.common import _config
from tobiko.common import _deprecation
from tobiko.common import _detail
from tobiko.common import _exception
from tobiko.common import _fail
from tobiko.common import _fixture
from tobiko.common import _logging
from tobiko.common.managers import loader as loader_manager
@ -53,9 +52,6 @@ deprecated = _deprecation.deprecated
details_content = _detail.details_content
FailureException = _fail.FailureException
fail = _fail.fail
tobiko_config = _config.tobiko_config
tobiko_config_dir = _config.tobiko_config_dir
tobiko_config_path = _config.tobiko_config_path
@ -129,7 +125,10 @@ skip_test = _skip.skip_test
skip_unless = _skip.skip_unless
skip = _skip.skip
add_cleanup = _testcase.add_cleanup
assert_test_case_was_skipped = _testcase.assert_test_case_was_skipped
fail = _testcase.fail
FailureException = _testcase.FailureException
get_test_case = _testcase.get_test_case
pop_test_case = _testcase.pop_test_case
push_test_case = _testcase.push_test_case

View File

@ -1,40 +0,0 @@
# Copyright 2018 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import typing
import testtools
FailureException = testtools.TestCase.failureException
def fail(msg: str, *args, **kwargs) -> typing.NoReturn:
"""Fail immediately current test case execution, with the given message.
Unconditionally raises a tobiko.FailureException as in below equivalent
code:
raise FailureException(msg.format(*args, **kwargs))
:param msg: string message used to create FailureException
:param *args: positional arguments to be passed to str.format method
:param **kwargs: key-word arguments to be passed to str.format method
:returns: It never returns
:raises FailureException:
"""
if args or kwargs:
msg = msg.format(*args, **kwargs)
raise FailureException(msg)

View File

@ -22,7 +22,7 @@ from oslo_log import log
import testtools
from tobiko.common import _exception
from tobiko.common import _fail
from tobiko.common import _testcase
from tobiko.common import _time
@ -274,7 +274,7 @@ def retry_test_case(*exceptions: Exception,
typing.Callable[[typing.Callable], typing.Callable]:
"""Re-run test case method in case it fails
"""
exceptions = exceptions or (_fail.FailureException,)
exceptions = exceptions or _testcase.FailureException
return retry_on_exception(*exceptions,
count=count,
timeout=timeout,

View File

@ -16,6 +16,7 @@ from __future__ import absolute_import
import os
import sys
import typing
import unittest
from oslo_log import log
import testtools
@ -28,11 +29,11 @@ LOG = log.getLogger(__name__)
os.environ.setdefault('PYTHON', sys.executable)
TestCase = testtools.TestCase
TestCase = unittest.TestCase
class TestCaseEntry(typing.NamedTuple):
test_case: testtools.TestCase
test_case: unittest.TestCase
start_time: float
@ -43,21 +44,21 @@ class TestCasesManager(object):
def __init__(self):
self._test_cases: typing.List[TestCaseEntry] = []
def get_test_case(self) -> testtools.TestCase:
def get_test_case(self) -> unittest.TestCase:
try:
return self._test_cases[-1].test_case
except IndexError:
return DUMMY_TEST_CASE
def pop_test_case(self) -> testtools.TestCase:
def pop_test_case(self) -> unittest.TestCase:
entry = self._test_cases.pop()
elapsed_time = _time.time() - entry.start_time
LOG.debug(f"Exit test case '{entry.test_case.id()}' after "
f"{elapsed_time} seconds")
return entry.test_case
def push_test_case(self, test_case: testtools.TestCase):
_exception.check_valid_type(test_case, testtools.TestCase)
def push_test_case(self, test_case: unittest.TestCase):
_exception.check_valid_type(test_case, unittest.TestCase)
entry = TestCaseEntry(test_case=test_case,
start_time=_time.time())
self._test_cases.append(entry)
@ -67,22 +68,22 @@ class TestCasesManager(object):
TEST_CASES = TestCasesManager()
def push_test_case(test_case: testtools.TestCase,
def push_test_case(test_case: unittest.TestCase,
manager: TestCasesManager = TEST_CASES):
return manager.push_test_case(test_case=test_case)
def pop_test_case(manager: TestCasesManager = TEST_CASES) -> \
testtools.TestCase:
unittest.TestCase:
return manager.pop_test_case()
def get_test_case(manager: TestCasesManager = TEST_CASES) -> \
testtools.TestCase:
unittest.TestCase:
return manager.get_test_case()
class DummyTestCase(testtools.TestCase):
class DummyTestCase(unittest.TestCase):
def runTest(self):
pass
@ -91,10 +92,16 @@ class DummyTestCase(testtools.TestCase):
DUMMY_TEST_CASE = DummyTestCase()
def run_test(test_case: testtools.TestCase,
test_result: testtools.TestResult = None) -> testtools.TestResult:
test_result = test_result or testtools.TestResult()
def run_test(test_case: unittest.TestCase,
test_result: unittest.TestResult = None,
manager: TestCasesManager = TEST_CASES) -> unittest.TestResult:
test_result = test_result or unittest.TestResult()
push_test_case(test_case, manager=manager)
try:
test_case.run(test_result)
finally:
popped = pop_test_case(manager=manager)
assert test_case is popped
return test_result
@ -103,16 +110,15 @@ def assert_in(needle, haystack, message: typing.Optional[str] = None,
get_test_case(manager=manager).assertIn(needle, haystack, message)
def get_skipped_test_cases(test_result: testtools.TestResult,
skip_reason: typing.Optional[str] = None):
if skip_reason is not None:
assert_in(skip_reason, test_result.skip_reasons)
return test_result.skip_reasons[skip_reason]
else:
skipped_test_cases = list()
for cases in test_result.skip_reasons.values():
skipped_test_cases.extend(cases)
return skipped_test_cases
def get_skipped_test_cases(test_result: unittest.TestResult,
skip_reason: str = None) \
-> typing.List[unittest.TestCase]:
if isinstance(test_result, testtools.TestResult):
raise NotImplementedError(
f"Unsupported result type: {test_result}")
return [case
for case, reason in test_result.skipped
if skip_reason is None or skip_reason in reason]
def assert_test_case_was_skipped(test_case: testtools.TestCase,
@ -122,3 +128,32 @@ def assert_test_case_was_skipped(test_case: testtools.TestCase,
skipped_tests = get_skipped_test_cases(test_result=test_result,
skip_reason=skip_reason)
assert_in(test_case, skipped_tests, manager=manager)
FailureException = typing.cast(
typing.Tuple[Exception, ...],
(unittest.TestCase.failureException,
testtools.TestCase.failureException,
AssertionError))
def fail(msg: str,
cause: typing.Type[Exception] = None) -> typing.NoReturn:
"""Fail immediately current test case execution, with the given message.
Unconditionally raises a tobiko.FailureException as in below equivalent
code:
raise FailureException(msg.format(*args, **kwargs))
:param msg: string message used to create FailureException
:param cause: error that caused the failure
:returns: It never returns
:raises failure_type or FailureException exception type:
"""
failure_type = get_test_case().failureException
raise failure_type(msg) from cause
def add_cleanup(function: typing.Callable, *args, **kwargs):
get_test_case().addCleanup(function, *args, **kwargs)

View File

@ -20,6 +20,7 @@ import typing # noqa
import time
# import testtools
import testtools
import tobiko
from tobiko.shell import ping
@ -81,6 +82,7 @@ def test_servers_creation(stack=TestServerCreationStack,
# Create all servers stacks
for fixture in fixtures:
assert isinstance(test_case, testtools.TestCase)
test_case.useFixture(fixture)
# Check every server ID is unique and new

View File

@ -349,7 +349,7 @@ def download_file(url: str,
headers_file_name=headers_file_name,
ssh_client=ssh_client,
sudo=sudo)
except tobiko.FailureException:
except tobiko.FailureException: # type: ignore
pass
else:
LOG.debug(f"File '{url}' already downloaded.")

View File

@ -48,7 +48,7 @@ class BadAddressPingError(PingError):
message = "bad address: {address}"
class PingFailed(PingError, tobiko.FailureException):
class PingFailed(PingError, AssertionError):
"""Raised when ping timeout expires before reaching expected message count
"""

View File

@ -122,8 +122,13 @@ class PingStatistics(object):
"""
def __init__(self, source=None, destination=None, transmitted=0,
received=0, undelivered=0, begin_interval=None,
def __init__(self,
source=None,
destination=None,
transmitted: int = 0,
received: int = 0,
undelivered: int = 0,
begin_interval=None,
end_interval=None):
self.source = source
self.destination = destination
@ -134,18 +139,17 @@ class PingStatistics(object):
self.end_interval = end_interval
@property
def unreceived(self):
def unreceived(self) -> int:
return max(0, self.transmitted - self.received)
@property
def delivered(self):
def delivered(self) -> int:
return max(0, self.transmitted - self.undelivered)
@property
def loss(self):
transmitted = max(0, float(self.transmitted))
if transmitted:
return float(self.unreceived) / transmitted
def loss(self) -> float:
if self.transmitted > 0:
return float(self.unreceived) / float(self.transmitted)
else:
return 0.
@ -173,28 +177,20 @@ class PingStatistics(object):
def assert_transmitted(self):
if not self.transmitted:
tobiko.fail("{transmitted!r} package(s) has been transmitted "
"to {destination!r}",
transmitted=self.transmitted,
destination=self.destination)
tobiko.fail(f"{self.transmitted} package(s) has been transmitted "
f"to {self.destination}")
def assert_not_transmitted(self):
if self.transmitted:
tobiko.fail("{transmitted!r} package(s) has been transmitted to "
"{destination!r}",
transmitted=self.transmitted,
destination=self.destination)
tobiko.fail(f"{self.transmitted} package(s) has been "
f"transmitted to {self.destination}")
def assert_replied(self):
if not self.received:
tobiko.fail("{received!r} reply package(s) has been received from "
"{destination!r}",
received=self.received,
destination=self.destination)
tobiko.fail(f"{self.received} reply package(s) has been "
f"received from {self.destination}")
def assert_not_replied(self):
if self.received:
tobiko.fail("{received!r} reply package(s) has been received from "
"{destination!r}",
received=self.received,
destination=self.destination)
tobiko.fail(f"{self.received} reply package(s) has been received "
f"from {self.destination}")

View File

@ -34,8 +34,8 @@ class OpenstackNodesTest(testtools.TestCase):
ping.ping(node.public_ip).assert_replied()
other = ips.setdefault(node.public_ip, node)
if node is not other:
tobiko.fail("Nodes {!r} and {!r} have the same IP: {!s}",
node.name, other.name, node.public_ip)
tobiko.fail(f"Nodes {node.name} and {other.name} have the "
f"same IP: {node.public_ip}")
def test_hostnames(self):
hostnames = dict()
@ -44,8 +44,8 @@ class OpenstackNodesTest(testtools.TestCase):
self.assertTrue(hostname.startswith(node.name))
other = hostnames.setdefault(hostname, node)
if node is not other:
tobiko.fail("Nodes {!r} and {!r} have the same hostname: {!r}",
node.name, other.name, hostname)
tobiko.fail(f"Nodes {node.name} and {other.name} have the "
f"same hostname: {hostname}")
def test_network_namespaces(self):
for node in self.topology.nodes:
@ -56,6 +56,5 @@ class OpenstackNodesTest(testtools.TestCase):
network_namespace=namespace)
other_ips = namespaces_ips.setdefault(namespace, ips)
if ips is not other_ips:
tobiko.fail("Duplicate network namespace {!r} in node "
"{!r}: {!r}, {!r}", namespace, node.name,
other_ips, ips)
tobiko.fail(f"Duplicate network namespace {namespace} in "
f"node {node.name}: {other_ips}, {ips}")

View File

@ -1,38 +0,0 @@
# Copyright 2019 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import tobiko
from tobiko.tests import unit
class TestFail(unit.TobikoUnitTest):
def test_fail(self):
self._test_fail('some_reason')
def test_fail_with_args(self):
self._test_fail('some {1!r} {0!s}', 'reason', 'strange')
def test_fail_with_kwargs(self):
self._test_fail('some {b!r} {a!s}', a='reason', b='strange')
def _test_fail(self, reason, *args, **kwargs):
ex = self.assertRaises(
tobiko.FailureException, tobiko.fail, reason, *args, **kwargs)
if args or kwargs:
expected_reason = reason.format(*args, **kwargs)
else:
expected_reason = reason
self.assertEqual(expected_reason, str(ex))

View File

@ -13,7 +13,8 @@
# under the License.
from __future__ import absolute_import
import testtools
import unittest
from unittest import mock
import tobiko
from tobiko.tests import unit
@ -38,25 +39,24 @@ class TestCaseTest(unit.TobikoUnitTest):
def test_get_test_case_out_of_context(self):
manager = tobiko.TestCasesManager()
result = tobiko.get_test_case(manager=manager)
self.assertIsInstance(result, testtools.TestCase)
self.assertIsInstance(result, unittest.TestCase)
self.assertEqual('tobiko.common._testcase.DummyTestCase.runTest',
result.id())
def test_push_test_case(self):
class InnerTest(testtools.TestCase):
class InnerTest(unittest.TestCase):
def runTest(self):
pass
inner_case = InnerTest()
tobiko.push_test_case(inner_case)
self.assertIs(inner_case, tobiko.get_test_case())
def test_pop_test_case(self):
class InnerTest(testtools.TestCase):
class InnerTest(unittest.TestCase):
def runTest(self):
pass
@ -68,3 +68,63 @@ class TestCaseTest(unit.TobikoUnitTest):
self.assertIs(inner_case, result)
self.assertIs(self, tobiko.get_test_case())
def test_add_cleanup(self,
*args,
error: Exception = None,
failure: str = None,
**kwargs):
mock_func = mock.Mock()
class InnerTest(unittest.TestCase):
def runTest(self):
tobiko.add_cleanup(mock_func, *args, **kwargs)
if error is not None:
raise error
if failure is not None:
self.fail(failure)
inner_case = InnerTest()
mock_func.assert_not_called()
result = tobiko.run_test(inner_case)
self.assertEqual(1, result.testsRun)
mock_func.assert_called_once_with(*args, **kwargs)
if error is not None:
self.assertEqual(1, len(result.errors))
for _error in result.errors:
self.assertIs(inner_case, _error[0])
self.assertIn(str(error), _error[1])
else:
self.assertEqual([], result.errors)
if failure is not None:
self.assertEqual(1, len(result.failures))
for _failure in result.failures:
self.assertIs(inner_case, _failure[0])
self.assertIn(failure, _failure[1])
else:
self.assertEqual([], result.failures)
def test_add_cleanup_with_args(self):
self.test_add_cleanup(1, 2, a='a', b='b')
def test_add_cleanup_with_error(self):
self.test_add_cleanup(error=RuntimeError('some error'))
def test_add_cleanup_with_failure(self):
self.test_add_cleanup(failure='some_failure')
class TestFail(unit.TobikoUnitTest):
def test_fail(self, cause: Exception = None):
ex = self.assertRaises(tobiko.FailureException, tobiko.fail,
'some_reason', cause=cause)
self.assertEqual('some_reason', str(ex))
self.assertIs(cause, ex.__cause__)
def test_fail_with_cause(self):
self.test_fail(cause=RuntimeError())

View File

@ -72,8 +72,7 @@ def get_pcs_resources_table(timeout=720, interval=2) -> pandas.DataFrame:
break
# exhausted all retries
if failures:
tobiko.fail(
'pcs status table import error\n{!s}', '\n'.join(failures))
tobiko.fail('pcs status table import error\n' + '\n'.join(failures))
LOG.debug("Got pcs status :\n%s", table)
return table