Support Neutron security groups in scenario testing

The methods
NetworkScenarioTest._create_security_group()
and
NetworkScenarioTest.create_loginable_secgroup_rule()
use the nova client by default.

Added helper functions "*_neutron" that use the neutron (network)
client.
Moved old implementation into "*_nova" for old usage.

Adds methods to scenario/manager to allow for generic creation of
neutron secgroup and rules as existing methods only create a secgroup
with login rules (incoming ping and ssh).

Change-Id: I6de4bf9f7deed215cf61c87c3ae3f4240982523e
This commit is contained in:
Yair Fried
2013-10-10 13:18:16 +03:00
parent e60b945d08
commit eb69f3fc9e
9 changed files with 193 additions and 24 deletions

View File

@@ -94,3 +94,18 @@ class DeletablePort(DeletableResource):
def delete(self):
self.client.delete_port(self.id)
class DeletableSecurityGroup(DeletableResource):
def delete(self):
self.client.delete_security_group(self.id)
class DeletableSecurityGroupRule(DeletableResource):
def __repr__(self):
return '<%s id="%s">' % (self.__class__.__name__, self.id)
def delete(self):
self.client.delete_security_group_rule(self.id)

View File

@@ -360,7 +360,8 @@ class OfficialClientTest(tempest.test.BaseTestCase):
to become %s" % (thing_id, log_status)
raise exceptions.TimeoutException(message)
def create_loginable_secgroup_rule(self, client=None, secgroup_id=None):
def _create_loginable_secgroup_rule_nova(self, client=None,
secgroup_id=None):
if client is None:
client = self.compute_client
if secgroup_id is None:
@@ -389,10 +390,13 @@ class OfficialClientTest(tempest.test.BaseTestCase):
'cidr': '0.0.0.0/0',
}
]
rules = list()
for ruleset in rulesets:
sg_rule = client.security_group_rules.create(secgroup_id,
**ruleset)
self.set_resource(sg_rule.id, sg_rule)
rules.append(sg_rule)
return rules
def create_server(self, client=None, name=None, image=None, flavor=None,
create_kwargs={}):
@@ -506,22 +510,6 @@ class NetworkScenarioTest(OfficialClientTest):
cls.config.identity.password,
cls.config.identity.tenant_name).tenant_id
def _create_security_group(self, client=None, namestart='secgroup-smoke-'):
if client is None:
client = self.compute_client
# Create security group
sg_name = data_utils.rand_name(namestart)
sg_desc = sg_name + " description"
secgroup = client.security_groups.create(sg_name, sg_desc)
self.assertEqual(secgroup.name, sg_name)
self.assertEqual(secgroup.description, sg_desc)
self.set_resource(sg_name, secgroup)
# Add rules to the security group
self.create_loginable_secgroup_rule(client, secgroup.id)
return secgroup
def _create_network(self, tenant_id, namestart='network-smoke-'):
name = data_utils.rand_name(namestart)
body = dict(
@@ -648,6 +636,171 @@ class NetworkScenarioTest(OfficialClientTest):
'Auth failure in connecting to %s@%s via ssh' %
(username, ip_address))
def _create_security_group_nova(self, client=None,
namestart='secgroup-smoke-',
tenant_id=None):
if client is None:
client = self.compute_client
# Create security group
sg_name = data_utils.rand_name(namestart)
sg_desc = sg_name + " description"
secgroup = client.security_groups.create(sg_name, sg_desc)
self.assertEqual(secgroup.name, sg_name)
self.assertEqual(secgroup.description, sg_desc)
self.set_resource(sg_name, secgroup)
# Add rules to the security group
self._create_loginable_secgroup_rule_nova(client, secgroup.id)
return secgroup
def _create_security_group_neutron(self, tenant_id, client=None,
namestart='secgroup-smoke-'):
if client is None:
client = self.network_client
secgroup = self._create_empty_security_group(namestart=namestart,
client=client,
tenant_id=tenant_id)
# Add rules to the security group
rules = self._create_loginable_secgroup_rule_neutron(secgroup=secgroup)
for rule in rules:
self.assertEqual(tenant_id, rule.tenant_id)
self.assertEqual(secgroup.id, rule.security_group_id)
return secgroup
def _create_empty_security_group(self, tenant_id, client=None,
namestart='secgroup-smoke-'):
"""Create a security group without rules.
Default rules will be created:
- IPv4 egress to any
- IPv6 egress to any
:param tenant_id: secgroup will be created in this tenant
:returns: DeletableSecurityGroup -- containing the secgroup created
"""
if client is None:
client = self.network_client
sg_name = data_utils.rand_name(namestart)
sg_desc = sg_name + " description"
sg_dict = dict(name=sg_name,
description=sg_desc)
sg_dict['tenant_id'] = tenant_id
body = dict(security_group=sg_dict)
result = client.create_security_group(body=body)
secgroup = net_common.DeletableSecurityGroup(
client=client,
**result['security_group']
)
self.assertEqual(secgroup.name, sg_name)
self.assertEqual(tenant_id, secgroup.tenant_id)
self.assertEqual(secgroup.description, sg_desc)
self.set_resource(sg_name, secgroup)
return secgroup
def _default_security_group(self, tenant_id, client=None):
"""Get default secgroup for given tenant_id.
:returns: DeletableSecurityGroup -- default secgroup for given tenant
"""
if client is None:
client = self.network_client
sgs = [
sg for sg in client.list_security_groups().values()[0]
if sg['tenant_id'] == tenant_id and sg['name'] == 'default'
]
msg = "No default security group for tenant %s." % (tenant_id)
self.assertTrue(len(sgs) > 0, msg)
if len(sgs) > 1:
msg = "Found %d default security groups" % len(sgs)
raise exc.NeutronClientNoUniqueMatch(msg=msg)
return net_common.DeletableSecurityGroup(client=client,
**sgs[0])
def _create_security_group_rule(self, client=None, secgroup=None,
tenant_id=None, **kwargs):
"""Create a rule from a dictionary of rule parameters.
Create a rule in a secgroup. if secgroup not defined will search for
default secgroup in tenant_id.
:param secgroup: type DeletableSecurityGroup.
:param secgroup_id: search for secgroup by id
default -- choose default secgroup for given tenant_id
:param tenant_id: if secgroup not passed -- the tenant in which to
search for default secgroup
:param kwargs: a dictionary containing rule parameters:
for example, to allow incoming ssh:
rule = {
direction: 'ingress'
protocol:'tcp',
port_range_min: 22,
port_range_max: 22
}
"""
if client is None:
client = self.network_client
if secgroup is None:
secgroup = self._default_security_group(tenant_id)
ruleset = dict(security_group_id=secgroup.id,
tenant_id=secgroup.tenant_id,
)
ruleset.update(kwargs)
body = dict(security_group_rule=dict(ruleset))
sg_rule = client.create_security_group_rule(body=body)
sg_rule = net_common.DeletableSecurityGroupRule(
client=client,
**sg_rule['security_group_rule']
)
self.set_resource(sg_rule.id, sg_rule)
self.assertEqual(secgroup.tenant_id, sg_rule.tenant_id)
self.assertEqual(secgroup.id, sg_rule.security_group_id)
return sg_rule
def _create_loginable_secgroup_rule_neutron(self, client=None,
secgroup=None):
"""These rules are intended to permit inbound ssh and icmp
traffic from all sources, so no group_id is provided.
Setting a group_id would only permit traffic from ports
belonging to the same security group.
"""
if client is None:
client = self.network_client
rules = []
rulesets = [
dict(
# ssh
protocol='tcp',
port_range_min=22,
port_range_max=22,
),
dict(
# ping
protocol='icmp',
)
]
for ruleset in rulesets:
for r_direction in ['ingress', 'egress']:
ruleset['direction'] = r_direction
try:
sg_rule = self._create_security_group_rule(
client=client, secgroup=secgroup, **ruleset)
except exc.NeutronClientException as ex:
# if rule already exist - skip rule and continue
if not (ex.status_code is 409 and 'Security group rule'
' already exists' in ex.message):
raise ex
else:
self.assertEqual(r_direction, sg_rule.direction)
rules.append(sg_rule)
return rules
class OrchestrationScenarioTest(OfficialClientTest):
"""

View File

@@ -85,7 +85,7 @@ class TestLargeOpsScenario(manager.NetworkScenarioTest):
name = data_utils.rand_name('scenario-server-')
client = self.compute_client
flavor_id = self.config.compute.flavor_ref
secgroup = self._create_security_group()
secgroup = self._create_security_group_nova()
self.servers = client.servers.create(
name=name, image=self.image,
flavor=flavor_id,

View File

@@ -161,7 +161,7 @@ class TestMinimumBasicScenario(manager.OfficialClientTest):
self.nova_floating_ip_create()
self.nova_floating_ip_add()
self.create_loginable_secgroup_rule()
self._create_loginable_secgroup_rule_nova()
self.ssh_to_server()
self.check_partitions()

View File

@@ -164,7 +164,8 @@ class TestNetworkBasicOps(manager.NetworkScenarioTest):
name=data_utils.rand_name('keypair-smoke-'))
def _create_security_groups(self):
self.security_groups[self.tenant_id] = self._create_security_group()
self.security_groups[self.tenant_id] =\
self._create_security_group_neutron(tenant_id=self.tenant_id)
def _create_networks(self):
network = self._create_network(self.tenant_id)

View File

@@ -50,7 +50,7 @@ class TestServerBasicOps(manager.OfficialClientTest):
self.set_resource('secgroup', self.secgroup)
# Add rules to the security group
self.create_loginable_secgroup_rule(secgroup_id=self.secgroup.id)
self._create_loginable_secgroup_rule_nova(secgroup_id=self.secgroup.id)
def boot_instance(self):
create_kwargs = {

View File

@@ -65,7 +65,7 @@ class TestSnapshotPattern(manager.OfficialClientTest):
def test_snapshot_pattern(self):
# prepare for booting a instance
self._add_keypair()
self.create_loginable_secgroup_rule()
self._create_loginable_secgroup_rule_nova()
# boot a instance and create a timestamp file in it
server = self._boot_image(self.config.compute.image_ref)

View File

@@ -146,7 +146,7 @@ class TestStampPattern(manager.OfficialClientTest):
def test_stamp_pattern(self):
# prepare for booting a instance
self._add_keypair()
self.create_loginable_secgroup_rule()
self._create_loginable_secgroup_rule_nova()
# boot an instance and create a timestamp file in it
volume = self._create_volume()

View File

@@ -120,7 +120,7 @@ class TestVolumeBootPattern(manager.OfficialClientTest):
@services('compute', 'volume', 'image')
def test_volume_boot_pattern(self):
keypair = self.create_keypair()
self.create_loginable_secgroup_rule()
self._create_loginable_secgroup_rule_nova()
# create an instance from volume
volume_origin = self._create_volume_from_image()