[Gate Fix] Fix AttributeError in ServerActionsRbacTest

This PS fixes an AttributeError being raised in ServerActionsRbacTest
[0] due to a recent Tempest change [1]. Besides, the volume
creation was being done manually in Patrole, when the volume
should have been created using Tempest's base helper for creating
a volume.

This same issue applies with `test_create_image_volume_backed` in
the same class [2] due to a recent Tempest change [3], and besides
that, the previous way of spinning up a volume-backed server did
not leverage Tempest's helpers, which this PS corrects.

[0] http://logs.openstack.org/06/466806/8/check/gate-tempest-dsvm-patrole-admin-ubuntu-xenial/c186718/console.html#_2017-09-22_01_07_10_242418
[1] https://review.openstack.org/#/c/498888/
[2] http://logs.openstack.org/31/506531/3/check/gate-tempest-dsvm-patrole-member-ubuntu-xenial/01bfbc6/console.html#_2017-09-22_07_08_39_483859
[3] https://review.openstack.org/#/c/498886/

Change-Id: I6f93dc6961844e2154d348790cad3fde77b9d218
This commit is contained in:
Felipe Monteiro 2017-09-22 07:06:26 +01:00
parent bb3c257f77
commit a63f854f63
3 changed files with 38 additions and 91 deletions

View File

@ -19,6 +19,7 @@ import testtools
from tempest.common import waiters
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from tempest import test
@ -38,27 +39,11 @@ class ServerActionsRbacTest(rbac_base.BaseV2ComputeRbacTest):
@classmethod
def resource_setup(cls):
super(ServerActionsRbacTest, cls).resource_setup()
# Create test server
cls.server_id = cls.create_test_server(wait_until='ACTIVE')['id']
cls.flavor_ref = CONF.compute.flavor_ref
cls.flavor_ref_alt = CONF.compute.flavor_ref_alt
cls.image_ref = CONF.compute.image_ref
# Create a volume
volume_name = data_utils.rand_name(cls.__name__ + '-volume')
name_field = 'name'
if not CONF.volume_feature_enabled.api_v2:
name_field = 'display_name'
params = {name_field: volume_name,
'imageRef': CONF.compute.image_ref,
'size': CONF.volume.volume_size}
volume = cls.volumes_client.create_volume(**params)['volume']
waiters.wait_for_volume_resource_status(cls.volumes_client,
volume['id'], 'available')
cls.volumes.append(volume)
cls.volume_id = volume['id']
def setUp(self):
super(ServerActionsRbacTest, self).setUp()
try:
@ -73,57 +58,6 @@ class ServerActionsRbacTest(rbac_base.BaseV2ComputeRbacTest):
# Rebuilding the server in case something happened during a test
self.__class__.server_id = self.rebuild_server(self.server_id)
@classmethod
def resource_cleanup(cls):
# If a test case creates an image from a server that is created with
# a volume, a volume snapshot will automatically be created by default.
# We need to delete the volume snapshot.
try:
body = cls.snapshots_extensions_client.list_snapshots()
volume_snapshots = body['snapshots']
except Exception:
LOG.info("Cannot retrieve snapshots for cleanup.")
else:
for snapshot in volume_snapshots:
if snapshot['volumeId'] == cls.volume_id:
# Wait for snapshot status to become 'available' before
# deletion
waiters.wait_for_volume_resource_status(
cls.snapshots_extensions_client, snapshot['id'],
'available')
cls.snapshots_extensions_client.delete_snapshot(
snapshot['id'])
for snapshot in volume_snapshots:
if snapshot['volumeId'] == cls.volume_id:
cls.snapshots_extensions_client.wait_for_resource_deletion(
snapshot['id'])
super(ServerActionsRbacTest, cls).resource_cleanup()
def _create_test_server_with_volume(self, volume_id):
# Create a server with the volume created earlier
server_name = data_utils.rand_name(self.__class__.__name__ + "-server")
bd_map_v2 = [{'uuid': volume_id,
'source_type': 'volume',
'destination_type': 'volume',
'boot_index': 0,
'delete_on_termination': True}]
device_mapping = {'block_device_mapping_v2': bd_map_v2}
# Since the server is booted from volume, the imageRef does not need
# to be specified.
server = self.servers_client.create_server(
name=server_name, imageRef='',
flavorRef=CONF.compute.flavor_ref,
**device_mapping)['server']
waiters.wait_for_server_status(
self.os_admin.servers_client, server['id'], 'ACTIVE')
self.servers.append(server)
return server
def _test_start_server(self):
self.servers_client.start_server(self.server_id)
waiters.wait_for_server_status(
@ -275,21 +209,30 @@ class ServerActionsRbacTest(rbac_base.BaseV2ComputeRbacTest):
self.rbac_utils.switch_role(self, toggle_rbac_role=True)
# This function will also call show image
self.create_image_from_server(self.server_id,
wait_until='ACTIVE')
self.create_image_from_server(self.server_id, wait_until='ACTIVE')
@test.services('image', 'volume')
@rbac_rule_validation.action(
service="nova",
rule="os_compute_api:servers:create_image:allow_volume_backed")
@decorators.idempotent_id('8b869f73-49b3-4cc4-a0ce-ef64f8e1d6f9')
def test_create_image_volume_backed(self):
server = self._create_test_server_with_volume(self.volume_id)
def test_create_image_from_volume_backed_server(self):
# volume_backed=True creates a volume and create server will be
# requested with 'block_device_mapping_v2' with necessary values for
# this test.
server = self.create_test_server(volume_backed=True,
wait_until='ACTIVE')
self.rbac_utils.switch_role(self, toggle_rbac_role=True)
# This function will also call show image
self.create_image_from_server(server['id'],
wait_until='ACTIVE')
# This function will also call show image.
image = self.create_image_from_server(server['id'],
wait_until='ACTIVE',
wait_for_server=False)
self.addCleanup(self.compute_images_client.wait_for_resource_deletion,
image['id'])
self.addCleanup(
test_utils.call_and_ignore_notfound_exc,
self.compute_images_client.delete_image, image['id'])
class ServerActionsV214RbacTest(rbac_base.BaseV2ComputeRbacTest):

View File

@ -18,6 +18,7 @@ from oslo_log import log
from tempest.common import waiters
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from tempest.lib import exceptions
from tempest import test
@ -98,8 +99,16 @@ class ComputeServersRbacTest(base.BaseV2ComputeRbacTest):
self.rbac_utils.switch_role(self, toggle_rbac_role=True)
# Use image_id='' to avoid using the default image in tempest.conf.
self.create_test_server(name=server_name, image_id='',
**device_mapping)
server = self.create_test_server(name=server_name, image_id='',
**device_mapping)
# Delete the server and wait for the volume to become available to
# avoid clean up errors.
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
waiters.wait_for_volume_resource_status,
self.volumes_client, volume_id, 'available')
self.addCleanup(waiters.wait_for_server_termination,
self.servers_client, server['id'])
self.addCleanup(self.delete_server, server['id'])
@test.services('network')
@rbac_rule_validation.action(

View File

@ -26,23 +26,23 @@ CONF = config.CONF
LOG = logging.getLogger(__name__)
class BaseIdentityV2RbacTest(base.BaseIdentityV2Test):
class BaseIdentityRbacTest(base.BaseIdentityTest):
@classmethod
def skip_checks(cls):
super(BaseIdentityV2RbacTest, cls).skip_checks()
super(BaseIdentityRbacTest, cls).skip_checks()
if not CONF.patrole.enable_rbac:
raise cls.skipException(
"%s skipped as RBAC testing not enabled" % cls.__name__)
@classmethod
def setup_clients(cls):
super(BaseIdentityV2RbacTest, cls).setup_clients()
super(BaseIdentityRbacTest, cls).setup_clients()
cls.rbac_utils = rbac_utils.RbacUtils(cls)
@classmethod
def resource_setup(cls):
super(BaseIdentityV2RbacTest, cls).resource_setup()
super(BaseIdentityRbacTest, cls).resource_setup()
cls.endpoints = []
cls.roles = []
cls.services = []
@ -66,7 +66,7 @@ class BaseIdentityV2RbacTest(base.BaseIdentityV2Test):
test_utils.call_and_ignore_notfound_exc(
cls.users_client.delete_user, user['id'])
super(BaseIdentityV2RbacTest, cls).resource_cleanup()
super(BaseIdentityRbacTest, cls).resource_cleanup()
@classmethod
def setup_test_endpoint(cls, service=None):
@ -87,10 +87,6 @@ class BaseIdentityV2RbacTest(base.BaseIdentityV2Test):
params['publicurl'] = url
elif cls.identity_version == 'v3':
params['url'] = url
else:
LOG.debug("Keystone version is invalid."
" Please enter a valid version number.")
raise KeyError
endpoint = cls.endpoints_client.create_endpoint(**params)['endpoint']
cls.endpoints.append(endpoint)
@ -122,10 +118,6 @@ class BaseIdentityV2RbacTest(base.BaseIdentityV2Test):
service = service['OS-KSADM:service']
elif cls.identity_version == 'v3':
service = service['service']
else:
LOG.debug("Keystone version is invalid."
" Please enter a valid version number.")
raise KeyError
cls.services.append(service)
@ -147,7 +139,7 @@ class BaseIdentityV2RbacTest(base.BaseIdentityV2Test):
return user
class BaseIdentityV2AdminRbacTest(BaseIdentityV2RbacTest):
class BaseIdentityV2AdminRbacTest(BaseIdentityRbacTest):
"""Base test class for the Identity v2 admin API.
Keystone's v2 API is split into two APIs: an admin and non-admin API. RBAC
@ -156,6 +148,8 @@ class BaseIdentityV2AdminRbacTest(BaseIdentityV2RbacTest):
request object has ``context_is_admin``. For more details, see the
implementation of ``assert_admin`` in ``keystone.common.wsgi``.
"""
identity_version = 'v2'
credentials = ['primary']
@classmethod
def skip_checks(cls):
@ -213,9 +207,10 @@ class BaseIdentityV2AdminRbacTest(BaseIdentityV2RbacTest):
return token_id
class BaseIdentityV3RbacTest(BaseIdentityV2RbacTest):
class BaseIdentityV3RbacTest(BaseIdentityRbacTest):
identity_version = 'v3'
credentials = ['primary']
@classmethod
def setup_clients(cls):