Add functional tests for OSC

In this patch we create the base for osc functional testing
and add tests for 'openstack share' commands that are currently

Change-Id: I3732229e4a61182a80f6f35ad7050075db84f6ce
Partially-implements: bp openstack-client-support
This commit is contained in:
Maari Tamm 2020-02-13 07:52:08 +00:00
parent 3beec44af2
commit 134f07af6d
3 changed files with 223 additions and 0 deletions

View File

@ -0,0 +1,154 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
import time
from tempest.lib.cli import base
from tempest.lib.common.utils import data_utils
from manilaclient import config
CONF = config.CONF
class OSCClientTestBase(base.ClientTestBase):
"""Base class for OSC manila functional tests"""
def _get_clients(self):
return base.CLIClient(
username=CONF.admin_username or CONF.username,
password=CONF.admin_password or CONF.password,
tenant_name=CONF.admin_tenant_name or CONF.tenant_name,
uri=CONF.admin_auth_url or CONF.auth_url,
project_domain_name=(CONF.admin_project_domain_name or
CONF.project_domain_name or None),
project_domain_id=(CONF.admin_project_domain_id or
CONF.project_domain_id or None),
user_domain_name=(CONF.admin_user_domain_name or
CONF.user_domain_name or None),
user_domain_id=(CONF.admin_user_domain_id or
CONF.user_domain_id or None)
def _get_property_from_output(self, output):
"""Creates a dictionary from the given output"""
obj = {}
items = self.parser.listing(output)
for item in items:
obj[item['Field']] = six.text_type(item['Value'])
return obj
def _wait_for_object_status(self, object_name, object_id, status,
"""Waits for a object to reach a given status."""
start_time = time.time()
while time.time() - start_time < timeout:
if status == self.openstack(
'%(obj)s show -c status -f value %(id)s'
% {'obj': object_name,
'id': object_id}).rstrip():
else:"%s %s did not reach status %s after %d seconds."
% (object_name, object_id, status, timeout))
def check_object_deleted(self, object_name, object_id,
"""Check that object deleted successfully"""
cmd = '%s list -c ID -f value' % object_name
start_time = time.time()
while time.time() - start_time < timeout:
if object_id not in self.openstack(cmd):
else:"%s %s not deleted after %d seconds."
% (object_name, object_id, timeout))
def openstack(self, action, flags='', params='', fail_ok=False,
"""Executes openstack command for given action"""
if '--os-share-api-version' not in flags:
flags = (
flags + '--os-share-api-version %s'
% CONF.max_api_microversion)
return self.clients.openstack(action, flags=flags, params=params,
def listing_result(self, object_name, command):
"""Returns output for the given command as list of dictionaries"""
output = self.openstack(object_name, params=command)
result = self.parser.listing(output)
return result
def dict_result(self, object_name, command):
"""Returns output for the given command as dictionary"""
output = self.openstack(object_name, params=command)
result_dict = self._get_property_from_output(output)
return result_dict
def create_share(self, share_protocol=None, size=None, name=None,
snapshot_id=None, properties=None, share_network=None,
description=None, public=False, share_type=None,
availability_zone=None, share_group=None,
name = name or data_utils.rand_name('autotest_share_name')
# share_type = dhss_false until we have implemented
# share network commands for osc
share_type = share_type or 'dhss_false'
cmd = ('create '
'%(protocol)s %(size)s %(name)s %(desc)s %(public)s %(stype)s'
% {'protocol': share_protocol or 'NFS',
'size': size or '1',
'name': '--name %s' % name,
'desc': '--description %s' % description,
'public': '--public %s' % public,
'stype': '--share-type %s' % share_type})
if snapshot_id:
cmd = cmd + ' --snapshot-id %s' % snapshot_id
if properties:
for key, value in properties.items():
cmd = (cmd + ' --property %(key)s=%(value)s'
% {'key': key, 'value': value})
if share_network:
cmd = cmd + ' --share-network %s' % share_network
if availability_zone:
cmd = cmd + ' --availability-zone %s' % availability_zone
if share_group:
cmd = cmd + ' --share-group %s' % share_group
share_object = self.dict_result('share', cmd)
'share', share_object['id'], 'available')
if add_cleanup:
self.openstack, 'share delete %s' % share_object['id']
return share_object

View File

@ -0,0 +1,69 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from manilaclient.tests.functional.osc import base
class SharesCLITest(base.OSCClientTestBase):
def test_openstack_share_create(self):
share_name = 'test_create_share'
share = self.create_share(name=share_name)
self.assertEqual(share['share_proto'], 'NFS')
self.assertEqual(share['size'], '1')
self.assertEqual(share['name'], share_name)
shares_list = self.listing_result('share', 'list')
self.assertIn(share['id'], [item['ID'] for item in shares_list])
def test_openstack_share_list(self):
share = self.create_share()
shares_list = self.listing_result('share', 'list')
self.assertTableStruct(shares_list, [
'Share Proto',
'Is Public',
'Share Type Name',
'Availability Zone'
self.assertIn(share['id'], [item['ID'] for item in shares_list])
def test_openstack_share_show(self):
share = self.create_share()
result = self.dict_result('share', 'show %s' % share['id'])
self.assertEqual(share['id'], result['id'])
listing_result = self.listing_result('share', 'show %s' % share['id'])
self.assertTableStruct(listing_result, [
def test_openstack_share_delete(self):
share = self.create_share(add_cleanup=False)
shares_list = self.listing_result('share', 'list')
self.assertIn(share['id'], [item['ID'] for item in shares_list])
self.openstack('share delete %s' % share['id'])
self.check_object_deleted('share', share['id'])
shares_list_after_delete = self.listing_result('share', 'list')
share['id'], [item['ID'] for item in shares_list_after_delete])