diff --git a/tobiko/openstack/stacks/_fixture.py b/tobiko/openstack/stacks/_fixture.py new file mode 100644 index 000000000..40e61275a --- /dev/null +++ b/tobiko/openstack/stacks/_fixture.py @@ -0,0 +1,120 @@ +# Copyright 2023 Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from __future__ import absolute_import + +import abc +import typing + +from oslo_log import log + +import tobiko +from tobiko import config + + +LOG = log.getLogger(__name__) + + +class ResourceFixture(tobiko.SharedFixture, abc.ABC): + """Base class for fixtures intended to manage Openstack resources not + created using Heat, but with openstacksdk or other component clients (such + as neutronclient, novaclient, manilaclient, etc). + Those resources will be shared by multiple tests and are not cleaned up + when the tobiko execution ends. + + Child classes must define the following common attributes: + _resource: the type of resource that the child class actually manages + (server, network, loadbalancer, manila share, etc) initialized to None. + _not_found_exception_tuple: (tuple of) type of exceptions that the + resource_find method could raise in case of not finding the resource. + + Child classes must define any other attributes required by the + resource_create, resource_delete and resource_find methods. Examples: + prefixes and default_prefixlen are needed for subnet_pools; description and + rules are needed for secutiry_groups; etc. + + Child classes must define the resource_create, resource_delete and + resource_find methods. In case of resource_create and resource_find, they + should return an object with the type defined for self._resouce. + + Child classes may optionally implement simple properties to access to + resource_id and resource using a more representative name (these properties + will simply return self.resource_id or self.resource, respectively). + """ + + name: typing.Optional[str] = None + _resource: typing.Optional[object] = None + _not_found_exception_tuple: typing.Type[Exception] = (Exception) + + def __init__(self): + self.name = self.fixture_name + super().__init__() + + @property + def resource_id(self): + if self.resource: + return self._resource['id'] + + @property + def resource(self): + if not self._resource: + try: + self._resource = self.resource_find() + except self._not_found_exception_tuple: + LOG.debug("%r not found.", self.name) + self._resource = None + return self._resource + + @abc.abstractmethod + def resource_create(self): + pass + + @abc.abstractmethod + def resource_find(self): + pass + + @abc.abstractmethod + def resource_delete(self): + pass + + def setup_fixture(self): + if config.get_bool_env('TOBIKO_PREVENT_CREATE'): + LOG.debug("%r should have been already created: %r", + self.name, + self.resource) + else: + self.try_create_resource() + + if self.resource: + tobiko.addme_to_shared_resource(__name__, self.name) + + def try_create_resource(self): + if not self.resource: + self._resource = self.resource_create() + + def cleanup_fixture(self): + tests_using_resource = tobiko.removeme_from_shared_resource(__name__, + self.name) + if len(tests_using_resource) == 0: + self._cleanup_resource() + else: + LOG.info(f'{self.name} ResourceFixture not deleted because some ' + f'tests are still using it: {tests_using_resource}') + + def _cleanup_resource(self): + resource_id = self.resource_id + if resource_id: + LOG.debug('Deleting %r (%r)...', self.name, resource_id) + self.resource_delete() + LOG.debug('%r (%r) deleted.', self.name, resource_id) + self._resource = None diff --git a/tobiko/openstack/stacks/_neutron.py b/tobiko/openstack/stacks/_neutron.py index 56ecb3aa7..f39f255a7 100644 --- a/tobiko/openstack/stacks/_neutron.py +++ b/tobiko/openstack/stacks/_neutron.py @@ -19,6 +19,7 @@ import json import typing import netaddr +from neutronclient.common import exceptions as nc_exceptions from oslo_concurrency import lockutils from oslo_log import log @@ -27,6 +28,7 @@ from tobiko import config from tobiko.openstack import heat from tobiko.openstack import neutron from tobiko.openstack.stacks import _hot +from tobiko.openstack.stacks import _fixture from tobiko.shell import ip from tobiko.shell import sh from tobiko.shell import ssh @@ -283,7 +285,7 @@ class RouterNoSnatStackFixture(RouterStackFixture): @neutron.skip_if_missing_networking_extensions('subnet_allocation') -class SubnetPoolFixture(tobiko.SharedFixture): +class SubnetPoolFixture(_fixture.ResourceFixture): """Neutron Subnet Pool Fixture. A subnet pool is a dependency of network fixtures with either IPv4 or @@ -294,84 +296,35 @@ class SubnetPoolFixture(tobiko.SharedFixture): located """ - name: typing.Optional[str] = None + _resource: typing.Optional[neutron.SubnetPoolType] = None prefixes: list = [CONF.tobiko.neutron.ipv4_cidr] default_prefixlen: int = CONF.tobiko.neutron.ipv4_prefixlen - _subnet_pool: typing.Optional[neutron.SubnetPoolType] = None - - def __init__(self, - name: typing.Optional[str] = None, - prefixes: typing.Optional[list] = None, - default_prefixlen: typing.Optional[int] = None): - self.name = name or self.fixture_name - if prefixes: - self.prefixes = prefixes - if default_prefixlen: - self.default_prefixlen = default_prefixlen - super().__init__() - - @property - def ip_version(self): - valid_versions = (4, 6) - for valid_version in valid_versions: - if len(self.prefixes) > 0 and all( - netaddr.IPNetwork(prefix).version == valid_version - for prefix in self.prefixes): - return valid_version - # return None when neither IPv4 nor IPv6 (or when both) - return None - - def setup_fixture(self): - if config.get_bool_env('TOBIKO_PREVENT_CREATE'): - LOG.debug("SubnetPoolFixture should have been already created: %r", - self.subnet_pool) - else: - self.try_create_subnet_pool() - - if self.subnet_pool: - tobiko.addme_to_shared_resource(__name__, self.name) - - @lockutils.synchronized( - 'create_subnet_pool', external=True, lock_path=tobiko.LOCK_DIR) - def try_create_subnet_pool(self): - if not self.subnet_pool: - self._subnet_pool = neutron.create_subnet_pool( - name=self.name, prefixes=self.prefixes, - default_prefixlen=self.default_prefixlen, add_cleanup=False) - - def cleanup_fixture(self): - n_tests_using_resource = len(tobiko.removeme_from_shared_resource( - __name__, self.name)) - if n_tests_using_resource == 0: - self._cleanup_subnet_pool() - else: - LOG.info('Subnet Pool %r not deleted because %d tests ' - 'are using it still.', - self.name, n_tests_using_resource) - - def _cleanup_subnet_pool(self): - sp_id = self.subnet_pool_id - if sp_id: - self._subnet_pool = None - LOG.debug('Deleting Subnet Pool %r (%r)...', - self.name, sp_id) - neutron.delete_subnet_pool(sp_id) - LOG.debug('Subnet Pool %r (%r) deleted.', self.name, sp_id) + _not_found_exception_tuple: typing.Type[ + neutron.NoSuchSubnetPool] = (neutron.NoSuchSubnetPool) @property def subnet_pool_id(self): - if self.subnet_pool: - return self._subnet_pool['id'] + return self.resource_id @property def subnet_pool(self): - if not self._subnet_pool: - try: - self._subnet_pool = neutron.find_subnet_pool(name=self.name) - except neutron.NoSuchSubnetPool: - LOG.debug("Subnet Pool %r not found.", self.name) - self._subnet_pool = None - return self._subnet_pool + return self.resource + + @lockutils.synchronized( + 'create_subnet_pool', external=True, lock_path=tobiko.LOCK_DIR) + def try_create_resource(self): + super().try_create_resource() + + def resource_create(self): + return neutron.create_subnet_pool( + name=self.name, prefixes=self.prefixes, + default_prefixlen=self.default_prefixlen, add_cleanup=False) + + def resource_delete(self): + neutron.delete_subnet_pool(self.subnet_pool_id) + + def resource_find(self): + return neutron.find_subnet_pool(name=self.name) class SubnetPoolIPv6Fixture(SubnetPoolFixture): @@ -382,12 +335,14 @@ class SubnetPoolIPv6Fixture(SubnetPoolFixture): @neutron.skip_if_missing_networking_extensions('port-security') class NetworkBaseStackFixture(heat.HeatStackFixture): """Heat stack for creating internal network with a router to external""" - subnet_pools_ipv4_stack = (tobiko.required_fixture(SubnetPoolFixture) - if bool(CONF.tobiko.neutron.ipv4_cidr) - else None) - subnet_pools_ipv6_stack = (tobiko.required_fixture(SubnetPoolIPv6Fixture) - if bool(CONF.tobiko.neutron.ipv6_cidr) - else None) + subnet_pools_ipv4_stack: typing.Optional[tobiko.RequiredFixture] = ( + tobiko.required_fixture(SubnetPoolFixture) + if bool(CONF.tobiko.neutron.ipv4_cidr) + else None) + subnet_pools_ipv6_stack: typing.Optional[tobiko.RequiredFixture] = ( + tobiko.required_fixture(SubnetPoolIPv6Fixture) + if bool(CONF.tobiko.neutron.ipv6_cidr) + else None) #: Heat template file template = _hot.heat_template_file('neutron/network.yaml') @@ -629,14 +584,14 @@ class SecurityGroupsFixture(heat.HeatStackFixture): @neutron.skip_if_missing_networking_extensions('stateful-security-group') -class StatelessSecurityGroupFixture(tobiko.SharedFixture): +class StatelessSecurityGroupFixture(_fixture.ResourceFixture): """Neutron Stateless Security Group Fixture. This SG will by default allow SSH and ICMP to the instance and also ingress traffic from the metadata service as it can't rely on conntrack. """ - name: typing.Optional[str] = None + _resource: typing.Optional[neutron.SecurityGroupType] = None description: typing.Optional[str] = "" rules = [ { @@ -653,77 +608,44 @@ class StatelessSecurityGroupFixture(tobiko.SharedFixture): 'direction': 'ingress' } ] - _security_group: typing.Optional[neutron.SecurityGroupType] = None - - def __init__(self, - name: typing.Optional[str] = None, - description: typing.Optional[str] = None, - rules: typing.Optional[list] = None): - self.name = name or self.fixture_name - if description: - self.description = description - if rules: - self.rules = rules - super(StatelessSecurityGroupFixture, self).__init__() - - def setup_fixture(self): - if config.get_bool_env('TOBIKO_PREVENT_CREATE'): - LOG.debug("StatelessSecurityGroupFixture should have been already " - "created: %r", self.security_group) - else: - self.try_create_security_group() - - if self.security_group: - tobiko.addme_to_shared_resource(__name__, self.name) - - @lockutils.synchronized( - 'create_security_group', external=True, lock_path=tobiko.LOCK_DIR) - def try_create_security_group(self): - if not self.security_group: - self._security_group = neutron.create_security_group( - name=self.name, description=self.description, - add_cleanup=False, stateful=False) - # add rules once the SG was created - for rule in self.rules: - neutron.create_security_group_rule( - self._security_group['id'], - add_cleanup=False, - **rule) - - def cleanup_fixture(self): - n_tests_using_stack = len(tobiko.removeme_from_shared_resource( - __name__, self.name)) - if n_tests_using_stack == 0: - self._cleanup_security_group() - else: - LOG.info('Security Group %r not deleted because %d tests ' - 'are using it still.', - self.name, n_tests_using_stack) - - def _cleanup_security_group(self): - sg_id = self.security_group_id - if sg_id: - self._security_group = None - LOG.debug('Deleting Security Group %r (%r)...', - self.name, sg_id) - neutron.delete_security_group(sg_id) - LOG.debug('Security Group %r (%r) deleted.', self.name, sg_id) + _not_found_exception_tuple: typing.Type[nc_exceptions.NotFound] = ( + neutron.NotFound) @property def security_group_id(self): - if self.security_group: - return self._security_group['id'] + return self.resource_id @property def security_group(self): - if not self._security_group: - sgs = neutron.list_security_groups(name=self.name) - if len(sgs) == 0: - LOG.debug("Security group %r not found.", self.name) - self._security_group = None - else: - self._security_group = sgs.unique - return self._security_group + return self.resource + + @lockutils.synchronized( + 'create_security_group', external=True, lock_path=tobiko.LOCK_DIR) + def try_create_resource(self): + super().try_create_resource() + + def resource_create(self): + sg = neutron.create_security_group( + name=self.name, description=self.description, + add_cleanup=False, stateful=False) + + # add rules once the SG was created + for rule in self.rules: + neutron.create_security_group_rule( + sg['id'], add_cleanup=False, **rule) + + return sg + + def resource_delete(self): + neutron.delete_security_group(self.security_group_id) + + def resource_find(self): + sgs = neutron.list_security_groups(name=self.name) + if len(sgs) == 0: + LOG.debug("Security group %r not found.", self.name) + return None + else: + return sgs.unique def list_external_networks(name: str = None) -> \ diff --git a/tobiko/tests/scenario/neutron/test_security_groups.py b/tobiko/tests/scenario/neutron/test_security_groups.py index 2582e5bcf..87c56cac5 100644 --- a/tobiko/tests/scenario/neutron/test_security_groups.py +++ b/tobiko/tests/scenario/neutron/test_security_groups.py @@ -278,8 +278,8 @@ class CirrosServerWithStatelessSecurityGroupFixture( """Heat stack for testing a floating IP instance with port security""" #: Resources stack with security group to allow ping Nova servers - security_groups_stack = tobiko.required_fixture( - stacks.StatelessSecurityGroupFixture) + security_groups_stack: tobiko.RequiredFixture = \ + tobiko.required_fixture(stacks.StatelessSecurityGroupFixture) @property def security_groups(self) -> typing.List[str]: