Merge "Adds scenario for IPv6 addresses"

This commit is contained in:
Jenkins
2014-12-11 08:59:59 +00:00
committed by Gerrit Code Review
3 changed files with 176 additions and 10 deletions

View File

@@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
import re
import time
@@ -87,7 +88,9 @@ class RemoteClient():
return self.exec_command(cmd)
def ping_host(self, host):
cmd = 'ping -c1 -w1 %s' % host
addr = netaddr.IPAddress(host)
cmd = 'ping6' if addr.version == 6 else 'ping'
cmd += ' -c1 -w1 {0}'.format(host)
return self.exec_command(cmd)
def get_mac_address(self):

View File

@@ -606,22 +606,31 @@ class NetworkScenarioTest(ScenarioTest):
cidr_in_use = self._list_subnets(tenant_id=tenant_id, cidr=cidr)
return len(cidr_in_use) != 0
tenant_cidr = netaddr.IPNetwork(CONF.network.tenant_network_cidr)
ip_version = kwargs.pop('ip_version', 4)
if ip_version == 6:
tenant_cidr = netaddr.IPNetwork(
CONF.network.tenant_network_v6_cidr)
num_bits = CONF.network.tenant_network_v6_mask_bits
else:
tenant_cidr = netaddr.IPNetwork(CONF.network.tenant_network_cidr)
num_bits = CONF.network.tenant_network_mask_bits
result = None
str_cidr = None
# Repeatedly attempt subnet creation with sequential cidr
# blocks until an unallocated block is found.
for subnet_cidr in tenant_cidr.subnet(
CONF.network.tenant_network_mask_bits):
for subnet_cidr in tenant_cidr.subnet(num_bits):
str_cidr = str(subnet_cidr)
if cidr_in_use(str_cidr, tenant_id=network.tenant_id):
continue
subnet = dict(
name=data_utils.rand_name(namestart),
ip_version=4,
network_id=network.id,
tenant_id=network.tenant_id,
cidr=str_cidr,
ip_version=ip_version,
**kwargs
)
try:
@@ -652,12 +661,17 @@ class NetworkScenarioTest(ScenarioTest):
self.addCleanup(self.delete_wrapper, port.delete)
return port
def _get_server_port_id(self, server, ip_addr=None):
def _get_server_port_id_and_ip4(self, server, ip_addr=None):
ports = self._list_ports(device_id=server['id'],
fixed_ip=ip_addr)
self.assertEqual(len(ports), 1,
"Unable to determine which port to target.")
return ports[0]['id']
# it might happen here that this port has more then one ip address
# as in case of dual stack- when this port is created on 2 subnets
for ip46 in ports[0]['fixed_ips']:
ip = ip46['ip_address']
if netaddr.valid_ipv4(ip):
return ports[0]['id'], ip
def _get_network_by_name(self, network_name):
net = self._list_networks(name=network_name)
@@ -673,11 +687,14 @@ class NetworkScenarioTest(ScenarioTest):
if not client:
client = self.network_client
if not port_id:
port_id = self._get_server_port_id(thing)
port_id, ip4 = self._get_server_port_id_and_ip4(thing)
else:
ip4 = None
_, result = client.create_floatingip(
floating_network_id=external_network_id,
port_id=port_id,
tenant_id=thing['tenant_id']
tenant_id=thing['tenant_id'],
fixed_ip_address=ip4
)
floating_ip = net_resources.DeletableFloatingIp(
client=client,
@@ -686,7 +703,7 @@ class NetworkScenarioTest(ScenarioTest):
return floating_ip
def _associate_floating_ip(self, floating_ip, server):
port_id = self._get_server_port_id(server)
port_id, _ = self._get_server_port_id_and_ip4(server)
floating_ip.update(port_id=port_id)
self.assertEqual(port_id, floating_ip.port_id)
return floating_ip

View File

@@ -0,0 +1,146 @@
# Copyright 2014 Cisco Systems, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from tempest import config
from tempest.openstack.common import log as logging
from tempest.scenario import manager
from tempest import test
CONF = config.CONF
LOG = logging.getLogger(__name__)
class TestGettingAddress(manager.NetworkScenarioTest):
"""Create network with 2 subnets: IPv4 and IPv6 in a given address mode
Boot 2 VMs on this network
Allocate and assign 2 FIP4
Check that vNIC of server matches port data from OpenStack DB
Ping4 tenant IPv4 of one VM from another one
Will do the same with ping6 when available in VM
"""
@classmethod
def resource_setup(cls):
# Create no network resources for these tests.
cls.set_network_resources()
super(TestGettingAddress, cls).resource_setup()
@classmethod
def check_preconditions(cls):
if not (CONF.network_feature_enabled.ipv6
and CONF.network_feature_enabled.ipv6_subnet_attributes):
cls.enabled = False
raise cls.skipException('IPv6 or its attributes not supported')
if not (CONF.network.tenant_networks_reachable
or CONF.network.public_network_id):
msg = ('Either tenant_networks_reachable must be "true", or '
'public_network_id must be defined.')
cls.enabled = False
raise cls.skipException(msg)
super(TestGettingAddress, cls).check_preconditions()
def setUp(self):
super(TestGettingAddress, self).setUp()
self.keypair = self.create_keypair()
self.sec_grp = self._create_security_group(tenant_id=self.tenant_id)
self.srv_kwargs = {
'key_name': self.keypair['name'],
'security_groups': [self.sec_grp]}
def prepare_network(self, address6_mode):
"""Creates network with
one IPv6 subnet in the given mode and
one IPv4 subnet
Creates router with ports on both subnets
"""
net = self._create_network(tenant_id=self.tenant_id)
sub4 = self._create_subnet(network=net,
namestart='sub4',
ip_version=4,)
# since https://bugs.launchpad.net/neutron/+bug/1394112 we need
# to specify gateway_ip manually
net_range = netaddr.IPNetwork(CONF.network.tenant_network_v6_cidr)
gateway_ip = (netaddr.IPAddress(net_range) + 1).format()
sub6 = self._create_subnet(network=net,
namestart='sub6',
ip_version=6,
gateway_ip=gateway_ip,
ipv6_ra_mode=address6_mode,
ipv6_address_mode=address6_mode)
router = self._get_router(tenant_id=self.tenant_id)
sub4.add_to_router(router_id=router['id'])
sub6.add_to_router(router_id=router['id'])
self.addCleanup(sub4.delete)
self.addCleanup(sub6.delete)
@staticmethod
def define_server_ips(srv):
for net_name, nics in srv['addresses'].iteritems():
for nic in nics:
if nic['version'] == 6:
srv['accessIPv6'] = nic['addr']
else:
srv['accessIPv4'] = nic['addr']
def prepare_server(self):
username = CONF.compute.image_ssh_user
srv = self.create_server(create_kwargs=self.srv_kwargs)
fip = self.create_floating_ip(thing=srv)
self.define_server_ips(srv=srv)
ssh = self.get_remote_client(
server_or_ip=fip.floating_ip_address,
username=username)
return ssh, srv
def _prepare_and_test(self, address6_mode):
self.prepare_network(address6_mode=address6_mode)
ssh1, srv1 = self.prepare_server()
ssh2, srv2 = self.prepare_server()
result = ssh1.get_ip_list()
self.assertIn(srv1['accessIPv4'], result)
# v6 should be configured since the image supports it
self.assertIn(srv1['accessIPv6'], result)
result = ssh2.get_ip_list()
self.assertIn(srv2['accessIPv4'], result)
# v6 should be configured since the image supports it
self.assertIn(srv2['accessIPv6'], result)
result = ssh1.ping_host(srv2['accessIPv4'])
self.assertIn('0% packet loss', result)
result = ssh2.ping_host(srv1['accessIPv4'])
self.assertIn('0% packet loss', result)
# Some VM (like cirros) may not have ping6 utility
result = ssh1.exec_command('whereis ping6')
is_ping6 = False if result == 'ping6:\n' else True
if is_ping6:
result = ssh1.ping_host(srv2['accessIPv6'])
self.assertIn('0% packet loss', result)
result = ssh2.ping_host(srv1['accessIPv6'])
self.assertIn('0% packet loss', result)
else:
LOG.warning('Ping6 is not available, skipping')
@test.services('compute', 'network')
def test_slaac_from_os(self):
self._prepare_and_test(address6_mode='slaac')
@test.services('compute', 'network')
def test_dhcp6_stateless_from_os(self):
self._prepare_and_test(address6_mode='dhcpv6-stateless')