diff --git a/cloudbaseinit/tests/metadata/services/osconfigdrive/test_windows.py b/cloudbaseinit/tests/metadata/services/osconfigdrive/test_windows.py new file mode 100644 index 00000000..4cc62a00 --- /dev/null +++ b/cloudbaseinit/tests/metadata/services/osconfigdrive/test_windows.py @@ -0,0 +1,325 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2014 Cloudbase Solutions Srl +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +import os +import sys +import unittest + +if sys.platform == 'win32': + from cloudbaseinit.metadata.services.osconfigdrive import windows + from cloudbaseinit.utils.windows import physical_disk +from oslo.config import cfg + +CONF = cfg.CONF + + +@unittest.skipUnless(sys.platform == "win32", "requires Windows") +class TestWindowsConfigDriveManager(unittest.TestCase): + + def setUp(self): + self._config_manager = windows.WindowsConfigDriveManager() + + @mock.patch('cloudbaseinit.osutils.factory.get_os_utils') + @mock.patch('os.path.exists') + def _test_get_config_drive_cdrom_mount_point(self, mock_join, + mock_get_os_utils, exists): + mock_osutils = mock.MagicMock() + mock_get_os_utils.return_value = mock_osutils + mock_osutils.get_cdrom_drives.return_value = ['fake drive'] + mock_osutils.get_volume_label.return_value = 'config-2' + mock_join.return_value = exists + + response = self._config_manager._get_config_drive_cdrom_mount_point() + + mock_osutils.get_cdrom_drives.assert_called_once_with() + mock_osutils.get_volume_label.assert_called_once_with('fake drive') + + if exists: + self.assertEqual(response, 'fake drive') + else: + self.assertIsNone(response) + + def test_get_config_drive_cdrom_mount_point_exists_true(self): + self._test_get_config_drive_cdrom_mount_point(exists=True) + + def test_get_config_drive_cdrom_mount_point_exists_false(self): + self._test_get_config_drive_cdrom_mount_point(exists=False) + + @mock.patch('ctypes.cast') + @mock.patch('ctypes.POINTER') + @mock.patch('ctypes.wintypes.WORD') + def test_c_char_array_to_c_ushort(self, mock_WORD, mock_POINTER, + mock_cast): + mock_buf = mock.MagicMock() + + response = self._config_manager._c_char_array_to_c_ushort(mock_buf, + 1) + + self.assertEqual(mock_cast.call_count, 2) + mock_POINTER.assert_called_with(mock_WORD) + mock_cast.assert_called_with(mock_buf.__getitem__(), mock_POINTER()) + self.assertEqual(response, + mock_cast().contents.value.__lshift__().__add__()) + + @mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.' + 'WindowsConfigDriveManager._c_char_array_to_c_ushort') + def _test_get_iso_disk_size(self, mock_c_char_array_to_c_ushort, + media_type, value, iso_id): + + boot_record_off = 0x8000 + volume_size_off = 80 + block_size_off = 128 + + mock_phys_disk = mock.MagicMock() + mock_buff = mock.MagicMock() + mock_geom = mock.MagicMock() + + mock_phys_disk.get_geometry.return_value = mock_geom + mock_geom.MediaType = media_type + mock_geom.Cylinders = value + mock_geom.TracksPerCylinder = 2 + mock_geom.SectorsPerTrack = 2 + mock_geom.BytesPerSector = 2 + mock_phys_disk.read.return_value = (mock_buff, 'fake value') + mock_buff.__getitem__.return_value = iso_id + mock_c_char_array_to_c_ushort.return_value = 100 + + disk_size = mock_geom.Cylinders * mock_geom.TracksPerCylinder * \ + mock_geom.SectorsPerTrack * mock_geom.BytesPerSector + + offset = boot_record_off / mock_geom.BytesPerSector * \ + mock_geom.BytesPerSector + + buf_off_volume = boot_record_off - offset + volume_size_off + buf_off_block = boot_record_off - offset + block_size_off + + response = self._config_manager._get_iso_disk_size(mock_phys_disk) + mock_phys_disk.get_geometry.assert_called_once_with() + if mock_geom.MediaType != physical_disk.Win32_DiskGeometry.FixedMedia: + self.assertIsNone(response) + elif disk_size <= offset + mock_geom.BytesPerSector: + self.assertIsNone(response) + else: + mock_phys_disk.seek.assert_called_once_with(offset) + mock_phys_disk.read.assert_called_once_with( + mock_geom.BytesPerSector) + if iso_id != 'CD001': + self.assertIsNone(response) + else: + mock_c_char_array_to_c_ushort.assert_has_calls( + mock.call(mock_buff, buf_off_volume), + mock.call(mock_buff, buf_off_block)) + self.assertEqual(response, 10000) + + def test_test_get_iso_disk_size(self): + self._test_get_iso_disk_size( + media_type=physical_disk.Win32_DiskGeometry.FixedMedia, + value=100, iso_id='CD001') + + def test_test_get_iso_disk_size_other_media_type(self): + self._test_get_iso_disk_size(media_type="fake media type", value=100, + iso_id='CD001') + + def test_test_get_iso_disk_size_other_disk_size_too_small(self): + self._test_get_iso_disk_size( + media_type=physical_disk.Win32_DiskGeometry.FixedMedia, value=0, + iso_id='CD001') + + def test_test_get_iso_disk_size_other_id(self): + self._test_get_iso_disk_size( + media_type=physical_disk.Win32_DiskGeometry.FixedMedia, + value=100, iso_id='other id') + + def test_write_iso_file(self): + mock_buff = mock.MagicMock() + mock_geom = mock.MagicMock() + mock_geom.BytesPerSector = 2 + + mock_phys_disk = mock.MagicMock() + mock_phys_disk.read.return_value = (mock_buff, 10) + + fake_path = os.path.join('fake', 'path') + + mock_phys_disk.get_geometry.return_value = mock_geom + with mock.patch('__builtin__.open', mock.mock_open(), + create=True) as f: + self._config_manager._write_iso_file(mock_phys_disk, fake_path, + 10) + f().write.assert_called_once_with(mock_buff) + mock_phys_disk.seek.assert_called_once_with(0) + mock_phys_disk.read.assert_called_once_with(10) + + @mock.patch('os.makedirs') + def _test_extract_iso_files(self, mock_makedirs, exit_code): + fake_path = os.path.join('fake', 'path') + fake_target_path = os.path.join(fake_path, 'target') + args = [CONF.bsdtar_path, '-xf', fake_path, '-C', fake_target_path] + mock_os_utils = mock.MagicMock() + + mock_os_utils.execute_process.return_value = ('fake out', 'fake err', + exit_code) + if exit_code: + self.assertRaises(Exception, + self._config_manager._extract_iso_files, + mock_os_utils, fake_path, fake_target_path) + else: + self._config_manager._extract_iso_files(mock_os_utils, fake_path, + fake_target_path) + + mock_os_utils.execute_process.assert_called_once_with(args, False) + mock_makedirs.assert_called_once_with(fake_target_path) + + def test_extract_iso_files(self): + self._test_extract_iso_files(exit_code=None) + + def test_extract_iso_files_exception(self): + self._test_extract_iso_files(exit_code=1) + + @mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.' + 'WindowsConfigDriveManager._get_iso_disk_size') + @mock.patch('cloudbaseinit.utils.windows.physical_disk.PhysicalDisk') + @mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.' + 'WindowsConfigDriveManager._write_iso_file') + def _test_extract_iso_disk_file(self, mock_write_iso_file, + mock_PhysicalDisk, mock_get_iso_disk_size, + exception): + mock_osutils = mock.MagicMock() + fake_path = os.path.join('fake', 'path') + fake_path_physical = os.path.join(fake_path, 'physical') + mock_osutils.get_physical_disks.return_value = [fake_path_physical] + mock_get_iso_disk_size.return_value = 'fake iso size' + + if exception: + mock_PhysicalDisk().open.side_effect = [Exception] + + response = self._config_manager._extract_iso_disk_file( + osutils=mock_osutils, iso_file_path=fake_path) + print mock_PhysicalDisk().open.mock_calls + + if not exception: + mock_get_iso_disk_size.assert_called_once_with( + mock_PhysicalDisk()) + mock_write_iso_file.assert_called_once_with(mock_PhysicalDisk(), + fake_path, + 'fake iso size') + self.assertTrue(response) + else: + self.assertFalse(response) + + mock_PhysicalDisk().open.assert_called_once_with() + mock_osutils.get_physical_disks.assert_called_once_with() + mock_PhysicalDisk().close.assert_called_once_with() + + def test_extract_iso_disk_file_disk_found(self): + self._test_extract_iso_disk_file(exception=False) + + def test_extract_iso_disk_file_disk_not_found(self): + self._test_extract_iso_disk_file(exception=True) + + @mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.' + 'WindowsConfigDriveManager._get_conf_drive_from_raw_hdd') + @mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.' + 'WindowsConfigDriveManager._get_conf_drive_from_cdrom_drive') + def test_get_config_drive_files(self, + mock_get_conf_drive_from_cdrom_drive, + mock_get_conf_drive_from_raw_hdd): + + fake_path = os.path.join('fake', 'path') + mock_get_conf_drive_from_raw_hdd.return_value = False + mock_get_conf_drive_from_cdrom_drive.return_value = True + + response = self._config_manager.get_config_drive_files( + target_path=fake_path) + + mock_get_conf_drive_from_raw_hdd.assert_called_once_with(fake_path) + mock_get_conf_drive_from_cdrom_drive.assert_called_once_with( + fake_path) + self.assertTrue(response) + + @mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.' + 'WindowsConfigDriveManager.' + '_get_config_drive_cdrom_mount_point') + @mock.patch('shutil.copytree') + def _test_get_conf_drive_from_cdrom_drive(self, mock_copytree, + mock_get_config_cdrom_mount, + mount_point): + fake_path = os.path.join('fake', 'path') + mock_get_config_cdrom_mount.return_value = mount_point + + response = self._config_manager._get_conf_drive_from_cdrom_drive( + fake_path) + + mock_get_config_cdrom_mount.assert_called_once_with() + + if mount_point: + mock_copytree.assert_called_once_with(mount_point, fake_path) + self.assertTrue(response) + else: + self.assertFalse(response) + + def test_get_conf_drive_from_cdrom_drive_with_mountpoint(self): + self._test_get_conf_drive_from_cdrom_drive( + mount_point='fake mount point') + + def test_get_conf_drive_from_cdrom_drive_without_mountpoint(self): + self._test_get_conf_drive_from_cdrom_drive( + mount_point=None) + + @mock.patch('os.remove') + @mock.patch('os.path.exists') + @mock.patch('tempfile.gettempdir') + @mock.patch('uuid.uuid4') + @mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.' + 'WindowsConfigDriveManager._extract_iso_disk_file') + @mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.' + 'WindowsConfigDriveManager._extract_iso_files') + @mock.patch('cloudbaseinit.osutils.factory.get_os_utils') + def _test_get_conf_drive_from_raw_hdd(self, mock_get_os_utils, + mock_extract_iso_files, + mock_extract_iso_disk_file, + mock_uuid4, mock_gettempdir, + mock_exists, mock_remove, + found_drive): + fake_target_path = os.path.join('fake', 'path') + fake_iso_path = os.path.join('fake_dir', 'fake_id' + '.iso') + + mock_uuid4.return_value = 'fake_id' + mock_gettempdir.return_value = 'fake_dir' + mock_extract_iso_disk_file.return_value = found_drive + mock_exists.return_value = found_drive + + response = self._config_manager._get_conf_drive_from_raw_hdd( + fake_target_path) + + mock_get_os_utils.assert_called_once_with() + mock_gettempdir.assert_called_once_with() + mock_extract_iso_disk_file.assert_called_once_with( + mock_get_os_utils(), fake_iso_path) + if found_drive: + mock_extract_iso_files.assert_called_once_with( + mock_get_os_utils(), fake_iso_path, fake_target_path) + mock_exists.assert_called_once_with(fake_iso_path) + mock_remove.assert_called_once_with(fake_iso_path) + self.assertTrue(response) + else: + self.assertFalse(response) + + def test_get_conf_drive_from_raw_hdd_found_drive(self): + self._test_get_conf_drive_from_raw_hdd(found_drive=True) + + def test_get_conf_drive_from_raw_hdd_no_drive_found(self): + self._test_get_conf_drive_from_raw_hdd(found_drive=False) diff --git a/cloudbaseinit/tests/osutils/test_windows.py b/cloudbaseinit/tests/osutils/test_windows.py index 4a55cc8f..9cb19577 100644 --- a/cloudbaseinit/tests/osutils/test_windows.py +++ b/cloudbaseinit/tests/osutils/test_windows.py @@ -971,6 +971,129 @@ class WindowsUtilsTest(unittest.TestCase): mock_get_logical_drives.assert_called_with() self.assertEqual(response, ['drive']) + @mock.patch('cloudbaseinit.osutils.windows.msvcrt') + @mock.patch('cloudbaseinit.osutils.windows.kernel32') + @mock.patch('cloudbaseinit.osutils.windows.setupapi') + @mock.patch('cloudbaseinit.osutils.windows.Win32_STORAGE_DEVICE_NUMBER') + @mock.patch('ctypes.byref') + @mock.patch('ctypes.sizeof') + @mock.patch('ctypes.wintypes.DWORD') + @mock.patch('ctypes.cast') + @mock.patch('ctypes.POINTER') + def _test_get_physical_disks(self, mock_POINTER, mock_cast, + mock_DWORD, mock_sizeof, mock_byref, + mock_sdn, mock_setupapi, mock_kernel32, + mock_msvcrt, handle_disks, last_error, + interface_detail, disk_handle, io_control): + + sizeof_calls = [mock.call( + windows_utils.Win32_SP_DEVICE_INTERFACE_DATA), + mock.call(mock_sdn())] + device_interfaces_calls = [mock.call(handle_disks, None, mock_byref(), + 0, mock_byref()), + mock.call(handle_disks, None, mock_byref(), + 1, mock_byref())] + cast_calls = [mock.call(), + mock.call(mock_msvcrt.malloc(), mock_POINTER()), + mock.call(mock_cast().contents.DevicePath, + wintypes.LPWSTR)] + + mock_setup_interface = mock_setupapi.SetupDiGetDeviceInterfaceDetailW + + mock_setupapi.SetupDiGetClassDevsW.return_value = handle_disks + mock_kernel32.GetLastError.return_value = last_error + mock_setup_interface.return_value = interface_detail + mock_kernel32.CreateFileW.return_value = disk_handle + mock_kernel32.DeviceIoControl.return_value = io_control + + mock_setupapi.SetupDiEnumDeviceInterfaces.side_effect = [True, False] + + if handle_disks == self._winutils.INVALID_HANDLE_VALUE \ + or last_error != self._winutils.ERROR_INSUFFICIENT_BUFFER \ + and not interface_detail \ + or disk_handle == self._winutils.INVALID_HANDLE_VALUE \ + or not io_control: + + self.assertRaises(Exception, self._winutils.get_physical_disks) + + else: + response = self._winutils.get_physical_disks() + self.assertEqual(mock_sizeof.call_args_list, sizeof_calls) + self.assertEqual( + mock_setupapi.SetupDiEnumDeviceInterfaces.call_args_list, + device_interfaces_calls) + if not interface_detail: + mock_kernel32.GetLastError.assert_called_once_with() + + mock_POINTER.assert_called_with( + windows_utils.Win32_SP_DEVICE_INTERFACE_DETAIL_DATA_W) + mock_msvcrt.malloc.assert_called_with(mock_DWORD()) + + self.assertEqual(mock_cast.call_args_list, cast_calls) + + mock_setup_interface.assert_called_with(handle_disks, mock_byref(), + mock_cast(),mock_DWORD(), + None, None) + mock_kernel32.CreateFileW.assert_called_with( + mock_cast().value, 0, self._winutils.FILE_SHARE_READ, None, + self._winutils.OPEN_EXISTING, 0, 0) + mock_sdn.assert_called_with() + + mock_kernel32.DeviceIoControl.assert_called_with( + disk_handle, self._winutils.IOCTL_STORAGE_GET_DEVICE_NUMBER, + None, 0, mock_byref(), mock_sizeof(), mock_byref(), None) + self.assertEqual(response, ["\\\\.\PHYSICALDRIVE1"]) + mock_setupapi.SetupDiDestroyDeviceInfoList.assert_called_once_with( + handle_disks) + + mock_setupapi.SetupDiGetClassDevsW.assert_called_once_with( + mock_byref(), None, None, self._winutils.DIGCF_PRESENT | + self._winutils.DIGCF_DEVICEINTERFACE) + + + + def test_get_physical_disks(self): + mock_handle_disks = mock.MagicMock() + mock_disk_handle = mock.MagicMock() + self._test_get_physical_disks( + handle_disks=mock_handle_disks, + last_error=self._winutils.ERROR_INSUFFICIENT_BUFFER, + interface_detail='fake interface detail', + disk_handle=mock_disk_handle, io_control=True) + + def test_get_physical_disks_other_error_and_no_interface_detail(self): + mock_handle_disks = mock.MagicMock() + mock_disk_handle = mock.MagicMock() + self._test_get_physical_disks( + handle_disks=mock_handle_disks, + last_error='other', interface_detail=None, + disk_handle=mock_disk_handle, io_control=True) + + def test_get_physical_disks_invalid_disk_handle(self): + mock_handle_disks = mock.MagicMock() + self._test_get_physical_disks( + handle_disks=mock_handle_disks, + last_error=self._winutils.ERROR_INSUFFICIENT_BUFFER, + interface_detail='fake interface detail', + disk_handle=self._winutils.INVALID_HANDLE_VALUE, io_control=True) + + def test_get_physical_disks_io_control(self): + mock_handle_disks = mock.MagicMock() + mock_disk_handle = mock.MagicMock() + self._test_get_physical_disks( + handle_disks=mock_handle_disks, + last_error=self._winutils.ERROR_INSUFFICIENT_BUFFER, + interface_detail='fake interface detail', + disk_handle=mock_disk_handle, io_control=False) + + def test_get_physical_disks_handle_disks_invalid(self): + mock_disk_handle = mock.MagicMock() + self._test_get_physical_disks( + handle_disks=self._winutils.INVALID_HANDLE_VALUE , + last_error=self._winutils.ERROR_INSUFFICIENT_BUFFER, + interface_detail='fake interface detail', + disk_handle=mock_disk_handle, io_control=True) + @mock.patch('win32com.client.Dispatch') @mock.patch('cloudbaseinit.osutils.windows.WindowsUtils._get_fw_protocol') def _test_firewall_create_rule(self, mock_get_fw_protocol, mock_Dispatch):