Update Neutron resource search

Change-Id: I14a431a8fa6bca7bdef4d2f60e71856716246a04
This commit is contained in:
Federico Ressi 2019-07-26 14:53:54 +02:00
parent 211e1a8c27
commit 937def3541
9 changed files with 72 additions and 128 deletions

View File

@ -15,7 +15,6 @@ from __future__ import absolute_import
from tobiko.common import _asserts from tobiko.common import _asserts
from tobiko.common import _exception from tobiko.common import _exception
from tobiko.common import _find
from tobiko.common import _fixture from tobiko.common import _fixture
from tobiko.common.managers import testcase as testcase_manager from tobiko.common.managers import testcase as testcase_manager
from tobiko.common.managers import loader as loader_manager from tobiko.common.managers import loader as loader_manager
@ -30,9 +29,6 @@ TobikoException = _exception.TobikoException
check_valid_type = _exception.check_valid_type check_valid_type = _exception.check_valid_type
exc_info = _exception.exc_info exc_info = _exception.exc_info
find_by_attributes = _find.find_by_attributes
find_by_items = _find.find_by_items
is_fixture = _fixture.is_fixture is_fixture = _fixture.is_fixture
get_fixture = _fixture.get_fixture get_fixture = _fixture.get_fixture
fixture_property = _fixture.fixture_property fixture_property = _fixture.fixture_property

View File

@ -1,44 +0,0 @@
# Copyright 2019 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
def find_by_attributes(objects, exclude=False, **attributes):
exclude = bool(exclude)
if attributes:
selection = []
for obj in objects:
for key, value in attributes.items():
matching = value == getattr(obj, key)
if matching is exclude:
break
else:
selection.append(obj)
objects = selection
return objects
def find_by_items(mappings, exclude=False, **items):
exclude = bool(exclude)
if items:
selection = []
for mapping in mappings:
for key, value in items.items():
matching = value == mapping[key]
if matching is exclude:
break
else:
selection.append(mapping)
mappings = selection
return mappings

View File

@ -30,10 +30,10 @@ list_ports = _client.list_ports
list_subnets = _client.list_subnets list_subnets = _client.list_subnets
list_subnet_cidrs = _client.list_subnet_cidrs list_subnet_cidrs = _client.list_subnet_cidrs
list_agents = _client.list_agents list_agents = _client.list_agents
show_network = _client.show_network get_network = _client.get_network
show_router = _client.show_router get_router = _client.get_router
show_port = _client.show_port get_port = _client.get_port
show_subnet = _client.show_subnet get_subnet = _client.get_subnet
new_ipv4_cidr = _cidr.new_ipv4_cidr new_ipv4_cidr = _cidr.new_ipv4_cidr
new_ipv6_cidr = _cidr.new_ipv6_cidr new_ipv6_cidr = _cidr.new_ipv6_cidr

View File

@ -34,9 +34,9 @@ class NetworkingAgentFixture(tobiko.SharedFixture):
self.agents = _client.list_agents(client=self.client) self.agents = _client.list_agents(client=self.client)
def get_networking_agents(**params): def get_networking_agents(**attributes):
agents = tobiko.setup_fixture(NetworkingAgentFixture).agents agents = tobiko.setup_fixture(NetworkingAgentFixture).agents
return tobiko.find_by_items(agents, **params) return agents.with_items(**attributes)
def missing_networking_agents(count=1, **params): def missing_networking_agents(count=1, **params):

View File

@ -20,7 +20,6 @@ from neutronclient.v2_0 import client as neutronclient
import tobiko import tobiko
from tobiko.openstack import _client from tobiko.openstack import _client
from tobiko.openstack import _find
class NeutronClientFixture(_client.OpenstackClientFixture): class NeutronClientFixture(_client.OpenstackClientFixture):
@ -62,85 +61,86 @@ def get_neutron_client(session=None, shared=True, init_client=None,
return client.client return client.client
def find_network(obj, properties=None, client=None, check_found=True, _RAISE_ERROR = object()
check_unique=True, **params):
def find_network(client=None, unique=False, default=_RAISE_ERROR,
**attributes):
"""Look for a network matching some property values""" """Look for a network matching some property values"""
resources = list_networks(client=client, **params) networks = list_networks(client=client, **attributes)
return _find.find_resource(obj=obj, if default is _RAISE_ERROR or networks:
resources=resources, if unique:
properties=properties, return networks.first
check_found=check_found, else:
check_unique=check_unique) return networks.unique
else:
return default
def find_port(obj, properties=None, client=None, check_found=True, def find_port(client=None, unique=False, default=_RAISE_ERROR, **attributes):
check_unique=True, **params):
"""Look for a port matching some property values""" """Look for a port matching some property values"""
resources = list_ports(client=client, **params) ports = list_ports(client=client, **attributes)
return _find.find_resource(obj=obj, if default is _RAISE_ERROR or ports:
resources=resources, if unique:
properties=properties, return ports.first
check_found=check_found, else:
check_unique=check_unique) return ports.unique
else:
return default
def find_subnet(obj, properties=None, client=None, check_found=True, def find_subnet(client=None, unique=False, default=_RAISE_ERROR, **attributes):
check_unique=False, **params):
"""Look for a subnet matching some property values""" """Look for a subnet matching some property values"""
resources = list_subnets(client=client, **params) subnets = list_subnets(client=client, **attributes)
return _find.find_resource(obj=obj, if default is _RAISE_ERROR or subnets:
resources=resources, if unique:
properties=properties, return subnets.first
check_found=check_found, else:
check_unique=check_unique) return subnets.unique
else:
return default
def list_networks(show=False, client=None, **params): def list_networks(client=None, **params):
networks = neutron_client(client).list_networks(**params)['networks'] networks = neutron_client(client).list_networks(**params)['networks']
if show: return tobiko.select(networks)
networks = [show_network(n['id'], client=client) for n in networks]
return networks
def list_ports(show=False, client=None, **params): def list_ports(client=None, **params):
ports = neutron_client(client).list_ports(**params)['ports'] ports = neutron_client(client).list_ports(**params)['ports']
if show: return tobiko.select(ports)
ports = [show_port(p['id'], client=client) for p in ports]
return ports
def list_subnets(show=False, client=None, **params): def list_subnets(client=None, **params):
subnets = neutron_client(client).list_subnets(**params) subnets = neutron_client(client).list_subnets(**params)
if isinstance(subnets, collections.Mapping): if isinstance(subnets, collections.Mapping):
subnets = subnets['subnets'] subnets = subnets['subnets']
if show: return tobiko.select(subnets)
subnets = [show_subnet(s['id'], client=client) for s in subnets]
return subnets
def list_agents(client=None, **params): def list_agents(client=None, **params):
agents = neutron_client(client).list_agents(**params) agents = neutron_client(client).list_agents(**params)
if isinstance(agents, collections.Mapping): if isinstance(agents, collections.Mapping):
agents = agents['agents'] agents = agents['agents']
return agents return tobiko.select(agents)
def list_subnet_cidrs(client=None, **params): def list_subnet_cidrs(client=None, **params):
return [netaddr.IPNetwork(subnet['cidr']) return tobiko.select(netaddr.IPNetwork(subnet['cidr'])
for subnet in list_subnets(client=client, **params)] for subnet in list_subnets(client=client, **params))
def show_network(network, client=None, **params): def get_network(network, client=None, **params):
return neutron_client(client).show_network(network, **params)['network'] return neutron_client(client).show_network(network, **params)['network']
def show_port(port, client=None, **params): def get_port(port, client=None, **params):
return neutron_client(client).show_port(port, **params)['port'] return neutron_client(client).show_port(port, **params)['port']
def show_router(router, client=None, **params): def get_router(router, client=None, **params):
return neutron_client(client).show_router(router, **params)['router'] return neutron_client(client).show_router(router, **params)['router']
def show_subnet(subnet, client=None, **params): def get_subnet(subnet, client=None, **params):
return neutron_client(client).show_subnet(subnet, **params)['subnet'] return neutron_client(client).show_subnet(subnet, **params)['subnet']

View File

@ -98,11 +98,11 @@ class NetworkStackFixture(heat.HeatStackFixture):
@property @property
def network_details(self): def network_details(self):
return neutron.show_network(self.network_id) return neutron.get_network(self.network_id)
@property @property
def ipv4_subnet_details(self): def ipv4_subnet_details(self):
return neutron.show_subnet(self.ipv4_subnet_id) return neutron.get_subnet(self.ipv4_subnet_id)
@property @property
def ipv4_subnet_cidr(self): def ipv4_subnet_cidr(self):
@ -110,7 +110,7 @@ class NetworkStackFixture(heat.HeatStackFixture):
@property @property
def ipv6_subnet_details(self): def ipv6_subnet_details(self):
return neutron.show_subnet(self.ipv6_subnet_id) return neutron.get_subnet(self.ipv6_subnet_id)
@property @property
def ipv6_subnet_cidr(self): def ipv6_subnet_cidr(self):
@ -118,29 +118,25 @@ class NetworkStackFixture(heat.HeatStackFixture):
@property @property
def gateway_details(self): def gateway_details(self):
return neutron.show_router(self.gateway_id) return neutron.get_router(self.gateway_id)
@property @property
def ipv4_gateway_port_details(self): def ipv4_gateway_port_details(self):
return neutron.find_port( return neutron.find_port(
[{'subnet_id': self.ipv4_subnet_id, fixed_ips='subnet_id=' + self.ipv4_subnet_id,
'ip_address': self.ipv4_subnet_details['gateway_ip']}],
properties=['fixed_ips'],
device_id=self.gateway_id, device_id=self.gateway_id,
network_id=self.network_id) network_id=self.network_id)
@property @property
def ipv6_gateway_port_details(self): def ipv6_gateway_port_details(self):
return neutron.find_port( return neutron.find_port(
[{'subnet_id': self.ipv6_subnet_id, fixed_ips='subnet_id=' + self.ipv6_subnet_id,
'ip_address': self.ipv6_subnet_details['gateway_ip']}],
properties=['fixed_ips'],
device_id=self.gateway_id, device_id=self.gateway_id,
network_id=self.network_id) network_id=self.network_id)
@property @property
def gateway_network_details(self): def gateway_network_details(self):
return neutron.show_network(self.gateway_network_id) return neutron.get_network(self.gateway_network_id)
@neutron.skip_if_missing_networking_extensions('net-mtu-writable') @neutron.skip_if_missing_networking_extensions('net-mtu-writable')

View File

@ -44,18 +44,14 @@ class NetworkTestCase(testtools.TestCase):
if not self.stack.has_ipv4: if not self.stack.has_ipv4:
tobiko.skip('Stack {!s} has no ipv4 subnet', self.stack.stack_name) tobiko.skip('Stack {!s} has no ipv4 subnet', self.stack.stack_name)
subnet = neutron.find_subnet(str(self.stack.ipv4_subnet_cidr), subnet = neutron.find_subnet(cidr=str(self.stack.ipv4_subnet_cidr))
properties=['cidr']) self.assertEqual(neutron.get_subnet(self.stack.ipv4_subnet_id), subnet)
self.assertEqual(neutron.show_subnet(self.stack.ipv4_subnet_id),
subnet)
def test_ipv6_subnet_cidr(self): def test_ipv6_subnet_cidr(self):
if not self.stack.has_ipv6: if not self.stack.has_ipv6:
tobiko.skip('Stack {!s} has no ipv6 subnet', self.stack.stack_name) tobiko.skip('Stack {!s} has no ipv6 subnet', self.stack.stack_name)
subnet = neutron.find_subnet(str(self.stack.ipv6_subnet_cidr), subnet = neutron.find_subnet(cidr=str(self.stack.ipv6_subnet_cidr))
properties=['cidr']) self.assertEqual(neutron.get_subnet(self.stack.ipv6_subnet_id), subnet)
self.assertEqual(neutron.show_subnet(self.stack.ipv6_subnet_id),
subnet)
def test_gateway_network(self): def test_gateway_network(self):
if not self.stack.has_gateway: if not self.stack.has_gateway:

View File

@ -34,14 +34,14 @@ class NeutronApiTestCase(testtools.TestCase):
stack = tobiko.required_setup_fixture(stacks.NetworkStackFixture) stack = tobiko.required_setup_fixture(stacks.NetworkStackFixture)
def test_find_network_with_id(self): def test_find_network_with_id(self):
network = neutron.find_network(self.stack.network_id) network = neutron.find_network(id=self.stack.network_id)
self.assertEqual(self.stack.network_id, network['id']) self.assertEqual(self.stack.network_id, network['id'])
def test_find_floating_network(self): def test_find_floating_network(self):
floating_network = CONF.tobiko.neutron.floating_network floating_network = CONF.tobiko.neutron.floating_network
if not floating_network: if not floating_network:
tobiko.skip('floating_network not configured') tobiko.skip('floating_network not configured')
network = neutron.find_network(floating_network) network = neutron.find_network(name=floating_network)
self.assertIn(floating_network, [network['name'], network['id']]) self.assertIn(floating_network, [network['name'], network['id']])
self.assertEqual(self.stack.gateway_network_id, network['id']) self.assertEqual(self.stack.gateway_network_id, network['id'])
@ -67,8 +67,8 @@ class NeutronApiTestCase(testtools.TestCase):
cidr = netaddr.IPNetwork(self.stack.ipv6_subnet_details['cidr']) cidr = netaddr.IPNetwork(self.stack.ipv6_subnet_details['cidr'])
self.assertIn(cidr, subnets_cidrs) self.assertIn(cidr, subnets_cidrs)
def test_show_network(self): def test_get_network(self):
network = neutron.show_network(self.stack.network_id) network = neutron.get_network(self.stack.network_id)
self.assertEqual(self.stack.network_id, network['id']) self.assertEqual(self.stack.network_id, network['id'])
self.assertEqual(self.stack.port_security_enabled, self.assertEqual(self.stack.port_security_enabled,
network['port_security_enabled']) network['port_security_enabled'])
@ -81,26 +81,26 @@ class NeutronApiTestCase(testtools.TestCase):
else: else:
self.assertNotIn(self.stack.ipv6_subnet_id, network['subnets']) self.assertNotIn(self.stack.ipv6_subnet_id, network['subnets'])
def test_show_router(self): def test_get_router(self):
if not self.stack.has_gateway: if not self.stack.has_gateway:
tobiko.skip("Stack {stack} has no gateway router", tobiko.skip("Stack {stack} has no gateway router",
stack=self.stack.stack_name) stack=self.stack.stack_name)
router = neutron.show_router(self.stack.gateway_id) router = neutron.get_router(self.stack.gateway_id)
self.assertEqual(self.stack.gateway_id, router['id']) self.assertEqual(self.stack.gateway_id, router['id'])
def test_show_ipv4_subnet(self): def test_get_ipv4_subnet(self):
if not self.stack.has_ipv4: if not self.stack.has_ipv4:
tobiko.skip("Stack {stack} has no IPv4 subnet", tobiko.skip("Stack {stack} has no IPv4 subnet",
stack=self.stack.stack_name) stack=self.stack.stack_name)
subnet = neutron.show_subnet(self.stack.ipv4_subnet_id) subnet = neutron.get_subnet(self.stack.ipv4_subnet_id)
self.assertEqual(self.stack.ipv4_subnet_id, subnet['id']) self.assertEqual(self.stack.ipv4_subnet_id, subnet['id'])
self.assertEqual(self.stack.ipv4_subnet_details, subnet) self.assertEqual(self.stack.ipv4_subnet_details, subnet)
def test_show_ipv6_subnet(self): def test_get_ipv6_subnet(self):
if not self.stack.has_ipv6: if not self.stack.has_ipv6:
tobiko.skip("Stack {stack} has no IPv6 subnet", tobiko.skip("Stack {stack} has no IPv6 subnet",
stack=self.stack.stack_name) stack=self.stack.stack_name)
subnet = neutron.show_subnet(self.stack.ipv6_subnet_id) subnet = neutron.get_subnet(self.stack.ipv6_subnet_id)
self.assertEqual(self.stack.ipv6_subnet_id, subnet['id']) self.assertEqual(self.stack.ipv6_subnet_id, subnet['id'])
self.assertEqual(self.stack.ipv6_subnet_details, subnet) self.assertEqual(self.stack.ipv6_subnet_details, subnet)

View File

@ -39,5 +39,5 @@ class ServerStackResourcesTest(testtools.TestCase):
port = self.stack.resources.port port = self.stack.resources.port
self.assertEqual('OS::Neutron::Port', port.resource_type) self.assertEqual('OS::Neutron::Port', port.resource_type)
# Verify actual port status (is alive, etc.) # Verify actual port status (is alive, etc.)
port_data = neutron.show_port(port.physical_resource_id) port_data = neutron.get_port(port.physical_resource_id)
self.assertEqual(port.physical_resource_id, port_data['id']) self.assertEqual(port.physical_resource_id, port_data['id'])