Merge "[fwaas] Remove secgroup creation related methods"
This commit is contained in:
commit
b02a11f3f7
|
@ -455,176 +455,6 @@ class NetworkScenarioTest(ScenarioTest):
|
|||
floating_ip['id'])
|
||||
return floating_ip
|
||||
|
||||
def _create_security_group(self, security_group_rules_client=None,
|
||||
tenant_id=None,
|
||||
namestart='secgroup-smoke',
|
||||
security_groups_client=None):
|
||||
if security_group_rules_client is None:
|
||||
security_group_rules_client = self.security_group_rules_client
|
||||
if security_groups_client is None:
|
||||
security_groups_client = self.security_groups_client
|
||||
if tenant_id is None:
|
||||
tenant_id = security_groups_client.tenant_id
|
||||
secgroup = self._create_empty_security_group(
|
||||
namestart=namestart, client=security_groups_client,
|
||||
tenant_id=tenant_id)
|
||||
|
||||
# Add rules to the security group
|
||||
rules = self._create_loginable_secgroup_rule(
|
||||
security_group_rules_client=security_group_rules_client,
|
||||
secgroup=secgroup,
|
||||
security_groups_client=security_groups_client)
|
||||
for rule in rules:
|
||||
self.assertEqual(tenant_id, rule['tenant_id'])
|
||||
self.assertEqual(secgroup['id'], rule['security_group_id'])
|
||||
return secgroup
|
||||
|
||||
def _create_empty_security_group(self, client=None, tenant_id=None,
|
||||
namestart='secgroup-smoke'):
|
||||
"""Create a security group without rules.
|
||||
|
||||
Default rules will be created:
|
||||
- IPv4 egress to any
|
||||
- IPv6 egress to any
|
||||
|
||||
:param tenant_id: secgroup will be created in this tenant
|
||||
:returns: the created security group
|
||||
"""
|
||||
if client is None:
|
||||
client = self.security_groups_client
|
||||
if not tenant_id:
|
||||
tenant_id = client.tenant_id
|
||||
sg_name = data_utils.rand_name(namestart)
|
||||
sg_desc = sg_name + " description"
|
||||
sg_dict = dict(name=sg_name,
|
||||
description=sg_desc)
|
||||
sg_dict['tenant_id'] = tenant_id
|
||||
result = client.create_security_group(**sg_dict)
|
||||
|
||||
secgroup = result['security_group']
|
||||
self.assertEqual(secgroup['name'], sg_name)
|
||||
self.assertEqual(tenant_id, secgroup['tenant_id'])
|
||||
self.assertEqual(secgroup['description'], sg_desc)
|
||||
|
||||
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
||||
client.delete_security_group, secgroup['id'])
|
||||
return secgroup
|
||||
|
||||
def _default_security_group(self, client=None, tenant_id=None):
|
||||
"""Get default secgroup for given tenant_id.
|
||||
|
||||
:returns: default secgroup for given tenant
|
||||
"""
|
||||
if client is None:
|
||||
client = self.security_groups_client
|
||||
if not tenant_id:
|
||||
tenant_id = client.tenant_id
|
||||
sgs = [
|
||||
sg for sg in list(client.list_security_groups().values())[0]
|
||||
if sg['tenant_id'] == tenant_id and sg['name'] == 'default'
|
||||
]
|
||||
msg = "No default security group for tenant %s." % (tenant_id)
|
||||
self.assertGreater(len(sgs), 0, msg)
|
||||
return sgs[0]
|
||||
|
||||
def _create_security_group_rule(self, secgroup=None,
|
||||
sec_group_rules_client=None,
|
||||
tenant_id=None,
|
||||
security_groups_client=None, **kwargs):
|
||||
"""Create a rule from a dictionary of rule parameters.
|
||||
|
||||
Create a rule in a secgroup. if secgroup not defined will search for
|
||||
default secgroup in tenant_id.
|
||||
|
||||
:param secgroup: the security group.
|
||||
:param tenant_id: if secgroup not passed -- the tenant in which to
|
||||
search for default secgroup
|
||||
:param kwargs: a dictionary containing rule parameters:
|
||||
for example, to allow incoming ssh:
|
||||
rule = {
|
||||
direction: 'ingress'
|
||||
protocol:'tcp',
|
||||
port_range_min: 22,
|
||||
port_range_max: 22
|
||||
}
|
||||
"""
|
||||
if sec_group_rules_client is None:
|
||||
sec_group_rules_client = self.security_group_rules_client
|
||||
if security_groups_client is None:
|
||||
security_groups_client = self.security_groups_client
|
||||
if not tenant_id:
|
||||
tenant_id = security_groups_client.tenant_id
|
||||
if secgroup is None:
|
||||
secgroup = self._default_security_group(
|
||||
client=security_groups_client, tenant_id=tenant_id)
|
||||
|
||||
ruleset = dict(security_group_id=secgroup['id'],
|
||||
tenant_id=secgroup['tenant_id'])
|
||||
ruleset.update(kwargs)
|
||||
|
||||
sg_rule = sec_group_rules_client.create_security_group_rule(**ruleset)
|
||||
sg_rule = sg_rule['security_group_rule']
|
||||
|
||||
self.assertEqual(secgroup['tenant_id'], sg_rule['tenant_id'])
|
||||
self.assertEqual(secgroup['id'], sg_rule['security_group_id'])
|
||||
|
||||
return sg_rule
|
||||
|
||||
def _create_loginable_secgroup_rule(self, security_group_rules_client=None,
|
||||
secgroup=None,
|
||||
security_groups_client=None):
|
||||
"""Create loginable security group rule
|
||||
|
||||
This function will create:
|
||||
1. egress and ingress tcp port 22 allow rule in order to allow ssh
|
||||
access for ipv4.
|
||||
2. egress and ingress ipv6 icmp allow rule, in order to allow icmpv6.
|
||||
3. egress and ingress ipv4 icmp allow rule, in order to allow icmpv4.
|
||||
"""
|
||||
|
||||
if security_group_rules_client is None:
|
||||
security_group_rules_client = self.security_group_rules_client
|
||||
if security_groups_client is None:
|
||||
security_groups_client = self.security_groups_client
|
||||
rules = []
|
||||
rulesets = [
|
||||
dict(
|
||||
# ssh
|
||||
protocol='tcp',
|
||||
port_range_min=22,
|
||||
port_range_max=22,
|
||||
),
|
||||
dict(
|
||||
# ping
|
||||
protocol='icmp',
|
||||
),
|
||||
dict(
|
||||
# ipv6-icmp for ping6
|
||||
protocol='icmp',
|
||||
ethertype='IPv6',
|
||||
)
|
||||
]
|
||||
sec_group_rules_client = security_group_rules_client
|
||||
for ruleset in rulesets:
|
||||
for r_direction in ['ingress', 'egress']:
|
||||
ruleset['direction'] = r_direction
|
||||
try:
|
||||
sg_rule = self._create_security_group_rule(
|
||||
sec_group_rules_client=sec_group_rules_client,
|
||||
secgroup=secgroup,
|
||||
security_groups_client=security_groups_client,
|
||||
**ruleset)
|
||||
except lib_exc.Conflict as ex:
|
||||
# if rule already exist - skip rule and continue
|
||||
msg = 'Security group rule already exists'
|
||||
if msg not in ex._error_string:
|
||||
raise ex
|
||||
else:
|
||||
self.assertEqual(r_direction, sg_rule['direction'])
|
||||
rules.append(sg_rule)
|
||||
|
||||
return rules
|
||||
|
||||
def _create_router(self, client=None, tenant_id=None,
|
||||
namestart='router-smoke'):
|
||||
if not client:
|
||||
|
|
|
@ -196,7 +196,7 @@ class TestFWaaS_v2(base.FWaaSScenarioTest_V2):
|
|||
resp['router_portid_2'] = router_portid_2
|
||||
|
||||
# Create a VM on each of the network and assign it a floating IP.
|
||||
security_group = self._create_security_group()
|
||||
security_group = self.create_security_group()
|
||||
server1, private_key1, server_fixed_ip_1, server_floating_ip_1 = (
|
||||
self._create_test_server(network1, security_group))
|
||||
server2, private_key2, server_fixed_ip_2, server_floating_ip_2 = (
|
||||
|
|
Loading…
Reference in New Issue