Merge "Remove nova-network from scenario tests"

This commit is contained in:
Zuul 2021-03-24 19:22:50 +00:00 committed by Gerrit Code Review
commit f0a2967f3d
1 changed files with 153 additions and 222 deletions

View File

@ -96,10 +96,6 @@ class ScenarioTest(tempest.test.BaseTestCase):
"""Compute and Compute security groups client"""
cls.compute_images_client = cls.os_primary.compute_images_client
cls.keypairs_client = cls.os_primary.keypairs_client
cls.compute_security_groups_client = (
cls.os_primary.compute_security_groups_client)
cls.compute_security_group_rules_client = (
cls.os_primary.compute_security_group_rules_client)
cls.servers_client = cls.os_primary.servers_client
cls.interface_client = cls.os_primary.interfaces_client
@ -546,69 +542,163 @@ class ScenarioTest(tempest.test.BaseTestCase):
self.addCleanup(self.cleanup_volume_type, volume_type)
return volume_type
def create_loginable_secgroup_rule(self, secgroup_id=None, rulesets=None):
"""Create loginable security group rule by compute clients.
This function will create by default the following rules:
1. tcp port 22 allow rule in order to allow ssh access for ipv4
2. ipv4 icmp allow rule in order to allow icmpv4
"""
_client = self.compute_security_groups_client
_client_rules = self.compute_security_group_rules_client
if secgroup_id is None:
sgs = _client.list_security_groups()['security_groups']
for sg in sgs:
if sg['name'] == 'default':
secgroup_id = sg['id']
# These rules are intended to permit inbound ssh and icmp
# traffic from all sources, so no group_id is provided.
# Setting a group_id would only permit traffic from ports
# belonging to the same security group.
if not rulesets:
rulesets = [
{
# ssh
'ip_protocol': 'tcp',
'from_port': 22,
'to_port': 22,
'cidr': '0.0.0.0/0',
},
{
# ping
'ip_protocol': 'icmp',
'from_port': -1,
'to_port': -1,
'cidr': '0.0.0.0/0',
}
]
rules = list()
for ruleset in rulesets:
sg_rule = _client_rules.create_security_group_rule(
parent_group_id=secgroup_id, **ruleset)['security_group_rule']
rules.append(sg_rule)
return rules
def create_security_group(self, **kwargs):
"""Create security group and add rules to security group"""
if not kwargs.get('name'):
kwargs['name'] = data_utils.rand_name(self.__class__.__name__)
if not kwargs.get('description'):
kwargs['description'] = kwargs['name'] + " description"
secgroup = self.compute_security_groups_client.create_security_group(
**kwargs)['security_group']
self.assertEqual(secgroup['name'], kwargs['name'])
self.assertEqual(secgroup['description'], kwargs['description'])
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.compute_security_groups_client.delete_security_group,
secgroup['id'])
def create_security_group(self, security_group_rules_client=None,
project_id=None,
namestart='secgroup-smoke',
security_groups_client=None):
if security_group_rules_client is None:
security_group_rules_client = self.security_group_rules_client
if security_groups_client is None:
security_groups_client = self.security_groups_client
if project_id is None:
project_id = security_groups_client.project_id
secgroup = self.create_empty_security_group(
namestart=namestart, client=security_groups_client,
project_id=project_id)
# Add rules to the security group
self.create_loginable_secgroup_rule(secgroup['id'])
rules = self.create_loginable_secgroup_rule(
security_group_rules_client=security_group_rules_client,
secgroup=secgroup,
security_groups_client=security_groups_client)
for rule in rules:
self.assertEqual(project_id, rule['project_id'])
self.assertEqual(secgroup['id'], rule['security_group_id'])
return secgroup
def create_empty_security_group(self, client=None, project_id=None,
namestart='secgroup-smoke'):
"""Create a security group without rules.
Default rules will be created:
- IPv4 egress to any
- IPv6 egress to any
:param project_id: secgroup will be created in this project
:returns: the created security group
"""
if client is None:
client = self.security_groups_client
if not project_id:
project_id = client.project_id
sg_name = data_utils.rand_name(namestart)
sg_desc = sg_name + " description"
sg_dict = dict(name=sg_name,
description=sg_desc)
sg_dict['project_id'] = project_id
result = client.create_security_group(**sg_dict)
secgroup = result['security_group']
self.assertEqual(secgroup['name'], sg_name)
self.assertEqual(project_id, secgroup['project_id'])
self.assertEqual(secgroup['description'], sg_desc)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
client.delete_security_group, secgroup['id'])
return secgroup
def create_security_group_rule(self, secgroup=None,
sec_group_rules_client=None,
project_id=None,
security_groups_client=None, **kwargs):
"""Create a rule from a dictionary of rule parameters.
Create a rule in a secgroup. if secgroup not defined will search for
default secgroup in project_id.
:param secgroup: the security group.
:param project_id: if secgroup not passed -- the tenant in which to
search for default secgroup
:param kwargs: a dictionary containing rule parameters:
for example, to allow incoming ssh:
rule = {
direction: 'ingress'
protocol:'tcp',
port_range_min: 22,
port_range_max: 22
}
"""
if sec_group_rules_client is None:
sec_group_rules_client = self.security_group_rules_client
if security_groups_client is None:
security_groups_client = self.security_groups_client
if not project_id:
project_id = security_groups_client.project_id
if secgroup is None:
# Get default secgroup for project_id
default_secgroups = security_groups_client.list_security_groups(
name='default', project_id=project_id)['security_groups']
msg = "No default security group for project %s." % (project_id)
self.assertNotEmpty(default_secgroups, msg)
secgroup = default_secgroups[0]
ruleset = dict(security_group_id=secgroup['id'],
project_id=secgroup['project_id'])
ruleset.update(kwargs)
sg_rule = sec_group_rules_client.create_security_group_rule(**ruleset)
sg_rule = sg_rule['security_group_rule']
self.assertEqual(secgroup['tenant_id'], sg_rule['tenant_id'])
self.assertEqual(secgroup['id'], sg_rule['security_group_id'])
return sg_rule
def create_loginable_secgroup_rule(self, security_group_rules_client=None,
secgroup=None,
security_groups_client=None):
"""Create loginable security group rule by neutron clients by default.
This function will create:
1. egress and ingress tcp port 22 allow rule in order to allow ssh
access for ipv4.
2. egress and ingress ipv6 icmp allow rule, in order to allow icmpv6.
3. egress and ingress ipv4 icmp allow rule, in order to allow icmpv4.
"""
if security_group_rules_client is None:
security_group_rules_client = self.security_group_rules_client
if security_groups_client is None:
security_groups_client = self.security_groups_client
rules = []
rulesets = [
dict(
# ssh
protocol='tcp',
port_range_min=22,
port_range_max=22,
),
dict(
# ping
protocol='icmp',
),
dict(
# ipv6-icmp for ping6
protocol='icmp',
ethertype='IPv6',
)
]
sec_group_rules_client = security_group_rules_client
for ruleset in rulesets:
for r_direction in ['ingress', 'egress']:
ruleset['direction'] = r_direction
try:
sg_rule = self.create_security_group_rule(
sec_group_rules_client=sec_group_rules_client,
secgroup=secgroup,
security_groups_client=security_groups_client,
**ruleset)
except lib_exc.Conflict as ex:
# if rule already exist - skip rule and continue
msg = 'Security group rule already exists'
if msg not in ex._error_string:
raise ex
else:
self.assertEqual(r_direction, sg_rule['direction'])
rules.append(sg_rule)
return rules
def get_remote_client(self, ip_address, username=None, private_key=None,
server=None):
"""Get a SSH client to a remote server
@ -1381,165 +1471,6 @@ class NetworkScenarioTest(ScenarioTest):
self.log_console_output()
self.fail(msg)
def create_security_group(self, security_group_rules_client=None,
project_id=None,
namestart='secgroup-smoke',
security_groups_client=None):
if security_group_rules_client is None:
security_group_rules_client = self.security_group_rules_client
if security_groups_client is None:
security_groups_client = self.security_groups_client
if project_id is None:
project_id = security_groups_client.project_id
secgroup = self.create_empty_security_group(
namestart=namestart, client=security_groups_client,
project_id=project_id)
# Add rules to the security group
rules = self.create_loginable_secgroup_rule(
security_group_rules_client=security_group_rules_client,
secgroup=secgroup,
security_groups_client=security_groups_client)
for rule in rules:
self.assertEqual(project_id, rule['project_id'])
self.assertEqual(secgroup['id'], rule['security_group_id'])
return secgroup
def create_empty_security_group(self, client=None, project_id=None,
namestart='secgroup-smoke'):
"""Create a security group without rules.
Default rules will be created:
- IPv4 egress to any
- IPv6 egress to any
:param project_id: secgroup will be created in this project
:returns: the created security group
"""
if client is None:
client = self.security_groups_client
if not project_id:
project_id = client.project_id
sg_name = data_utils.rand_name(namestart)
sg_desc = sg_name + " description"
sg_dict = dict(name=sg_name,
description=sg_desc)
sg_dict['project_id'] = project_id
result = client.create_security_group(**sg_dict)
secgroup = result['security_group']
self.assertEqual(secgroup['name'], sg_name)
self.assertEqual(project_id, secgroup['project_id'])
self.assertEqual(secgroup['description'], sg_desc)
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
client.delete_security_group, secgroup['id'])
return secgroup
def create_security_group_rule(self, secgroup=None,
sec_group_rules_client=None,
project_id=None,
security_groups_client=None, **kwargs):
"""Create a rule from a dictionary of rule parameters.
Create a rule in a secgroup. if secgroup not defined will search for
default secgroup in project_id.
:param secgroup: the security group.
:param project_id: if secgroup not passed -- the tenant in which to
search for default secgroup
:param kwargs: a dictionary containing rule parameters:
for example, to allow incoming ssh:
rule = {
direction: 'ingress'
protocol:'tcp',
port_range_min: 22,
port_range_max: 22
}
"""
if sec_group_rules_client is None:
sec_group_rules_client = self.security_group_rules_client
if security_groups_client is None:
security_groups_client = self.security_groups_client
if not project_id:
project_id = security_groups_client.project_id
if secgroup is None:
# Get default secgroup for project_id
default_secgroups = security_groups_client.list_security_groups(
name='default', project_id=project_id)['security_groups']
msg = "No default security group for project %s." % (project_id)
self.assertNotEmpty(default_secgroups, msg)
secgroup = default_secgroups[0]
ruleset = dict(security_group_id=secgroup['id'],
project_id=secgroup['project_id'])
ruleset.update(kwargs)
sg_rule = sec_group_rules_client.create_security_group_rule(**ruleset)
sg_rule = sg_rule['security_group_rule']
self.assertEqual(secgroup['tenant_id'], sg_rule['tenant_id'])
self.assertEqual(secgroup['id'], sg_rule['security_group_id'])
return sg_rule
def create_loginable_secgroup_rule(self, security_group_rules_client=None,
secgroup=None,
security_groups_client=None):
"""Create loginable security group rule by neutron clients by default.
This function will create:
1. egress and ingress tcp port 22 allow rule in order to allow ssh
access for ipv4.
2. egress and ingress ipv6 icmp allow rule, in order to allow icmpv6.
3. egress and ingress ipv4 icmp allow rule, in order to allow icmpv4.
"""
if security_group_rules_client is None:
security_group_rules_client = self.security_group_rules_client
if security_groups_client is None:
security_groups_client = self.security_groups_client
rules = []
rulesets = [
dict(
# ssh
protocol='tcp',
port_range_min=22,
port_range_max=22,
),
dict(
# ping
protocol='icmp',
),
dict(
# ipv6-icmp for ping6
protocol='icmp',
ethertype='IPv6',
)
]
sec_group_rules_client = security_group_rules_client
for ruleset in rulesets:
for r_direction in ['ingress', 'egress']:
ruleset['direction'] = r_direction
try:
sg_rule = self.create_security_group_rule(
sec_group_rules_client=sec_group_rules_client,
secgroup=secgroup,
security_groups_client=security_groups_client,
**ruleset)
except lib_exc.Conflict as ex:
# if rule already exist - skip rule and continue
msg = 'Security group rule already exists'
if msg not in ex._error_string:
raise ex
else:
self.assertEqual(r_direction, sg_rule['direction'])
rules.append(sg_rule)
return rules
def get_router(self, client=None, project_id=None, **kwargs):
"""Retrieve a router for the given tenant id.