manila/manila_tempest_tests/tests/api/admin/test_share_manage.py

172 lines
6.4 KiB
Python

# Copyright 2015 Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
from tempest import config # noqa
from tempest import test # noqa
from tempest_lib.common.utils import data_utils # noqa
from tempest_lib import exceptions as lib_exc # noqa
import testtools # noqa
from manila_tempest_tests.tests.api import base
CONF = config.CONF
class ManageNFSShareTest(base.BaseSharesAdminTest):
protocol = 'nfs'
# NOTE(vponomaryov): be careful running these tests using generic driver
# because cinder volumes will stay attached to service Nova VM and
# won't be deleted.
@classmethod
@testtools.skipIf(
CONF.share.multitenancy_enabled,
"Only for driver_handles_share_servers = False driver mode.")
@testtools.skipUnless(
CONF.share.run_manage_unmanage_tests,
"Manage/unmanage tests are disabled.")
def resource_setup(cls):
super(ManageNFSShareTest, cls).resource_setup()
if cls.protocol not in CONF.share.enable_protocols:
message = "%s tests are disabled" % cls.protocol
raise cls.skipException(message)
# Create share types
cls.st_name = data_utils.rand_name("manage-st-name")
cls.st_name_invalid = data_utils.rand_name("manage-st-name-invalid")
cls.extra_specs = {
'storage_protocol': CONF.share.capability_storage_protocol,
'driver_handles_share_servers': False,
'snapshot_support': six.text_type(
CONF.share.capability_snapshot_support),
}
cls.extra_specs_invalid = {
'storage_protocol': CONF.share.capability_storage_protocol,
'driver_handles_share_servers': True,
'snapshot_support': six.text_type(
CONF.share.capability_snapshot_support),
}
cls.st = cls.create_share_type(
name=cls.st_name,
cleanup_in_class=True,
extra_specs=cls.extra_specs)
cls.st_invalid = cls.create_share_type(
name=cls.st_name_invalid,
cleanup_in_class=True,
extra_specs=cls.extra_specs_invalid)
creation_data = {'kwargs': {
'share_type_id': cls.st['share_type']['id'],
'share_protocol': cls.protocol,
}}
# Create two shares in parallel
cls.shares = cls.create_shares([creation_data, creation_data])
# Load all share data (host, etc.)
cls.share1 = cls.shares_client.get_share(cls.shares[0]['id'])
cls.share2 = cls.shares_client.get_share(cls.shares[1]['id'])
# Unmanage shares from manila
cls.shares_client.unmanage_share(cls.share1['id'])
cls.shares_client.wait_for_resource_deletion(share_id=cls.share1['id'])
cls.shares_client.unmanage_share(cls.share2['id'])
cls.shares_client.wait_for_resource_deletion(share_id=cls.share2['id'])
@test.attr(type=["gate", "smoke"])
def test_manage(self):
name = "Name for 'managed' share that had ID %s" % self.share1["id"]
description = "Description for 'managed' share"
# Manage share
share = self.shares_client.manage_share(
service_host=self.share1['host'],
export_path=self.share1['export_locations'][0],
protocol=self.share1['share_proto'],
share_type_id=self.st['share_type']['id'],
name=name,
description=description,
)
# Add managed share to cleanup queue
self.method_resources.insert(
0, {'type': 'share', 'id': share['id'],
'client': self.shares_client})
# Wait for success
self.shares_client.wait_for_share_status(share['id'], 'available')
# Verify data of managed share
get = self.shares_v2_client.get_share(share['id'], version="2.5")
self.assertEqual(name, get['name'])
self.assertEqual(description, get['description'])
self.assertEqual(self.share1['host'], get['host'])
self.assertEqual(self.share1['share_proto'], get['share_proto'])
self.assertEqual(self.st['share_type']['name'], get['share_type'])
get = self.shares_v2_client.get_share(share['id'], version="2.6")
self.assertEqual(self.st['share_type']['id'], get['share_type'])
# Delete share
self.shares_client.delete_share(share['id'])
self.shares_client.wait_for_resource_deletion(share_id=share['id'])
self.assertRaises(lib_exc.NotFound,
self.shares_client.get_share,
share['id'])
@test.attr(type=["gate", "smoke"])
def test_manage_retry(self):
# Manage share with invalid parameters
share = None
parameters = [(self.st_invalid['share_type']['id'], 'manage_error'),
(self.st['share_type']['id'], 'available')]
for share_type_id, status in parameters:
share = self.shares_client.manage_share(
service_host=self.share2['host'],
export_path=self.share2['export_locations'][0],
protocol=self.share2['share_proto'],
share_type_id=share_type_id)
# Add managed share to cleanup queue
self.method_resources.insert(
0, {'type': 'share', 'id': share['id'],
'client': self.shares_client})
# Wait for success
self.shares_client.wait_for_share_status(share['id'], status)
# Delete share
self.shares_client.delete_share(share['id'])
self.shares_client.wait_for_resource_deletion(share_id=share['id'])
self.assertRaises(lib_exc.NotFound,
self.shares_client.get_share,
share['id'])
class ManageCIFSShareTest(ManageNFSShareTest):
protocol = 'cifs'
class ManageGLUSTERFSShareTest(ManageNFSShareTest):
protocol = 'glusterfs'
class ManageHDFSShareTest(ManageNFSShareTest):
protocol = 'hdfs'