Remove neutron OO wrappers

There are object-oriented wrappers around neutron objects that add
needless complexity to the code and make it more difficult to
read, understand and debug.  This patch will remove those wrappers
(which are all contained in tempest/scenario/network_resources.py).
The tests that previously used those wrappers have been updated to
client results directly.

Co-Authored-by: Joshua White <joshua.l.white@intel.com>
Change-Id: I3879ae24e47590d12fb09a4bb4697f139f57caf7
This commit is contained in:
Steve Heyman 2016-05-24 09:28:08 -05:00
parent 9d0f53a1a7
commit 33735f2685
6 changed files with 154 additions and 340 deletions

View File

@ -29,7 +29,6 @@ from tempest import config
from tempest import exceptions
from tempest.lib.common.utils import test_utils
from tempest.lib import exceptions as lib_exc
from tempest.scenario import network_resources
import tempest.test
CONF = config.CONF
@ -704,12 +703,12 @@ class NetworkScenarioTest(ScenarioTest):
tenant_id = networks_client.tenant_id
name = data_utils.rand_name(namestart)
result = networks_client.create_network(name=name, tenant_id=tenant_id)
network = network_resources.DeletableNetwork(
networks_client=networks_client, routers_client=routers_client,
**result['network'])
self.assertEqual(network.name, name)
network = result['network']
self.assertEqual(network['name'], name)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
network.delete)
self.networks_client.delete_network,
network['id'])
return network
def _list_networks(self, *args, **kwargs):
@ -779,13 +778,13 @@ class NetworkScenarioTest(ScenarioTest):
# blocks until an unallocated block is found.
for subnet_cidr in tenant_cidr.subnet(num_bits):
str_cidr = str(subnet_cidr)
if cidr_in_use(str_cidr, tenant_id=network.tenant_id):
if cidr_in_use(str_cidr, tenant_id=network['tenant_id']):
continue
subnet = dict(
name=data_utils.rand_name(namestart),
network_id=network.id,
tenant_id=network.tenant_id,
network_id=network['id'],
tenant_id=network['tenant_id'],
cidr=str_cidr,
ip_version=ip_version,
**kwargs
@ -798,11 +797,13 @@ class NetworkScenarioTest(ScenarioTest):
if not is_overlapping_cidr:
raise
self.assertIsNotNone(result, 'Unable to allocate tenant network')
subnet = network_resources.DeletableSubnet(
subnets_client=subnets_client,
routers_client=routers_client, **result['subnet'])
self.assertEqual(subnet.cidr, str_cidr)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, subnet.delete)
subnet = result['subnet']
self.assertEqual(subnet['cidr'], str_cidr)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
subnets_client.delete_subnet, subnet['id'])
return subnet
def _create_port(self, network_id, client=None, namestart='port-quotatest',
@ -815,9 +816,9 @@ class NetworkScenarioTest(ScenarioTest):
network_id=network_id,
**kwargs)
self.assertIsNotNone(result, 'Unable to allocate port')
port = network_resources.DeletablePort(ports_client=client,
**result['port'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc, port.delete)
port = result['port']
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
client.delete_port, port['id'])
return port
def _get_server_port_id_and_ip4(self, server, ip_addr=None):
@ -852,7 +853,7 @@ class NetworkScenarioTest(ScenarioTest):
net = self._list_networks(name=network_name)
self.assertNotEqual(len(net), 0,
"Unable to get network by name: %s" % network_name)
return network_resources.AttributeDict(net[0])
return net[0]
def create_floating_ip(self, thing, external_network_id=None,
port_id=None, client=None):
@ -871,44 +872,51 @@ class NetworkScenarioTest(ScenarioTest):
tenant_id=thing['tenant_id'],
fixed_ip_address=ip4
)
floating_ip = network_resources.DeletableFloatingIp(
client=client,
**result['floatingip'])
floating_ip = result['floatingip']
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
floating_ip.delete)
self.floating_ips_client.delete_floatingip,
floating_ip['id'])
return floating_ip
def _associate_floating_ip(self, floating_ip, server):
port_id, _ = self._get_server_port_id_and_ip4(server)
floating_ip.update(port_id=port_id)
self.assertEqual(port_id, floating_ip.port_id)
kwargs = dict(port_id=port_id)
floating_ip = self.floating_ips_client.update_floatingip(
floating_ip['id'], **kwargs)['floatingip']
self.assertEqual(port_id, floating_ip['port_id'])
return floating_ip
def _disassociate_floating_ip(self, floating_ip):
""":param floating_ip: type DeletableFloatingIp"""
floating_ip.update(port_id=None)
self.assertIsNone(floating_ip.port_id)
""":param floating_ip: floating_ips_client.create_floatingip"""
kwargs = dict(port_id=None)
floating_ip = self.floating_ips_client.update_floatingip(
floating_ip['id'], **kwargs)['floatingip']
self.assertIsNone(floating_ip['port_id'])
return floating_ip
def check_floating_ip_status(self, floating_ip, status):
"""Verifies floatingip reaches the given status
:param floating_ip: network_resources.DeletableFloatingIp floating
IP to check status
:param dict floating_ip: floating IP dict to check status
:param status: target status
:raises: AssertionError if status doesn't match
"""
floatingip_id = floating_ip['id']
def refresh():
floating_ip.refresh()
return status == floating_ip.status
result = (self.floating_ips_client.
show_floatingip(floatingip_id)['floatingip'])
return status == result['status']
tempest.test.call_until_true(refresh,
CONF.network.build_timeout,
CONF.network.build_interval)
self.assertEqual(status, floating_ip.status,
floating_ip = self.floating_ips_client.show_floatingip(
floatingip_id)['floatingip']
self.assertEqual(status, floating_ip['status'],
message="FloatingIP: {fp} is at status: {cst}. "
"failed to reach status: {st}"
.format(fp=floating_ip, cst=floating_ip.status,
.format(fp=floating_ip, cst=floating_ip['status'],
st=status))
LOG.info("FloatingIP: {fp} is at status: {st}"
.format(fp=floating_ip, st=status))
@ -981,8 +989,8 @@ class NetworkScenarioTest(ScenarioTest):
secgroup=secgroup,
security_groups_client=security_groups_client)
for rule in rules:
self.assertEqual(tenant_id, rule.tenant_id)
self.assertEqual(secgroup.id, rule.security_group_id)
self.assertEqual(tenant_id, rule['tenant_id'])
self.assertEqual(secgroup['id'], rule['security_group_id'])
return secgroup
def _create_empty_security_group(self, client=None, tenant_id=None,
@ -994,7 +1002,7 @@ class NetworkScenarioTest(ScenarioTest):
- IPv6 egress to any
:param tenant_id: secgroup will be created in this tenant
:returns: DeletableSecurityGroup -- containing the secgroup created
:returns: the created security group
"""
if client is None:
client = self.security_groups_client
@ -1006,15 +1014,14 @@ class NetworkScenarioTest(ScenarioTest):
description=sg_desc)
sg_dict['tenant_id'] = tenant_id
result = client.create_security_group(**sg_dict)
secgroup = network_resources.DeletableSecurityGroup(
client=client, routers_client=self.routers_client,
**result['security_group']
)
self.assertEqual(secgroup.name, sg_name)
self.assertEqual(tenant_id, secgroup.tenant_id)
self.assertEqual(secgroup.description, sg_desc)
secgroup = result['security_group']
self.assertEqual(secgroup['name'], sg_name)
self.assertEqual(tenant_id, secgroup['tenant_id'])
self.assertEqual(secgroup['description'], sg_desc)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
secgroup.delete)
client.delete_security_group, secgroup['id'])
return secgroup
def _default_security_group(self, client=None, tenant_id=None):
@ -1032,8 +1039,7 @@ class NetworkScenarioTest(ScenarioTest):
]
msg = "No default security group for tenant %s." % (tenant_id)
self.assertTrue(len(sgs) > 0, msg)
return network_resources.DeletableSecurityGroup(client=client,
**sgs[0])
return sgs[0]
def _create_security_group_rule(self, secgroup=None,
sec_group_rules_client=None,
@ -1044,7 +1050,7 @@ class NetworkScenarioTest(ScenarioTest):
Create a rule in a secgroup. if secgroup not defined will search for
default secgroup in tenant_id.
:param secgroup: type DeletableSecurityGroup.
:param secgroup: the security group.
:param tenant_id: if secgroup not passed -- the tenant in which to
search for default secgroup
:param kwargs: a dictionary containing rule parameters:
@ -1066,17 +1072,15 @@ class NetworkScenarioTest(ScenarioTest):
secgroup = self._default_security_group(
client=security_groups_client, tenant_id=tenant_id)
ruleset = dict(security_group_id=secgroup.id,
tenant_id=secgroup.tenant_id)
ruleset = dict(security_group_id=secgroup['id'],
tenant_id=secgroup['tenant_id'])
ruleset.update(kwargs)
sg_rule = sec_group_rules_client.create_security_group_rule(**ruleset)
sg_rule = network_resources.DeletableSecurityGroupRule(
client=sec_group_rules_client,
**sg_rule['security_group_rule']
)
self.assertEqual(secgroup.tenant_id, sg_rule.tenant_id)
self.assertEqual(secgroup.id, sg_rule.security_group_id)
sg_rule = sg_rule['security_group_rule']
self.assertEqual(secgroup['tenant_id'], sg_rule['tenant_id'])
self.assertEqual(secgroup['id'], sg_rule['security_group_id'])
return sg_rule
@ -1130,7 +1134,7 @@ class NetworkScenarioTest(ScenarioTest):
if msg not in ex._error_string:
raise ex
else:
self.assertEqual(r_direction, sg_rule.direction)
self.assertEqual(r_direction, sg_rule['direction'])
rules.append(sg_rule)
return rules
@ -1152,10 +1156,11 @@ class NetworkScenarioTest(ScenarioTest):
network_id = CONF.network.public_network_id
if router_id:
body = client.show_router(router_id)
return network_resources.AttributeDict(**body['router'])
return body['router']
elif network_id:
router = self._create_router(client, tenant_id)
router.set_gateway(network_id)
kwargs = {'external_gateway_info': dict(network_id=network_id)}
router = client.update_router(router['id'], **kwargs)['router']
return router
else:
raise Exception("Neither of 'public_router_id' or "
@ -1171,15 +1176,18 @@ class NetworkScenarioTest(ScenarioTest):
result = client.create_router(name=name,
admin_state_up=True,
tenant_id=tenant_id)
router = network_resources.DeletableRouter(routers_client=client,
**result['router'])
self.assertEqual(router.name, name)
self.addCleanup(test_utils.call_and_ignore_notfound_exc, router.delete)
router = result['router']
self.assertEqual(router['name'], name)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
client.delete_router,
router['id'])
return router
def _update_router_admin_state(self, router, admin_state_up):
router.update(admin_state_up=admin_state_up)
self.assertEqual(admin_state_up, router.admin_state_up)
kwargs = dict(admin_state_up=admin_state_up)
router = self.routers_client.update_router(
router['id'], **kwargs)['router']
self.assertEqual(admin_state_up, router['admin_state_up'])
def create_networks(self, networks_client=None,
routers_client=None, subnets_client=None,
@ -1212,7 +1220,6 @@ class NetworkScenarioTest(ScenarioTest):
tenant_id=tenant_id)
router = self._get_router(client=routers_client,
tenant_id=tenant_id)
subnet_kwargs = dict(network=network,
subnets_client=subnets_client,
routers_client=routers_client)
@ -1220,7 +1227,17 @@ class NetworkScenarioTest(ScenarioTest):
if dns_nameservers is not None:
subnet_kwargs['dns_nameservers'] = dns_nameservers
subnet = self._create_subnet(**subnet_kwargs)
subnet.add_to_router(router.id)
if not routers_client:
routers_client = self.routers_client
router_id = router['id']
routers_client.add_router_interface(router_id,
subnet_id=subnet['id'])
# save a cleanup job to remove this association between
# router and subnet
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
routers_client.remove_router_interface, router_id,
subnet_id=subnet['id'])
return network, subnet, router

View File

@ -1,220 +0,0 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import time
import six
from tempest import exceptions
from tempest.lib.common.utils import misc
class AttributeDict(dict):
"""Provide attribute access (dict.key) to dictionary values."""
def __getattr__(self, name):
"""Allow attribute access for all keys in the dict."""
if name in self:
return self[name]
return super(AttributeDict, self).__getattribute__(name)
@six.add_metaclass(abc.ABCMeta)
class DeletableResource(AttributeDict):
"""Support deletion of neutron resources (networks, subnets)
via a delete() method, as is supported by keystone and nova resources.
"""
def __init__(self, *args, **kwargs):
self.client = kwargs.pop('client', None)
self.networks_client = kwargs.pop('networks_client', None)
self.routers_client = kwargs.pop('routers_client', None)
self.subnets_client = kwargs.pop('subnets_client', None)
self.ports_client = kwargs.pop('ports_client', None)
super(DeletableResource, self).__init__(*args, **kwargs)
def __str__(self):
return '<%s id="%s" name="%s">' % (self.__class__.__name__,
self.id, self.name)
@abc.abstractmethod
def delete(self):
return
@abc.abstractmethod
def refresh(self):
return
def __hash__(self):
return hash(self.id)
def wait_for_status(self, status):
if not hasattr(self, 'status'):
return
def helper_get():
self.refresh()
return self
return self.wait_for_resource_status(helper_get, status)
def wait_for_resource_status(self, fetch, status):
"""Waits for a network resource to reach a status
@param fetch: the callable to be used to query the resource status
@type fetch: callable that takes no parameters and returns the resource
@param status: the status that the resource has to reach
@type status: String
"""
interval = self.build_interval
timeout = self.build_timeout
start_time = time.time()
while time.time() - start_time <= timeout:
resource = fetch()
if resource['status'] == status:
return
time.sleep(interval)
# At this point, the wait has timed out
message = 'Resource %s' % (str(resource))
message += ' failed to reach status %s' % status
message += ' (current: %s)' % resource['status']
message += ' within the required time %s' % timeout
caller = misc.find_test_caller()
if caller:
message = '(%s) %s' % (caller, message)
raise exceptions.TimeoutException(message)
class DeletableNetwork(DeletableResource):
def delete(self):
self.networks_client.delete_network(self.id)
class DeletableSubnet(DeletableResource):
def __init__(self, *args, **kwargs):
super(DeletableSubnet, self).__init__(*args, **kwargs)
self._router_ids = set()
def update(self, *args, **kwargs):
result = self.subnets_client.update_subnet(self.id,
*args,
**kwargs)
return super(DeletableSubnet, self).update(**result['subnet'])
def add_to_router(self, router_id):
self._router_ids.add(router_id)
self.routers_client.add_router_interface(router_id,
subnet_id=self.id)
def delete(self):
for router_id in self._router_ids.copy():
self.routers_client.remove_router_interface(router_id,
subnet_id=self.id)
self._router_ids.remove(router_id)
self.subnets_client.delete_subnet(self.id)
class DeletableRouter(DeletableResource):
def set_gateway(self, network_id):
return self.update(external_gateway_info=dict(network_id=network_id))
def unset_gateway(self):
return self.update(external_gateway_info=dict())
def update(self, *args, **kwargs):
result = self.routers_client.update_router(self.id,
*args,
**kwargs)
return super(DeletableRouter, self).update(**result['router'])
def delete(self):
self.unset_gateway()
self.routers_client.delete_router(self.id)
class DeletableFloatingIp(DeletableResource):
def refresh(self, *args, **kwargs):
result = self.client.show_floatingip(self.id,
*args,
**kwargs)
super(DeletableFloatingIp, self).update(**result['floatingip'])
def update(self, *args, **kwargs):
result = self.client.update_floatingip(self.id,
*args,
**kwargs)
super(DeletableFloatingIp, self).update(**result['floatingip'])
def __repr__(self):
return '<%s addr="%s">' % (self.__class__.__name__,
self.floating_ip_address)
def __str__(self):
return '<"FloatingIP" addr="%s" id="%s">' % (self.floating_ip_address,
self.id)
def delete(self):
self.client.delete_floatingip(self.id)
class DeletablePort(DeletableResource):
def delete(self):
self.ports_client.delete_port(self.id)
class DeletableSecurityGroup(DeletableResource):
def delete(self):
self.client.delete_security_group(self.id)
class DeletableSecurityGroupRule(DeletableResource):
def __repr__(self):
return '<%s id="%s">' % (self.__class__.__name__, self.id)
def delete(self):
self.client.delete_security_group_rule(self.id)
class DeletablePool(DeletableResource):
def delete(self):
self.client.delete_pool(self.id)
class DeletableMember(DeletableResource):
def delete(self):
self.client.delete_member(self.id)
class DeletableVip(DeletableResource):
def delete(self):
self.client.delete_vip(self.id)
def refresh(self):
result = self.client.show_vip(self.id)
super(DeletableVip, self).update(**result['vip'])

View File

@ -58,7 +58,7 @@ class TestNetworkAdvancedServerOps(manager.NetworkScenarioTest):
server_name = data_utils.rand_name('server-smoke')
server = self.create_server(
name=server_name,
networks=[{'uuid': network.id}],
networks=[{'uuid': network['id']}],
key_name=keypair['name'],
security_groups=[{'name': security_group['name']}],
wait_until='ACTIVE')
@ -78,7 +78,7 @@ class TestNetworkAdvancedServerOps(manager.NetworkScenarioTest):
server, username, private_key,
should_connect=should_connect,
servers_for_debug=[server])
floating_ip_addr = floating_ip.floating_ip_address
floating_ip_addr = floating_ip['floating_ip_address']
# Check FloatingIP status before checking the connectivity
self.check_floating_ip_status(floating_ip, 'ACTIVE')
self.check_public_network_connectivity(floating_ip_addr, username,

View File

@ -25,7 +25,6 @@ from tempest import config
from tempest import exceptions
from tempest.lib.common.utils import test_utils
from tempest.scenario import manager
from tempest.scenario import network_resources
from tempest import test
CONF = config.CONF
@ -114,7 +113,7 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
self.port_id = None
if boot_with_port:
# create a port on the network and boot with that
self.port_id = self._create_port(self.network['id']).id
self.port_id = self._create_port(self.network['id'])['id']
self.ports.append({'port': self.port_id})
name = data_utils.rand_name('server-smoke')
@ -133,30 +132,30 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
seen_nets = self._list_networks()
seen_names = [n['name'] for n in seen_nets]
seen_ids = [n['id'] for n in seen_nets]
self.assertIn(self.network.name, seen_names)
self.assertIn(self.network.id, seen_ids)
self.assertIn(self.network['name'], seen_names)
self.assertIn(self.network['id'], seen_ids)
if self.subnet:
seen_subnets = self._list_subnets()
seen_net_ids = [n['network_id'] for n in seen_subnets]
seen_subnet_ids = [n['id'] for n in seen_subnets]
self.assertIn(self.network.id, seen_net_ids)
self.assertIn(self.subnet.id, seen_subnet_ids)
self.assertIn(self.network['id'], seen_net_ids)
self.assertIn(self.subnet['id'], seen_subnet_ids)
if self.router:
seen_routers = self._list_routers()
seen_router_ids = [n['id'] for n in seen_routers]
seen_router_names = [n['name'] for n in seen_routers]
self.assertIn(self.router.name,
self.assertIn(self.router['name'],
seen_router_names)
self.assertIn(self.router.id,
self.assertIn(self.router['id'],
seen_router_ids)
def _create_server(self, name, network, port_id=None):
keypair = self.create_keypair()
self.keypairs[keypair['name']] = keypair
security_groups = [{'name': self.security_group['name']}]
network = {'uuid': network.id}
network = {'uuid': network['id']}
if port_id is not None:
network['port'] = port_id
@ -198,7 +197,7 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
"""
ssh_login = CONF.validation.image_ssh_user
floating_ip, server = self.floating_ip_tuple
ip_address = floating_ip.floating_ip_address
ip_address = floating_ip['floating_ip_address']
private_key = None
floatingip_status = 'DOWN'
if should_connect:
@ -239,7 +238,7 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
def _hotplug_server(self):
old_floating_ip, server = self.floating_ip_tuple
ip_address = old_floating_ip.floating_ip_address
ip_address = old_floating_ip['floating_ip_address']
private_key = self._get_server_key(server)
ssh_client = self.get_remote_client(
ip_address, private_key=private_key)
@ -250,7 +249,7 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
old_port = port_list[0]
interface = self.interface_client.create_interface(
server_id=server['id'],
net_id=self.new_net.id)['interfaceAttachment']
net_id=self.new_net['id'])['interfaceAttachment']
self.addCleanup(self.ports_client.wait_for_resource_deletion,
interface['port_id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
@ -270,9 +269,7 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
"Old port: %s. Number of new ports: %d" % (
CONF.network.build_timeout, old_port,
len(self.new_port_list)))
new_port = network_resources.DeletablePort(
ports_client=self.ports_client,
**self.new_port_list[0])
new_port = self.new_port_list[0]
def check_new_nic():
new_nic_list = self._get_server_nics(ssh_client)
@ -287,7 +284,8 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
num, new_nic = self.diff_list[0]
ssh_client.assign_static_ip(nic=new_nic,
addr=new_port.fixed_ips[0]['ip_address'])
addr=new_port['fixed_ips'][0][
'ip_address'])
ssh_client.set_nic_state(nic=new_nic)
def _get_server_nics(self, ssh_client):
@ -307,7 +305,7 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
# get all network ports in the new network
internal_ips = (p['fixed_ips'][0]['ip_address'] for p in
self._list_ports(tenant_id=server['tenant_id'],
network_id=network.id)
network_id=network['id'])
if p['device_owner'].startswith('network'))
self._check_server_connectivity(floating_ip,
@ -335,7 +333,7 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
def _check_server_connectivity(self, floating_ip, address_list,
should_connect=True):
ip_address = floating_ip.floating_ip_address
ip_address = floating_ip['floating_ip_address']
private_key = self._get_server_key(self.floating_ip_tuple.server)
ssh_source = self.get_remote_client(
ip_address, private_key=private_key)
@ -451,7 +449,13 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
self._create_server(name, self.new_net)
self._check_network_internal_connectivity(network=self.new_net,
should_connect=False)
self.new_subnet.add_to_router(self.router.id)
router_id = self.router['id']
self.routers_client.add_router_interface(
router_id, subnet_id=self.new_subnet['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.routers_client.remove_router_interface,
router_id, subnet_id=self.new_subnet['id'])
self._check_network_internal_connectivity(network=self.new_net,
should_connect=True)
@ -553,7 +557,7 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
self.check_public_network_connectivity(should_connect=True)
floating_ip, server = self.floating_ip_tuple
ip_address = floating_ip.floating_ip_address
ip_address = floating_ip['floating_ip_address']
private_key = self._get_server_key(server)
ssh_client = self.get_remote_client(
ip_address, private_key=private_key)
@ -568,9 +572,11 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
act_serv=servers,
trgt_serv=dns_servers))
self.subnet.update(dns_nameservers=[alt_dns_server])
self.subnet = self.subnets_client.update_subnet(
self.subnet['id'], dns_nameservers=[alt_dns_server])['subnet']
# asserts that Neutron DB has updated the nameservers
self.assertEqual([alt_dns_server], self.subnet.dns_nameservers,
self.assertEqual([alt_dns_server], self.subnet['dns_nameservers'],
"Failed to update subnet's nameservers")
def check_new_dns_server():
@ -695,7 +701,8 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
# NOTE(kevinbenton): we have to use the admin credentials to check
# for the distributed flag because self.router only has a project view.
admin = self.admin_manager.routers_client.show_router(self.router.id)
admin = self.admin_manager.routers_client.show_router(
self.router['id'])
if admin['router'].get('distributed', False):
msg = "Rescheduling test does not apply to distributed routers."
raise self.skipException(msg)
@ -704,16 +711,16 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
# remove resource from agents
hosting_agents = set(a["id"] for a in
list_hosts(self.router.id)['agents'])
list_hosts(self.router['id'])['agents'])
no_migration = agent_list_alive == hosting_agents
LOG.info("Router will be assigned to {mig} hosting agent".
format(mig="the same" if no_migration else "a new"))
for hosting_agent in hosting_agents:
unschedule_router(hosting_agent, self.router.id)
unschedule_router(hosting_agent, self.router['id'])
self.assertNotIn(hosting_agent,
[a["id"] for a in
list_hosts(self.router.id)['agents']],
list_hosts(self.router['id'])['agents']],
'unscheduling router failed')
# verify resource is un-functional
@ -730,7 +737,7 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
router_id=self.router['id'])
self.assertEqual(
target_agent,
list_hosts(self.router.id)['agents'][0]['id'],
list_hosts(self.router['id'])['agents'][0]['id'],
"Router failed to reschedule. Hosting agent doesn't match "
"target agent")
@ -776,12 +783,12 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
network_id=self.new_net["id"])
spoof_port = new_ports[0]
private_key = self._get_server_key(server)
ssh_client = self.get_remote_client(fip.floating_ip_address,
ssh_client = self.get_remote_client(fip['floating_ip_address'],
private_key=private_key)
spoof_nic = ssh_client.get_nic_name_by_mac(spoof_port["mac_address"])
name = data_utils.rand_name('peer')
peer = self._create_server(name, self.new_net)
peer_address = peer['addresses'][self.new_net.name][0]['addr']
peer_address = peer['addresses'][self.new_net['name']][0]['addr']
self._check_remote_connectivity(ssh_client, dest=peer_address,
nic=spoof_nic, should_succeed=True)
ssh_client.set_mac_address(spoof_nic, spoof_mac)

View File

@ -17,6 +17,7 @@ import functools
import six
from tempest import config
from tempest.lib.common.utils import test_utils
from tempest.scenario import manager
from tempest import test
@ -82,8 +83,12 @@ class TestGettingAddress(manager.NetworkScenarioTest):
ip_version=4)
router = self._get_router(tenant_id=self.tenant_id)
sub4.add_to_router(router_id=router['id'])
self.addCleanup(sub4.delete)
self.routers_client.add_router_interface(router['id'],
subnet_id=sub4['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.routers_client.remove_router_interface,
router['id'], subnet_id=sub4['id'])
self.subnets_v6 = []
for _ in range(n_subnets6):
@ -94,10 +99,14 @@ class TestGettingAddress(manager.NetworkScenarioTest):
ipv6_ra_mode=address6_mode,
ipv6_address_mode=address6_mode)
sub6.add_to_router(router_id=router['id'])
self.addCleanup(sub6.delete)
self.subnets_v6.append(sub6)
self.routers_client.add_router_interface(router['id'],
subnet_id=sub6['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.routers_client.remove_router_interface,
router['id'], subnet_id=sub6['id'])
self.subnets_v6.append(sub6)
return [self.network, self.network_v6] if dualnet else [self.network]
@staticmethod
@ -119,12 +128,12 @@ class TestGettingAddress(manager.NetworkScenarioTest):
srv = self.create_server(
key_name=self.keypair['name'],
security_groups=[{'name': self.sec_grp['name']}],
networks=[{'uuid': n.id} for n in networks],
networks=[{'uuid': n['id']} for n in networks],
wait_until='ACTIVE')
fip = self.create_floating_ip(thing=srv)
ips = self.define_server_ips(srv=srv)
ssh = self.get_remote_client(
ip_address=fip.floating_ip_address,
ip_address=fip['floating_ip_address'],
username=username)
return ssh, ips, srv["id"]
@ -139,7 +148,7 @@ class TestGettingAddress(manager.NetworkScenarioTest):
"""
ports = [p["mac_address"] for p in
self._list_ports(device_id=sid,
network_id=self.network_v6.id)]
network_id=self.network_v6['id'])]
self.assertEqual(1, len(ports),
message=("Multiple IPv6 ports found on network %s. "
"ports: %s")
@ -190,11 +199,11 @@ class TestGettingAddress(manager.NetworkScenarioTest):
self._check_connectivity(sshv4_1,
ips_from_api_2['6'][i])
self._check_connectivity(sshv4_1,
self.subnets_v6[i].gateway_ip)
self.subnets_v6[i]['gateway_ip'])
self._check_connectivity(sshv4_2,
ips_from_api_1['6'][i])
self._check_connectivity(sshv4_2,
self.subnets_v6[i].gateway_ip)
self.subnets_v6[i]['gateway_ip'])
def _check_connectivity(self, source, dest):
self.assertTrue(

View File

@ -227,22 +227,23 @@ class TestSecurityGroupsBasicOps(manager.NetworkScenarioTest):
seen_names = [n['name'] for n in seen_nets]
seen_ids = [n['id'] for n in seen_nets]
self.assertIn(tenant.network.name, seen_names)
self.assertIn(tenant.network.id, seen_ids)
self.assertIn(tenant.network['name'], seen_names)
self.assertIn(tenant.network['id'], seen_ids)
seen_subnets = [(n['id'], n['cidr'], n['network_id'])
for n in self._list_subnets()]
mysubnet = (tenant.subnet.id, tenant.subnet.cidr, tenant.network.id)
mysubnet = (tenant.subnet['id'], tenant.subnet['cidr'],
tenant.network['id'])
self.assertIn(mysubnet, seen_subnets)
seen_routers = self._list_routers()
seen_router_ids = [n['id'] for n in seen_routers]
seen_router_names = [n['name'] for n in seen_routers]
self.assertIn(tenant.router.name, seen_router_names)
self.assertIn(tenant.router.id, seen_router_ids)
self.assertIn(tenant.router['name'], seen_router_names)
self.assertIn(tenant.router['id'], seen_router_ids)
myport = (tenant.router.id, tenant.subnet.id)
myport = (tenant.router['id'], tenant.subnet['id'])
router_ports = [(i['device_id'], i['fixed_ips'][0]['subnet_id']) for i
in self._list_ports()
if self._is_router_port(i)]
@ -270,7 +271,7 @@ class TestSecurityGroupsBasicOps(manager.NetworkScenarioTest):
kwargs["scheduler_hints"] = {'different_host': self.servers}
server = self.create_server(
name=name,
networks=[{'uuid': tenant.network.id}],
networks=[{'uuid': tenant.network["id"]}],
key_name=tenant.keypair['name'],
security_groups=security_groups_names,
wait_until='ACTIVE',
@ -353,10 +354,10 @@ class TestSecurityGroupsBasicOps(manager.NetworkScenarioTest):
def _get_server_ip(self, server, floating=False):
"""returns the ip (floating/internal) of a server"""
if floating:
server_ip = self.floating_ips[server['id']].floating_ip_address
server_ip = self.floating_ips[server['id']]['floating_ip_address']
else:
server_ip = None
network_name = self.tenants[server['tenant_id']].network.name
network_name = self.tenants[server['tenant_id']].network['name']
if network_name in server['addresses']:
server_ip = server['addresses'][network_name][0]['addr']
return server_ip
@ -364,7 +365,7 @@ class TestSecurityGroupsBasicOps(manager.NetworkScenarioTest):
def _connect_to_access_point(self, tenant):
"""create ssh connection to tenant access point"""
access_point_ssh = \
self.floating_ips[tenant.access_point['id']].floating_ip_address
self.floating_ips[tenant.access_point['id']]['floating_ip_address']
private_key = tenant.keypair['private_key']
access_point_ssh = self.get_remote_client(
access_point_ssh, private_key=private_key)
@ -388,7 +389,7 @@ class TestSecurityGroupsBasicOps(manager.NetworkScenarioTest):
def _test_in_tenant_allow(self, tenant):
ruleset = dict(
protocol='icmp',
remote_group_id=tenant.security_groups['default'].id,
remote_group_id=tenant.security_groups['default']['id'],
direction='ingress'
)
self._create_security_group_rule(
@ -464,7 +465,7 @@ class TestSecurityGroupsBasicOps(manager.NetworkScenarioTest):
for port in port_list if port['fixed_ips']
]
server_ip = self._get_server_ip(tenant.access_point)
subnet_id = tenant.subnet.id
subnet_id = tenant.subnet['id']
self.assertIn((subnet_id, server_ip, mac_addr), port_detail_list)
@test.idempotent_id('e79f879e-debb-440c-a7e4-efeda05b6848')
@ -545,7 +546,7 @@ class TestSecurityGroupsBasicOps(manager.NetworkScenarioTest):
# update port with new security group and check connectivity
self.ports_client.update_port(port_id, security_groups=[
new_tenant.security_groups['new_sg'].id])
new_tenant.security_groups['new_sg']['id']])
self._check_connectivity(
access_point=access_point_ssh,
ip=self._get_server_ip(server))