Remove secgroups related methods
As tempest.scenario.manager was announced stable interface in Tempest 27.0.0[1]
it can be now reused in plugins.
Barbican tempest plugin still uses nova-network old interface for floating ip
related methods, this patch removes them and use tempests's ones which
already use neutron's interface.[2]
Replaced/Removed methods:
* _default_security_group
* _create_security_group
* _create_loginable_secgroup_rule
* _create_security_group_rule
Etherpad concerning this effort:
https://etherpad.opendev.org/p/tempest-scenario-manager-cleanup
[1] https://docs.openstack.org/releasenotes/tempest/v27.0.0.html#release-notes-27-0-0
[2] bbc9dd34f6
Change-Id: I7605fe11caa8ae5725e0c0583c623ebad73d40da
This commit is contained in:
parent
ddfeb25152
commit
29541dcf25
@ -20,7 +20,6 @@ from tempest.common import image as common_image
|
||||
from tempest.common import waiters
|
||||
from tempest import config
|
||||
from tempest.lib.common.utils import data_utils
|
||||
from tempest.lib.common.utils import test_utils
|
||||
from tempest.lib import exceptions as lib_exc
|
||||
from tempest.scenario import manager
|
||||
|
||||
@ -52,12 +51,6 @@ class ScenarioTest(manager.NetworkScenarioTest):
|
||||
# Compute image client
|
||||
cls.compute_images_client = cls.os_primary.compute_images_client
|
||||
cls.keypairs_client = cls.os_primary.keypairs_client
|
||||
# Nova security groups client
|
||||
cls.compute_security_groups_client = (
|
||||
cls.os_primary.compute_security_groups_client)
|
||||
cls.compute_security_group_rules_client = (
|
||||
cls.os_primary.compute_security_group_rules_client)
|
||||
cls.servers_client = cls.os_primary.servers_client
|
||||
# Neutron network client
|
||||
cls.networks_client = cls.os_primary.networks_client
|
||||
cls.ports_client = cls.os_primary.ports_client
|
||||
@ -125,117 +118,3 @@ class ScenarioTest(manager.NetworkScenarioTest):
|
||||
str(failed_stores))
|
||||
|
||||
return image['id']
|
||||
|
||||
def _default_security_group(self, client=None, tenant_id=None):
|
||||
"""Get default secgroup for given tenant_id.
|
||||
|
||||
:returns: default secgroup for given tenant
|
||||
"""
|
||||
if client is None:
|
||||
client = self.security_groups_client
|
||||
if not tenant_id:
|
||||
tenant_id = client.tenant_id
|
||||
sgs = [
|
||||
sg for sg in list(client.list_security_groups().values())[0]
|
||||
if sg['tenant_id'] == tenant_id and sg['name'] == 'default'
|
||||
]
|
||||
msg = "No default security group for tenant %s." % (tenant_id)
|
||||
self.assertGreater(len(sgs), 0, msg)
|
||||
return sgs[0]
|
||||
|
||||
def _create_security_group(self):
|
||||
# Create security group
|
||||
sg_name = data_utils.rand_name(self.__class__.__name__)
|
||||
sg_desc = sg_name + " description"
|
||||
secgroup = self.compute_security_groups_client.create_security_group(
|
||||
name=sg_name, description=sg_desc)['security_group']
|
||||
self.assertEqual(secgroup['name'], sg_name)
|
||||
self.assertEqual(secgroup['description'], sg_desc)
|
||||
self.addCleanup(
|
||||
test_utils.call_and_ignore_notfound_exc,
|
||||
self.compute_security_groups_client.delete_security_group,
|
||||
secgroup['id'])
|
||||
|
||||
# Add rules to the security group
|
||||
self._create_loginable_secgroup_rule(secgroup['id'])
|
||||
|
||||
return secgroup
|
||||
|
||||
def _create_loginable_secgroup_rule(self, secgroup_id=None):
|
||||
_client = self.compute_security_groups_client
|
||||
_client_rules = self.compute_security_group_rules_client
|
||||
if secgroup_id is None:
|
||||
sgs = _client.list_security_groups()['security_groups']
|
||||
for sg in sgs:
|
||||
if sg['name'] == 'default':
|
||||
secgroup_id = sg['id']
|
||||
|
||||
# These rules are intended to permit inbound ssh and icmp
|
||||
# traffic from all sources, so no group_id is provided.
|
||||
# Setting a group_id would only permit traffic from ports
|
||||
# belonging to the same security group.
|
||||
rulesets = [
|
||||
{
|
||||
# ssh
|
||||
'ip_protocol': 'tcp',
|
||||
'from_port': 22,
|
||||
'to_port': 22,
|
||||
'cidr': '0.0.0.0/0',
|
||||
},
|
||||
{
|
||||
# ping
|
||||
'ip_protocol': 'icmp',
|
||||
'from_port': -1,
|
||||
'to_port': -1,
|
||||
'cidr': '0.0.0.0/0',
|
||||
}
|
||||
]
|
||||
rules = list()
|
||||
for ruleset in rulesets:
|
||||
sg_rule = _client_rules.create_security_group_rule(
|
||||
parent_group_id=secgroup_id, **ruleset)['security_group_rule']
|
||||
rules.append(sg_rule)
|
||||
return rules
|
||||
|
||||
def _create_security_group_rule(self, secgroup=None,
|
||||
sec_group_rules_client=None,
|
||||
tenant_id=None,
|
||||
security_groups_client=None, **kwargs):
|
||||
"""Create a rule from a dictionary of rule parameters.
|
||||
|
||||
Create a rule in a secgroup. if secgroup not defined will search for
|
||||
default secgroup in tenant_id.
|
||||
|
||||
:param secgroup: the security group.
|
||||
:param tenant_id: if secgroup not passed -- the tenant in which to
|
||||
search for default secgroup
|
||||
:param kwargs: a dictionary containing rule parameters:
|
||||
for example, to allow incoming ssh:
|
||||
rule = {
|
||||
direction: 'ingress'
|
||||
protocol:'tcp',
|
||||
port_range_min: 22,
|
||||
port_range_max: 22
|
||||
}
|
||||
"""
|
||||
if sec_group_rules_client is None:
|
||||
sec_group_rules_client = self.security_group_rules_client
|
||||
if security_groups_client is None:
|
||||
security_groups_client = self.security_groups_client
|
||||
if not tenant_id:
|
||||
tenant_id = security_groups_client.tenant_id
|
||||
if secgroup is None:
|
||||
secgroup = self._default_security_group(
|
||||
client=security_groups_client, tenant_id=tenant_id)
|
||||
|
||||
ruleset = dict(security_group_id=secgroup['id'],
|
||||
tenant_id=secgroup['tenant_id'])
|
||||
ruleset.update(kwargs)
|
||||
|
||||
sg_rule = sec_group_rules_client.create_security_group_rule(**ruleset)
|
||||
sg_rule = sg_rule['security_group_rule']
|
||||
|
||||
self.assertEqual(secgroup['tenant_id'], sg_rule['tenant_id'])
|
||||
self.assertEqual(secgroup['id'], sg_rule['security_group_id'])
|
||||
|
||||
return sg_rule
|
||||
|
@ -63,7 +63,7 @@ class EphemeralStorageEncryptionTest(barbican_manager.BarbicanScenarioTest):
|
||||
client_test_path = '/tmp/ephemeral_disk_encryption_test'
|
||||
img_uuid = self.sign_and_upload_image()
|
||||
keypair = self.create_keypair()
|
||||
security_group = self._create_security_group()
|
||||
security_group = self.create_security_group()
|
||||
instance = self.create_server(
|
||||
name='signed_img_server',
|
||||
image_id=img_uuid,
|
||||
|
@ -97,7 +97,7 @@ class VolumeEncryptionTest(barbican_manager.BarbicanScenarioTest):
|
||||
img_uuid = self.sign_and_upload_image()
|
||||
LOG.info("Creating keypair and security group")
|
||||
keypair = self.create_keypair()
|
||||
security_group = self._create_security_group()
|
||||
security_group = self.create_security_group()
|
||||
server = self.create_server(
|
||||
name='signed_img_server',
|
||||
image_id=img_uuid,
|
||||
@ -122,7 +122,7 @@ class VolumeEncryptionTest(barbican_manager.BarbicanScenarioTest):
|
||||
img_uuid = self.sign_and_upload_image()
|
||||
LOG.info("Creating keypair and security group")
|
||||
keypair = self.create_keypair()
|
||||
security_group = self._create_security_group()
|
||||
security_group = self.create_security_group()
|
||||
server = self.create_server(
|
||||
name='signed_img_server',
|
||||
image_id=img_uuid,
|
||||
|
Loading…
Reference in New Issue
Block a user