292 lines
10 KiB
Python
292 lines
10 KiB
Python
# Copyright 2015 Mirantis Inc.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from collections import OrderedDict
|
|
import random
|
|
import re
|
|
|
|
from netaddr import ip
|
|
from tempest import config
|
|
from tempest.lib.common.utils import data_utils
|
|
import testtools
|
|
|
|
from manila_tempest_tests import utils
|
|
|
|
CONF = config.CONF
|
|
SHARE_NETWORK_SUBNETS_MICROVERSION = '2.51'
|
|
SHARE_REPLICA_QUOTAS_MICROVERSION = "2.53"
|
|
EXPERIMENTAL = {'X-OpenStack-Manila-API-Experimental': 'True'}
|
|
|
|
|
|
def deduplicate(items):
|
|
"""De-duplicate a list of items while preserving the order.
|
|
|
|
It is useful when passing a list of items to ddt.data, in order
|
|
to remove duplicated elements which may be specified as constants.
|
|
"""
|
|
return list(OrderedDict.fromkeys(items))
|
|
|
|
|
|
def get_microversion_as_tuple(microversion_str):
|
|
"""Transforms string-like microversion to two-value tuple of integers.
|
|
|
|
Tuple of integers useful for microversion comparisons.
|
|
"""
|
|
regex = r"^([1-9]\d*)\.([1-9]\d*|0)$"
|
|
match = re.match(regex, microversion_str)
|
|
if not match:
|
|
raise ValueError(
|
|
"Microversion does not fit template 'x.y' - %s" % microversion_str)
|
|
return int(match.group(1)), int(match.group(2))
|
|
|
|
|
|
def is_microversion_gt(left, right):
|
|
"""Is microversion for left is greater than the right one."""
|
|
return get_microversion_as_tuple(left) > get_microversion_as_tuple(right)
|
|
|
|
|
|
def is_microversion_ge(left, right):
|
|
"""Is microversion for left is greater than or equal to the right one."""
|
|
return get_microversion_as_tuple(left) >= get_microversion_as_tuple(right)
|
|
|
|
|
|
def is_microversion_eq(left, right):
|
|
"""Is microversion for left is equal to the right one."""
|
|
return get_microversion_as_tuple(left) == get_microversion_as_tuple(right)
|
|
|
|
|
|
def is_microversion_ne(left, right):
|
|
"""Is microversion for left is not equal to the right one."""
|
|
return get_microversion_as_tuple(left) != get_microversion_as_tuple(right)
|
|
|
|
|
|
def is_microversion_le(left, right):
|
|
"""Is microversion for left is less than or equal to the right one."""
|
|
return get_microversion_as_tuple(left) <= get_microversion_as_tuple(right)
|
|
|
|
|
|
def is_microversion_lt(left, right):
|
|
"""Is microversion for left is less than the right one."""
|
|
return get_microversion_as_tuple(left) < get_microversion_as_tuple(right)
|
|
|
|
|
|
def is_microversion_supported(microversion):
|
|
bottom = get_microversion_as_tuple(CONF.share.min_api_microversion)
|
|
microversion = get_microversion_as_tuple(microversion)
|
|
top = get_microversion_as_tuple(CONF.share.max_api_microversion)
|
|
return bottom <= microversion <= top
|
|
|
|
|
|
def skip_if_microversion_not_supported(microversion):
|
|
"""Decorator for tests that are microversion-specific."""
|
|
if not is_microversion_supported(microversion):
|
|
reason = ("Skipped. Test requires microversion '%s'." % microversion)
|
|
return testtools.skip(reason)
|
|
return lambda f: f
|
|
|
|
|
|
def skip_if_is_microversion_ge(left, right):
|
|
"""Skip if version for left is greater than or equal to the right one."""
|
|
|
|
if is_microversion_ge(left, right):
|
|
reason = ("Skipped. Test requires microversion "
|
|
"< than '%s'." % right)
|
|
return testtools.skip(reason)
|
|
return lambda f: f
|
|
|
|
|
|
def check_skip_if_microversion_not_supported(microversion):
|
|
"""Callable method for tests that are microversion-specific."""
|
|
if not is_microversion_supported(microversion):
|
|
reason = ("Skipped. Test requires microversion '%s'." % microversion)
|
|
raise testtools.TestCase.skipException(reason)
|
|
|
|
|
|
def rand_ip(network=False):
|
|
"""This uses the TEST-NET-3 range of reserved IP addresses.
|
|
|
|
Using this range, which are reserved solely for use in
|
|
documentation and example source code, should avoid any potential
|
|
conflicts in real-world testing.
|
|
"""
|
|
test_net_3 = '203.0.113.'
|
|
address = test_net_3 + str(random.randint(0, 255))
|
|
if network:
|
|
mask_length = str(random.randint(24, 32))
|
|
address = '/'.join((address, mask_length))
|
|
ip_network = ip.IPNetwork(address)
|
|
return '/'.join((str(ip_network.network), mask_length))
|
|
return address
|
|
|
|
|
|
def rand_ipv6_ip(network=False):
|
|
"""This uses the IPv6 documentation range of 2001:DB8::/32"""
|
|
ran_add = ["%x" % random.randrange(0, 16 ** 4) for i in range(6)]
|
|
address = "2001:0DB8:" + ":".join(ran_add)
|
|
if network:
|
|
mask_length = str(random.randint(32, 128))
|
|
address = '/'.join((address, mask_length))
|
|
ip_network = ip.IPNetwork(address)
|
|
return '/'.join((str(ip_network.network), mask_length))
|
|
return address
|
|
|
|
|
|
def generate_share_network_data():
|
|
data = {
|
|
"name": data_utils.rand_name("sn-name"),
|
|
"description": data_utils.rand_name("sn-desc"),
|
|
"neutron_net_id": data_utils.rand_name("net-id"),
|
|
"neutron_subnet_id": data_utils.rand_name("subnet-id"),
|
|
}
|
|
return data
|
|
|
|
|
|
def generate_subnet_data():
|
|
data = {
|
|
"neutron_net_id": data_utils.rand_name("net-id"),
|
|
"neutron_subnet_id": data_utils.rand_name("subnet-id"),
|
|
}
|
|
return data
|
|
|
|
|
|
def generate_security_service_data(set_ou=False):
|
|
data = {
|
|
"name": data_utils.rand_name("ss-name"),
|
|
"description": data_utils.rand_name("ss-desc"),
|
|
"dns_ip": utils.rand_ip(),
|
|
"server": utils.rand_ip(),
|
|
"domain": data_utils.rand_name("ss-domain"),
|
|
"user": data_utils.rand_name("ss-user"),
|
|
"password": data_utils.rand_name("ss-password"),
|
|
}
|
|
if set_ou:
|
|
data["ou"] = data_utils.rand_name("ss-ou")
|
|
|
|
return data
|
|
|
|
|
|
def choose_matching_backend(share, pools, share_type):
|
|
extra_specs = {}
|
|
# fix extra specs with string values instead of boolean
|
|
for k, v in share_type['extra_specs'].items():
|
|
extra_specs[k] = (True if str(v).lower() == 'true'
|
|
else False if str(v).lower() == 'false'
|
|
else v)
|
|
selected_pool = next(
|
|
(x for x in pools if (x['name'] != share['host'] and all(
|
|
y in x['capabilities'].items() for y in extra_specs.items()))),
|
|
None)
|
|
|
|
return selected_pool
|
|
|
|
|
|
def get_configured_extra_specs(variation=None):
|
|
"""Retrieve essential extra specs according to configuration in tempest.
|
|
|
|
:param variation: can assume possible values: None to be as configured in
|
|
tempest; 'opposite_driver_modes' for as configured in tempest but
|
|
inverse driver mode; 'invalid' for inverse as configured in tempest,
|
|
ideal for negative tests.
|
|
:return: dict containing essential extra specs.
|
|
"""
|
|
|
|
extra_specs = {'storage_protocol': CONF.share.capability_storage_protocol}
|
|
|
|
if variation == 'invalid':
|
|
extra_specs['driver_handles_share_servers'] = (
|
|
not CONF.share.multitenancy_enabled)
|
|
extra_specs['snapshot_support'] = (
|
|
not CONF.share.capability_snapshot_support)
|
|
|
|
elif variation == 'opposite_driver_modes':
|
|
extra_specs['driver_handles_share_servers'] = (
|
|
not CONF.share.multitenancy_enabled)
|
|
extra_specs['snapshot_support'] = (
|
|
CONF.share.capability_snapshot_support)
|
|
|
|
else:
|
|
extra_specs['driver_handles_share_servers'] = (
|
|
CONF.share.multitenancy_enabled)
|
|
extra_specs['snapshot_support'] = (
|
|
CONF.share.capability_snapshot_support)
|
|
extra_specs['create_share_from_snapshot_support'] = (
|
|
CONF.share.capability_create_share_from_snapshot_support)
|
|
|
|
return extra_specs
|
|
|
|
|
|
def get_access_rule_data_from_config(protocol):
|
|
"""Get the first available access type/to combination from config.
|
|
|
|
This method opportunistically picks the first configured protocol
|
|
to create the share. Do not use this method in tests where you need
|
|
to test depth and breadth in the access types and access recipients.
|
|
"""
|
|
|
|
if protocol in CONF.share.enable_ip_rules_for_protocols:
|
|
access_type = "ip"
|
|
access_to = rand_ip()
|
|
elif protocol in CONF.share.enable_user_rules_for_protocols:
|
|
access_type = "user"
|
|
access_to = CONF.share.username_for_user_rules
|
|
elif protocol in CONF.share.enable_cert_rules_for_protocols:
|
|
access_type = "cert"
|
|
access_to = "client3.com"
|
|
elif protocol in CONF.share.enable_cephx_rules_for_protocols:
|
|
access_type = "cephx"
|
|
access_to = data_utils.rand_name("cephx-id")
|
|
else:
|
|
message = "Unrecognized protocol and access rules configuration."
|
|
raise testtools.TestCase.skipException(message)
|
|
|
|
return access_type, access_to
|
|
|
|
|
|
def replication_with_multitenancy_support():
|
|
return (share_network_subnets_are_supported() and
|
|
CONF.share.multitenancy_enabled)
|
|
|
|
|
|
def skip_if_manage_not_supported_for_version(
|
|
version=CONF.share.max_api_microversion):
|
|
if (is_microversion_lt(version, "2.49")
|
|
and CONF.share.multitenancy_enabled):
|
|
raise testtools.TestCase.skipException(
|
|
"Share manage tests with multitenancy are disabled for "
|
|
"microversion < 2.49")
|
|
|
|
|
|
def share_network_subnets_are_supported():
|
|
return is_microversion_supported(SHARE_NETWORK_SUBNETS_MICROVERSION)
|
|
|
|
|
|
def share_replica_quotas_are_supported():
|
|
return is_microversion_supported(SHARE_REPLICA_QUOTAS_MICROVERSION)
|
|
|
|
|
|
def share_network_get_default_subnet(share_network):
|
|
return next((
|
|
subnet for subnet in share_network.get('share_network_subnets', [])
|
|
if subnet['availability_zone'] is None), None)
|
|
|
|
|
|
def get_extra_headers(request_version, graduation_version):
|
|
headers = None
|
|
extra_headers = False
|
|
if is_microversion_lt(request_version, graduation_version):
|
|
headers = EXPERIMENTAL
|
|
extra_headers = True
|
|
return headers, extra_headers
|