Merge pull request #54 from trobert2/latest_config_drive

Added tests for windows config drive
This commit is contained in:
Alessandro Pilotti
2014-04-30 15:50:03 +03:00
2 changed files with 448 additions and 0 deletions

View File

@@ -0,0 +1,325 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2014 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import os
import sys
import unittest
if sys.platform == 'win32':
from cloudbaseinit.metadata.services.osconfigdrive import windows
from cloudbaseinit.utils.windows import physical_disk
from oslo.config import cfg
CONF = cfg.CONF
@unittest.skipUnless(sys.platform == "win32", "requires Windows")
class TestWindowsConfigDriveManager(unittest.TestCase):
def setUp(self):
self._config_manager = windows.WindowsConfigDriveManager()
@mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
@mock.patch('os.path.exists')
def _test_get_config_drive_cdrom_mount_point(self, mock_join,
mock_get_os_utils, exists):
mock_osutils = mock.MagicMock()
mock_get_os_utils.return_value = mock_osutils
mock_osutils.get_cdrom_drives.return_value = ['fake drive']
mock_osutils.get_volume_label.return_value = 'config-2'
mock_join.return_value = exists
response = self._config_manager._get_config_drive_cdrom_mount_point()
mock_osutils.get_cdrom_drives.assert_called_once_with()
mock_osutils.get_volume_label.assert_called_once_with('fake drive')
if exists:
self.assertEqual(response, 'fake drive')
else:
self.assertIsNone(response)
def test_get_config_drive_cdrom_mount_point_exists_true(self):
self._test_get_config_drive_cdrom_mount_point(exists=True)
def test_get_config_drive_cdrom_mount_point_exists_false(self):
self._test_get_config_drive_cdrom_mount_point(exists=False)
@mock.patch('ctypes.cast')
@mock.patch('ctypes.POINTER')
@mock.patch('ctypes.wintypes.WORD')
def test_c_char_array_to_c_ushort(self, mock_WORD, mock_POINTER,
mock_cast):
mock_buf = mock.MagicMock()
response = self._config_manager._c_char_array_to_c_ushort(mock_buf,
1)
self.assertEqual(mock_cast.call_count, 2)
mock_POINTER.assert_called_with(mock_WORD)
mock_cast.assert_called_with(mock_buf.__getitem__(), mock_POINTER())
self.assertEqual(response,
mock_cast().contents.value.__lshift__().__add__())
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager._c_char_array_to_c_ushort')
def _test_get_iso_disk_size(self, mock_c_char_array_to_c_ushort,
media_type, value, iso_id):
boot_record_off = 0x8000
volume_size_off = 80
block_size_off = 128
mock_phys_disk = mock.MagicMock()
mock_buff = mock.MagicMock()
mock_geom = mock.MagicMock()
mock_phys_disk.get_geometry.return_value = mock_geom
mock_geom.MediaType = media_type
mock_geom.Cylinders = value
mock_geom.TracksPerCylinder = 2
mock_geom.SectorsPerTrack = 2
mock_geom.BytesPerSector = 2
mock_phys_disk.read.return_value = (mock_buff, 'fake value')
mock_buff.__getitem__.return_value = iso_id
mock_c_char_array_to_c_ushort.return_value = 100
disk_size = mock_geom.Cylinders * mock_geom.TracksPerCylinder * \
mock_geom.SectorsPerTrack * mock_geom.BytesPerSector
offset = boot_record_off / mock_geom.BytesPerSector * \
mock_geom.BytesPerSector
buf_off_volume = boot_record_off - offset + volume_size_off
buf_off_block = boot_record_off - offset + block_size_off
response = self._config_manager._get_iso_disk_size(mock_phys_disk)
mock_phys_disk.get_geometry.assert_called_once_with()
if mock_geom.MediaType != physical_disk.Win32_DiskGeometry.FixedMedia:
self.assertIsNone(response)
elif disk_size <= offset + mock_geom.BytesPerSector:
self.assertIsNone(response)
else:
mock_phys_disk.seek.assert_called_once_with(offset)
mock_phys_disk.read.assert_called_once_with(
mock_geom.BytesPerSector)
if iso_id != 'CD001':
self.assertIsNone(response)
else:
mock_c_char_array_to_c_ushort.assert_has_calls(
mock.call(mock_buff, buf_off_volume),
mock.call(mock_buff, buf_off_block))
self.assertEqual(response, 10000)
def test_test_get_iso_disk_size(self):
self._test_get_iso_disk_size(
media_type=physical_disk.Win32_DiskGeometry.FixedMedia,
value=100, iso_id='CD001')
def test_test_get_iso_disk_size_other_media_type(self):
self._test_get_iso_disk_size(media_type="fake media type", value=100,
iso_id='CD001')
def test_test_get_iso_disk_size_other_disk_size_too_small(self):
self._test_get_iso_disk_size(
media_type=physical_disk.Win32_DiskGeometry.FixedMedia, value=0,
iso_id='CD001')
def test_test_get_iso_disk_size_other_id(self):
self._test_get_iso_disk_size(
media_type=physical_disk.Win32_DiskGeometry.FixedMedia,
value=100, iso_id='other id')
def test_write_iso_file(self):
mock_buff = mock.MagicMock()
mock_geom = mock.MagicMock()
mock_geom.BytesPerSector = 2
mock_phys_disk = mock.MagicMock()
mock_phys_disk.read.return_value = (mock_buff, 10)
fake_path = os.path.join('fake', 'path')
mock_phys_disk.get_geometry.return_value = mock_geom
with mock.patch('__builtin__.open', mock.mock_open(),
create=True) as f:
self._config_manager._write_iso_file(mock_phys_disk, fake_path,
10)
f().write.assert_called_once_with(mock_buff)
mock_phys_disk.seek.assert_called_once_with(0)
mock_phys_disk.read.assert_called_once_with(10)
@mock.patch('os.makedirs')
def _test_extract_iso_files(self, mock_makedirs, exit_code):
fake_path = os.path.join('fake', 'path')
fake_target_path = os.path.join(fake_path, 'target')
args = [CONF.bsdtar_path, '-xf', fake_path, '-C', fake_target_path]
mock_os_utils = mock.MagicMock()
mock_os_utils.execute_process.return_value = ('fake out', 'fake err',
exit_code)
if exit_code:
self.assertRaises(Exception,
self._config_manager._extract_iso_files,
mock_os_utils, fake_path, fake_target_path)
else:
self._config_manager._extract_iso_files(mock_os_utils, fake_path,
fake_target_path)
mock_os_utils.execute_process.assert_called_once_with(args, False)
mock_makedirs.assert_called_once_with(fake_target_path)
def test_extract_iso_files(self):
self._test_extract_iso_files(exit_code=None)
def test_extract_iso_files_exception(self):
self._test_extract_iso_files(exit_code=1)
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager._get_iso_disk_size')
@mock.patch('cloudbaseinit.utils.windows.physical_disk.PhysicalDisk')
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager._write_iso_file')
def _test_extract_iso_disk_file(self, mock_write_iso_file,
mock_PhysicalDisk, mock_get_iso_disk_size,
exception):
mock_osutils = mock.MagicMock()
fake_path = os.path.join('fake', 'path')
fake_path_physical = os.path.join(fake_path, 'physical')
mock_osutils.get_physical_disks.return_value = [fake_path_physical]
mock_get_iso_disk_size.return_value = 'fake iso size'
if exception:
mock_PhysicalDisk().open.side_effect = [Exception]
response = self._config_manager._extract_iso_disk_file(
osutils=mock_osutils, iso_file_path=fake_path)
print mock_PhysicalDisk().open.mock_calls
if not exception:
mock_get_iso_disk_size.assert_called_once_with(
mock_PhysicalDisk())
mock_write_iso_file.assert_called_once_with(mock_PhysicalDisk(),
fake_path,
'fake iso size')
self.assertTrue(response)
else:
self.assertFalse(response)
mock_PhysicalDisk().open.assert_called_once_with()
mock_osutils.get_physical_disks.assert_called_once_with()
mock_PhysicalDisk().close.assert_called_once_with()
def test_extract_iso_disk_file_disk_found(self):
self._test_extract_iso_disk_file(exception=False)
def test_extract_iso_disk_file_disk_not_found(self):
self._test_extract_iso_disk_file(exception=True)
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager._get_conf_drive_from_raw_hdd')
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager._get_conf_drive_from_cdrom_drive')
def test_get_config_drive_files(self,
mock_get_conf_drive_from_cdrom_drive,
mock_get_conf_drive_from_raw_hdd):
fake_path = os.path.join('fake', 'path')
mock_get_conf_drive_from_raw_hdd.return_value = False
mock_get_conf_drive_from_cdrom_drive.return_value = True
response = self._config_manager.get_config_drive_files(
target_path=fake_path)
mock_get_conf_drive_from_raw_hdd.assert_called_once_with(fake_path)
mock_get_conf_drive_from_cdrom_drive.assert_called_once_with(
fake_path)
self.assertTrue(response)
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager.'
'_get_config_drive_cdrom_mount_point')
@mock.patch('shutil.copytree')
def _test_get_conf_drive_from_cdrom_drive(self, mock_copytree,
mock_get_config_cdrom_mount,
mount_point):
fake_path = os.path.join('fake', 'path')
mock_get_config_cdrom_mount.return_value = mount_point
response = self._config_manager._get_conf_drive_from_cdrom_drive(
fake_path)
mock_get_config_cdrom_mount.assert_called_once_with()
if mount_point:
mock_copytree.assert_called_once_with(mount_point, fake_path)
self.assertTrue(response)
else:
self.assertFalse(response)
def test_get_conf_drive_from_cdrom_drive_with_mountpoint(self):
self._test_get_conf_drive_from_cdrom_drive(
mount_point='fake mount point')
def test_get_conf_drive_from_cdrom_drive_without_mountpoint(self):
self._test_get_conf_drive_from_cdrom_drive(
mount_point=None)
@mock.patch('os.remove')
@mock.patch('os.path.exists')
@mock.patch('tempfile.gettempdir')
@mock.patch('uuid.uuid4')
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager._extract_iso_disk_file')
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
'WindowsConfigDriveManager._extract_iso_files')
@mock.patch('cloudbaseinit.osutils.factory.get_os_utils')
def _test_get_conf_drive_from_raw_hdd(self, mock_get_os_utils,
mock_extract_iso_files,
mock_extract_iso_disk_file,
mock_uuid4, mock_gettempdir,
mock_exists, mock_remove,
found_drive):
fake_target_path = os.path.join('fake', 'path')
fake_iso_path = os.path.join('fake_dir', 'fake_id' + '.iso')
mock_uuid4.return_value = 'fake_id'
mock_gettempdir.return_value = 'fake_dir'
mock_extract_iso_disk_file.return_value = found_drive
mock_exists.return_value = found_drive
response = self._config_manager._get_conf_drive_from_raw_hdd(
fake_target_path)
mock_get_os_utils.assert_called_once_with()
mock_gettempdir.assert_called_once_with()
mock_extract_iso_disk_file.assert_called_once_with(
mock_get_os_utils(), fake_iso_path)
if found_drive:
mock_extract_iso_files.assert_called_once_with(
mock_get_os_utils(), fake_iso_path, fake_target_path)
mock_exists.assert_called_once_with(fake_iso_path)
mock_remove.assert_called_once_with(fake_iso_path)
self.assertTrue(response)
else:
self.assertFalse(response)
def test_get_conf_drive_from_raw_hdd_found_drive(self):
self._test_get_conf_drive_from_raw_hdd(found_drive=True)
def test_get_conf_drive_from_raw_hdd_no_drive_found(self):
self._test_get_conf_drive_from_raw_hdd(found_drive=False)

View File

@@ -971,6 +971,129 @@ class WindowsUtilsTest(unittest.TestCase):
mock_get_logical_drives.assert_called_with()
self.assertEqual(response, ['drive'])
@mock.patch('cloudbaseinit.osutils.windows.msvcrt')
@mock.patch('cloudbaseinit.osutils.windows.kernel32')
@mock.patch('cloudbaseinit.osutils.windows.setupapi')
@mock.patch('cloudbaseinit.osutils.windows.Win32_STORAGE_DEVICE_NUMBER')
@mock.patch('ctypes.byref')
@mock.patch('ctypes.sizeof')
@mock.patch('ctypes.wintypes.DWORD')
@mock.patch('ctypes.cast')
@mock.patch('ctypes.POINTER')
def _test_get_physical_disks(self, mock_POINTER, mock_cast,
mock_DWORD, mock_sizeof, mock_byref,
mock_sdn, mock_setupapi, mock_kernel32,
mock_msvcrt, handle_disks, last_error,
interface_detail, disk_handle, io_control):
sizeof_calls = [mock.call(
windows_utils.Win32_SP_DEVICE_INTERFACE_DATA),
mock.call(mock_sdn())]
device_interfaces_calls = [mock.call(handle_disks, None, mock_byref(),
0, mock_byref()),
mock.call(handle_disks, None, mock_byref(),
1, mock_byref())]
cast_calls = [mock.call(),
mock.call(mock_msvcrt.malloc(), mock_POINTER()),
mock.call(mock_cast().contents.DevicePath,
wintypes.LPWSTR)]
mock_setup_interface = mock_setupapi.SetupDiGetDeviceInterfaceDetailW
mock_setupapi.SetupDiGetClassDevsW.return_value = handle_disks
mock_kernel32.GetLastError.return_value = last_error
mock_setup_interface.return_value = interface_detail
mock_kernel32.CreateFileW.return_value = disk_handle
mock_kernel32.DeviceIoControl.return_value = io_control
mock_setupapi.SetupDiEnumDeviceInterfaces.side_effect = [True, False]
if handle_disks == self._winutils.INVALID_HANDLE_VALUE \
or last_error != self._winutils.ERROR_INSUFFICIENT_BUFFER \
and not interface_detail \
or disk_handle == self._winutils.INVALID_HANDLE_VALUE \
or not io_control:
self.assertRaises(Exception, self._winutils.get_physical_disks)
else:
response = self._winutils.get_physical_disks()
self.assertEqual(mock_sizeof.call_args_list, sizeof_calls)
self.assertEqual(
mock_setupapi.SetupDiEnumDeviceInterfaces.call_args_list,
device_interfaces_calls)
if not interface_detail:
mock_kernel32.GetLastError.assert_called_once_with()
mock_POINTER.assert_called_with(
windows_utils.Win32_SP_DEVICE_INTERFACE_DETAIL_DATA_W)
mock_msvcrt.malloc.assert_called_with(mock_DWORD())
self.assertEqual(mock_cast.call_args_list, cast_calls)
mock_setup_interface.assert_called_with(handle_disks, mock_byref(),
mock_cast(),mock_DWORD(),
None, None)
mock_kernel32.CreateFileW.assert_called_with(
mock_cast().value, 0, self._winutils.FILE_SHARE_READ, None,
self._winutils.OPEN_EXISTING, 0, 0)
mock_sdn.assert_called_with()
mock_kernel32.DeviceIoControl.assert_called_with(
disk_handle, self._winutils.IOCTL_STORAGE_GET_DEVICE_NUMBER,
None, 0, mock_byref(), mock_sizeof(), mock_byref(), None)
self.assertEqual(response, ["\\\\.\PHYSICALDRIVE1"])
mock_setupapi.SetupDiDestroyDeviceInfoList.assert_called_once_with(
handle_disks)
mock_setupapi.SetupDiGetClassDevsW.assert_called_once_with(
mock_byref(), None, None, self._winutils.DIGCF_PRESENT |
self._winutils.DIGCF_DEVICEINTERFACE)
def test_get_physical_disks(self):
mock_handle_disks = mock.MagicMock()
mock_disk_handle = mock.MagicMock()
self._test_get_physical_disks(
handle_disks=mock_handle_disks,
last_error=self._winutils.ERROR_INSUFFICIENT_BUFFER,
interface_detail='fake interface detail',
disk_handle=mock_disk_handle, io_control=True)
def test_get_physical_disks_other_error_and_no_interface_detail(self):
mock_handle_disks = mock.MagicMock()
mock_disk_handle = mock.MagicMock()
self._test_get_physical_disks(
handle_disks=mock_handle_disks,
last_error='other', interface_detail=None,
disk_handle=mock_disk_handle, io_control=True)
def test_get_physical_disks_invalid_disk_handle(self):
mock_handle_disks = mock.MagicMock()
self._test_get_physical_disks(
handle_disks=mock_handle_disks,
last_error=self._winutils.ERROR_INSUFFICIENT_BUFFER,
interface_detail='fake interface detail',
disk_handle=self._winutils.INVALID_HANDLE_VALUE, io_control=True)
def test_get_physical_disks_io_control(self):
mock_handle_disks = mock.MagicMock()
mock_disk_handle = mock.MagicMock()
self._test_get_physical_disks(
handle_disks=mock_handle_disks,
last_error=self._winutils.ERROR_INSUFFICIENT_BUFFER,
interface_detail='fake interface detail',
disk_handle=mock_disk_handle, io_control=False)
def test_get_physical_disks_handle_disks_invalid(self):
mock_disk_handle = mock.MagicMock()
self._test_get_physical_disks(
handle_disks=self._winutils.INVALID_HANDLE_VALUE ,
last_error=self._winutils.ERROR_INSUFFICIENT_BUFFER,
interface_detail='fake interface detail',
disk_handle=mock_disk_handle, io_control=True)
@mock.patch('win32com.client.Dispatch')
@mock.patch('cloudbaseinit.osutils.windows.WindowsUtils._get_fw_protocol')
def _test_firewall_create_rule(self, mock_get_fw_protocol, mock_Dispatch):