fullstack: test for IPv6 east-west traffic

This patch validates east west IPv6 traffic through legacy router.

Partial-Bug: #1583028
Change-Id: Id18065340d49dfd389d88cc625f5a80be8da9a50
This commit is contained in:
sridhargaddam 2016-05-26 17:39:58 +00:00
parent 6a04f417ac
commit 45d363241d
3 changed files with 55 additions and 24 deletions

View File

@ -13,8 +13,10 @@
# under the License.
#
import functools
import netaddr
import fixtures
from neutron_lib import constants
from neutronclient.common import exceptions
from neutron.extensions import portbindings
@ -68,14 +70,19 @@ class ClientFixture(fixtures.Fixture):
return self._create_resource(resource_type, spec)
def create_subnet(self, tenant_id, network_id,
cidr, gateway_ip=None, ip_version=4,
name=None, enable_dhcp=True):
cidr, gateway_ip=None, name=None, enable_dhcp=True,
ipv6_address_mode='slaac', ipv6_ra_mode='slaac'):
resource_type = 'subnet'
name = name or base.get_rand_name(prefix=resource_type)
ip_version = netaddr.IPNetwork(cidr).version
spec = {'tenant_id': tenant_id, 'network_id': network_id, 'name': name,
'cidr': cidr, 'ip_version': ip_version,
'enable_dhcp': enable_dhcp}
'cidr': cidr, 'enable_dhcp': enable_dhcp,
'ip_version': ip_version}
if ip_version == constants.IP_VERSION_6:
spec['ipv6_address_mode'] = ipv6_address_mode
spec['ipv6_ra_mode'] = ipv6_ra_mode
if gateway_ip:
spec['gateway_ip'] = gateway_ip

View File

@ -14,6 +14,8 @@
import netaddr
from neutron_lib import constants
from neutron.agent.linux import utils
from neutron.tests.common import machine_fixtures
from neutron.tests.common import net_helpers
@ -47,8 +49,17 @@ class FakeFullstackMachine(machine_fixtures.FakeMachineBase):
self.bridge, self.namespace, mac_address,
self.neutron_port['id'])).port
self._ip = self.neutron_port['fixed_ips'][0]['ip_address']
subnet_id = self.neutron_port['fixed_ips'][0]['subnet_id']
for fixed_ip in self.neutron_port['fixed_ips']:
self._configure_ipaddress(fixed_ip)
def _configure_ipaddress(self, fixed_ip):
if (netaddr.IPAddress(fixed_ip['ip_address']).version ==
constants.IP_VERSION_6):
# v6Address/default_route is auto-configured.
self._ipv6 = fixed_ip['ip_address']
else:
self._ip = fixed_ip['ip_address']
subnet_id = fixed_ip['subnet_id']
subnet = self.safe_client.client.show_subnet(subnet_id)
prefixlen = netaddr.IPNetwork(subnet['subnet']['cidr']).prefixlen
self._ip_cidr = '%s/%s' % (self._ip, prefixlen)
@ -60,6 +71,10 @@ class FakeFullstackMachine(machine_fixtures.FakeMachineBase):
if self.gateway_ip:
net_helpers.set_namespace_gateway(self.port, self.gateway_ip)
@property
def ipv6(self):
return self._ipv6
@property
def ip(self):
return self._ip

View File

@ -13,7 +13,9 @@
# under the License.
import functools
import netaddr
from neutron_lib import constants
from oslo_utils import uuidutils
from neutron.agent.l3 import agent as l3_agent
@ -47,19 +49,24 @@ class TestL3Agent(base.BaseFullStackTestCase):
return port['port']['status'] == 'ACTIVE'
utils.wait_until_true(lambda: is_port_status_active(), sleep=1)
def _create_net_subnet_and_vm(self, tenant_id, cidr, host, router):
def _create_net_subnet_and_vm(self, tenant_id, subnet_cidrs, host, router):
network = self.safe_client.create_network(tenant_id)
for cidr in subnet_cidrs:
# For IPv6 subnets, enable_dhcp should be set to true.
enable_dhcp = (netaddr.IPNetwork(cidr).version ==
constants.IP_VERSION_6)
subnet = self.safe_client.create_subnet(
tenant_id, network['id'], cidr, enable_dhcp=False)
tenant_id, network['id'], cidr, enable_dhcp=enable_dhcp)
router_interface_info = self.safe_client.add_router_interface(
router['id'], subnet['id'])
self.block_until_port_status_active(
router_interface_info['port_id'])
vm = self.useFixture(
machine.FakeFullstackMachine(
host, network['id'], tenant_id, self.safe_client))
vm.block_until_boot()
router_interface_info = self.safe_client.add_router_interface(
router['id'], subnet['id'])
self.block_until_port_status_active(router_interface_info['port_id'])
return vm
@ -101,13 +108,15 @@ class TestLegacyL3Agent(TestL3Agent):
router = self.safe_client.create_router(tenant_id)
vm1 = self._create_net_subnet_and_vm(
tenant_id, '20.0.0.0/24',
tenant_id, ['20.0.0.0/24', '2001:db8:aaaa::/64'],
self.environment.hosts[0], router)
vm2 = self._create_net_subnet_and_vm(
tenant_id, '21.0.0.0/24',
tenant_id, ['21.0.0.0/24', '2001:db8:bbbb::/64'],
self.environment.hosts[1], router)
vm1.block_until_ping(vm2.ip)
# Verify ping6 from vm2 to vm1 IPv6 Address
vm2.block_until_ping(vm1.ipv6)
def test_snat_and_floatingip(self):
# This function creates external network and boots an extrenal vm
@ -125,7 +134,7 @@ class TestLegacyL3Agent(TestL3Agent):
router = self.safe_client.create_router(tenant_id,
external_network=ext_net['id'])
vm = self._create_net_subnet_and_vm(
tenant_id, '20.0.0.0/24',
tenant_id, ['20.0.0.0/24'],
self.environment.hosts[1], router)
# ping external vm to test snat