# Copyright 2012 OpenStack Foundation # Copyright 2013 Hewlett-Packard Development Company, L.P. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import collections import re from tempest.api.network import common as net_common from tempest.common import debug from tempest.common.utils import data_utils from tempest import config from tempest.openstack.common import log as logging from tempest.scenario import manager from tempest import test CONF = config.CONF LOG = logging.getLogger(__name__) Floating_IP_tuple = collections.namedtuple('Floating_IP_tuple', ['floating_ip', 'server']) class TestNetworkBasicOps(manager.NetworkScenarioTest): """ This smoke test suite assumes that Nova has been configured to boot VM's with Neutron-managed networking, and attempts to verify network connectivity as follows: There are presumed to be two types of networks: tenant and public. A tenant network may or may not be reachable from the Tempest host. A public network is assumed to be reachable from the Tempest host, and it should be possible to associate a public ('floating') IP address with a tenant ('fixed') IP address to facilitate external connectivity to a potentially unroutable tenant IP address. This test suite can be configured to test network connectivity to a VM via a tenant network, a public network, or both. If both networking types are to be evaluated, tests that need to be executed remotely on the VM (via ssh) will only be run against one of the networks (to minimize test execution time). Determine which types of networks to test as follows: * Configure tenant network checks (via the 'tenant_networks_reachable' key) if the Tempest host should have direct connectivity to tenant networks. This is likely to be the case if Tempest is running on the same host as a single-node devstack installation with IP namespaces disabled. * Configure checks for a public network if a public network has been configured prior to the test suite being run and if the Tempest host should have connectivity to that public network. Checking connectivity for a public network requires that a value be provided for 'public_network_id'. A value can optionally be provided for 'public_router_id' if tenants will use a shared router to access a public network (as is likely to be the case when IP namespaces are not enabled). If a value is not provided for 'public_router_id', a router will be created for each tenant and use the network identified by 'public_network_id' as its gateway. """ @classmethod def check_preconditions(cls): super(TestNetworkBasicOps, cls).check_preconditions() if not (CONF.network.tenant_networks_reachable or CONF.network.public_network_id): msg = ('Either tenant_networks_reachable must be "true", or ' 'public_network_id must be defined.') cls.enabled = False raise cls.skipException(msg) @classmethod def setUpClass(cls): super(TestNetworkBasicOps, cls).setUpClass() for ext in ['router', 'security-group']: if not test.is_extension_enabled(ext, 'network'): msg = "%s extension not enabled." % ext raise cls.skipException(msg) cls.check_preconditions() def cleanup_wrapper(self, resource): self.cleanup_resource(resource, self.__class__.__name__) def setUp(self): super(TestNetworkBasicOps, self).setUp() self.security_group = \ self._create_security_group_neutron(tenant_id=self.tenant_id) self.addCleanup(self.cleanup_wrapper, self.security_group) self.network, self.subnet, self.router = self._create_networks() for r in [self.network, self.router, self.subnet]: self.addCleanup(self.cleanup_wrapper, r) self.check_networks() self.servers = {} name = data_utils.rand_name('server-smoke') serv_dict = self._create_server(name, self.network) self.servers[serv_dict['server']] = serv_dict['keypair'] self._check_tenant_network_connectivity() self._create_and_associate_floating_ips() def check_networks(self): """ Checks that we see the newly created network/subnet/router via checking the result of list_[networks,routers,subnets] """ seen_nets = self._list_networks() seen_names = [n['name'] for n in seen_nets] seen_ids = [n['id'] for n in seen_nets] self.assertIn(self.network.name, seen_names) self.assertIn(self.network.id, seen_ids) seen_subnets = self._list_subnets() seen_net_ids = [n['network_id'] for n in seen_subnets] seen_subnet_ids = [n['id'] for n in seen_subnets] self.assertIn(self.network.id, seen_net_ids) self.assertIn(self.subnet.id, seen_subnet_ids) seen_routers = self._list_routers() seen_router_ids = [n['id'] for n in seen_routers] seen_router_names = [n['name'] for n in seen_routers] self.assertIn(self.router.name, seen_router_names) self.assertIn(self.router.id, seen_router_ids) def _create_server(self, name, network): keypair = self.create_keypair(name='keypair-%s' % name) self.addCleanup(self.cleanup_wrapper, keypair) security_groups = [self.security_group.name] create_kwargs = { 'nics': [ {'net-id': network.id}, ], 'key_name': keypair.name, 'security_groups': security_groups, } server = self.create_server(name=name, create_kwargs=create_kwargs) self.addCleanup(self.cleanup_wrapper, server) return dict(server=server, keypair=keypair) def _check_tenant_network_connectivity(self): if not CONF.network.tenant_networks_reachable: msg = 'Tenant networks not configured to be reachable.' LOG.info(msg) return # The target login is assumed to have been configured for # key-based authentication by cloud-init. ssh_login = CONF.compute.image_ssh_user try: for server, key in self.servers.iteritems(): for net_name, ip_addresses in server.networks.iteritems(): for ip_address in ip_addresses: self._check_vm_connectivity(ip_address, ssh_login, key.private_key) except Exception: LOG.exception('Tenant connectivity check failed') self._log_console_output(servers=self.servers.keys()) debug.log_net_debug() raise def _create_and_associate_floating_ips(self): public_network_id = CONF.network.public_network_id for server in self.servers.keys(): floating_ip = self._create_floating_ip(server, public_network_id) self.floating_ip_tuple = Floating_IP_tuple(floating_ip, server) self.addCleanup(self.cleanup_wrapper, floating_ip) def _check_public_network_connectivity(self, should_connect=True, msg=None): # The target login is assumed to have been configured for # key-based authentication by cloud-init. ssh_login = CONF.compute.image_ssh_user LOG.debug('checking network connections') floating_ip, server = self.floating_ip_tuple ip_address = floating_ip.floating_ip_address private_key = None if should_connect: private_key = self.servers[server].private_key try: self._check_vm_connectivity(ip_address, ssh_login, private_key, should_connect=should_connect) except Exception: ex_msg = 'Public network connectivity check failed' if msg: ex_msg += ": " + msg LOG.exception(ex_msg) self._log_console_output(servers=self.servers.keys()) debug.log_net_debug() raise def _disassociate_floating_ips(self): floating_ip, server = self.floating_ip_tuple self._disassociate_floating_ip(floating_ip) self.floating_ip_tuple = Floating_IP_tuple( floating_ip, None) def _reassociate_floating_ips(self): floating_ip, server = self.floating_ip_tuple name = data_utils.rand_name('new_server-smoke-') # create a new server for the floating ip serv_dict = self._create_server(name, self.network) self.servers[serv_dict['server']] = serv_dict['keypair'] self._associate_floating_ip(floating_ip, serv_dict['server']) self.floating_ip_tuple = Floating_IP_tuple( floating_ip, serv_dict['server']) def _create_new_network(self): self.new_net = self._create_network(self.tenant_id) self.addCleanup(self.cleanup_wrapper, self.new_net) self.new_subnet = self._create_subnet( network=self.new_net, gateway_ip=None) self.addCleanup(self.cleanup_wrapper, self.new_subnet) def _hotplug_server(self): old_floating_ip, server = self.floating_ip_tuple ip_address = old_floating_ip.floating_ip_address private_key = self.servers[server].private_key ssh_client = self.get_remote_client(ip_address, private_key=private_key) old_nic_list = self._get_server_nics(ssh_client) # get a port from a list of one item port_list = self._list_ports(device_id=server.id) self.assertEqual(1, len(port_list)) old_port = port_list[0] self.compute_client.servers.interface_attach(server=server, net_id=self.new_net.id, port_id=None, fixed_ip=None) # move server to the head of the cleanup list self.addCleanup(self.cleanup_wrapper, server) def check_ports(): port_list = [port for port in self._list_ports(device_id=server.id) if port != old_port] return len(port_list) == 1 test.call_until_true(check_ports, 60, 1) new_port_list = [p for p in self._list_ports(device_id=server.id) if p != old_port] self.assertEqual(1, len(new_port_list)) new_port = new_port_list[0] new_port = net_common.DeletablePort(client=self.network_client, **new_port) new_nic_list = self._get_server_nics(ssh_client) diff_list = [n for n in new_nic_list if n not in old_nic_list] self.assertEqual(1, len(diff_list)) num, new_nic = diff_list[0] ssh_client.assign_static_ip(nic=new_nic, addr=new_port.fixed_ips[0]['ip_address']) ssh_client.turn_nic_on(nic=new_nic) def _get_server_nics(self, ssh_client): reg = re.compile(r'(?P\d+): (?P\w+):') ipatxt = ssh_client.get_ip_list() return reg.findall(ipatxt) def _check_network_internal_connectivity(self, network): """ via ssh check VM internal connectivity: - ping internal gateway and DHCP port, implying in-tenant connectivity pinging both, because L3 and DHCP agents might be on different nodes """ floating_ip, server = self.floating_ip_tuple # get internal ports' ips: # get all network ports in the new network internal_ips = (p['fixed_ips'][0]['ip_address'] for p in self._list_ports(tenant_id=server.tenant_id, network_id=network.id) if p['device_owner'].startswith('network')) self._check_server_connectivity(floating_ip, internal_ips) def _check_network_external_connectivity(self): """ ping public network default gateway to imply external connectivity """ if not CONF.network.public_network_id: msg = 'public network not defined.' LOG.info(msg) return subnet = self.network_client.list_subnets( network_id=CONF.network.public_network_id)['subnets'] self.assertEqual(1, len(subnet), "Found %d subnets" % len(subnet)) external_ips = [subnet[0]['gateway_ip']] self._check_server_connectivity(self.floating_ip_tuple.floating_ip, external_ips) def _check_server_connectivity(self, floating_ip, address_list): ip_address = floating_ip.floating_ip_address private_key = self.servers[self.floating_ip_tuple.server].private_key ssh_source = self._ssh_to_server(ip_address, private_key) for remote_ip in address_list: try: self.assertTrue(self._check_remote_connectivity(ssh_source, remote_ip), "Timed out waiting for %s to become " "reachable" % remote_ip) except Exception: LOG.exception("Unable to access {dest} via ssh to " "floating-ip {src}".format(dest=remote_ip, src=floating_ip)) debug.log_ip_ns() raise @test.attr(type='smoke') @test.services('compute', 'network') def test_network_basic_ops(self): """ For a freshly-booted VM with an IP address ("port") on a given network: - the Tempest host can ping the IP address. This implies, but does not guarantee (see the ssh check that follows), that the VM has been assigned the correct IP address and has connectivity to the Tempest host. - the Tempest host can perform key-based authentication to an ssh server hosted at the IP address. This check guarantees that the IP address is associated with the target VM. - the Tempest host can ssh into the VM via the IP address and successfully execute the following: - ping an external IP address, implying external connectivity. - ping an external hostname, implying that dns is correctly configured. - ping an internal IP address, implying connectivity to another VM on the same network. - detach the floating-ip from the VM and verify that it becomes unreachable - associate detached floating ip to a new VM and verify connectivity. VMs are created with unique keypair so connectivity also asserts that floating IP is associated with the new VM instead of the old one """ self._check_public_network_connectivity(should_connect=True) self._check_network_internal_connectivity(network=self.network) self._check_network_external_connectivity() self._disassociate_floating_ips() self._check_public_network_connectivity(should_connect=False, msg="after disassociate " "floating ip") self._reassociate_floating_ips() self._check_public_network_connectivity(should_connect=True, msg="after re-associate " "floating ip") @test.attr(type='smoke') @test.services('compute', 'network') def test_hotplug_nic(self): """ 1. create a new network, with no gateway (to prevent overwriting VM's gateway) 2. connect VM to new network 3. set static ip and bring new nic up 4. check VM can ping new network dhcp port """ self._check_public_network_connectivity(should_connect=True) self._create_new_network() self._hotplug_server() self._check_network_internal_connectivity(network=self.new_net)