Merge "[Tempest] Testing default security group scenarios"
This commit is contained in:
commit
5fc872023c
|
@ -12,6 +12,7 @@
|
|||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import subprocess
|
||||
|
||||
import netaddr
|
||||
from oslo_log import log
|
||||
|
@ -276,3 +277,41 @@ class BaseTempestTestCase(base_api.BaseNetworkTest):
|
|||
nic=None, mtu=None, fragmentation=True):
|
||||
self.assertTrue(self._check_remote_connectivity(
|
||||
source, dest, should_succeed, nic, mtu, fragmentation))
|
||||
|
||||
def ping_ip_address(self, ip_address, should_succeed=True,
|
||||
ping_timeout=None, mtu=None):
|
||||
# the code is taken from tempest/scenario/manager.py in tempest git
|
||||
timeout = ping_timeout or CONF.validation.ping_timeout
|
||||
cmd = ['ping', '-c1', '-w1']
|
||||
|
||||
if mtu:
|
||||
cmd += [
|
||||
# don't fragment
|
||||
'-M', 'do',
|
||||
# ping receives just the size of ICMP payload
|
||||
'-s', str(net_utils.get_ping_payload_size(mtu, 4))
|
||||
]
|
||||
cmd.append(ip_address)
|
||||
|
||||
def ping():
|
||||
proc = subprocess.Popen(cmd,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
proc.communicate()
|
||||
|
||||
return (proc.returncode == 0) == should_succeed
|
||||
|
||||
caller = test_utils.find_test_caller()
|
||||
LOG.debug('%(caller)s begins to ping %(ip)s in %(timeout)s sec and the'
|
||||
' expected result is %(should_succeed)s', {
|
||||
'caller': caller, 'ip': ip_address, 'timeout': timeout,
|
||||
'should_succeed':
|
||||
'reachable' if should_succeed else 'unreachable'
|
||||
})
|
||||
result = test_utils.call_until_true(ping, timeout, 1)
|
||||
LOG.debug('%(caller)s finishes ping %(ip)s in %(timeout)s sec and the '
|
||||
'ping result is %(result)s', {
|
||||
'caller': caller, 'ip': ip_address, 'timeout': timeout,
|
||||
'result': 'expected' if result else 'unexpected'
|
||||
})
|
||||
return result
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
# Copyright 2016 Red Hat, Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from tempest.common import waiters
|
||||
from tempest.lib import decorators
|
||||
|
||||
from neutron.tests.tempest.common import ssh
|
||||
from neutron.tests.tempest import config
|
||||
from neutron.tests.tempest.scenario import base
|
||||
from neutron.tests.tempest.scenario import constants
|
||||
|
||||
CONF = config.CONF
|
||||
|
||||
|
||||
class NetworkDefaultSecGroupTest(base.BaseTempestTestCase):
|
||||
credentials = ['primary', 'admin']
|
||||
required_extensions = ['router', 'security-group']
|
||||
|
||||
@classmethod
|
||||
def resource_setup(cls):
|
||||
super(NetworkDefaultSecGroupTest, cls).resource_setup()
|
||||
# setup basic topology for servers we can log into it
|
||||
cls.network = cls.create_network()
|
||||
cls.subnet = cls.create_subnet(cls.network)
|
||||
router = cls.create_router_by_client()
|
||||
cls.create_router_interface(router['id'], cls.subnet['id'])
|
||||
cls.keypair = cls.create_keypair()
|
||||
|
||||
def create_vm_default_sec_grp(self, num_servers=2):
|
||||
servers, fips, server_ssh_clients = ([], [], [])
|
||||
for i in range(num_servers):
|
||||
servers.append(self.create_server(
|
||||
flavor_ref=CONF.compute.flavor_ref,
|
||||
image_ref=CONF.compute.image_ref,
|
||||
key_name=self.keypair['name'],
|
||||
networks=[{'uuid': self.network['id']}]))
|
||||
for i, server in enumerate(servers):
|
||||
waiters.wait_for_server_status(
|
||||
self.os_primary.servers_client, server['server']['id'],
|
||||
constants.SERVER_STATUS_ACTIVE)
|
||||
port = self.client.list_ports(
|
||||
network_id=self.network['id'], device_id=server['server'][
|
||||
'id'])['ports'][0]
|
||||
fips.append(self.create_and_associate_floatingip(port['id']))
|
||||
server_ssh_clients.append(ssh.Client(
|
||||
fips[i]['floating_ip_address'], CONF.validation.image_ssh_user,
|
||||
pkey=self.keypair['private_key']))
|
||||
return server_ssh_clients, fips
|
||||
|
||||
@decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d764')
|
||||
def test_default_sec_grp_scenarios(self):
|
||||
server_ssh_clients, fips = self.create_vm_default_sec_grp()
|
||||
# Check ssh connectivity when you add sec group rule, enabling ssh
|
||||
self.create_loginable_secgroup_rule(
|
||||
self.os_primary.network_client.list_security_groups()[
|
||||
'security_groups'][0]['id']
|
||||
)
|
||||
self.check_connectivity(fips[0]['floating_ip_address'],
|
||||
CONF.validation.image_ssh_user,
|
||||
self.keypair['private_key'])
|
||||
|
||||
# make sure ICMP connectivity still fails as only ssh rule was added
|
||||
self.ping_ip_address(fips[0]['floating_ip_address'],
|
||||
should_succeed=False)
|
||||
|
||||
# Check ICMP connectivity between VMs without specific rule for that
|
||||
# It should work though the rule is not configured
|
||||
self.check_remote_connectivity(
|
||||
server_ssh_clients[0], fips[1]['fixed_ip_address'])
|
||||
|
||||
# Check ICMP connectivity from VM to external network
|
||||
subnets = self.os_admin.network_client.list_subnets(
|
||||
network_id=CONF.network.public_network_id)['subnets']
|
||||
ext_net_ip = None
|
||||
for subnet in subnets:
|
||||
if subnet['ip_version'] == 4:
|
||||
ext_net_ip = subnet['gateway_ip']
|
||||
break
|
||||
self.assertTrue(ext_net_ip)
|
||||
self.check_remote_connectivity(server_ssh_clients[0], ext_net_ip)
|
Loading…
Reference in New Issue