Make ResourceFixtures configurable

With this patch, ResourceFixtures can be declared with
tobiko.required_fixture using kwargs.
Those kwargs can be used to modify default values from attributes used
within the ResourceFixtures subclass constructors

Change-Id: Ib3dbff002cec51336ed56aaa2a86e8dc207d9807
This commit is contained in:
Eduardo Olivares 2023-09-21 16:11:31 +02:00
parent 798ccdad50
commit 6845550e0f
7 changed files with 222 additions and 33 deletions

View File

@ -47,27 +47,31 @@ def is_fixture(obj: typing.Any) -> bool:
@typing.overload
def get_fixture(obj: typing.Type[F],
fixture_id: typing.Any = None,
manager: 'FixtureManager' = None) -> F:
manager: 'FixtureManager' = None,
**kwargs) -> F:
pass
@typing.overload
def get_fixture(obj: F,
fixture_id: typing.Any = None,
manager: 'FixtureManager' = None) -> F:
manager: 'FixtureManager' = None,
**kwargs) -> F:
pass
@typing.overload
def get_fixture(obj: str,
fixture_id: typing.Any = None,
manager: 'FixtureManager' = None) -> fixtures.Fixture:
manager: 'FixtureManager' = None,
**kwargs) -> fixtures.Fixture:
pass
def get_fixture(obj: FixtureType,
fixture_id: typing.Any = None,
manager: 'FixtureManager' = None) -> F:
manager: 'FixtureManager' = None,
**kwargs) -> F:
"""Returns a fixture identified by given :param obj:
It returns registered fixture for given :param obj:. If none has been
@ -95,7 +99,7 @@ def get_fixture(obj: FixtureType,
if isinstance(obj, fixtures.Fixture):
return typing.cast(F, obj)
return fixture_manager(obj, manager).get_fixture(
obj, fixture_id=fixture_id)
obj, fixture_id=fixture_id, **kwargs)
def get_fixture_name(obj) -> str:
@ -153,21 +157,24 @@ def remove_fixture(obj: FixtureType,
@typing.overload
def setup_fixture(obj: typing.Type[F],
fixture_id: typing.Any = None,
manager: 'FixtureManager' = None) -> F:
manager: 'FixtureManager' = None,
**kwargs) -> F:
pass
@typing.overload
def setup_fixture(obj: F,
fixture_id: typing.Any = None,
manager: 'FixtureManager' = None) -> F:
manager: 'FixtureManager' = None,
**kwargs) -> F:
pass
def setup_fixture(obj: FixtureType,
fixture_id: typing.Any = None,
manager: 'FixtureManager' = None,
alternative: FixtureType = None) \
alternative: FixtureType = None,
**kwargs) \
-> F:
"""I setups registered fixture
@ -183,7 +190,8 @@ def setup_fixture(obj: FixtureType,
fixture: F = typing.cast(F,
get_fixture(_obj,
fixture_id=fixture_id,
manager=manager))
manager=manager,
**kwargs))
try:
fixture.setUp()
break
@ -210,22 +218,26 @@ def handle_setup_error(ex_type, ex_value, ex_tb):
@typing.overload
def reset_fixture(obj: typing.Type[F],
fixture_id: typing.Any = None,
manager: 'FixtureManager' = None) -> F:
manager: 'FixtureManager' = None,
**kwargs) -> F:
pass
@typing.overload
def reset_fixture(obj: F,
fixture_id: typing.Any = None,
manager: 'FixtureManager' = None) -> F:
manager: 'FixtureManager' = None,
**kwargs) -> F:
pass
def reset_fixture(obj: FixtureType,
fixture_id: typing.Any = None,
manager: 'FixtureManager' = None) -> F:
manager: 'FixtureManager' = None,
**kwargs) -> F:
"""It cleanups and setups registered fixture"""
fixture: F = get_fixture(obj, fixture_id=fixture_id, manager=manager)
fixture: F = get_fixture(
obj, fixture_id=fixture_id, manager=manager, **kwargs)
with _exception.handle_multiple_exceptions():
fixture.reset()
return fixture
@ -258,26 +270,30 @@ def cleanup_fixture(obj: FixtureType,
@typing.overload
def use_fixture(obj: typing.Type[F],
fixture_id: typing.Any = None,
manager: 'FixtureManager' = None) -> F:
manager: 'FixtureManager' = None,
**kwargs) -> F:
pass
@typing.overload
def use_fixture(obj: F,
fixture_id: typing.Any = None,
manager: 'FixtureManager' = None) -> F:
manager: 'FixtureManager' = None,
**kwargs) -> F:
pass
def use_fixture(obj: FixtureType,
fixture_id: typing.Any = None,
manager: 'FixtureManager' = None) -> F:
manager: 'FixtureManager' = None,
**kwargs) -> F:
"""It setups registered fixture and then register it for cleanup
At the end of the test case execution it will call cleanup_fixture
with on the fixture
"""
fixture = setup_fixture(obj, fixture_id=fixture_id, manager=manager)
fixture = setup_fixture(
obj, fixture_id=fixture_id, manager=manager, **kwargs)
_case.add_cleanup(cleanup_fixture, fixture)
return fixture
@ -402,13 +418,14 @@ def get_required_fixture_properties(cls):
def init_fixture(obj: typing.Union[typing.Type[F], F],
name: str,
fixture_id: typing.Any = None) -> F:
fixture_id: typing.Any = None,
**kwargs) -> F:
fixture: F
if isinstance(obj, fixtures.Fixture):
fixture = obj
fixture = obj(**kwargs)
elif inspect.isclass(obj) and issubclass(obj, fixtures.Fixture):
try:
fixture = obj()
fixture = obj(**kwargs)
except Exception as ex:
raise TypeError(f"Error creating fixture '{name}' from class "
f"{obj!r}.") from ex
@ -467,7 +484,8 @@ class FixtureManager:
def get_fixture(self,
obj: FixtureType,
fixture_id: typing.Any = None) -> F:
fixture_id: typing.Any = None,
**kwargs) -> F:
name, obj = get_name_and_object(obj)
if fixture_id:
name += f'-{fixture_id}'
@ -476,7 +494,8 @@ class FixtureManager:
except KeyError:
fixture: F = self.init_fixture(obj=obj,
name=name,
fixture_id=fixture_id)
fixture_id=fixture_id,
**kwargs)
assert isinstance(fixture, fixtures.Fixture)
self.fixtures[name] = fixture
return fixture
@ -484,10 +503,12 @@ class FixtureManager:
@staticmethod
def init_fixture(obj: typing.Union[typing.Type[F], F],
name: str,
fixture_id: typing.Any) -> F:
fixture_id: typing.Any,
**kwargs) -> F:
return init_fixture(obj=obj,
name=name,
fixture_id=fixture_id)
fixture_id=fixture_id,
**kwargs)
def remove_fixture(self,
obj: FixtureType,
@ -625,7 +646,7 @@ class RequiredFixture(property, typing.Generic[G]):
def setup_fixture(self, _instance=None) -> G:
fixture = self.fixture
setup_fixture(fixture)
setup_fixture(fixture, **self.kwargs)
if (hasattr(_instance, 'addCleanup') and
hasattr(_instance, 'getDetails')):
_instance.addCleanup(_detail.gather_details,

View File

@ -146,6 +146,7 @@ get_subnet_pool = _subnet_pool.get_subnet_pool
create_subnet_pool = _subnet_pool.create_subnet_pool
delete_subnet_pool = _subnet_pool.delete_subnet_pool
find_subnet_pool = _subnet_pool.find_subnet_pool
list_subnet_pools = _subnet_pool.list_subnet_pools
list_security_groups = _security_group.list_security_groups
get_security_group = _security_group.get_security_group

View File

@ -56,10 +56,6 @@ class ResourceFixture(tobiko.SharedFixture, abc.ABC):
_resource: typing.Optional[object] = None
_not_found_exception_tuple: typing.Type[Exception] = (Exception)
def __init__(self):
self.name = self.fixture_name
super().__init__()
@property
def resource_id(self):
if self.resource:
@ -88,6 +84,7 @@ class ResourceFixture(tobiko.SharedFixture, abc.ABC):
pass
def setup_fixture(self):
self.name = self.fixture_name
if config.get_bool_env('TOBIKO_PREVENT_CREATE'):
LOG.debug("%r should have been already created: %r",
self.name,
@ -95,7 +92,9 @@ class ResourceFixture(tobiko.SharedFixture, abc.ABC):
else:
self.try_create_resource()
if self.resource:
if self.resource is None:
tobiko.fail("%r not found!", self.name)
else:
tobiko.addme_to_shared_resource(__name__, self.name)
def try_create_resource(self):

View File

@ -302,6 +302,11 @@ class SubnetPoolFixture(_fixture.ResourceFixture):
_not_found_exception_tuple: typing.Type[
neutron.NoSuchSubnetPool] = (neutron.NoSuchSubnetPool)
def __init__(self, prefixes=None, default_prefixlen=None):
super().__init__()
self.prefixes = prefixes or self.prefixes
self.default_prefixlen = default_prefixlen or self.default_prefixlen
@property
def subnet_pool_id(self):
return self.resource_id
@ -611,6 +616,11 @@ class StatelessSecurityGroupFixture(_fixture.ResourceFixture):
_not_found_exception_tuple: typing.Type[nc_exceptions.NotFound] = (
neutron.NotFound)
def __init__(self, description=None, rules=None):
super().__init__()
self.description = description or self.description
self.rules = rules or self.rules
@property
def security_group_id(self):
return self.resource_id
@ -634,7 +644,8 @@ class StatelessSecurityGroupFixture(_fixture.ResourceFixture):
neutron.create_security_group_rule(
sg['id'], add_cleanup=False, **rule)
return sg
# return the updated SG, including the rules just added
return self.resource_find()
def resource_delete(self):
neutron.delete_security_group(self.security_group_id)

View File

@ -0,0 +1,62 @@
# Copyright (c) 2023 Red Hat, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import testtools
import tobiko
from tobiko.openstack import keystone
from tobiko.openstack import neutron
from tobiko.openstack.stacks._neutron import StatelessSecurityGroupFixture
DESCRIPTION = "Amazing Stateless Sec Group number {i}"
NUM_SEC_GROUPS = 10
@keystone.skip_unless_has_keystone_credentials()
class StatelessSecurityGroupTest(testtools.TestCase):
"""Tests Stateless Security Group creation"""
stack_list = [tobiko.required_fixture(
StatelessSecurityGroupFixture,
fixture_id=str(i),
description=DESCRIPTION.format(i=i))
for i in range(NUM_SEC_GROUPS)]
ssg_fixture_list: list = []
@classmethod
def tearDownClass(cls):
for stack in cls.stack_list:
tobiko.cleanup_fixture(stack.fixture)
@classmethod
def setUpClass(cls):
for stack in cls.stack_list:
cls.ssg_fixture_list.append(tobiko.use_fixture(stack.fixture))
def test_stateless_sec_group_list_find(self):
self.assertEqual(NUM_SEC_GROUPS, len(self.ssg_fixture_list))
for i, ssg_fixture in enumerate(self.ssg_fixture_list):
ssg_name = (f"{StatelessSecurityGroupFixture.__module__}."
f"{StatelessSecurityGroupFixture.__qualname__}-{i}")
self.assertEqual(ssg_name, ssg_fixture.name)
ssg = neutron.list_security_groups(name=ssg_name).unique
self.assertEqual(ssg, ssg_fixture.security_group)
def test_stateless_sec_group_list_parameters(self):
for i, ssg_fixture in enumerate(self.ssg_fixture_list):
self.assertEqual(DESCRIPTION.format(i=i),
ssg_fixture.security_group['description'])

View File

@ -0,0 +1,94 @@
# Copyright (c) 2023 Red Hat, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import testtools
import tobiko
from tobiko.openstack import keystone
from tobiko.openstack import neutron
from tobiko.openstack.stacks._neutron import SubnetPoolFixture
PREFIX = "10.211.{i}.0/24"
DEFAULT_PREFIXLEN = 29
@keystone.skip_unless_has_keystone_credentials()
class SubnetPoolTest(testtools.TestCase):
"""Tests subnet pool creation"""
stack = tobiko.required_fixture(SubnetPoolFixture,
fixture_id="thistest",
prefixes=[PREFIX.format(i=0)],
default_prefixlen=DEFAULT_PREFIXLEN)
@classmethod
def tearDownClass(cls):
# NOTE: the skip at class level does not affect tearDownClass, so
# the following workaround is needed to avoid errors
if keystone.has_keystone_credentials():
tobiko.cleanup_fixture(cls.stack.fixture)
def test_subnet_pool_find(self):
snp = neutron.list_subnet_pools(name=self.stack.name).unique
self.assertEqual(snp, self.stack.subnet_pool)
def test_subnet_pool_parameters(self):
self.assertEqual([PREFIX.format(i=0)],
self.stack.subnet_pool['prefixes'])
self.assertEqual(str(DEFAULT_PREFIXLEN),
self.stack.subnet_pool['default_prefixlen'])
NUM_SUBNET_POOLS = 10
@keystone.skip_unless_has_keystone_credentials()
class SubnetPoolListTest(testtools.TestCase):
"""Tests creation of a list of subnet pools"""
stack_list = [tobiko.required_fixture(SubnetPoolFixture,
fixture_id=str(i),
prefixes=[PREFIX.format(i=i)],
default_prefixlen=DEFAULT_PREFIXLEN)
for i in range(NUM_SUBNET_POOLS)]
snp_fixture_list: list = []
@classmethod
def tearDownClass(cls):
for stack in cls.stack_list:
tobiko.cleanup_fixture(stack.fixture)
@classmethod
def setUpClass(cls):
for stack in cls.stack_list:
cls.snp_fixture_list.append(tobiko.use_fixture(stack.fixture))
def test_subnet_pool_list_find(self):
self.assertEqual(NUM_SUBNET_POOLS, len(self.snp_fixture_list))
for i, snp_fixture in enumerate(self.snp_fixture_list):
snp_name = (f"{SubnetPoolFixture.__module__}."
f"{SubnetPoolFixture.__qualname__}-{i}")
self.assertEqual(snp_name, snp_fixture.name)
snp = neutron.list_subnet_pools(name=snp_name).unique
self.assertEqual(snp, snp_fixture.subnet_pool)
def test_subnet_pool_list_parameters(self):
for i, snp_fixture in enumerate(self.snp_fixture_list):
self.assertEqual([PREFIX.format(i=i)],
snp_fixture.subnet_pool['prefixes'])
self.assertEqual(str(DEFAULT_PREFIXLEN),
snp_fixture.subnet_pool['default_prefixlen'])

View File

@ -48,10 +48,11 @@ class PatchEnvironFixture(tobiko.SharedFixture):
class FixtureManagerPatch(tobiko.FixtureManager, _patch.PatchFixture):
def init_fixture(self, obj, name, fixture_id):
def init_fixture(self, obj, name, fixture_id, **kwargs):
fixture = super().init_fixture(obj=obj,
name=name,
fixture_id=fixture_id)
fixture_id=fixture_id,
**kwargs)
self.addCleanup(tobiko.cleanup_fixture, fixture)
return fixture