# Copyright 2015 Mirantis Inc. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import ast import ddt from tempest.lib.common.utils import data_utils from tempest.lib import exceptions as tempest_lib_exc import time from manilaclient import config from manilaclient import exceptions from manilaclient.tests.functional import base from manilaclient.tests.functional import utils SECURITY_SERVICE_UPDATE_VERSION = '2.63' CONF = config.CONF @ddt.ddt class ShareNetworksReadWriteTest(base.BaseTestCase): def setUp(self): super().setUp() self.name = data_utils.rand_name('autotest') self.description = 'fake_description' self.neutron_net_id = 'fake_neutron_net_id' self.neutron_subnet_id = 'fake_neutron_subnet_id' self.sn = self.create_share_network( name=self.name, description=self.description, neutron_net_id=self.neutron_net_id, neutron_subnet_id=self.neutron_subnet_id, ) @ddt.data( {'name': data_utils.rand_name('autotest_share_network_name')}, {'description': 'fake_description'}, { 'neutron_net_id': 'fake_neutron_net_id', 'neutron_subnet_id': 'fake_neutron_subnet_id', }, ) def test_create_delete_share_network(self, net_data): share_subnet_support = utils.share_network_subnets_are_supported() share_subnet_fields = ( ['neutron_net_id', 'neutron_subnet_id', 'availability_zone'] if share_subnet_support else [] ) sn = self.create_share_network(cleanup_in_class=False, **net_data) default_subnet = ( utils.get_default_subnet(self.user_client, sn['id']) if share_subnet_support else None ) expected_data = { 'name': 'None', 'description': 'None', 'neutron_net_id': 'None', 'neutron_subnet_id': 'None', } expected_data.update(net_data) share_network_expected_data = [ (k, v) for k, v in expected_data.items() if k not in share_subnet_fields ] share_subnet_expected_data = [ (k, v) for k, v in expected_data.items() if k in share_subnet_fields ] for k, v in share_network_expected_data: self.assertEqual(v, sn[k]) for k, v in share_subnet_expected_data: self.assertEqual(v, default_subnet[k]) self.admin_client.delete_share_network(sn['id']) self.admin_client.wait_for_share_network_deletion(sn['id']) @utils.skip_if_microversion_not_supported('2.51') def test_create_delete_share_network_with_az(self): share_subnet_fields = [ 'neutron_net_id', 'neutron_subnet_id', 'availability_zone', ] az = self.user_client.list_availability_zones()[0] net_data = { 'neutron_net_id': 'fake_neutron_net_id', 'neutron_subnet_id': 'fake_neutron_subnet_id', 'availability_zone': az['Name'], } sn = self.create_share_network(cleanup_in_class=False, **net_data) default_subnet = utils.get_subnet_by_availability_zone_name( self.user_client, sn['id'], az['Name'] ) expected_data = { 'name': 'None', 'description': 'None', 'neutron_net_id': 'None', 'neutron_subnet_id': 'None', 'availability_zone': 'None', } expected_data.update(net_data) share_network_expected_data = [ (k, v) for k, v in expected_data.items() if k not in share_subnet_fields ] share_subnet_expected_data = [ (k, v) for k, v in expected_data.items() if k in share_subnet_fields ] for k, v in share_network_expected_data: self.assertEqual(v, sn[k]) for k, v in share_subnet_expected_data: self.assertEqual(v, default_subnet[k]) self.admin_client.delete_share_network(sn['id']) self.admin_client.wait_for_share_network_deletion(sn['id']) def test_get_share_network_with_neutron_data(self): get = self.admin_client.get_share_network(self.sn['id']) self.assertEqual(self.name, get['name']) self.assertEqual(self.description, get['description']) if not utils.share_network_subnets_are_supported(): self.assertEqual(self.neutron_net_id, get['neutron_net_id']) self.assertEqual(self.neutron_subnet_id, get['neutron_subnet_id']) def _get_expected_update_data(self, net_data, net_creation_data): # NOTE(dviroel): When subnets are supported, the outputs are converted # from string to literal structures in order to process the content of # 'share_network_subnets' field. default_return_value = ( None if utils.share_network_subnets_are_supported() else 'None' ) expected_nn_id = ( default_return_value if net_data.get('neutron_net_id') else net_creation_data.get('neutron_net_id', default_return_value) ) expected_nsn_id = ( default_return_value if net_data.get('neutron_subnet_id') else net_creation_data.get( 'neutron_subnet_id', default_return_value ) ) return expected_nn_id, expected_nsn_id @ddt.data( ({'name': data_utils.rand_name('autotest_share_network_name')}, {}), ({'description': 'fake_description'}, {}), ( { 'neutron_net_id': 'fake_neutron_net_id', 'neutron_subnet_id': 'fake_neutron_subnet_id', }, {}, ), ({'name': '""'}, {}), ({'description': '""'}, {}), ( {'neutron_net_id': '""'}, { 'neutron_net_id': 'fake_nn_id', 'neutron_subnet_id': 'fake_nsn_id', }, ), ( {'neutron_subnet_id': '""'}, { 'neutron_net_id': 'fake_nn_id', 'neutron_subnet_id': 'fake_nsn_id', }, ), ) @ddt.unpack def test_create_update_share_network(self, net_data, net_creation_data): sn = self.create_share_network( cleanup_in_class=False, **net_creation_data ) update = self.admin_client.update_share_network(sn['id'], **net_data) expected_nn_id, expected_nsn_id = self._get_expected_update_data( net_data, net_creation_data ) expected_data = { 'name': 'None', 'description': 'None', 'neutron_net_id': expected_nn_id, 'neutron_subnet_id': expected_nsn_id, } subnet_keys = [] if utils.share_network_subnets_are_supported(): subnet_keys = ['neutron_net_id', 'neutron_subnet_id'] subnet = ast.literal_eval(update['share_network_subnets']) update_values = dict( [(k, v) for k, v in net_data.items() if v != '""'] ) expected_data.update(update_values) for k, v in expected_data.items(): if k in subnet_keys: self.assertEqual(v, subnet[0][k]) else: self.assertEqual(v, update[k]) self.admin_client.delete_share_network(sn['id']) self.admin_client.wait_for_share_network_deletion(sn['id']) @ddt.data(True, False) def test_list_share_networks(self, all_tenants): share_networks = self.admin_client.list_share_networks(all_tenants) self.assertTrue( any(self.sn['id'] == sn['id'] for sn in share_networks) ) for sn in share_networks: self.assertEqual(2, len(sn)) self.assertIn('id', sn) self.assertIn('name', sn) def test_list_share_networks_select_column(self): share_networks = self.admin_client.list_share_networks(columns="id") self.assertTrue(any(s['Id'] is not None for s in share_networks)) self.assertTrue(all('Name' not in s for s in share_networks)) self.assertTrue(all('name' not in s for s in share_networks)) def _list_share_networks_with_filters(self, filters): assert_subnet_fields = utils.share_network_subnets_are_supported() share_subnet_fields = ( ['neutron_subnet_id', 'neutron_net_id'] if assert_subnet_fields else [] ) share_network_filters = [ (k, v) for k, v in filters.items() if k not in share_subnet_fields ] share_network_subnet_filters = [ (k, v) for k, v in filters.items() if k in share_subnet_fields ] share_networks = self.admin_client.list_share_networks(filters=filters) self.assertGreater(len(share_networks), 0) self.assertTrue( any(self.sn['id'] == sn['id'] for sn in share_networks) ) for sn in share_networks: try: share_network = self.admin_client.get_share_network(sn['id']) default_subnet = ( utils.get_default_subnet(self.user_client, sn['id']) if assert_subnet_fields else None ) except tempest_lib_exc.NotFound: # NOTE(vponomaryov): Case when some share network was deleted # between our 'list' and 'get' requests. Skip such case. continue for k, v in share_network_filters: self.assertIn(k, share_network) self.assertEqual(v, share_network[k]) for k, v in share_network_subnet_filters: self.assertIn(k, default_subnet) self.assertEqual(v, default_subnet[k]) def test_list_share_networks_filter_by_project_id(self): project_id = self.admin_client.get_project_id( self.admin_client.tenant_name ) filters = {'project_id': project_id} self._list_share_networks_with_filters(filters) def test_list_share_networks_filter_by_name(self): filters = {'name': self.name} self._list_share_networks_with_filters(filters) def test_list_share_networks_filter_by_description(self): filters = {'description': self.description} self._list_share_networks_with_filters(filters) def test_list_share_networks_filter_by_neutron_net_id(self): filters = {'neutron_net_id': self.neutron_net_id} self._list_share_networks_with_filters(filters) def test_list_share_networks_filter_by_neutron_subnet_id(self): filters = {'neutron_subnet_id': self.neutron_subnet_id} self._list_share_networks_with_filters(filters) @ddt.data('name', 'description') def test_list_share_networks_filter_by_inexact(self, option): self.create_share_network( name=data_utils.rand_name('autotest_inexact'), description='fake_description_inexact', neutron_net_id='fake_neutron_net_id', neutron_subnet_id='fake_neutron_subnet_id', ) filters = {option + '~': 'inexact'} share_networks = self.admin_client.list_share_networks(filters=filters) self.assertGreater(len(share_networks), 0) def test_list_share_networks_by_inexact_unicode_option(self): self.create_share_network( name='网络名称', description='网络描述', neutron_net_id='fake_neutron_net_id', neutron_subnet_id='fake_neutron_subnet_id', ) filters = {'name~': '名称'} share_networks = self.admin_client.list_share_networks(filters=filters) self.assertGreater(len(share_networks), 0) filters = {'description~': '描述'} share_networks = self.admin_client.list_share_networks(filters=filters) self.assertGreater(len(share_networks), 0) def test_share_network_reset_status(self): share_network = self.create_share_network( client=self.user_client, name='cool_net_name', description='fakedescription', neutron_net_id='fake_neutron_net_id', neutron_subnet_id='fake_neutron_subnet_id', ) # Admin operation self.admin_client.share_network_reset_state( share_network['id'], 'error', microversion=SECURITY_SERVICE_UPDATE_VERSION, ) self.user_client.wait_for_resource_status( share_network['id'], 'error', microversion=SECURITY_SERVICE_UPDATE_VERSION, resource_type="share_network", ) def test_share_network_security_service_add(self): share_network = self.create_share_network( client=self.user_client, name='cool_net_name', description='fakedescription', neutron_net_id='fake_neutron_net_id', neutron_subnet_id='fake_neutron_subnet_id', ) new_security_service = self.create_security_service( client=self.user_client ) check_result = ( self.user_client.share_network_security_service_add_check( share_network['id'], security_service_id=new_security_service['id'], ) ) self.assertEqual(check_result['compatible'], 'True') self.user_client.share_network_security_service_add( share_network['id'], new_security_service['id'] ) network_services = ( self.user_client.share_network_security_service_list( share_network['id'] ) ) self.assertEqual(len(network_services), 1) self.assertEqual(network_services[0]['id'], new_security_service['id']) def test_share_network_security_service_update(self): share_network = self.create_share_network( client=self.user_client, name='cool_net_name', description='fakedescription', neutron_net_id='fake_neutron_net_id', neutron_subnet_id='fake_neutron_subnet_id', ) current_name = 'current' new_name = 'new' current_security_service = self.create_security_service( client=self.user_client, name=current_name ) new_security_service = self.create_security_service( client=self.user_client, name=new_name ) check_result = ( self.user_client.share_network_security_service_add_check( share_network['id'], current_security_service['id'] ) ) self.assertEqual(check_result['compatible'], 'True') self.user_client.share_network_security_service_add( share_network['id'], current_security_service['id'] ) network_services = ( self.user_client.share_network_security_service_list( share_network['id'] ) ) self.assertEqual(len(network_services), 1) self.assertEqual(network_services[0]['name'], current_name) check_result = ( self.user_client.share_network_security_service_update_check( share_network['id'], current_security_service['id'], new_security_service['id'], ) ) self.assertEqual(check_result['compatible'], 'True') self.user_client.share_network_security_service_update( share_network['id'], current_security_service['id'], new_security_service['id'], ) network_services = ( self.user_client.share_network_security_service_list( share_network['id'] ) ) self.assertEqual(len(network_services), 1) self.assertEqual(network_services[0]['name'], new_name) def test_share_network_subnet_create_check(self): share_network = self.create_share_network( client=self.user_client, description='fakedescription', ) check_result = self.user_client.share_network_subnet_create_check( share_network['id'], neutron_net_id='fake_neutron_net_id', neutron_subnet_id='fake_neutron_subnet_id', ) self.assertEqual(check_result['compatible'], 'True') @ddt.data( {'neutron_net_id': None, 'neutron_subnet_id': 'fake_subnet_id'}, {'neutron_net_id': 'fake_net_id', 'neutron_subnet_id': None}, {'availability_zone': 'invalid_availability_zone'}, ) def test_check_add_share_network_subnet_with_invalid_params(self, params): self.assertRaises( tempest_lib_exc.CommandFailed, self.user_client.share_network_subnet_create_check, self.sn['id'], **params, ) def test_check_add_share_network_subnet_to_invalid_share_network(self): self.assertRaises( tempest_lib_exc.CommandFailed, self.user_client.share_network_subnet_create_check, 'invalid_share_network', self.neutron_net_id, self.neutron_subnet_id, ) class ShareNetworkSecurityServiceCheckReadWriteTests(base.BaseTestCase): protocol = None def setUp(self): super().setUp() if self.protocol not in CONF.enable_protocols: message = f"{self.protocol} tests are disabled." raise self.skipException(message) self.client = self.get_user_client() if not self.client.share_network: message = "Can run only with DHSS=True mode" raise self.skipException(message) def _wait_for_update_security_service_compatible_result( self, share_network, current_security_service, new_security_service=None, ): compatible_expected_result = 'True' check_is_compatible = 'None' tentatives = 0 # There might be a delay from the time the check is requested until # the backend has performed all checks while check_is_compatible != compatible_expected_result: tentatives += 1 if not new_security_service: check_is_compatible = ( self.user_client.share_network_security_service_add_check( share_network['id'], current_security_service['id'] ) )['compatible'] else: check_is_compatible = ( self.user_client.share_network_security_service_update_check( share_network['id'], current_security_service['id'], new_security_service['id'], ) )['compatible'] if tentatives > 3: timeout_message = ( "Share network security service add/update check did not " "reach 'compatible=True' within 15 seconds." ) raise exceptions.TimeoutException(message=timeout_message) time.sleep(5) def test_check_if_security_service_can_be_added_to_share_network_in_use( self, ): share_network = self.create_share_network( client=self.user_client, description='fakedescription', neutron_net_id='fake_neutron_net_id', neutron_subnet_id='fake_neutron_subnet_id', ) # Create a share so we can be sure that a share server will exist and # the check will be performed in the backends self.create_share( self.protocol, client=self.user_client, share_network=share_network['id'], ) current_security_service = self.create_security_service( client=self.user_client ) check_result = ( self.user_client.share_network_security_service_add_check( share_network['id'], current_security_service['id'] ) ) self.assertEqual(check_result['compatible'], 'None') self._wait_for_update_security_service_compatible_result( share_network, current_security_service ) def test_add_and_update_security_service_when_share_network_is_in_use( self, ): share_network = self.create_share_network( client=self.user_client, name='cool_net_name', description='fakedescription', neutron_net_id='fake_neutron_net_id', neutron_subnet_id='fake_neutron_subnet_id', ) # Create a share so we can be sure that a share server will exist and # the check will be performed in the backends self.create_share( self.protocol, name='fake_share_name', share_network=share_network['id'], client=self.user_client, ) current_security_service = self.create_security_service( client=self.user_client, name='current_security_service' ) new_security_service = self.create_security_service( client=self.user_client, name='new_security_service' ) check_result = ( self.user_client.share_network_security_service_add_check( share_network['id'], current_security_service['id'] ) ) self.assertEqual(check_result['compatible'], 'None') self._wait_for_update_security_service_compatible_result( share_network, current_security_service ) self.user_client.share_network_security_service_add( share_network['id'], current_security_service['id'] ) network_services = ( self.user_client.share_network_security_service_list( share_network['id'] ) ) self.assertEqual(len(network_services), 1) self.assertEqual( network_services[0]['name'], current_security_service['name'] ) self.user_client.wait_for_resource_status( share_network['id'], 'active', microversion=SECURITY_SERVICE_UPDATE_VERSION, resource_type="share_network", ) check_result = ( self.user_client.share_network_security_service_update_check( share_network['id'], current_security_service['id'], new_security_service['id'], ) ) self.assertEqual(check_result['compatible'], 'None') self._wait_for_update_security_service_compatible_result( share_network, current_security_service, new_security_service=new_security_service, ) self.user_client.share_network_security_service_update( share_network['id'], current_security_service['id'], new_security_service['id'], ) network_services = ( self.user_client.share_network_security_service_list( share_network['id'] ) ) self.assertEqual(len(network_services), 1) self.assertEqual( network_services[0]['name'], new_security_service['name'] ) self.user_client.wait_for_resource_status( share_network['id'], 'active', microversion=SECURITY_SERVICE_UPDATE_VERSION, resource_type="share_network", ) base_security_service_check = ShareNetworkSecurityServiceCheckReadWriteTests class ShareNetworkSecServiceCheckRWNFSTest(base_security_service_check): protocol = 'nfs' class ShareNetworkSecServiceCheckRWTestsCIFSTest(base_security_service_check): protocol = 'cifs'