Improve neutron client integration
Change-Id: Ic85cd58edef496ba16a18fb75daf33e50effeb7a
This commit is contained in:
parent
d9103b4d5c
commit
476ccd52df
|
@ -0,0 +1,53 @@
|
|||
# Copyright 2019 Red Hat
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from __future__ import absolute_import
|
||||
|
||||
|
||||
import tobiko
|
||||
|
||||
|
||||
def find_resource(obj, resources, resource_type, properties=None):
|
||||
resources = list(find_resources(obj, resources, properties=properties))
|
||||
count = len(resources)
|
||||
if count == 0:
|
||||
raise ResourceNotFound(obj=obj,
|
||||
resource_type=resource_type,
|
||||
properties=properties)
|
||||
if count > 1:
|
||||
resource_ids = [r['id'] for r in resources]
|
||||
raise MultipleResourcesFound(obj=obj,
|
||||
resource_type=resource_type,
|
||||
properties=properties,
|
||||
count=len(resources),
|
||||
resource_ids=resource_ids)
|
||||
return resources[0]
|
||||
|
||||
|
||||
def find_resources(obj, resources, properties=None):
|
||||
properties = properties or ('id', 'name')
|
||||
for resource in resources:
|
||||
for property_name in properties:
|
||||
if obj == resource[property_name]:
|
||||
yield resource
|
||||
break
|
||||
|
||||
|
||||
class ResourceNotFound(tobiko.TobikoException):
|
||||
message = ("No such {resource_type} found for {obj!r} in "
|
||||
"properties {properties!r}")
|
||||
|
||||
|
||||
class MultipleResourcesFound(tobiko.TobikoException):
|
||||
message = ("{count} {resource_type}d found for {obj!r} in "
|
||||
"properties {properties!r}: {resource_ids}")
|
|
@ -17,10 +17,14 @@ from tobiko.openstack.neutron import _client
|
|||
from tobiko.openstack.neutron import _extension
|
||||
|
||||
|
||||
neutron_client = _client.neutron_client
|
||||
get_neutron_client = _client.get_neutron_client
|
||||
NeutronClientFixture = _client.NeutronClientFixture
|
||||
find_network = _client.find_network
|
||||
list_network = _client.list_network
|
||||
list_networks = _client.list_networks
|
||||
find_subnet = _client.find_subnet
|
||||
list_subnets = _client.list_subnets
|
||||
list_subnet_cidrs = _client.list_subnet_cidrs
|
||||
show_network = _client.show_network
|
||||
show_router = _client.show_router
|
||||
show_subnet = _client.show_subnet
|
||||
|
|
|
@ -13,10 +13,12 @@
|
|||
# under the License.
|
||||
from __future__ import absolute_import
|
||||
|
||||
import netaddr
|
||||
from neutronclient.v2_0 import client as neutronclient
|
||||
|
||||
import tobiko
|
||||
from tobiko.openstack import _client
|
||||
from tobiko.openstack.neutron import _exceptions
|
||||
from tobiko.openstack import _find
|
||||
|
||||
|
||||
class NeutronClientFixture(_client.OpenstackClientFixture):
|
||||
|
@ -28,47 +30,70 @@ class NeutronClientFixture(_client.OpenstackClientFixture):
|
|||
CLIENTS = _client.OpenstackClientManager(init_client=NeutronClientFixture)
|
||||
|
||||
|
||||
def neutron_client(obj):
|
||||
if not obj:
|
||||
return get_neutron_client()
|
||||
|
||||
if isinstance(obj, neutronclient.Client):
|
||||
return obj
|
||||
|
||||
fixture = tobiko.setup_fixture(obj)
|
||||
if isinstance(fixture, NeutronClientFixture):
|
||||
return fixture.client
|
||||
|
||||
message = "Object {!r} is not a NeutronClientFixture".format(obj)
|
||||
raise TypeError(message)
|
||||
|
||||
|
||||
def get_neutron_client(session=None, shared=True, init_client=None,
|
||||
manager=None):
|
||||
manager = manager or CLIENTS
|
||||
client = manager.get_client(session=session, shared=shared,
|
||||
init_client=init_client)
|
||||
client.setUp()
|
||||
tobiko.setup_fixture(client)
|
||||
return client.client
|
||||
|
||||
|
||||
def find_network(network, session=None, **params):
|
||||
networks = [n
|
||||
for n in list_network(session=session, **params)
|
||||
if network in (n['name'], n['id'])]
|
||||
|
||||
if not networks:
|
||||
raise _exceptions.NoSuchNetwork(network=network)
|
||||
|
||||
elif len(networks) > 1:
|
||||
network_ids = [n['id'] for n in networks]
|
||||
raise _exceptions.MoreNetworksFound(
|
||||
network=network,
|
||||
netowrk_ids=(', '.join(network_ids)))
|
||||
|
||||
return networks[0]
|
||||
def find_network(obj, properties=None, client=None, **params):
|
||||
"""Look for the unique network matching some property values"""
|
||||
return _find.find_resource(
|
||||
obj=obj, resource_type='network', properties=properties,
|
||||
resources=list_networks(client=client, **params))
|
||||
|
||||
|
||||
def list_network(session=None, **params):
|
||||
return get_neutron_client(session=session).list_networks(**params)[
|
||||
'networks']
|
||||
def find_subnet(obj, properties=None, client=None, **params):
|
||||
"""Look for the unique subnet matching some property values"""
|
||||
return _find.find_resource(
|
||||
obj=obj, resource_type='subnet', properties=properties,
|
||||
resources=list_subnets(client=client, **params))
|
||||
|
||||
|
||||
def show_network(network, session=None, **params):
|
||||
return get_neutron_client(session=session).show_network(
|
||||
network, **params)['network']
|
||||
def list_networks(show=False, client=None, **params):
|
||||
networks = neutron_client(client).list_networks(**params)['networks']
|
||||
if show:
|
||||
networks = [show_network(n['id'], client=client) for n in networks]
|
||||
return networks
|
||||
|
||||
|
||||
def show_router(router, session=None, **params):
|
||||
return get_neutron_client(session=session).show_router(
|
||||
router, **params)['router']
|
||||
def list_subnets(show=False, client=None, **params):
|
||||
subnets = neutron_client(client).list_subnets(**params)['subnets']
|
||||
if show:
|
||||
subnets = [show_subnet(s['id'], client=client) for s in subnets]
|
||||
return subnets
|
||||
|
||||
|
||||
def show_subnet(subnet, session=None, **params):
|
||||
return get_neutron_client(session=session).show_subnet(
|
||||
subnet, **params)['subnet']
|
||||
def list_subnet_cidrs(client=None, **params):
|
||||
return [netaddr.IPNetwork(subnet['cidr'])
|
||||
for subnet in list_subnets(client=client, **params)]
|
||||
|
||||
|
||||
def show_network(network, client=None, **params):
|
||||
return neutron_client(client).show_network(network, **params)['network']
|
||||
|
||||
|
||||
def show_router(router, client=None, **params):
|
||||
return neutron_client(client).show_router(router, **params)['router']
|
||||
|
||||
|
||||
def show_subnet(subnet, client=None, **params):
|
||||
return neutron_client(client).show_subnet(subnet, **params)['subnet']
|
||||
|
|
|
@ -20,4 +20,16 @@ def register_tobiko_options(conf):
|
|||
conf.register_opts(
|
||||
group=cfg.OptGroup('neutron'),
|
||||
opts=[cfg.StrOpt('floating_network',
|
||||
help="Network for creating floating IPs")])
|
||||
help="Network for creating floating IPs"),
|
||||
cfg.StrOpt('ipv4_cidr',
|
||||
default='10.100.0.0/16',
|
||||
help="The CIDR block to allocate IPv4 subnets from"),
|
||||
cfg.IntOpt('ipv4_prefixlen',
|
||||
default=24,
|
||||
help="The mask bits for IPv4 subnets"),
|
||||
cfg.StrOpt('ipv6_cidr',
|
||||
default='2003::/48',
|
||||
help="The CIDR block to allocate IPv6 subnets from"),
|
||||
cfg.IntOpt('ipv6_prefixlen',
|
||||
default=64,
|
||||
help="The mask bits for IPv6 subnets")])
|
||||
|
|
|
@ -74,12 +74,12 @@ class NetworkStackFixture(heat.HeatStackFixture):
|
|||
return neutron.show_subnet(self.ipv4_subnet_id)
|
||||
|
||||
@property
|
||||
def gateway_details(self):
|
||||
return neutron.show_router(self.gateway_id)
|
||||
def ipv6_subnet_details(self):
|
||||
return neutron.show_subnet(self.ipv6_subnet_id)
|
||||
|
||||
@property
|
||||
def gateway_network_id(self):
|
||||
return neutron.find_network(self.gateway_network)['id']
|
||||
def gateway_details(self):
|
||||
return neutron.show_router(self.gateway_id)
|
||||
|
||||
@property
|
||||
def gateway_network_details(self):
|
||||
|
|
|
@ -183,6 +183,11 @@ outputs:
|
|||
value: {get_resource: gateway}
|
||||
condition: has_gateway
|
||||
|
||||
gateway_network_id:
|
||||
description: Gateway network ID
|
||||
value: {get_attr: [gateway, external_gateway_info, network_id]}
|
||||
condition: has_gateway
|
||||
|
||||
mtu:
|
||||
description: Network MTU value (integer)
|
||||
value: {get_attr: [network, mtu]}
|
||||
|
|
|
@ -0,0 +1,105 @@
|
|||
# Copyright (c) 2019 Red Hat, Inc.
|
||||
#
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from __future__ import absolute_import
|
||||
|
||||
import netaddr
|
||||
import testtools
|
||||
|
||||
import tobiko
|
||||
from tobiko import config
|
||||
from tobiko.openstack import neutron
|
||||
from tobiko.openstack import stacks
|
||||
|
||||
|
||||
CONF = config.CONF
|
||||
|
||||
|
||||
class NeutronApiTestCase(testtools.TestCase):
|
||||
"""Tests network creation"""
|
||||
|
||||
#: Stack of resources with a network with a gateway router
|
||||
stack = tobiko.required_setup_fixture(stacks.NetworkStackFixture)
|
||||
|
||||
def test_find_network_with_id(self):
|
||||
network = neutron.find_network(self.stack.network_id)
|
||||
self.assertEqual(self.stack.network_id, network['id'])
|
||||
|
||||
def test_find_floating_network(self):
|
||||
floating_network = CONF.tobiko.neutron.floating_network
|
||||
if not floating_network:
|
||||
tobiko.skip('floating_network not configured')
|
||||
network = neutron.find_network(floating_network)
|
||||
self.assertIn(floating_network, [network['name'], network['id']])
|
||||
self.assertEqual(self.stack.gateway_network_id, network['id'])
|
||||
|
||||
def test_list_networks(self):
|
||||
networks = neutron.list_networks()
|
||||
network_ids = {n['id'] for n in networks}
|
||||
self.assertIn(self.stack.network_id, network_ids)
|
||||
|
||||
def test_list_subnets(self):
|
||||
subnets = neutron.list_subnets()
|
||||
subnets_ids = {s['id'] for s in subnets}
|
||||
if self.stack.has_ipv4:
|
||||
self.assertIn(self.stack.ipv4_subnet_id, subnets_ids)
|
||||
if self.stack.has_ipv6:
|
||||
self.assertIn(self.stack.ipv6_subnet_id, subnets_ids)
|
||||
|
||||
def test_list_subnet_cidrs(self):
|
||||
subnets_cidrs = neutron.list_subnet_cidrs()
|
||||
if self.stack.has_ipv4:
|
||||
cidr = netaddr.IPNetwork(self.stack.ipv4_subnet_details['cidr'])
|
||||
self.assertIn(cidr, subnets_cidrs)
|
||||
if self.stack.has_ipv6:
|
||||
cidr = netaddr.IPNetwork(self.stack.ipv6_subnet_details['cidr'])
|
||||
self.assertIn(cidr, subnets_cidrs)
|
||||
|
||||
def test_show_network(self):
|
||||
network = neutron.show_network(self.stack.network_id)
|
||||
self.assertEqual(self.stack.network_id, network['id'])
|
||||
self.assertEqual(self.stack.port_security_enabled,
|
||||
network['port_security_enabled'])
|
||||
if self.stack.has_ipv4:
|
||||
self.assertIn(self.stack.ipv4_subnet_id, network['subnets'])
|
||||
else:
|
||||
self.assertNotIn(self.stack.ipv4_subnet_id, network['subnets'])
|
||||
if self.stack.has_ipv6:
|
||||
self.assertIn(self.stack.ipv6_subnet_id, network['subnets'])
|
||||
else:
|
||||
self.assertNotIn(self.stack.ipv6_subnet_id, network['subnets'])
|
||||
|
||||
def test_show_router(self):
|
||||
if not self.stack.has_gateway:
|
||||
tobiko.skip("Stack {stack} has no gateway router",
|
||||
stack=self.stack.stack_name)
|
||||
router = neutron.show_router(self.stack.gateway_id)
|
||||
self.assertEqual(self.stack.gateway_id, router['id'])
|
||||
|
||||
def test_show_ipv4_subnet(self):
|
||||
if not self.stack.has_ipv4:
|
||||
tobiko.skip("Stack {stack} has no IPv4 subnet",
|
||||
stack=self.stack.stack_name)
|
||||
subnet = neutron.show_subnet(self.stack.ipv4_subnet_id)
|
||||
self.assertEqual(self.stack.ipv4_subnet_id, subnet['id'])
|
||||
self.assertEqual(str(self.stack.ipv4_cidr), subnet['cidr'])
|
||||
|
||||
def test_show_ipv6_subnet(self):
|
||||
if not self.stack.has_ipv6:
|
||||
tobiko.skip("Stack {stack} has no IPv6 subnet",
|
||||
stack=self.stack.stack_name)
|
||||
subnet = neutron.show_subnet(self.stack.ipv6_subnet_id)
|
||||
self.assertEqual(self.stack.ipv6_subnet_id, subnet['id'])
|
||||
self.assertEqual(str(self.stack.ipv6_cidr), subnet['cidr'])
|
|
@ -15,6 +15,8 @@
|
|||
# under the License.
|
||||
from __future__ import absolute_import
|
||||
|
||||
import unittest
|
||||
|
||||
import netaddr
|
||||
import testtools
|
||||
|
||||
|
@ -43,15 +45,49 @@ class NetworkTestCase(testtools.TestCase):
|
|||
self.assertEqual(self.stack.network_details['mtu'],
|
||||
self.stack.outputs.mtu)
|
||||
|
||||
@unittest.skip('Feature not implemented')
|
||||
def test_ipv4_subnet_cidr(self):
|
||||
self.assertEqual(self.stack.ipv4_cidr,
|
||||
if not self.stack.has_ipv4:
|
||||
tobiko.skip('Stack {!s} has no ipv4 subnet', self.stack.stack_name)
|
||||
|
||||
self.assertEqual(str(self.stack.ipv4_cidr),
|
||||
self.stack.ipv4_subnet_details['cidr'])
|
||||
|
||||
subnet = neutron.find_subnet(str(self.stack.ipv4_cidr),
|
||||
properties=['cidr'])
|
||||
self.assertEqual(neutron.show_subnet(self.stack.ipv4_subnet_id),
|
||||
subnet)
|
||||
|
||||
@unittest.skip('Feature not implemented')
|
||||
def test_ipv6_subnet_cidr(self):
|
||||
if not self.stack.has_ipv6:
|
||||
tobiko.skip('Stack {!s} has no ipv4 subnet', self.stack.stack_name)
|
||||
self.assertEqual(str(self.stack.ipv6_cidr),
|
||||
self.stack.ipv6_subnet_details['cidr'])
|
||||
|
||||
subnet = neutron.find_subnet(str(self.stack.ipv6_cidr),
|
||||
properties=['cidr'])
|
||||
self.assertEqual(neutron.show_subnet(self.stack.ipv6_subnet_id),
|
||||
subnet)
|
||||
|
||||
def test_ipv4_subnet_gateway_ip(self):
|
||||
if not self.stack.has_ipv4 or self.stack.has_gateway:
|
||||
tobiko.skip('Stack {!s} has no IPv4 gateway',
|
||||
self.stack.stack_name)
|
||||
self.assertEqual(str(netaddr.IPNetwork(self.stack.ipv4_cidr).ip + 1),
|
||||
self.stack.ipv4_subnet_details['gateway_ip'])
|
||||
|
||||
def test_ipv6_subnet_gateway_ip(self):
|
||||
if not self.stack.has_ipv6 or self.stack.has_gateway:
|
||||
tobiko.skip('Stack {!s} has no IPv6 gateway',
|
||||
self.stack.stack_name)
|
||||
self.assertEqual(str(netaddr.IPNetwork(self.stack.ipv6_cidr).ip + 1),
|
||||
self.stack.ipv6_subnet_details['gateway_ip'])
|
||||
|
||||
def test_gateway_network(self):
|
||||
if not self.stack.has_gateway:
|
||||
tobiko.skip('Stack {!s} has no gateway',
|
||||
self.stack.stack_name)
|
||||
self.assertEqual(
|
||||
self.stack.gateway_network_id,
|
||||
self.stack.gateway_details['external_gateway_info']['network_id'])
|
||||
|
|
|
@ -45,3 +45,24 @@ class GetNeutronClientTest(openstack.OpenstackTest):
|
|||
def test_get_neutron_client_with_session(self):
|
||||
session = keystone.get_keystone_session()
|
||||
self.test_get_neutron_client(session=session)
|
||||
|
||||
|
||||
class NeutronClientTest(openstack.OpenstackTest):
|
||||
|
||||
def test_neutron_client_with_none(self):
|
||||
default_client = neutron.get_neutron_client()
|
||||
client = neutron.neutron_client(None)
|
||||
self.assertIsInstance(client, neutronclient.Client)
|
||||
self.assertIs(default_client, client)
|
||||
|
||||
def test_neutron_client_with_client(self):
|
||||
default_client = neutron.get_neutron_client()
|
||||
client = neutron.neutron_client(default_client)
|
||||
self.assertIsInstance(client, neutronclient.Client)
|
||||
self.assertIs(default_client, client)
|
||||
|
||||
def test_neutron_client_with_fixture(self):
|
||||
fixture = neutron.NeutronClientFixture()
|
||||
client = neutron.neutron_client(fixture)
|
||||
self.assertIsInstance(client, neutronclient.Client)
|
||||
self.assertIs(client, fixture.client)
|
||||
|
|
Loading…
Reference in New Issue