# Copyright (c) 2015 Cloudbase Solutions SRL # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import os import ddt import mock from oslo_concurrency import processutils from oslo_config import cfg from manila import exception from manila.share import configuration from manila.share.drivers import service_instance as generic_service_instance from manila.share.drivers.windows import service_instance from manila.share.drivers.windows import windows_utils from manila import test CONF = cfg.CONF CONF.import_opt('driver_handles_share_servers', 'manila.share.driver') CONF.register_opts(generic_service_instance.common_opts) serv_mgr_cls = service_instance.WindowsServiceInstanceManager generic_serv_mgr_cls = generic_service_instance.ServiceInstanceManager @ddt.ddt class WindowsServiceInstanceManagerTestCase(test.TestCase): _FAKE_SERVER = {'ip': mock.sentinel.ip, 'instance_id': mock.sentinel.instance_id} @mock.patch.object(windows_utils, 'WindowsUtils') @mock.patch.object(serv_mgr_cls, '_check_auth_mode') def setUp(self, mock_check_auth, mock_utils_cls): self.flags(service_instance_user=mock.sentinel.username) self._remote_execute = mock.Mock() fake_conf = configuration.Configuration(None) self._mgr = serv_mgr_cls(remote_execute=self._remote_execute, driver_config=fake_conf) self._windows_utils = mock_utils_cls.return_value super(WindowsServiceInstanceManagerTestCase, self).setUp() @ddt.data({}, {'use_cert_auth': False}, {'use_cert_auth': False, 'valid_pass_complexity': False}, {'certs_exist': False}) @mock.patch('os.path.exists') @mock.patch.object(serv_mgr_cls, '_check_password_complexity') @ddt.unpack def test_check_auth_mode(self, mock_check_complexity, mock_path_exists, use_cert_auth=True, certs_exist=True, valid_pass_complexity=True): self.flags(service_instance_password=mock.sentinel.password) self._mgr._cert_pem_path = mock.sentinel.cert_path self._mgr._cert_key_pem_path = mock.sentinel.key_path mock_path_exists.return_value = certs_exist mock_check_complexity.return_value = valid_pass_complexity self._mgr._use_cert_auth = use_cert_auth invalid_auth = ((use_cert_auth and not certs_exist) or not valid_pass_complexity) if invalid_auth: self.assertRaises(exception.ServiceInstanceException, self._mgr._check_auth_mode) else: self._mgr._check_auth_mode() if not use_cert_auth: mock_check_complexity.assert_called_once_with( mock.sentinel.password) @ddt.data(False, True) def test_get_auth_info(self, use_cert_auth): self._mgr._use_cert_auth = use_cert_auth self._mgr._cert_pem_path = mock.sentinel.cert_path self._mgr._cert_key_pem_path = mock.sentinel.key_path auth_info = self._mgr._get_auth_info() expected_auth_info = {'use_cert_auth': use_cert_auth} if use_cert_auth: expected_auth_info.update(cert_pem_path=mock.sentinel.cert_path, cert_key_pem_path=mock.sentinel.key_path) self.assertEqual(expected_auth_info, auth_info) @mock.patch.object(serv_mgr_cls, '_get_auth_info') @mock.patch.object(generic_serv_mgr_cls, 'get_common_server') def test_common_server(self, mock_generic_get_server, mock_get_auth): mock_server_details = {'backend_details': {}} mock_auth_info = {'fake_auth_info': mock.sentinel.auth_info} mock_generic_get_server.return_value = mock_server_details mock_get_auth.return_value = mock_auth_info expected_server_details = dict(backend_details=mock_auth_info) server_details = self._mgr.get_common_server() mock_generic_get_server.assert_called_once_with() self.assertEqual(expected_server_details, server_details) @mock.patch.object(serv_mgr_cls, '_get_auth_info') @mock.patch.object(generic_serv_mgr_cls, '_get_new_instance_details') def test_get_new_instance_details(self, mock_generic_get_details, mock_get_auth): mock_server_details = {'fake_server_details': mock.sentinel.server_details} mock_generic_get_details.return_value = mock_server_details mock_auth_info = {'fake_auth_info': mock.sentinel.auth_info} mock_get_auth.return_value = mock_auth_info expected_server_details = dict(mock_server_details, **mock_auth_info) instance_details = self._mgr._get_new_instance_details( server=mock.sentinel.server) mock_generic_get_details.assert_called_once_with(mock.sentinel.server) self.assertEqual(expected_server_details, instance_details) @ddt.data(('abAB01', True), ('abcdef', False), ('aA0', False)) @ddt.unpack def test_check_password_complexity(self, password, expected_result): valid_complexity = self._mgr._check_password_complexity( password) self.assertEqual(expected_result, valid_complexity) @ddt.data(None, Exception) def test_server_connection(self, side_effect): self._remote_execute.side_effect = side_effect expected_result = side_effect is None is_available = self._mgr._test_server_connection(self._FAKE_SERVER) self.assertEqual(expected_result, is_available) self._remote_execute.assert_called_once_with(self._FAKE_SERVER, "whoami", retry=False) @ddt.data(False, True) def test_get_service_instance_create_kwargs(self, use_cert_auth): self._mgr._use_cert_auth = use_cert_auth self.flags(service_instance_password=mock.sentinel.admin_pass) if use_cert_auth: mock_cert_data = 'mock_cert_data' self.mock_object(service_instance, 'open', mock.mock_open( read_data=mock_cert_data)) expected_kwargs = dict(user_data=mock_cert_data) else: expected_kwargs = dict( meta=dict(admin_pass=mock.sentinel.admin_pass)) create_kwargs = self._mgr._get_service_instance_create_kwargs() self.assertEqual(expected_kwargs, create_kwargs) @mock.patch.object(generic_serv_mgr_cls, 'set_up_service_instance') @mock.patch.object(serv_mgr_cls, 'get_valid_security_service') @mock.patch.object(serv_mgr_cls, '_setup_security_service') def test_set_up_service_instance(self, mock_setup_security_service, mock_get_valid_security_service, mock_generic_setup_serv_inst): mock_service_instance = {'instance_details': None} mock_network_info = {'security_services': mock.sentinel.security_services} mock_generic_setup_serv_inst.return_value = mock_service_instance mock_get_valid_security_service.return_value = ( mock.sentinel.security_service) instance_details = self._mgr.set_up_service_instance( mock.sentinel.context, mock_network_info) mock_generic_setup_serv_inst.assert_called_once_with( mock.sentinel.context, mock_network_info) mock_get_valid_security_service.assert_called_once_with( mock.sentinel.security_services) mock_setup_security_service.assert_called_once_with( mock_service_instance, mock.sentinel.security_service) expected_instance_details = dict(mock_service_instance, joined_domain=True) self.assertEqual(expected_instance_details, instance_details) @mock.patch.object(serv_mgr_cls, '_run_cloudbase_init_plugin_after_reboot') @mock.patch.object(serv_mgr_cls, '_join_domain') def test_setup_security_service(self, mock_join_domain, mock_run_cbsinit_plugin): utils = self._windows_utils mock_security_service = {'domain': mock.sentinel.domain, 'user': mock.sentinel.admin_username, 'password': mock.sentinel.admin_password, 'dns_ip': mock.sentinel.dns_ip} utils.get_interface_index_by_ip.return_value = ( mock.sentinel.interface_index) self._mgr._setup_security_service(self._FAKE_SERVER, mock_security_service) utils.set_dns_client_search_list.assert_called_once_with( self._FAKE_SERVER, [mock_security_service['domain']]) utils.get_interface_index_by_ip.assert_called_once_with( self._FAKE_SERVER, self._FAKE_SERVER['ip']) utils.set_dns_client_server_addresses.assert_called_once_with( self._FAKE_SERVER, mock.sentinel.interface_index, [mock_security_service['dns_ip']]) mock_run_cbsinit_plugin.assert_called_once_with( self._FAKE_SERVER, plugin_name=self._mgr._CBS_INIT_WINRM_PLUGIN) mock_join_domain.assert_called_once_with( self._FAKE_SERVER, mock.sentinel.domain, mock.sentinel.admin_username, mock.sentinel.admin_password) @ddt.data({'join_domain_side_eff': Exception}, {'server_available': False, 'expected_exception': exception.ServiceInstanceException}, {'join_domain_side_eff': processutils.ProcessExecutionError, 'expected_exception': processutils.ProcessExecutionError}, {'domain_mismatch': True, 'expected_exception': exception.ServiceInstanceException}) @mock.patch.object(generic_serv_mgr_cls, 'reboot_server') @mock.patch.object(generic_serv_mgr_cls, 'wait_for_instance_to_be_active') @mock.patch.object(generic_serv_mgr_cls, '_check_server_availability') @ddt.unpack def test_join_domain(self, mock_check_avail, mock_wait_instance_active, mock_reboot_server, expected_exception=None, server_available=True, domain_mismatch=False, join_domain_side_eff=None): self._windows_utils.join_domain.side_effect = join_domain_side_eff mock_check_avail.return_value = server_available self._windows_utils.get_current_domain.return_value = ( None if domain_mismatch else mock.sentinel.domain) domain_params = (mock.sentinel.domain, mock.sentinel.admin_username, mock.sentinel.admin_password) if expected_exception: self.assertRaises(expected_exception, self._mgr._join_domain, self._FAKE_SERVER, *domain_params) else: self._mgr._join_domain(self._FAKE_SERVER, *domain_params) if join_domain_side_eff != processutils.ProcessExecutionError: mock_reboot_server.assert_called_once_with( self._FAKE_SERVER, soft_reboot=True) mock_wait_instance_active.assert_called_once_with( self._FAKE_SERVER['instance_id'], timeout=self._mgr.max_time_to_build_instance) mock_check_avail.assert_called_once_with(self._FAKE_SERVER) if server_available: self._windows_utils.get_current_domain.assert_called_once_with( self._FAKE_SERVER) self._windows_utils.join_domain.assert_called_once_with( self._FAKE_SERVER, *domain_params) @ddt.data([], [{'type': 'active_directory'}], [{'type': 'active_directory'}] * 2, [{'type': mock.sentinel.invalid_type}]) def test_get_valid_security_service(self, security_services): valid_security_service = self._mgr.get_valid_security_service( security_services) if (security_services and len(security_services) == 1 and security_services[0]['type'] == 'active_directory'): expected_valid_sec_service = security_services[0] else: expected_valid_sec_service = None self.assertEqual(expected_valid_sec_service, valid_security_service) @mock.patch.object(serv_mgr_cls, '_get_cbs_init_reg_section') def test_run_cloudbase_init_plugin_after_reboot(self, mock_get_cbs_init_reg): self._FAKE_SERVER = {'instance_id': mock.sentinel.instance_id} mock_get_cbs_init_reg.return_value = mock.sentinel.cbs_init_reg_sect expected_plugin_key_path = "%(cbs_init)s\\%(instance_id)s\\Plugins" % { 'cbs_init': mock.sentinel.cbs_init_reg_sect, 'instance_id': self._FAKE_SERVER['instance_id']} self._mgr._run_cloudbase_init_plugin_after_reboot( server=self._FAKE_SERVER, plugin_name=mock.sentinel.plugin_name) mock_get_cbs_init_reg.assert_called_once_with(self._FAKE_SERVER) self._windows_utils.set_win_reg_value.assert_called_once_with( self._FAKE_SERVER, path=expected_plugin_key_path, key=mock.sentinel.plugin_name, value=self._mgr._CBS_INIT_RUN_PLUGIN_AFTER_REBOOT) @ddt.data( {}, {'exec_errors': [ processutils.ProcessExecutionError(stderr='Cannot find path'), processutils.ProcessExecutionError(stderr='Cannot find path')], 'expected_exception': exception.ServiceInstanceException}, {'exec_errors': [processutils.ProcessExecutionError(stderr='')], 'expected_exception': processutils.ProcessExecutionError}, {'exec_errors': [ processutils.ProcessExecutionError(stderr='Cannot find path'), None]} ) @ddt.unpack def test_get_cbs_init_reg_section(self, exec_errors=None, expected_exception=None): self._windows_utils.normalize_path.return_value = ( mock.sentinel.normalized_section_path) self._windows_utils.get_win_reg_value.side_effect = exec_errors if expected_exception: self.assertRaises(expected_exception, self._mgr._get_cbs_init_reg_section, mock.sentinel.server) else: cbs_init_section = self._mgr._get_cbs_init_reg_section( mock.sentinel.server) self.assertEqual(mock.sentinel.normalized_section_path, cbs_init_section) base_path = 'hklm:\\SOFTWARE' cbs_section = 'Cloudbase Solutions\\Cloudbase-Init' tested_upper_sections = [''] if exec_errors and 'Cannot find path' in exec_errors[0].stderr: tested_upper_sections.append('Wow6432Node') tested_sections = [os.path.join(base_path, upper_section, cbs_section) for upper_section in tested_upper_sections] self._windows_utils.normalize_path.assert_has_calls( [mock.call(tested_section) for tested_section in tested_sections])