Adds new functional tests for share-network-subnets

This patch adds functional test for share-network-subnets and
validates the new API version 2.51

Change-Id: I60bdb493ad5766f28408a0c877f960922fe44ad0
Partially-implements: bp share-network-multiple-subnets
This commit is contained in:
Douglas Viroel 2019-08-06 19:40:37 -03:00
parent 0d1d29f561
commit b7e27e7719
14 changed files with 774 additions and 28 deletions

View File

@ -30,7 +30,7 @@ ShareGroup = [
help="The minimum api microversion is configured to be the "
"value of the minimum microversion supported by Manila."),
cfg.StrOpt("max_api_microversion",
default="2.50",
default="2.51",
help="The maximum api microversion is configured to be the "
"value of the latest microversion supported by Manila."),
cfg.StrOpt("region",

View File

@ -376,9 +376,9 @@ class SharesClient(rest_client.RestClient):
raise share_exceptions.InvalidResource(
message=six.text_type(kwargs))
def _is_resource_deleted(self, func, res_id):
def _is_resource_deleted(self, func, res_id, **kwargs):
try:
res = func(res_id)
res = func(res_id, **kwargs)
except exceptions.NotFound:
return True
if res.get('status') in ['error_deleting', 'error']:

View File

@ -226,6 +226,13 @@ class SharesV2Client(shares_client.SharesClient):
elif "message_id" in kwargs:
return self._is_resource_deleted(
self.get_message, kwargs.get("message_id"))
elif "share_network_subnet_id" in kwargs:
subnet_kwargs = {
"sn_id": kwargs["extra_params"]["share_network_id"]}
return self._is_resource_deleted(
self.get_subnet, kwargs.get("share_network_subnet_id"),
**subnet_kwargs
)
else:
return super(SharesV2Client, self).is_resource_deleted(
*args, **kwargs)
@ -1417,7 +1424,8 @@ class SharesV2Client(shares_client.SharesClient):
###############
def manage_share_server(self, host, share_network_id, identifier,
driver_options=None, version=LATEST_MICROVERSION):
driver_options=None, version=LATEST_MICROVERSION,
share_network_subnet_id=None):
body = {
'share_server': {
'host': host,
@ -1426,6 +1434,9 @@ class SharesV2Client(shares_client.SharesClient):
'driver_options': driver_options if driver_options else {},
}
}
if share_network_subnet_id:
body['share_server']['share_network_subnet_id'] = (
share_network_subnet_id)
body = json.dumps(body)
resp, body = self.post('share-servers/manage', body,
@ -1975,3 +1986,42 @@ class SharesV2Client(shares_client.SharesClient):
resp, body = self.get(uri, version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
###############
def create_subnet(
self, share_network_id, availability_zone=None,
neutron_net_id=None, neutron_subnet_id=None):
body = {'share_network_id': share_network_id}
if availability_zone:
body['availability_zone'] = availability_zone
if neutron_net_id:
body['neutron_net_id'] = neutron_net_id
if neutron_subnet_id:
body['neutron_subnet_id'] = neutron_subnet_id
body = json.dumps({"share-network-subnet": body})
url = '/share-networks/%s/subnets' % share_network_id
resp, body = self.post(url, body, version=LATEST_MICROVERSION)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def get_subnet(self, share_network_subnet_id, share_network_id):
url = ('share-networks/%(network)s/subnets/%(subnet)s' % {
'network': share_network_id,
'subnet': share_network_subnet_id}
)
resp, body = self.get(url)
self.expected_success(200, resp.status)
return self._parse_resp(body)
def delete_subnet(self, share_network_id, share_network_subnet_id):
url = ('share-networks/%(network)s/subnets/%(subnet)s' % {
'network': share_network_id,
'subnet': share_network_subnet_id}
)
resp, body = self.delete(url)
self.expected_success(202, resp.status)
return body
###############

View File

@ -195,11 +195,14 @@ class MigrationBase(base.BaseSharesAdminTest):
old_share_network = self.shares_v2_client.get_share_network(
old_share_network_id)
share_net_info = (
utils.share_network_get_default_subnet(old_share_network)
if utils.share_network_subnets_are_supported()
else old_share_network)
new_share_network = self.create_share_network(
cleanup_in_class=True,
neutron_net_id=old_share_network['neutron_net_id'],
neutron_subnet_id=old_share_network['neutron_subnet_id'])
neutron_net_id=share_net_info['neutron_net_id'],
neutron_subnet_id=share_net_info['neutron_subnet_id'])
return new_share_network['id']

View File

@ -53,6 +53,10 @@ class ShareServersAdminTest(base.BaseSharesAdminTest):
cls.share_network["name"],
cls.share_network["id"],
]
cls.share_net_info = (
utils.share_network_get_default_subnet(cls.share_network)
if utils.share_network_subnets_are_supported()
else cls.share_network)
# Date should be like '2014-13-12T11:10:09.000000'
cls.date_re = re.compile("^([0-9]{4}-[0-9]{2}-[0-9]{2}[A-Z]{1}"
@ -70,6 +74,7 @@ class ShareServersAdminTest(base.BaseSharesAdminTest):
"updated_at",
"project_id",
]
for server in servers:
# All expected keys are present
for key in keys:
@ -171,6 +176,8 @@ class ShareServersAdminTest(base.BaseSharesAdminTest):
if utils.is_microversion_ge(CONF.share.max_api_microversion, "2.49"):
keys.append("is_auto_deletable")
keys.append("identifier")
if utils.is_microversion_ge(CONF.share.max_api_microversion, "2.51"):
keys.append("share_network_subnet_id")
# all expected keys are present
for key in keys:
self.assertIn(key, server.keys())
@ -211,8 +218,8 @@ class ShareServersAdminTest(base.BaseSharesAdminTest):
# TODO(vponomaryov): attach security-services too. If any exist from
# donor share-network.
new_sn = self.create_share_network(
neutron_net_id=self.share_network['neutron_net_id'],
neutron_subnet_id=self.share_network['neutron_subnet_id'])
neutron_net_id=self.share_net_info['neutron_net_id'],
neutron_subnet_id=self.share_net_info['neutron_subnet_id'])
# Create server with share
self.create_share(share_type_id=self.share_type_id,
@ -274,8 +281,8 @@ class ShareServersAdminTest(base.BaseSharesAdminTest):
# Get network and subnet from existing share_network and reuse it
# to be able to delete share_server after test ends.
new_sn = self.create_share_network(
neutron_net_id=self.share_network['neutron_net_id'],
neutron_subnet_id=self.share_network['neutron_subnet_id'])
neutron_net_id=self.share_net_info['neutron_net_id'],
neutron_subnet_id=self.share_net_info['neutron_subnet_id'])
share = self.create_share(
share_type_id=self.share_type_id,
share_network_id=new_sn['id']

View File

@ -13,12 +13,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import ddt
from tempest import config
from tempest.lib.common.utils import data_utils
import testtools
from testtools import testcase as tc
from manila_tempest_tests.tests.api import base
from manila_tempest_tests import utils
CONF = config.CONF
@ -30,6 +32,7 @@ CONF = config.CONF
@testtools.skipUnless(
CONF.share.run_manage_unmanage_tests,
'Manage/unmanage tests are disabled.')
@ddt.ddt
class ManageShareServersTest(base.BaseSharesAdminTest):
@classmethod
@ -51,23 +54,44 @@ class ManageShareServersTest(base.BaseSharesAdminTest):
"This test is not suitable for pre-existing "
"share_network.")
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
def test_manage_share_server(self):
@ddt.data(True, False)
def test_manage_share_server(self, add_subnet_field):
# Starting from v2.51 share network spans to multiple subnets.
if add_subnet_field and not utils.is_microversion_supported('2.51'):
msg = ("Manage share server with share network subnet is "
"supported starting from microversion '2.51'.")
raise self.skipException(msg)
# create a new share network to make sure that a new share server
# will be created
original_share_network = self.shares_v2_client.get_share_network(
self.shares_v2_client.share_network_id
)
share_net_info = (
utils.share_network_get_default_subnet(original_share_network)
if utils.share_network_subnets_are_supported()
else original_share_network)
share_network = self.create_share_network(
neutron_net_id=original_share_network['neutron_net_id'],
neutron_subnet_id=original_share_network['neutron_subnet_id'],
neutron_net_id=share_net_info['neutron_net_id'],
neutron_subnet_id=share_net_info['neutron_subnet_id'],
cleanup_in_class=True
)
az = params = None
if add_subnet_field:
# Get a compatible availability zone
az = self.get_availability_zones_matching_share_type(
self.share_type['share_type'])[0]
az_subnet = self.shares_v2_client.create_subnet(
share_network['id'],
neutron_net_id=share_network['neutron_net_id'],
neutron_subnet_id=share_network['neutron_subnet_id'],
availability_zone=az
)
params = {'share_network_subnet_id': az_subnet['id']}
# create share
share = self.create_share(
share_type_id=self.share_type['share_type']['id'],
share_network_id=share_network['id']
share_network_id=share_network['id'], availability_zone=az
)
share = self.shares_v2_client.get_share(share['id'])
el = self.shares_v2_client.list_share_export_locations(share['id'])
@ -88,6 +112,8 @@ class ManageShareServersTest(base.BaseSharesAdminTest):
"is_auto_deletable",
"identifier",
]
if add_subnet_field:
keys.append('share_network_subnet_id')
# all expected keys are present
for key in keys:
self.assertIn(key, share_server)
@ -95,6 +121,9 @@ class ManageShareServersTest(base.BaseSharesAdminTest):
# check that the share server is initially auto-deletable
self.assertIs(True, share_server["is_auto_deletable"])
self.assertIsNotNone(share_server["identifier"])
if add_subnet_field:
self.assertEqual(az_subnet["id"],
share_server["share_network_subnet_id"])
self._unmanage_share_and_wait(share)
@ -107,7 +136,8 @@ class ManageShareServersTest(base.BaseSharesAdminTest):
# unmanage share server and manage it again
self._unmanage_share_server_and_wait(share_server)
managed_share_server = self._manage_share_server(share_server)
managed_share_server = self._manage_share_server(share_server,
fields=params)
managed_share = self._manage_share(
share,
name="managed share that had ID %s" % share['id'],
@ -138,3 +168,8 @@ class ManageShareServersTest(base.BaseSharesAdminTest):
# delete share server
self._delete_share_server_and_wait(managed_share_server['id'])
if add_subnet_field:
# delete the created subnet
self.shares_v2_client.delete_subnet(share_network['id'],
az_subnet['id'])

View File

@ -23,6 +23,7 @@ from testtools import testcase as tc
from manila_tempest_tests.common import constants
from manila_tempest_tests import share_exceptions
from manila_tempest_tests.tests.api import base
from manila_tempest_tests import utils
CONF = config.CONF
@ -53,11 +54,15 @@ class ManageShareServersNegativeTest(base.BaseSharesAdminTest):
extra_specs=cls.extra_specs)
cls.original_share_network = cls.shares_v2_client.get_share_network(
cls.shares_v2_client.share_network_id)
cls.share_net_info = (
utils.share_network_get_default_subnet(cls.original_share_network)
if utils.share_network_subnets_are_supported() else
cls.original_share_network)
def _create_share_with_new_share_network(self):
share_network = self.create_share_network(
neutron_net_id=self.original_share_network['neutron_net_id'],
neutron_subnet_id=self.original_share_network['neutron_subnet_id'],
neutron_net_id=self.share_net_info['neutron_net_id'],
neutron_subnet_id=self.share_net_info['neutron_subnet_id'],
cleanup_in_class=True
)
share = self.create_share(
@ -69,6 +74,7 @@ class ManageShareServersNegativeTest(base.BaseSharesAdminTest):
@ddt.data(
('host', 'invalid_host'),
('share_network_id', 'invalid_share_network_id'),
('share_network_subnet_id', 'invalid_share_network_subnet_id'),
)
@ddt.unpack
@testtools.skipIf(CONF.share.share_network_id != "",

View File

@ -361,8 +361,13 @@ class BaseSharesTest(test.BaseTestCase):
# Try get suitable share-network
share_networks = sc.list_share_networks_with_detail()
for sn in share_networks:
if (sn["neutron_net_id"] is None and
sn["neutron_subnet_id"] is None and
net_info = (
utils.share_network_get_default_subnet(sn)
if utils.share_network_subnets_are_supported() else sn)
if net_info is None:
continue
if(net_info["neutron_net_id"] is None and
net_info["neutron_subnet_id"] is None and
sn["name"] and search_word in sn["name"]):
share_network_id = sn["id"]
break
@ -401,8 +406,14 @@ class BaseSharesTest(test.BaseTestCase):
# Try get suitable share-network
share_networks = sc.list_share_networks_with_detail()
for sn in share_networks:
if (net_id == sn["neutron_net_id"] and
subnet_id == sn["neutron_subnet_id"] and
net_info = (
utils.share_network_get_default_subnet(sn)
if utils.share_network_subnets_are_supported()
else sn)
if net_info is None:
continue
if (net_id == net_info["neutron_net_id"] and
subnet_id == net_info["neutron_subnet_id"] and
sn["name"] and search_word in sn["name"]):
share_network_id = sn["id"]
break
@ -836,6 +847,26 @@ class BaseSharesTest(test.BaseTestCase):
cls.method_resources.insert(0, resource)
return share_network
@classmethod
def create_share_network_subnet(cls, client=None,
cleanup_in_class=False, **kwargs):
if client is None:
client = cls.shares_v2_client
share_network_subnet = client.create_subnet(**kwargs)
resource = {
"type": "share-network-subnet",
"id": share_network_subnet["id"],
"extra_params": {
"share_network_id": share_network_subnet["share_network_id"]
},
"client": client,
}
if cleanup_in_class:
cls.class_resources.insert(0, resource)
else:
cls.method_resources.insert(0, resource)
return share_network_subnet
@classmethod
def create_security_service(cls, ss_type="ldap", client=None,
cleanup_in_class=False, **kwargs):
@ -990,6 +1021,12 @@ class BaseSharesTest(test.BaseTestCase):
elif res["type"] is "share_replica":
client.delete_share_replica(res_id)
client.wait_for_resource_deletion(replica_id=res_id)
elif res["type"] is "share_network_subnet":
sn_id = res["extra_params"]["share_network_id"]
client.delete_subnet(sn_id, res_id)
client.wait_for_resource_deletion(
share_network_subnet_id=res_id,
sn_id=sn_id)
else:
LOG.warning("Provided unsupported resource type for "
"cleanup '%s'. Skipping.", res["type"])
@ -1005,6 +1042,14 @@ class BaseSharesTest(test.BaseTestCase):
}
return data
@classmethod
def generate_subnet_data(self):
data = {
"neutron_net_id": data_utils.rand_name("net-id"),
"neutron_subnet_id": data_utils.rand_name("subnet-id"),
}
return data
@classmethod
def generate_security_service_data(self, set_ou=False):
data = {
@ -1192,10 +1237,12 @@ class BaseSharesAdminTest(BaseSharesTest):
def _manage_share_server(self, share_server, fields=None):
params = fields or {}
subnet_id = params.get('share_network_subnet_id', None)
managed_share_server = self.shares_v2_client.manage_share_server(
params.get('host', share_server['host']),
params.get('share_network_id', share_server['share_network_id']),
params.get('identifier', share_server['identifier']),
share_network_subnet_id=subnet_id,
)
self.shares_v2_client.wait_for_share_server_status(
managed_share_server['id'],

View File

@ -21,6 +21,7 @@ import testtools
from testtools import testcase as tc
from manila_tempest_tests.tests.api import base
from manila_tempest_tests import utils
CONF = config.CONF
LOG = log.getLogger(__name__)
@ -32,6 +33,9 @@ class SecServicesMappingNegativeTest(base.BaseSharesMixedTest):
def resource_setup(cls):
super(SecServicesMappingNegativeTest, cls).resource_setup()
cls.sn = cls.create_share_network(cleanup_in_class=True)
cls.share_net_info = (
utils.share_network_get_default_subnet(cls.sn)
if utils.share_network_subnets_are_supported() else cls.sn)
cls.ss = cls.create_security_service(cleanup_in_class=True)
cls.cl = cls.shares_client
# create share type

View File

@ -0,0 +1,256 @@
# Copyright 2019 NetApp Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
from tempest import config
import testtools
from testtools import testcase as tc
from manila_tempest_tests.tests.api import base
from manila_tempest_tests import utils
CONF = config.CONF
@base.skip_if_microversion_lt("2.51")
@ddt.ddt
class ShareNetworkSubnetsTest(base.BaseSharesMixedTest):
@classmethod
def resource_setup(cls):
super(ShareNetworkSubnetsTest, cls).resource_setup()
# create share_type
cls.extra_specs = {
'driver_handles_share_servers': CONF.share.multitenancy_enabled,
}
cls.share_type = cls._create_share_type(specs=cls.extra_specs)
cls.share_type_id = cls.share_type['id']
# create share_network
cls.share_network = cls.create_share_network()
cls.share_network_id = cls.share_network['id']
@tc.attr(base.TAG_POSITIVE, base.TAG_API)
def test_create_delete_subnet(self):
share_network = self.shares_v2_client.create_share_network()
share_network = self.shares_v2_client.get_share_network(
share_network['id']
)
default_subnet = share_network['share_network_subnets'][0]
az = self.shares_v2_client.list_availability_zones()[0]
az_name = az['name']
# Generate subnet data
data = self.generate_subnet_data()
data['share_network_id'] = share_network['id']
data['availability_zone'] = az_name
# create a new share network subnet
created = self.create_share_network_subnet(**data)
data['share_network_name'] = share_network['name']
# verify keys
keys = [
"share_network_name", "id", "network_type", "cidr",
"ip_version", "neutron_net_id", "neutron_subnet_id", "created_at",
"updated_at", "segmentation_id", "availability_zone", "gateway",
"share_network_id", "mtu"
]
# Default subnet was created during share network creation
self.assertIsNone(default_subnet['availability_zone'])
# Match new subnet content
self.assertDictContainsSubset(data, created)
self.assertEqual(sorted(keys), sorted(list(created.keys())))
# Delete the subnets
self.shares_v2_client.delete_subnet(share_network['id'], created['id'])
@tc.attr(base.TAG_POSITIVE, base.TAG_API)
def test_show_share_network_subnet(self):
share_network = self.create_share_network()
az = self.shares_v2_client.list_availability_zones()[0]
az_name = az['name']
# Generate subnet data
data = self.generate_subnet_data()
data['share_network_id'] = share_network['id']
data['availability_zone'] = az_name
# Create the share network subnet
created = self.create_share_network_subnet(**data)
# Shows the share network subnet
shown = self.shares_v2_client.get_subnet(created['id'],
share_network['id'])
# Asserts
self.assertDictContainsSubset(data, shown)
# Deletes the created subnet
self.shares_v2_client.delete_subnet(share_network['id'],
created['id'])
@tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
@testtools.skipIf(
not CONF.share.multitenancy_enabled, "Only for multitenancy.")
@testtools.skipIf(CONF.share.share_network_id != "",
"This test is not suitable for pre-existing "
"share_network.")
def test_create_share_on_subnet_with_availability_zone(self):
compatible_azs = self.get_availability_zones_matching_share_type(
self.share_type)
if len(compatible_azs) < 2:
msg = ("This test needs at least two compatible storage "
"availability zones.")
raise self.skipException(msg)
original_share_network = self.shares_v2_client.get_share_network(
self.shares_v2_client.share_network_id
)
share_net_info = (
utils.share_network_get_default_subnet(original_share_network))
share_network = self.create_share_network(
neutron_net_id=share_net_info['neutron_net_id'],
neutron_subnet_id=share_net_info['neutron_subnet_id'],
)
share_network = self.shares_v2_client.get_share_network(
share_network['id']
)
default_subnet = share_network['share_network_subnets'][0]
availability_zone = compatible_azs[0]
data = {
"neutron_net_id": share_net_info['neutron_net_id'],
"neutron_subnet_id": share_net_info['neutron_subnet_id'],
'share_network_id': share_network['id'],
'availability_zone': availability_zone,
}
# Create a new share network subnet
subnet = self.create_share_network_subnet(**data)
# Create a new share in the select availability zone
# The 'status' of the share returned by the create API must be
share = self.create_share(
share_type_id=self.share_type_id,
share_network_id=share_network['id'],
availability_zone=availability_zone)
# Set and have value either 'creating' or
# 'available' (if share creation is really fast as in
# case of Dummy driver).
self.assertIn(share['status'], ('creating', 'available'))
share = self.admin_shares_v2_client.get_share(share['id'])
share_server = self.admin_shares_v2_client.show_share_server(
share['share_server_id']
)
# Default subnet was created during share network creation
self.assertIsNone(default_subnet['availability_zone'])
# Match new subnet content
self.assertDictContainsSubset(data, subnet)
# Match share server subnet
self.assertEqual(subnet['id'],
share_server['share_network_subnet_id'])
# Delete share
self.shares_v2_client.delete_share(share['id'])
self.shares_v2_client.wait_for_resource_deletion(share_id=share['id'])
# Delete the subnets
self.shares_v2_client.delete_subnet(share_network['id'], subnet['id'])
@tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
@testtools.skipIf(
not CONF.share.multitenancy_enabled, "Only for multitenancy.")
@testtools.skipIf(CONF.share.share_network_id != "",
"This test is not suitable for pre-existing "
"share_network.")
@ddt.data(True, False)
def test_create_share_on_share_network_with_multiple_subnets(
self, create_share_with_az):
compatible_azs = self.get_availability_zones_matching_share_type(
self.share_type)
if len(compatible_azs) < 2:
msg = ("This test needs at least two compatible storage "
"availability zones.")
raise self.skipException(msg)
original_share_network = self.shares_v2_client.get_share_network(
self.shares_v2_client.share_network_id
)
share_net_info = (
utils.share_network_get_default_subnet(original_share_network))
share_network = self.create_share_network(
neutron_net_id=share_net_info['neutron_net_id'],
neutron_subnet_id=share_net_info['neutron_subnet_id'],
)
share_network = self.shares_v2_client.get_share_network(
share_network['id']
)
default_subnet = share_network['share_network_subnets'][0]
# Save one availability zone to remain associated with default subnet
destination_az = compatible_azs.pop()
if not create_share_with_az:
destination_az = None
new_subnets = []
data = {
"neutron_net_id": share_net_info['neutron_net_id'],
"neutron_subnet_id": share_net_info['neutron_subnet_id'],
'share_network_id': share_network['id'],
}
for availability_zone in compatible_azs:
# update availability zone
data['availability_zone'] = availability_zone
# create a new share network subnet
subnet = self.create_share_network_subnet(**data)
new_subnets.append(subnet)
# Create a new share in the selected availability zone
share = self.create_share(
share_type_id=self.share_type_id,
share_network_id=share_network['id'],
availability_zone=destination_az)
# The 'status' of the share returned by the create API must be
# set and have value either 'creating' or 'available' (if share
# creation is really fast as in case of Dummy driver).
self.assertIn(share['status'], ('creating', 'available'))
share = self.admin_shares_v2_client.get_share(share['id'])
share_server = self.admin_shares_v2_client.show_share_server(
share['share_server_id']
)
# If no availability zone was provided during share creation, it is
# expected that the Scheduler selects one of the compatible backends to
# place the share. The destination availability zone may or may not
# have an specific share network subnet.
expected_subnet_id = (
next((subnet['id'] for subnet in new_subnets
if subnet['availability_zone'] == share['availability_zone']),
default_subnet['id']))
# Default subnet was created during share network creation
self.assertIsNone(default_subnet['availability_zone'])
# Match share server subnet
self.assertEqual(expected_subnet_id,
share_server['share_network_subnet_id'])
if create_share_with_az:
self.assertEqual(destination_az,
share['availability_zone'])
# Delete share
self.shares_v2_client.delete_share(share['id'])
self.shares_v2_client.wait_for_resource_deletion(share_id=share['id'])
# Delete the subnets
for subnet in new_subnets:
self.shares_v2_client.delete_subnet(share_network['id'],
subnet['id'])

View File

@ -0,0 +1,270 @@
# Copyright 2019 NetApp Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
from tempest import config
from tempest.lib import exceptions as lib_exc
import testtools
from testtools import testcase as tc
from manila_tempest_tests.common import constants
from manila_tempest_tests.tests.api import base
from manila_tempest_tests import utils
CONF = config.CONF
@base.skip_if_microversion_lt("2.51")
@ddt.ddt
class ShareNetworkSubnetsNegativeTest(base.BaseSharesAdminTest):
@classmethod
def resource_setup(cls):
super(ShareNetworkSubnetsNegativeTest, cls).resource_setup()
# Create a new share network which will be used in the tests
cls.share_network = cls.shares_v2_client.create_share_network(
cleanup_in_class=True)
cls.share_network_id = cls.share_network['id']
cls.share_type = cls._create_share_type()
cls.az = cls.shares_v2_client.list_availability_zones()[0]
cls.az_name = cls.az['name']
@tc.attr(base.TAG_NEGATIVE, base.TAG_API)
def test_add_share_network_subnet_share_network_not_found(self):
data = self.generate_subnet_data()
self.assertRaises(lib_exc.NotFound,
self.shares_v2_client.create_subnet,
'fake_inexistent_id',
**data)
@tc.attr(base.TAG_NEGATIVE, base.TAG_API)
def test_add_share_network_subnet_az_not_found(self):
data = {'availability_zone': 'non-existent-az'}
self.assertRaises(lib_exc.BadRequest,
self.shares_v2_client.create_subnet,
self.share_network_id, **data)
@tc.attr(base.TAG_NEGATIVE, base.TAG_API)
@ddt.data(True, False)
def test_add_share_network_subnet_in_same_az_exists(self, is_default):
share_network = self.shares_v2_client.create_share_network()
data = {}
if not is_default:
azs = self.get_availability_zones_matching_share_type(
self.share_type)
data['availability_zone'] = azs[0]
self.shares_v2_client.create_subnet(
share_network['id'], **data)
self.assertRaises(lib_exc.Conflict,
self.shares_v2_client.create_subnet,
share_network['id'], **data)
@tc.attr(base.TAG_NEGATIVE, base.TAG_API)
def test_add_share_network_subnet_missing_parameters(self):
# Generate subnet data
data = self.generate_subnet_data()
data['availability_zone'] = self.az_name
data.pop('neutron_net_id')
self.assertRaises(lib_exc.BadRequest,
self.shares_v2_client.create_subnet,
self.share_network_id, **data)
@tc.attr(base.TAG_NEGATIVE, base.TAG_API)
def test_show_subnet_share_network_not_found(self):
self.assertRaises(lib_exc.NotFound,
self.shares_v2_client.get_subnet,
'fake-subnet',
'fake-sn')
@tc.attr(base.TAG_NEGATIVE, base.TAG_API)
def test_show_subnet_not_found(self):
self.assertRaises(lib_exc.NotFound,
self.shares_v2_client.get_subnet,
'fake-subnet',
self.share_network_id)
@tc.attr(base.TAG_NEGATIVE, base.TAG_API)
def test_get_deleted_subnet(self):
# Generate subnet data
data = self.generate_subnet_data()
data['share_network_id'] = self.share_network_id
az = self.shares_v2_client.list_availability_zones()[0]
data['availability_zone'] = az['name']
subnet = self.create_share_network_subnet(**data)
# Make sure that the created subnet contains the data
self.assertDictContainsSubset(data, subnet)
# Delete the given subnet
self.shares_v2_client.delete_subnet(self.share_network_id,
subnet['id'])
share_network = self.shares_v2_client.get_share_network(
self.share_network_id
)
self.assertIsNotNone(share_network)
self.assertRaises(lib_exc.NotFound,
self.shares_v2_client.get_subnet,
subnet['id'],
self.share_network['id'])
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
@testtools.skipIf(not CONF.share.multitenancy_enabled,
'Can run only with drivers that do handle share servers '
'creation. Skipping.')
@testtools.skipIf(not CONF.share.run_manage_unmanage_tests,
'Can run only with manage/unmanage tests enabled.')
def test_delete_contains_unmanaged_share_servers(self):
# Get a compatible availability zone
az = self.get_availability_zones_matching_share_type(
self.share_type)[0]
share_network = self.shares_v2_client.get_share_network(
self.shares_v2_client.share_network_id
)
share_network_id = share_network['id']
subnet = utils.share_network_get_default_subnet(share_network)
# Generate subnet data
data = {'neutron_net_id': subnet['neutron_net_id'],
'neutron_subnet_id': subnet['neutron_subnet_id'],
'share_network_id': share_network_id,
'availability_zone': az}
# Create a new subnet in the desired az
subnet = self.create_share_network_subnet(**data)
args = {'share_network_id': share_network_id,
'share_type_id': self.share_type['id'],
'availability_zone': az}
# Create a share into the share network
share = self.shares_v2_client.create_share(**args)
self.shares_v2_client.wait_for_share_status(
share['id'], constants.STATUS_AVAILABLE)
share = self.shares_v2_client.get_share(share['id'])
# Gets the export locations to be used in the future
el = self.shares_v2_client.list_share_export_locations(share['id'])
share['export_locations'] = el
# Unmanages the share to make the share server become is_auto
# deletable=False
self._unmanage_share_and_wait(share)
# Assert that the user cannot delete a subnet that contains share
# servers which may have unmanaged stuff
self.assertRaises(lib_exc.Conflict,
self.shares_v2_client.delete_subnet,
share_network_id,
subnet['id'])
# Manages the share again to start cleaning up the test stuff
managed_share = self.shares_v2_client.manage_share(
service_host=share['host'],
export_path=share['export_locations'][0],
protocol=share['share_proto'],
share_type_id=self.share_type['id'],
name='share_to_be_deleted',
description='share managed to be deleted',
share_server_id=share['share_server_id']
)
# Do some necessary cleanup
self.shares_v2_client.wait_for_share_status(
managed_share['id'], constants.STATUS_AVAILABLE)
self.shares_client.delete_share(managed_share['id'])
self.shares_v2_client.wait_for_resource_deletion(
share_id=managed_share["id"])
self._delete_share_server_and_wait(share['share_server_id'])
self.shares_v2_client.delete_subnet(share_network_id,
subnet['id'])
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
@testtools.skipIf(not CONF.share.multitenancy_enabled,
'Can run only with drivers that do handle share servers '
'creation. Skipping.')
def test_delete_contains_shares(self):
# Get a compatible availability zone
az = self.get_availability_zones_matching_share_type(
self.share_type)[0]
share_network = self.shares_v2_client.get_share_network(
self.shares_v2_client.share_network_id
)
share_network_id = share_network['id']
subnet = utils.share_network_get_default_subnet(share_network)
# Generate subnet data
data = {'neutron_net_id': subnet['neutron_net_id'],
'neutron_subnet_id': subnet['neutron_subnet_id'],
'share_network_id': share_network_id,
'availability_zone': az}
# Create a new subnet in the desired az
subnet = self.create_share_network_subnet(**data)
args = {'share_network_id': share_network_id,
'share_type_id': self.share_type['id'],
'availability_zone': az}
# Create a share into the share network
share = self.shares_v2_client.create_share(**args)
self.shares_v2_client.wait_for_share_status(
share['id'], constants.STATUS_AVAILABLE)
share = self.admin_shares_v2_client.get_share(share['id'])
share_server = self.admin_shares_v2_client.show_share_server(
share['share_server_id']
)
# Match share server subnet
self.assertEqual(subnet['id'],
share_server['share_network_subnet_id'])
# Assert that the user cannot delete a subnet that contain shares
self.assertRaises(lib_exc.Conflict,
self.shares_v2_client.delete_subnet,
share_network_id,
subnet['id'])
# Assert that the user cannot delete a share-network that contain
# shares
self.assertRaises(lib_exc.Conflict,
self.shares_v2_client.delete_share_network,
share_network_id)
# Cleanups
self.shares_client.delete_share(share['id'])
self.shares_v2_client.wait_for_resource_deletion(share_id=share["id"])
self._delete_share_server_and_wait(share['share_server_id'])
self.shares_v2_client.delete_subnet(share_network_id,
subnet['id'])
@tc.attr(base.TAG_NEGATIVE, base.TAG_API)
def test_delete_subnet_share_network_not_found(self):
self.assertRaises(lib_exc.NotFound,
self.shares_v2_client.delete_subnet,
'fake-sn',
'fake-subnet')
@tc.attr(base.TAG_NEGATIVE, base.TAG_API)
def test_delete_subnet_not_found(self):
self.assertRaises(lib_exc.NotFound,
self.shares_v2_client.delete_subnet,
self.share_network_id,
'fake-subnet')

View File

@ -76,6 +76,19 @@ class ShareNetworkListMixin(object):
if utils.is_microversion_supported('2.20'):
keys.append('mtu')
# In v2.51 and beyond, share-network does not have
# network parameters anymore.
if utils.is_microversion_supported('2.51'):
subnet_keys = [
"network_type", "cidr", "ip_version", "neutron_net_id",
"neutron_subnet_id", "segmentation_id", "gateway", "mtu"
]
keys = list(set(keys) - set(subnet_keys))
keys.append('share_network_subnets')
for sn in listed:
[self.assertIn(key, list(subnet.keys())) for key in subnet_keys
for subnet in sn['share_network_subnets']]
[self.assertIn(key, sn.keys()) for sn in listed for key in keys]
@tc.attr(base.TAG_POSITIVE, base.TAG_API)
@ -279,11 +292,14 @@ class ShareNetworksTest(base.BaseSharesMixedTest, ShareNetworkListMixin):
cleanup_in_class=False)
share_net_details = self.shares_v2_client.get_share_network(
self.shares_v2_client.share_network_id)
share_net_info = (
utils.share_network_get_default_subnet(share_net_details)
if utils.share_network_subnets_are_supported()
else share_net_details)
subnet_details = subnet_client.show_subnet(
share_net_details['neutron_subnet_id'])
share_net_info['neutron_subnet_id'])
self.assertEqual(subnet_details['subnet']['gateway_ip'],
share_net_details['gateway'])
share_net_info['gateway'])
@testtools.skipUnless(CONF.share.create_networks_when_multitenancy_enabled,
"Only for setups with network creation.")
@ -299,8 +315,12 @@ class ShareNetworksTest(base.BaseSharesMixedTest, ShareNetworkListMixin):
cleanup_in_class=False)
share_net_details = self.shares_v2_client.get_share_network(
self.shares_v2_client.share_network_id)
share_net_info = (
utils.share_network_get_default_subnet(share_net_details)
if utils.share_network_subnets_are_supported()
else share_net_details)
network_details = network_client.show_network(
share_net_details['neutron_net_id'])
share_net_info['neutron_net_id'])
self.assertEqual(network_details['network']['mtu'],
share_net_details['mtu'])
share_net_info['mtu'])

View File

@ -141,3 +141,40 @@ class ShareNetworksNegativeTest(base.BaseSharesMixedTest):
params=filters))
self.assertEqual(0, len(share_networks))
@base.skip_if_microversion_lt("2.51")
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
def test_delete_share_network_contains_more_than_one_subnet(self):
share_network = self.create_share_network()
az = self.shares_v2_client.list_availability_zones()[0]
az_name = az['name']
# Generate subnet data
data = self.generate_subnet_data()
data['share_network_id'] = share_network['id']
data['availability_zone'] = az_name
# create share network
subnet = self.create_share_network_subnet(**data)
# Try to delete the share network
self.assertRaises(
lib_exc.Conflict,
self.shares_client.delete_share_network,
share_network['id']
)
self.shares_v2_client.delete_subnet(share_network['id'], subnet['id'])
share_network = self.shares_v2_client.get_share_network(
share_network['id'])
default_subnet = share_network['share_network_subnets'][0]
self.assertIsNone(default_subnet['availability_zone'])
@base.skip_if_microversion_lt("2.51")
@tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
def test_create_share_network_inexistent_az(self):
self.assertRaises(
lib_exc.BadRequest,
self.shares_v2_client.create_share_network,
availability_zone='inexistent-availability-zone',
)

View File

@ -22,6 +22,7 @@ from tempest import config
import testtools
CONF = config.CONF
SHARE_NETWORK_SUBNETS_MICROVERSION = '2.51'
def get_microversion_as_tuple(microversion_str):
@ -177,3 +178,13 @@ def skip_if_manage_not_supported_for_version(
raise testtools.TestCase.skipException(
"Share manage tests with multitenancy are disabled for "
"microversion < 2.49")
def share_network_subnets_are_supported():
return is_microversion_supported(SHARE_NETWORK_SUBNETS_MICROVERSION)
def share_network_get_default_subnet(share_network):
return next((
subnet for subnet in share_network.get('share_network_subnets', [])
if subnet['availability_zone'] is None), None)