Merge "[sfc] Remove unused sec groups methods"
This commit is contained in:
commit
36a231e229
|
@ -541,176 +541,6 @@ class NetworkScenarioTest(ScenarioTest):
|
||||||
self._log_net_info(e)
|
self._log_net_info(e)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def _create_security_group(self, security_group_rules_client=None,
|
|
||||||
tenant_id=None,
|
|
||||||
namestart='secgroup-smoke',
|
|
||||||
security_groups_client=None):
|
|
||||||
if security_group_rules_client is None:
|
|
||||||
security_group_rules_client = self.security_group_rules_client
|
|
||||||
if security_groups_client is None:
|
|
||||||
security_groups_client = self.security_groups_client
|
|
||||||
if tenant_id is None:
|
|
||||||
tenant_id = security_groups_client.tenant_id
|
|
||||||
secgroup = self._create_empty_security_group(
|
|
||||||
namestart=namestart, client=security_groups_client,
|
|
||||||
tenant_id=tenant_id)
|
|
||||||
|
|
||||||
# Add rules to the security group
|
|
||||||
rules = self._create_loginable_secgroup_rule(
|
|
||||||
security_group_rules_client=security_group_rules_client,
|
|
||||||
secgroup=secgroup,
|
|
||||||
security_groups_client=security_groups_client)
|
|
||||||
for rule in rules:
|
|
||||||
self.assertEqual(tenant_id, rule['tenant_id'])
|
|
||||||
self.assertEqual(secgroup['id'], rule['security_group_id'])
|
|
||||||
return secgroup
|
|
||||||
|
|
||||||
def _create_empty_security_group(self, client=None, tenant_id=None,
|
|
||||||
namestart='secgroup-smoke'):
|
|
||||||
"""Create a security group without rules.
|
|
||||||
|
|
||||||
Default rules will be created:
|
|
||||||
- IPv4 egress to any
|
|
||||||
- IPv6 egress to any
|
|
||||||
|
|
||||||
:param tenant_id: secgroup will be created in this tenant
|
|
||||||
:returns: the created security group
|
|
||||||
"""
|
|
||||||
if client is None:
|
|
||||||
client = self.security_groups_client
|
|
||||||
if not tenant_id:
|
|
||||||
tenant_id = client.tenant_id
|
|
||||||
sg_name = data_utils.rand_name(namestart)
|
|
||||||
sg_desc = sg_name + " description"
|
|
||||||
sg_dict = dict(name=sg_name,
|
|
||||||
description=sg_desc)
|
|
||||||
sg_dict['tenant_id'] = tenant_id
|
|
||||||
result = client.create_security_group(**sg_dict)
|
|
||||||
|
|
||||||
secgroup = result['security_group']
|
|
||||||
self.assertEqual(secgroup['name'], sg_name)
|
|
||||||
self.assertEqual(tenant_id, secgroup['tenant_id'])
|
|
||||||
self.assertEqual(secgroup['description'], sg_desc)
|
|
||||||
|
|
||||||
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
|
|
||||||
client.delete_security_group, secgroup['id'])
|
|
||||||
return secgroup
|
|
||||||
|
|
||||||
def _default_security_group(self, client=None, tenant_id=None):
|
|
||||||
"""Get default secgroup for given tenant_id.
|
|
||||||
|
|
||||||
:returns: default secgroup for given tenant
|
|
||||||
"""
|
|
||||||
if client is None:
|
|
||||||
client = self.security_groups_client
|
|
||||||
if not tenant_id:
|
|
||||||
tenant_id = client.tenant_id
|
|
||||||
sgs = [
|
|
||||||
sg for sg in list(client.list_security_groups().values())[0]
|
|
||||||
if sg['tenant_id'] == tenant_id and sg['name'] == 'default'
|
|
||||||
]
|
|
||||||
msg = "No default security group for tenant %s." % (tenant_id)
|
|
||||||
self.assertNotEmpty(sgs, msg)
|
|
||||||
return sgs[0]
|
|
||||||
|
|
||||||
def _create_security_group_rule(self, secgroup=None,
|
|
||||||
sec_group_rules_client=None,
|
|
||||||
tenant_id=None,
|
|
||||||
security_groups_client=None, **kwargs):
|
|
||||||
"""Create a rule from a dictionary of rule parameters.
|
|
||||||
|
|
||||||
Create a rule in a secgroup. if secgroup not defined will search for
|
|
||||||
default secgroup in tenant_id.
|
|
||||||
|
|
||||||
:param secgroup: the security group.
|
|
||||||
:param tenant_id: if secgroup not passed -- the tenant in which to
|
|
||||||
search for default secgroup
|
|
||||||
:param kwargs: a dictionary containing rule parameters:
|
|
||||||
for example, to allow incoming ssh:
|
|
||||||
rule = {
|
|
||||||
direction: 'ingress'
|
|
||||||
protocol:'tcp',
|
|
||||||
port_range_min: 22,
|
|
||||||
port_range_max: 22
|
|
||||||
}
|
|
||||||
"""
|
|
||||||
if sec_group_rules_client is None:
|
|
||||||
sec_group_rules_client = self.security_group_rules_client
|
|
||||||
if security_groups_client is None:
|
|
||||||
security_groups_client = self.security_groups_client
|
|
||||||
if not tenant_id:
|
|
||||||
tenant_id = security_groups_client.tenant_id
|
|
||||||
if secgroup is None:
|
|
||||||
secgroup = self._default_security_group(
|
|
||||||
client=security_groups_client, tenant_id=tenant_id)
|
|
||||||
|
|
||||||
ruleset = dict(security_group_id=secgroup['id'],
|
|
||||||
tenant_id=secgroup['tenant_id'])
|
|
||||||
ruleset.update(kwargs)
|
|
||||||
|
|
||||||
sg_rule = sec_group_rules_client.create_security_group_rule(**ruleset)
|
|
||||||
sg_rule = sg_rule['security_group_rule']
|
|
||||||
|
|
||||||
self.assertEqual(secgroup['tenant_id'], sg_rule['tenant_id'])
|
|
||||||
self.assertEqual(secgroup['id'], sg_rule['security_group_id'])
|
|
||||||
|
|
||||||
return sg_rule
|
|
||||||
|
|
||||||
def _create_loginable_secgroup_rule(self, security_group_rules_client=None,
|
|
||||||
secgroup=None,
|
|
||||||
security_groups_client=None):
|
|
||||||
"""Create loginable security group rule
|
|
||||||
|
|
||||||
This function will create:
|
|
||||||
1. egress and ingress tcp port 22 allow rule in order to allow ssh
|
|
||||||
access for ipv4.
|
|
||||||
2. egress and ingress ipv6 icmp allow rule, in order to allow icmpv6.
|
|
||||||
3. egress and ingress ipv4 icmp allow rule, in order to allow icmpv4.
|
|
||||||
"""
|
|
||||||
|
|
||||||
if security_group_rules_client is None:
|
|
||||||
security_group_rules_client = self.security_group_rules_client
|
|
||||||
if security_groups_client is None:
|
|
||||||
security_groups_client = self.security_groups_client
|
|
||||||
rules = []
|
|
||||||
rulesets = [
|
|
||||||
dict(
|
|
||||||
# ssh
|
|
||||||
protocol='tcp',
|
|
||||||
port_range_min=22,
|
|
||||||
port_range_max=22,
|
|
||||||
),
|
|
||||||
dict(
|
|
||||||
# ping
|
|
||||||
protocol='icmp',
|
|
||||||
),
|
|
||||||
dict(
|
|
||||||
# ipv6-icmp for ping6
|
|
||||||
protocol='icmp',
|
|
||||||
ethertype='IPv6',
|
|
||||||
)
|
|
||||||
]
|
|
||||||
sec_group_rules_client = security_group_rules_client
|
|
||||||
for ruleset in rulesets:
|
|
||||||
for r_direction in ['ingress', 'egress']:
|
|
||||||
ruleset['direction'] = r_direction
|
|
||||||
try:
|
|
||||||
sg_rule = self._create_security_group_rule(
|
|
||||||
sec_group_rules_client=sec_group_rules_client,
|
|
||||||
secgroup=secgroup,
|
|
||||||
security_groups_client=security_groups_client,
|
|
||||||
**ruleset)
|
|
||||||
except lib_exc.Conflict as ex:
|
|
||||||
# if rule already exist - skip rule and continue
|
|
||||||
msg = 'Security group rule already exists'
|
|
||||||
if msg not in ex._error_string:
|
|
||||||
raise ex
|
|
||||||
else:
|
|
||||||
self.assertEqual(r_direction, sg_rule['direction'])
|
|
||||||
rules.append(sg_rule)
|
|
||||||
|
|
||||||
return rules
|
|
||||||
|
|
||||||
def _get_router(self, client=None, tenant_id=None):
|
def _get_router(self, client=None, tenant_id=None):
|
||||||
"""Retrieve a router for the given tenant id.
|
"""Retrieve a router for the given tenant id.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue