505 lines
22 KiB
Python
505 lines
22 KiB
Python
# Copyright 2014 Cloudbase Solutions Srl
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
|
|
import importlib
|
|
import itertools
|
|
import os
|
|
import unittest
|
|
|
|
try:
|
|
import unittest.mock as mock
|
|
except ImportError:
|
|
import mock
|
|
|
|
from cloudbaseinit import conf as cloudbaseinit_conf
|
|
from cloudbaseinit import exception
|
|
from cloudbaseinit.tests import testutils
|
|
|
|
|
|
CONF = cloudbaseinit_conf.CONF
|
|
|
|
OPEN = mock.mock_open()
|
|
|
|
|
|
class TestWindowsConfigDriveManager(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
module_path = "cloudbaseinit.metadata.services.osconfigdrive.windows"
|
|
mock_ctypes = mock.MagicMock()
|
|
mock_ctypes.wintypes = mock.MagicMock()
|
|
self._module_patcher = mock.patch.dict(
|
|
'sys.modules',
|
|
{'disk': mock.Mock(),
|
|
'ctypes': mock_ctypes,
|
|
'winioctlcon': mock.Mock()})
|
|
self._module_patcher.start()
|
|
self.addCleanup(self._module_patcher.stop)
|
|
self.conf_module = importlib.import_module(module_path)
|
|
|
|
self.conf_module.osutils_factory = mock.Mock()
|
|
self.conf_module.disk.Disk = mock.MagicMock()
|
|
self.conf_module.tempfile = mock.Mock()
|
|
self.mock_gettempdir = self.conf_module.tempfile.gettempdir
|
|
self.mock_gettempdir.return_value = "tempdir"
|
|
self.conf_module.uuid = mock.Mock()
|
|
self.mock_uuid4 = self.conf_module.uuid.uuid4
|
|
self.mock_uuid4.return_value = "uuid"
|
|
self._config_manager = self.conf_module.WindowsConfigDriveManager()
|
|
self.addCleanup(os.rmdir, self._config_manager.target_path)
|
|
self.osutils = mock.Mock()
|
|
self._config_manager._osutils = self.osutils
|
|
self.snatcher = testutils.LogSnatcher(module_path)
|
|
self._fake_label = 'config-2'
|
|
self._fake_metadata_file = 'fake_metadata_file'
|
|
|
|
@mock.patch('os.path.exists')
|
|
def _test_check_for_config_drive(self, mock_exists, exists=True,
|
|
label="config-2", fail=False):
|
|
drive = "C:\\"
|
|
self.osutils.get_volume_label.return_value = label
|
|
mock_exists.return_value = exists
|
|
|
|
with self.snatcher:
|
|
response = self._config_manager._check_for_config_drive(
|
|
drive, self._fake_label, self._fake_metadata_file)
|
|
|
|
self.osutils.get_volume_label.assert_called_once_with(drive)
|
|
if exists and not fail:
|
|
self.assertEqual(["Config Drive found on C:\\"],
|
|
self.snatcher.output)
|
|
if not label:
|
|
self.assertEqual(["Looking for a Config Drive with label "
|
|
"'config-2' on 'C:\\'. Found mismatching "
|
|
"label 'None'."],
|
|
self.snatcher.output)
|
|
self.assertEqual(not fail, response)
|
|
|
|
def test_check_for_config_drive_exists(self):
|
|
self._test_check_for_config_drive()
|
|
|
|
def test_check_for_config_drive_exists_upper_label(self):
|
|
self._test_check_for_config_drive(label="CONFIG-2")
|
|
|
|
def test_check_for_config_drive_missing(self):
|
|
self._test_check_for_config_drive(exists=False, fail=True)
|
|
|
|
def test_check_for_config_drive_wrong_label(self):
|
|
self._test_check_for_config_drive(label="config-3", fail=True)
|
|
|
|
def test_check_for_config_drive_empty_label(self):
|
|
self._test_check_for_config_drive(label=None, exists=False,
|
|
fail=True)
|
|
|
|
def _test_get_iso_file_size(self, fixed=True, small=False,
|
|
found_iso=True):
|
|
device = mock.Mock()
|
|
device.fixed = fixed
|
|
device.size = (self.conf_module.OFFSET_BLOCK_SIZE +
|
|
self.conf_module.PEEK_SIZE + int(not small))
|
|
iso_id = self.conf_module.ISO_ID
|
|
if not found_iso:
|
|
iso_id = b"pwned"
|
|
iso_off = self.conf_module.OFFSET_ISO_ID - 1
|
|
volume_off = self.conf_module.OFFSET_VOLUME_SIZE - 1
|
|
block_off = self.conf_module.OFFSET_BLOCK_SIZE - 1
|
|
volume_bytes = b'd\x00' # 100
|
|
block_bytes = b'\x00\x02' # 512
|
|
device.seek.side_effect = [iso_off, volume_off, block_off]
|
|
device.read.side_effect = [iso_id, volume_bytes, block_bytes]
|
|
|
|
response = self._config_manager._get_iso_file_size(device)
|
|
if not fixed or small or not found_iso:
|
|
self.assertIsNone(response)
|
|
return
|
|
|
|
seek_calls = [
|
|
mock.call(self.conf_module.OFFSET_ISO_ID),
|
|
mock.call(self.conf_module.OFFSET_VOLUME_SIZE),
|
|
mock.call(self.conf_module.OFFSET_BLOCK_SIZE)]
|
|
read_calls = [
|
|
mock.call(len(iso_id),
|
|
skip=self.conf_module.OFFSET_ISO_ID - iso_off),
|
|
mock.call(self.conf_module.PEEK_SIZE,
|
|
skip=self.conf_module.OFFSET_VOLUME_SIZE - volume_off),
|
|
mock.call(self.conf_module.PEEK_SIZE,
|
|
skip=self.conf_module.OFFSET_BLOCK_SIZE - block_off)]
|
|
device.seek.assert_has_calls(seek_calls)
|
|
device.read.assert_has_calls(read_calls)
|
|
self.assertEqual(100 * 512, response)
|
|
|
|
def test_get_iso_file_size_not_fixed(self):
|
|
self._test_get_iso_file_size(fixed=False)
|
|
|
|
def test_get_iso_file_size_small(self):
|
|
self._test_get_iso_file_size(small=True)
|
|
|
|
def test_get_iso_file_size_not_found(self):
|
|
self._test_get_iso_file_size(found_iso=False)
|
|
|
|
def test_get_iso_file_size(self):
|
|
self._test_get_iso_file_size()
|
|
|
|
@mock.patch("builtins.open", new=OPEN)
|
|
def test_write_iso_file(self):
|
|
file_path = "fake\\path"
|
|
file_size = 100 * 512
|
|
sector_size = self.conf_module.MAX_SECTOR_SIZE
|
|
offsets = list(range(0, file_size, sector_size))
|
|
remain = file_size % sector_size
|
|
reads = ([b"\x00" * sector_size] *
|
|
(len(offsets) - int(bool(remain))) +
|
|
([b"\x00" * remain] if remain else []))
|
|
|
|
device = mock.Mock()
|
|
device_seek_calls = [mock.call(off) for off in offsets]
|
|
device_read_calls = [
|
|
mock.call(min(sector_size, file_size - off), skip=0)
|
|
for off in offsets]
|
|
stream_write_calls = [mock.call(read) for read in reads]
|
|
device.seek.side_effect = offsets
|
|
device.read.side_effect = reads
|
|
|
|
self._config_manager._write_iso_file(device, file_path, file_size)
|
|
device.seek.assert_has_calls(device_seek_calls)
|
|
device.read.assert_has_calls(device_read_calls)
|
|
OPEN.return_value.write.assert_has_calls(stream_write_calls)
|
|
|
|
@mock.patch('os.path.exists')
|
|
def _test_extract_files_from_iso(self, os_path_exists, exit_code,
|
|
enforce_os_path_exists=True):
|
|
fake_path = os.path.join('fake', 'path')
|
|
fake_target_path = os.path.join(fake_path, 'target')
|
|
self._config_manager.target_path = fake_target_path
|
|
os_path_exists.return_code = enforce_os_path_exists
|
|
args = [CONF.bsdtar_path, '-xf', fake_path, '-C', fake_target_path]
|
|
|
|
self.osutils.execute_process.return_value = ('fake out', 'fake err',
|
|
exit_code)
|
|
if exit_code:
|
|
self.assertRaises(exception.CloudbaseInitException,
|
|
self._config_manager._extract_files_from_iso,
|
|
fake_path)
|
|
else:
|
|
self._config_manager._extract_files_from_iso(fake_path)
|
|
|
|
self.osutils.execute_process.assert_called_once_with(args, False)
|
|
|
|
def test_extract_files_from_iso(self):
|
|
self._test_extract_files_from_iso(exit_code=0)
|
|
|
|
def test_extract_files_from_iso_fail(self):
|
|
self._test_extract_files_from_iso(exit_code=1)
|
|
|
|
def test_extract_files_from_iso_fail_bsdtar_does_not_exist(self):
|
|
self._test_extract_files_from_iso(exit_code=1,
|
|
enforce_os_path_exists=False)
|
|
|
|
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
|
|
'WindowsConfigDriveManager._extract_files_from_iso')
|
|
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
|
|
'WindowsConfigDriveManager._write_iso_file')
|
|
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
|
|
'WindowsConfigDriveManager._get_iso_file_size')
|
|
def _test_extract_iso_from_devices(self, mock_get_iso_file_size,
|
|
mock_write_iso_file,
|
|
mock_extract_files_from_iso,
|
|
found=True):
|
|
# For every device (mock) in the list of available devices:
|
|
# first - skip (no size)
|
|
# second - error (throws Exception)
|
|
# third - extract (is ok)
|
|
# fourth - unreachable (already found ok device)
|
|
size = 100 * 512
|
|
devices = [mock.MagicMock() for _ in range(4)]
|
|
devices[1].__enter__.side_effect = [Exception]
|
|
rest = [size] if found else [None]
|
|
mock_get_iso_file_size.side_effect = [None] + rest * 2
|
|
file_path = os.path.join("tempdir", "uuid.iso")
|
|
|
|
with self.snatcher:
|
|
response = self._config_manager._extract_iso_from_devices(devices)
|
|
self.mock_gettempdir.assert_called_once_with()
|
|
mock_get_iso_file_size.assert_has_calls([
|
|
mock.call(devices[0]), mock.call(devices[2])])
|
|
expected_log = [
|
|
"ISO extraction failed on %(device)s with %(error)r" %
|
|
{"device": devices[1], "error": Exception()}]
|
|
if found:
|
|
mock_write_iso_file.assert_called_once_with(devices[2],
|
|
file_path, size)
|
|
mock_extract_files_from_iso.assert_called_once_with(file_path)
|
|
expected_log.append("ISO9660 disk found on %s" % devices[2])
|
|
self.assertEqual(expected_log, self.snatcher.output)
|
|
self.assertEqual(found, response)
|
|
|
|
def test_extract_iso_from_devices_not_found(self):
|
|
self._test_extract_iso_from_devices(found=False)
|
|
|
|
def test_extract_iso_from_devices(self):
|
|
self._test_extract_iso_from_devices()
|
|
|
|
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
|
|
'WindowsConfigDriveManager.'
|
|
'_check_for_config_drive')
|
|
@mock.patch('shutil.copytree')
|
|
@mock.patch('os.rmdir')
|
|
def _test_get_config_drive_from_cdrom_drive(self, mock_os_rmdir,
|
|
mock_copytree,
|
|
mock_check_for_config_drive,
|
|
found=True):
|
|
drives = ["C:\\", "M:\\", "I:\\", "N:\\"]
|
|
self.osutils.get_cdrom_drives.return_value = drives
|
|
checks = [False, False, True, False]
|
|
if not found:
|
|
checks[2] = False
|
|
mock_check_for_config_drive.side_effect = checks
|
|
|
|
response = self._config_manager._get_config_drive_from_cdrom_drive(
|
|
self._fake_label, self._fake_metadata_file
|
|
)
|
|
|
|
self.osutils.get_cdrom_drives.assert_called_once_with()
|
|
idx = 3 if found else 4
|
|
check_calls = [
|
|
mock.call(drive, self._fake_label,
|
|
self._fake_metadata_file) for drive in drives[:idx]]
|
|
mock_check_for_config_drive.assert_has_calls(check_calls)
|
|
if found:
|
|
mock_os_rmdir.assert_called_once_with(
|
|
self._config_manager.target_path)
|
|
mock_copytree.assert_called_once_with(
|
|
drives[2], self._config_manager.target_path)
|
|
|
|
self.assertEqual(found, response)
|
|
|
|
def test_get_config_drive_from_cdrom_drive_not_found(self):
|
|
self._test_get_config_drive_from_cdrom_drive(found=False)
|
|
|
|
def test_get_config_drive_from_cdrom_drive(self):
|
|
self._test_get_config_drive_from_cdrom_drive()
|
|
|
|
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
|
|
'WindowsConfigDriveManager.'
|
|
'_extract_iso_from_devices')
|
|
@mock.patch("builtins.map")
|
|
def test_get_config_drive_from_raw_hdd(self, mock_map,
|
|
mock_extract_iso_from_devices):
|
|
Disk = self.conf_module.disk.Disk
|
|
paths = [mock.Mock() for _ in range(3)]
|
|
self.osutils.get_physical_disks.return_value = paths
|
|
mock_extract_iso_from_devices.return_value = True
|
|
|
|
response = self._config_manager._get_config_drive_from_raw_hdd(
|
|
self._fake_label, self._fake_metadata_file)
|
|
|
|
mock_map.assert_called_once_with(Disk, paths)
|
|
self.osutils.get_physical_disks.assert_called_once_with()
|
|
mock_extract_iso_from_devices.assert_called_once_with(
|
|
mock_map.return_value)
|
|
self.assertTrue(response)
|
|
|
|
@mock.patch('cloudbaseinit.utils.windows.vfat.copy_from_vfat_drive')
|
|
@mock.patch('cloudbaseinit.utils.windows.vfat.is_vfat_drive')
|
|
def test_get_config_drive_from_vfat(self, mock_is_vfat_drive,
|
|
mock_copy_from_vfat_drive):
|
|
self.osutils.get_physical_disks.return_value = (
|
|
mock.sentinel.drive1,
|
|
mock.sentinel.drive2,
|
|
)
|
|
mock_is_vfat_drive.side_effect = (None, True)
|
|
|
|
with testutils.LogSnatcher('cloudbaseinit.metadata.services.'
|
|
'osconfigdrive.windows') as snatcher:
|
|
response = self._config_manager._get_config_drive_from_vfat(
|
|
self._fake_label, self._fake_metadata_file)
|
|
|
|
self.assertTrue(response)
|
|
self.osutils.get_physical_disks.assert_called_once_with()
|
|
|
|
expected_is_vfat_calls = [
|
|
mock.call(self.osutils, mock.sentinel.drive1, self._fake_label),
|
|
mock.call(self.osutils, mock.sentinel.drive2, self._fake_label),
|
|
]
|
|
self.assertEqual(expected_is_vfat_calls, mock_is_vfat_drive.mock_calls)
|
|
mock_copy_from_vfat_drive.assert_called_once_with(
|
|
self.osutils,
|
|
mock.sentinel.drive2,
|
|
self._config_manager.target_path)
|
|
|
|
expected_logging = [
|
|
'Config Drive found on disk %r' % mock.sentinel.drive2,
|
|
]
|
|
self.assertEqual(expected_logging, snatcher.output)
|
|
|
|
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
|
|
'WindowsConfigDriveManager.'
|
|
'_extract_iso_from_devices')
|
|
def _test_get_config_drive_from_partition(self,
|
|
mock_extract_iso_from_devices,
|
|
found=True):
|
|
paths = [mock.Mock() for _ in range(3)]
|
|
self.osutils.get_physical_disks.return_value = paths
|
|
disks = list(map(self.conf_module.disk.Disk, paths))
|
|
mock_extract_iso_from_devices.side_effect = [False, found, found]
|
|
idx = 3 - int(found)
|
|
extract_calls = [mock.call(disk.partitions())
|
|
for disk in disks[:idx]]
|
|
|
|
response = self._config_manager._get_config_drive_from_partition(
|
|
self._fake_label, self._fake_metadata_file
|
|
)
|
|
self.osutils.get_physical_disks.assert_called_once_with()
|
|
mock_extract_iso_from_devices.assert_has_calls(extract_calls)
|
|
self.assertEqual(found, response)
|
|
|
|
def test_get_config_drive_from_partition_not_found(self):
|
|
self._test_get_config_drive_from_partition(found=False)
|
|
|
|
def test_get_config_drive_from_partition(self):
|
|
self._test_get_config_drive_from_partition()
|
|
|
|
@mock.patch('os.rmdir')
|
|
@mock.patch('shutil.copytree')
|
|
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
|
|
'WindowsConfigDriveManager.'
|
|
'_check_for_config_drive')
|
|
def _test_get_config_drive_from_volume(self, mock_check_for_config_drive,
|
|
mock_copytree, mock_os_rmdir,
|
|
found=True):
|
|
volumes = [mock.Mock() for _ in range(3)]
|
|
self.osutils.get_volumes.return_value = volumes
|
|
checks = [False, found, found]
|
|
mock_check_for_config_drive.side_effect = checks
|
|
idx = 3 - int(found)
|
|
check_calls = [
|
|
mock.call(volume, self._fake_label,
|
|
self._fake_metadata_file) for volume in volumes[:idx]]
|
|
|
|
response = self._config_manager._get_config_drive_from_volume(
|
|
self._fake_label, self._fake_metadata_file
|
|
)
|
|
self.osutils.get_volumes.assert_called_once_with()
|
|
mock_check_for_config_drive.assert_has_calls(check_calls)
|
|
if found:
|
|
mock_os_rmdir.assert_called_once_with(
|
|
self._config_manager.target_path)
|
|
mock_copytree.assert_called_once_with(
|
|
volumes[1], self._config_manager.target_path)
|
|
self.assertEqual(found, response)
|
|
|
|
def test_get_config_drive_from_volume_not_found(self):
|
|
self._test_get_config_drive_from_volume(found=False)
|
|
|
|
def test_get_config_drive_from_volume(self):
|
|
self._test_get_config_drive_from_volume()
|
|
|
|
def _test__get_config_drive_files(self, cd_type, cd_location,
|
|
func, found=True):
|
|
response = self._config_manager._get_config_drive_files(
|
|
self._fake_label, self._fake_metadata_file,
|
|
cd_type, cd_location)
|
|
if found:
|
|
if func:
|
|
func.assert_called_once_with(self._fake_label,
|
|
self._fake_metadata_file)
|
|
self.assertEqual(func.return_value, response)
|
|
else:
|
|
self.assertFalse(response)
|
|
|
|
def test__get_config_drive_files_not_found(self):
|
|
self._test__get_config_drive_files(None, None, None, found=False)
|
|
|
|
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
|
|
'WindowsConfigDriveManager.'
|
|
'_get_config_drive_from_cdrom_drive')
|
|
def test__get_config_drive_files_cdrom_iso(self, func):
|
|
self._test__get_config_drive_files(
|
|
"iso", "cdrom", func)
|
|
|
|
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
|
|
'WindowsConfigDriveManager.'
|
|
'_get_config_drive_from_cdrom_drive')
|
|
def test__get_config_drive_files_cdrom_iso_failed(self, func):
|
|
func.side_effect = Exception
|
|
self._test__get_config_drive_files(
|
|
"iso", "cdrom", func, found=False)
|
|
|
|
def test__get_config_drive_files_cdrom_vfat(self):
|
|
self._test__get_config_drive_files(
|
|
"vfat", "cdrom", None)
|
|
|
|
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
|
|
'WindowsConfigDriveManager.'
|
|
'_get_config_drive_from_raw_hdd')
|
|
def test__get_config_drive_files_hdd_iso(self, func):
|
|
self._test__get_config_drive_files(
|
|
"iso", "hdd", func)
|
|
|
|
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
|
|
'WindowsConfigDriveManager.'
|
|
'_get_config_drive_from_vfat')
|
|
def test__get_config_drive_files_hdd_vfat(self, func):
|
|
self._test__get_config_drive_files(
|
|
"vfat", "hdd", func)
|
|
|
|
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
|
|
'WindowsConfigDriveManager.'
|
|
'_get_config_drive_from_partition')
|
|
def test__get_config_drive_files_partition_iso(self, func):
|
|
self._test__get_config_drive_files(
|
|
"iso", "partition", func)
|
|
|
|
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
|
|
'WindowsConfigDriveManager.'
|
|
'_get_config_drive_from_volume')
|
|
def test__get_config_drive_files_partition_vfat(self, func):
|
|
self._test__get_config_drive_files(
|
|
"vfat", "partition", func)
|
|
|
|
@mock.patch('cloudbaseinit.metadata.services.osconfigdrive.windows.'
|
|
'WindowsConfigDriveManager.'
|
|
'_get_config_drive_files')
|
|
def _test_get_config_drive_files(self, mock_get_config_drive_files,
|
|
found=True):
|
|
check_types = ["iso", "vfat"] if found else []
|
|
check_locations = ["cdrom", "hdd", "partition"]
|
|
product = list(itertools.product(check_types, check_locations,
|
|
[self._fake_label]))
|
|
product_calls = [mock.call(self._fake_label, self._fake_metadata_file,
|
|
cd_type, cd_location)
|
|
for cd_type, cd_location, _ in product]
|
|
mock_get_config_drive_files.side_effect = \
|
|
[False] * (len(product_calls) - 1) + [True]
|
|
expected_log = [("Looking for Config Drive %(type)s in %(location)s"
|
|
" with expected label %(label)s") %
|
|
{"type": cd_type, "location": cd_location,
|
|
"label": self._fake_label}
|
|
for cd_type, cd_location, config_type in product]
|
|
|
|
with self.snatcher:
|
|
response = self._config_manager.get_config_drive_files(
|
|
self._fake_label, self._fake_metadata_file,
|
|
check_types, check_locations)
|
|
|
|
mock_get_config_drive_files.assert_has_calls(product_calls)
|
|
self.assertEqual(expected_log, self.snatcher.output)
|
|
self.assertEqual(found, response)
|
|
|
|
def test_get_config_drive_files_not_found(self):
|
|
self._test_get_config_drive_files(found=False)
|
|
|
|
def test_get_config_drive_files(self):
|
|
self._test_get_config_drive_files()
|