Merge "Create ResourceFixture: common class to manage OSP resources"

This commit is contained in:
Zuul 2023-09-29 12:04:52 +00:00 committed by Gerrit Code Review
commit 6d89125d3e
3 changed files with 188 additions and 146 deletions

View File

@ -0,0 +1,120 @@
# Copyright 2023 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import abc
import typing
from oslo_log import log
import tobiko
from tobiko import config
LOG = log.getLogger(__name__)
class ResourceFixture(tobiko.SharedFixture, abc.ABC):
"""Base class for fixtures intended to manage Openstack resources not
created using Heat, but with openstacksdk or other component clients (such
as neutronclient, novaclient, manilaclient, etc).
Those resources will be shared by multiple tests and are not cleaned up
when the tobiko execution ends.
Child classes must define the following common attributes:
_resource: the type of resource that the child class actually manages
(server, network, loadbalancer, manila share, etc) initialized to None.
_not_found_exception_tuple: (tuple of) type of exceptions that the
resource_find method could raise in case of not finding the resource.
Child classes must define any other attributes required by the
resource_create, resource_delete and resource_find methods. Examples:
prefixes and default_prefixlen are needed for subnet_pools; description and
rules are needed for secutiry_groups; etc.
Child classes must define the resource_create, resource_delete and
resource_find methods. In case of resource_create and resource_find, they
should return an object with the type defined for self._resouce.
Child classes may optionally implement simple properties to access to
resource_id and resource using a more representative name (these properties
will simply return self.resource_id or self.resource, respectively).
"""
name: typing.Optional[str] = None
_resource: typing.Optional[object] = None
_not_found_exception_tuple: typing.Type[Exception] = (Exception)
def __init__(self):
self.name = self.fixture_name
super().__init__()
@property
def resource_id(self):
if self.resource:
return self._resource['id']
@property
def resource(self):
if not self._resource:
try:
self._resource = self.resource_find()
except self._not_found_exception_tuple:
LOG.debug("%r not found.", self.name)
self._resource = None
return self._resource
@abc.abstractmethod
def resource_create(self):
pass
@abc.abstractmethod
def resource_find(self):
pass
@abc.abstractmethod
def resource_delete(self):
pass
def setup_fixture(self):
if config.get_bool_env('TOBIKO_PREVENT_CREATE'):
LOG.debug("%r should have been already created: %r",
self.name,
self.resource)
else:
self.try_create_resource()
if self.resource:
tobiko.addme_to_shared_resource(__name__, self.name)
def try_create_resource(self):
if not self.resource:
self._resource = self.resource_create()
def cleanup_fixture(self):
tests_using_resource = tobiko.removeme_from_shared_resource(__name__,
self.name)
if len(tests_using_resource) == 0:
self._cleanup_resource()
else:
LOG.info(f'{self.name} ResourceFixture not deleted because some '
f'tests are still using it: {tests_using_resource}')
def _cleanup_resource(self):
resource_id = self.resource_id
if resource_id:
LOG.debug('Deleting %r (%r)...', self.name, resource_id)
self.resource_delete()
LOG.debug('%r (%r) deleted.', self.name, resource_id)
self._resource = None

View File

@ -19,6 +19,7 @@ import json
import typing import typing
import netaddr import netaddr
from neutronclient.common import exceptions as nc_exceptions
from oslo_concurrency import lockutils from oslo_concurrency import lockutils
from oslo_log import log from oslo_log import log
@ -27,6 +28,7 @@ from tobiko import config
from tobiko.openstack import heat from tobiko.openstack import heat
from tobiko.openstack import neutron from tobiko.openstack import neutron
from tobiko.openstack.stacks import _hot from tobiko.openstack.stacks import _hot
from tobiko.openstack.stacks import _fixture
from tobiko.shell import ip from tobiko.shell import ip
from tobiko.shell import sh from tobiko.shell import sh
from tobiko.shell import ssh from tobiko.shell import ssh
@ -283,7 +285,7 @@ class RouterNoSnatStackFixture(RouterStackFixture):
@neutron.skip_if_missing_networking_extensions('subnet_allocation') @neutron.skip_if_missing_networking_extensions('subnet_allocation')
class SubnetPoolFixture(tobiko.SharedFixture): class SubnetPoolFixture(_fixture.ResourceFixture):
"""Neutron Subnet Pool Fixture. """Neutron Subnet Pool Fixture.
A subnet pool is a dependency of network fixtures with either IPv4 or A subnet pool is a dependency of network fixtures with either IPv4 or
@ -294,84 +296,35 @@ class SubnetPoolFixture(tobiko.SharedFixture):
located located
""" """
name: typing.Optional[str] = None _resource: typing.Optional[neutron.SubnetPoolType] = None
prefixes: list = [CONF.tobiko.neutron.ipv4_cidr] prefixes: list = [CONF.tobiko.neutron.ipv4_cidr]
default_prefixlen: int = CONF.tobiko.neutron.ipv4_prefixlen default_prefixlen: int = CONF.tobiko.neutron.ipv4_prefixlen
_subnet_pool: typing.Optional[neutron.SubnetPoolType] = None _not_found_exception_tuple: typing.Type[
neutron.NoSuchSubnetPool] = (neutron.NoSuchSubnetPool)
def __init__(self,
name: typing.Optional[str] = None,
prefixes: typing.Optional[list] = None,
default_prefixlen: typing.Optional[int] = None):
self.name = name or self.fixture_name
if prefixes:
self.prefixes = prefixes
if default_prefixlen:
self.default_prefixlen = default_prefixlen
super().__init__()
@property
def ip_version(self):
valid_versions = (4, 6)
for valid_version in valid_versions:
if len(self.prefixes) > 0 and all(
netaddr.IPNetwork(prefix).version == valid_version
for prefix in self.prefixes):
return valid_version
# return None when neither IPv4 nor IPv6 (or when both)
return None
def setup_fixture(self):
if config.get_bool_env('TOBIKO_PREVENT_CREATE'):
LOG.debug("SubnetPoolFixture should have been already created: %r",
self.subnet_pool)
else:
self.try_create_subnet_pool()
if self.subnet_pool:
tobiko.addme_to_shared_resource(__name__, self.name)
@lockutils.synchronized(
'create_subnet_pool', external=True, lock_path=tobiko.LOCK_DIR)
def try_create_subnet_pool(self):
if not self.subnet_pool:
self._subnet_pool = neutron.create_subnet_pool(
name=self.name, prefixes=self.prefixes,
default_prefixlen=self.default_prefixlen, add_cleanup=False)
def cleanup_fixture(self):
n_tests_using_resource = len(tobiko.removeme_from_shared_resource(
__name__, self.name))
if n_tests_using_resource == 0:
self._cleanup_subnet_pool()
else:
LOG.info('Subnet Pool %r not deleted because %d tests '
'are using it still.',
self.name, n_tests_using_resource)
def _cleanup_subnet_pool(self):
sp_id = self.subnet_pool_id
if sp_id:
self._subnet_pool = None
LOG.debug('Deleting Subnet Pool %r (%r)...',
self.name, sp_id)
neutron.delete_subnet_pool(sp_id)
LOG.debug('Subnet Pool %r (%r) deleted.', self.name, sp_id)
@property @property
def subnet_pool_id(self): def subnet_pool_id(self):
if self.subnet_pool: return self.resource_id
return self._subnet_pool['id']
@property @property
def subnet_pool(self): def subnet_pool(self):
if not self._subnet_pool: return self.resource
try:
self._subnet_pool = neutron.find_subnet_pool(name=self.name) @lockutils.synchronized(
except neutron.NoSuchSubnetPool: 'create_subnet_pool', external=True, lock_path=tobiko.LOCK_DIR)
LOG.debug("Subnet Pool %r not found.", self.name) def try_create_resource(self):
self._subnet_pool = None super().try_create_resource()
return self._subnet_pool
def resource_create(self):
return neutron.create_subnet_pool(
name=self.name, prefixes=self.prefixes,
default_prefixlen=self.default_prefixlen, add_cleanup=False)
def resource_delete(self):
neutron.delete_subnet_pool(self.subnet_pool_id)
def resource_find(self):
return neutron.find_subnet_pool(name=self.name)
class SubnetPoolIPv6Fixture(SubnetPoolFixture): class SubnetPoolIPv6Fixture(SubnetPoolFixture):
@ -382,12 +335,14 @@ class SubnetPoolIPv6Fixture(SubnetPoolFixture):
@neutron.skip_if_missing_networking_extensions('port-security') @neutron.skip_if_missing_networking_extensions('port-security')
class NetworkBaseStackFixture(heat.HeatStackFixture): class NetworkBaseStackFixture(heat.HeatStackFixture):
"""Heat stack for creating internal network with a router to external""" """Heat stack for creating internal network with a router to external"""
subnet_pools_ipv4_stack = (tobiko.required_fixture(SubnetPoolFixture) subnet_pools_ipv4_stack: typing.Optional[tobiko.RequiredFixture] = (
if bool(CONF.tobiko.neutron.ipv4_cidr) tobiko.required_fixture(SubnetPoolFixture)
else None) if bool(CONF.tobiko.neutron.ipv4_cidr)
subnet_pools_ipv6_stack = (tobiko.required_fixture(SubnetPoolIPv6Fixture) else None)
if bool(CONF.tobiko.neutron.ipv6_cidr) subnet_pools_ipv6_stack: typing.Optional[tobiko.RequiredFixture] = (
else None) tobiko.required_fixture(SubnetPoolIPv6Fixture)
if bool(CONF.tobiko.neutron.ipv6_cidr)
else None)
#: Heat template file #: Heat template file
template = _hot.heat_template_file('neutron/network.yaml') template = _hot.heat_template_file('neutron/network.yaml')
@ -629,14 +584,14 @@ class SecurityGroupsFixture(heat.HeatStackFixture):
@neutron.skip_if_missing_networking_extensions('stateful-security-group') @neutron.skip_if_missing_networking_extensions('stateful-security-group')
class StatelessSecurityGroupFixture(tobiko.SharedFixture): class StatelessSecurityGroupFixture(_fixture.ResourceFixture):
"""Neutron Stateless Security Group Fixture. """Neutron Stateless Security Group Fixture.
This SG will by default allow SSH and ICMP to the instance and also This SG will by default allow SSH and ICMP to the instance and also
ingress traffic from the metadata service as it can't rely on conntrack. ingress traffic from the metadata service as it can't rely on conntrack.
""" """
name: typing.Optional[str] = None _resource: typing.Optional[neutron.SecurityGroupType] = None
description: typing.Optional[str] = "" description: typing.Optional[str] = ""
rules = [ rules = [
{ {
@ -653,77 +608,44 @@ class StatelessSecurityGroupFixture(tobiko.SharedFixture):
'direction': 'ingress' 'direction': 'ingress'
} }
] ]
_security_group: typing.Optional[neutron.SecurityGroupType] = None _not_found_exception_tuple: typing.Type[nc_exceptions.NotFound] = (
neutron.NotFound)
def __init__(self,
name: typing.Optional[str] = None,
description: typing.Optional[str] = None,
rules: typing.Optional[list] = None):
self.name = name or self.fixture_name
if description:
self.description = description
if rules:
self.rules = rules
super(StatelessSecurityGroupFixture, self).__init__()
def setup_fixture(self):
if config.get_bool_env('TOBIKO_PREVENT_CREATE'):
LOG.debug("StatelessSecurityGroupFixture should have been already "
"created: %r", self.security_group)
else:
self.try_create_security_group()
if self.security_group:
tobiko.addme_to_shared_resource(__name__, self.name)
@lockutils.synchronized(
'create_security_group', external=True, lock_path=tobiko.LOCK_DIR)
def try_create_security_group(self):
if not self.security_group:
self._security_group = neutron.create_security_group(
name=self.name, description=self.description,
add_cleanup=False, stateful=False)
# add rules once the SG was created
for rule in self.rules:
neutron.create_security_group_rule(
self._security_group['id'],
add_cleanup=False,
**rule)
def cleanup_fixture(self):
n_tests_using_stack = len(tobiko.removeme_from_shared_resource(
__name__, self.name))
if n_tests_using_stack == 0:
self._cleanup_security_group()
else:
LOG.info('Security Group %r not deleted because %d tests '
'are using it still.',
self.name, n_tests_using_stack)
def _cleanup_security_group(self):
sg_id = self.security_group_id
if sg_id:
self._security_group = None
LOG.debug('Deleting Security Group %r (%r)...',
self.name, sg_id)
neutron.delete_security_group(sg_id)
LOG.debug('Security Group %r (%r) deleted.', self.name, sg_id)
@property @property
def security_group_id(self): def security_group_id(self):
if self.security_group: return self.resource_id
return self._security_group['id']
@property @property
def security_group(self): def security_group(self):
if not self._security_group: return self.resource
sgs = neutron.list_security_groups(name=self.name)
if len(sgs) == 0: @lockutils.synchronized(
LOG.debug("Security group %r not found.", self.name) 'create_security_group', external=True, lock_path=tobiko.LOCK_DIR)
self._security_group = None def try_create_resource(self):
else: super().try_create_resource()
self._security_group = sgs.unique
return self._security_group def resource_create(self):
sg = neutron.create_security_group(
name=self.name, description=self.description,
add_cleanup=False, stateful=False)
# add rules once the SG was created
for rule in self.rules:
neutron.create_security_group_rule(
sg['id'], add_cleanup=False, **rule)
return sg
def resource_delete(self):
neutron.delete_security_group(self.security_group_id)
def resource_find(self):
sgs = neutron.list_security_groups(name=self.name)
if len(sgs) == 0:
LOG.debug("Security group %r not found.", self.name)
return None
else:
return sgs.unique
def list_external_networks(name: str = None) -> \ def list_external_networks(name: str = None) -> \

View File

@ -278,8 +278,8 @@ class CirrosServerWithStatelessSecurityGroupFixture(
"""Heat stack for testing a floating IP instance with port security""" """Heat stack for testing a floating IP instance with port security"""
#: Resources stack with security group to allow ping Nova servers #: Resources stack with security group to allow ping Nova servers
security_groups_stack = tobiko.required_fixture( security_groups_stack: tobiko.RequiredFixture = \
stacks.StatelessSecurityGroupFixture) tobiko.required_fixture(stacks.StatelessSecurityGroupFixture)
@property @property
def security_groups(self) -> typing.List[str]: def security_groups(self) -> typing.List[str]: