diff --git a/ironic_python_agent/efi_utils.py b/ironic_python_agent/efi_utils.py new file mode 100644 index 000000000..3e91d2b8b --- /dev/null +++ b/ironic_python_agent/efi_utils.py @@ -0,0 +1,242 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import re +import shutil +import tempfile + +from oslo_concurrency import processutils +from oslo_log import log + +from ironic_python_agent import errors +from ironic_python_agent import partition_utils +from ironic_python_agent import utils + + +LOG = log.getLogger(__name__) + + +def manage_uefi(device, efi_system_part_uuid=None): + """Manage the device looking for valid efi bootloaders to update the nvram. + + This method checks for valid efi bootloaders in the device, if they exists + it updates the nvram using the efibootmgr. + + :param device: the device to be checked. + :param efi_system_part_uuid: efi partition uuid. + :raises: DeviceNotFound if the efi partition cannot be found. + :return: True - if it founds any efi bootloader and the nvram was updated + using the efibootmgr. + False - if no efi bootloader is found. + """ + efi_partition_mount_point = None + efi_mounted = False + LOG.debug('Attempting UEFI loader autodetection and NVRAM record setup.') + try: + # Force UEFI to rescan the device. + utils.rescan_device(device) + + local_path = tempfile.mkdtemp() + # Trust the contents on the disk in the event of a whole disk image. + efi_partition = utils.get_efi_part_on_device(device) + if not efi_partition and efi_system_part_uuid: + # _get_partition returns + and we only need the + # partition number + partition = partition_utils.get_partition( + device, uuid=efi_system_part_uuid) + try: + efi_partition = int(partition.replace(device, "")) + except ValueError: + # NVMe Devices get a partitioning scheme that is different from + # traditional block devices like SCSI/SATA + efi_partition = int(partition.replace(device + 'p', "")) + + if not efi_partition: + # NOTE(dtantsur): we cannot have a valid EFI deployment without an + # EFI partition at all. This code path is easily hit when using an + # image that is not UEFI compatible (which sadly applies to most + # cloud images out there, with a nice exception of Ubuntu). + raise errors.DeviceNotFound( + "No EFI partition could be detected on device %s and " + "EFI partition UUID has not been recorded during deployment " + "(which is often the case for whole disk images). " + "Are you using a UEFI-compatible image?" % device) + + efi_partition_mount_point = os.path.join(local_path, "boot/efi") + if not os.path.exists(efi_partition_mount_point): + os.makedirs(efi_partition_mount_point) + + # The mount needs the device with the partition, in case the + # device ends with a digit we add a `p` and the partition number we + # found, otherwise we just join the device and the partition number + if device[-1].isdigit(): + efi_device_part = '{}p{}'.format(device, efi_partition) + utils.execute('mount', efi_device_part, efi_partition_mount_point) + else: + efi_device_part = '{}{}'.format(device, efi_partition) + utils.execute('mount', efi_device_part, efi_partition_mount_point) + efi_mounted = True + + valid_efi_bootloaders = _get_efi_bootloaders(efi_partition_mount_point) + if valid_efi_bootloaders: + _run_efibootmgr(valid_efi_bootloaders, device, efi_partition, + efi_partition_mount_point) + return True + else: + # NOTE(dtantsur): if we have an empty EFI partition, try to use + # grub-install to populate it. + LOG.warning('Empty EFI partition detected.') + return False + + except processutils.ProcessExecutionError as e: + error_msg = ('Could not verify uefi on device %(dev)s' + 'failed with %(err)s.' % {'dev': device, 'err': e}) + LOG.error(error_msg) + raise errors.CommandExecutionError(error_msg) + finally: + LOG.debug('Executing _manage_uefi clean-up.') + umount_warn_msg = "Unable to umount %(local_path)s. Error: %(error)s" + + try: + if efi_mounted: + utils.execute('umount', efi_partition_mount_point, + attempts=3, delay_on_retry=True) + except processutils.ProcessExecutionError as e: + error_msg = ('Umounting efi system partition failed. ' + 'Attempted 3 times. Error: %s' % e) + LOG.error(error_msg) + raise errors.CommandExecutionError(error_msg) + + else: + # If umounting the binds succeed then we can try to delete it + try: + utils.execute('sync') + except processutils.ProcessExecutionError as e: + LOG.warning(umount_warn_msg, {'path': local_path, 'error': e}) + else: + # After everything is umounted we can then remove the + # temporary directory + shutil.rmtree(local_path) + + +# NOTE(TheJulia): Do not add bootia32.csv to this list. That is 32bit +# EFI booting and never really became popular. +BOOTLOADERS_EFI = [ + 'bootx64.csv', # Used by GRUB2 shim loader (Ubuntu, Red Hat) + 'boot.csv', # Used by rEFInd, Centos7 Grub2 + 'bootia32.efi', + 'bootx64.efi', # x86_64 Default + 'bootia64.efi', + 'bootarm.efi', + 'bootaa64.efi', # Arm64 Default + 'bootriscv32.efi', + 'bootriscv64.efi', + 'bootriscv128.efi', + 'grubaa64.efi', + 'winload.efi' +] + + +def _get_efi_bootloaders(location): + """Get all valid efi bootloaders in a given location + + :param location: the location where it should start looking for the + efi files. + :return: a list of relative paths to valid efi bootloaders or reference + files. + """ + # Let's find all files with .efi or .EFI extension + LOG.debug('Looking for all efi files on %s', location) + valid_bootloaders = [] + for root, dirs, files in os.walk(location): + efi_files = [f for f in files if f.lower() in BOOTLOADERS_EFI] + LOG.debug('efi files found in %(location)s : %(efi_files)s', + {'location': location, 'efi_files': str(efi_files)}) + for name in efi_files: + efi_f = os.path.join(root, name) + LOG.debug('Checking if %s is executable', efi_f) + if os.access(efi_f, os.X_OK): + v_bl = efi_f.split(location)[-1][1:] + LOG.debug('%s is a valid bootloader', v_bl) + valid_bootloaders.append(v_bl) + if 'csv' in efi_f.lower(): + v_bl = efi_f.split(location)[-1][1:] + LOG.debug('%s is a pointer to a bootloader', v_bl) + # The CSV files are intended to be authortative as + # to the bootloader and the label to be used. Since + # we found one, we're going to point directly to it. + # centos7 did ship with 2, but with the same contents. + # TODO(TheJulia): Perhaps we extend this to make a list + # of CSVs instead and only return those?! But then the + # question is which is right/first/preferred. + return [v_bl] + return valid_bootloaders + + +def _run_efibootmgr(valid_efi_bootloaders, device, efi_partition, + mount_point): + """Executes efibootmgr and removes duplicate entries. + + :param valid_efi_bootloaders: the list of valid efi bootloaders + :param device: the device to be used + :param efi_partition: the efi partition on the device + :param mount_point: The mountpoint for the EFI partition so we can + read contents of files if necessary to perform + proper bootloader injection operations. + """ + + # Before updating let's get information about the bootorder + LOG.debug("Getting information about boot order.") + original_efi_output = utils.execute('efibootmgr', '-v') + # NOTE(TheJulia): regex used to identify entries in the efibootmgr + # output on stdout. + entry_label = re.compile(r'Boot([0-9a-f-A-F]+)\*?\s(.*).*$') + label_id = 1 + for v_bl in valid_efi_bootloaders: + if 'csv' in v_bl.lower(): + LOG.debug('A CSV file has been identified as a bootloader hint. ' + 'File: %s', v_bl) + # These files are always UTF-16 encoded, sometimes have a header. + # Positive bonus is python silently drops the FEFF header. + with open(mount_point + '/' + v_bl, 'r', encoding='utf-16') as csv: + contents = str(csv.read()) + csv_contents = contents.split(',', maxsplit=3) + csv_filename = v_bl.split('/')[-1] + v_efi_bl_path = v_bl.replace(csv_filename, str(csv_contents[0])) + v_efi_bl_path = '\\' + v_efi_bl_path.replace('/', '\\') + label = csv_contents[1] + else: + v_efi_bl_path = '\\' + v_bl.replace('/', '\\') + label = 'ironic' + str(label_id) + + # Iterate through standard out, and look for duplicates + for line in original_efi_output[0].split('\n'): + match = entry_label.match(line) + # Look for the base label in the string if a line match + # occurs, so we can identify if we need to eliminate the + # entry. + if match and label in match.group(2): + boot_num = match.group(1) + LOG.debug("Found bootnum %s matching label", boot_num) + utils.execute('efibootmgr', '-b', boot_num, '-B') + + LOG.debug("Adding loader %(path)s on partition %(part)s of device " + " %(dev)s", {'path': v_efi_bl_path, 'part': efi_partition, + 'dev': device}) + # Update the nvram using efibootmgr + # https://linux.die.net/man/8/efibootmgr + utils.execute('efibootmgr', '-v', '-c', '-d', device, + '-p', efi_partition, '-w', '-L', label, + '-l', v_efi_bl_path) + # Increment the ID in case the loop runs again. + label_id += 1 diff --git a/ironic_python_agent/extensions/image.py b/ironic_python_agent/extensions/image.py index 47a18b3fc..2d44df7e4 100644 --- a/ironic_python_agent/extensions/image.py +++ b/ironic_python_agent/extensions/image.py @@ -15,9 +15,7 @@ import os import re -import shlex import shutil -import stat import tempfile from ironic_lib import utils as ilib_utils @@ -25,9 +23,11 @@ from oslo_concurrency import processutils from oslo_config import cfg from oslo_log import log +from ironic_python_agent import efi_utils from ironic_python_agent import errors from ironic_python_agent.extensions import base from ironic_python_agent import hardware +from ironic_python_agent import partition_utils from ironic_python_agent import raid_utils from ironic_python_agent import utils @@ -37,112 +37,6 @@ CONF = cfg.CONF BIND_MOUNTS = ('/dev', '/proc', '/run') -# NOTE(TheJulia): Do not add bootia32.csv to this list. That is 32bit -# EFI booting and never really became popular. -BOOTLOADERS_EFI = [ - 'bootx64.csv', # Used by GRUB2 shim loader (Ubuntu, Red Hat) - 'boot.csv', # Used by rEFInd, Centos7 Grub2 - 'bootia32.efi', - 'bootx64.efi', # x86_64 Default - 'bootia64.efi', - 'bootarm.efi', - 'bootaa64.efi', # Arm64 Default - 'bootriscv32.efi', - 'bootriscv64.efi', - 'bootriscv128.efi', - 'grubaa64.efi', - 'winload.efi' -] - - -def _get_partition(device, uuid): - """Find the partition of a given device.""" - LOG.debug("Find the partition %(uuid)s on device %(dev)s", - {'dev': device, 'uuid': uuid}) - - try: - utils.rescan_device(device) - lsblk = utils.execute( - 'lsblk', '-PbioKNAME,UUID,PARTUUID,TYPE,LABEL', device) - report = lsblk[0] - for line in report.split('\n'): - part = {} - # Split into KEY=VAL pairs - vals = shlex.split(line) - for key, val in (v.split('=', 1) for v in vals): - part[key] = val.strip() - # Ignore non partition - if part.get('TYPE') not in ['md', 'part']: - # NOTE(TheJulia): This technically creates an edge failure - # case where a filesystem on a whole block device sans - # partitioning would behave differently. - continue - - if part.get('UUID') == uuid: - LOG.debug("Partition %(uuid)s found on device " - "%(dev)s", {'uuid': uuid, 'dev': device}) - return '/dev/' + part.get('KNAME') - if part.get('PARTUUID') == uuid: - LOG.debug("Partition %(uuid)s found on device " - "%(dev)s", {'uuid': uuid, 'dev': device}) - return '/dev/' + part.get('KNAME') - if part.get('LABEL') == uuid: - LOG.debug("Partition %(uuid)s found on device " - "%(dev)s", {'uuid': uuid, 'dev': device}) - return '/dev/' + part.get('KNAME') - else: - # NOTE(TheJulia): We may want to consider moving towards using - # findfs in the future, if we're comfortable with the execution - # and interaction. There is value in either way though. - # NOTE(rg): alternative: blkid -l -t UUID=/PARTUUID= - try: - findfs, stderr = utils.execute('findfs', 'UUID=%s' % uuid) - return findfs.strip() - except processutils.ProcessExecutionError as e: - LOG.debug('First fallback detection attempt for locating ' - 'partition via UUID %(uuid)s failed. ' - 'Error: %(err)s', - {'uuid': uuid, - 'err': e}) - try: - findfs, stderr = utils.execute( - 'findfs', 'PARTUUID=%s' % uuid) - return findfs.strip() - except processutils.ProcessExecutionError as e: - LOG.debug('Secondary fallback detection attempt for ' - 'locating partition via UUID %(uuid)s failed. ' - 'Error: %(err)s', - {'uuid': uuid, - 'err': e}) - - # Last fallback: In case we cannot find the partition by UUID - # and the deploy device is an md device, we check if the md - # device has a partition (which we assume to contain the root fs). - if hardware.is_md_device(device): - md_partition = device + 'p1' - if (os.path.exists(md_partition) - and stat.S_ISBLK(os.stat(md_partition).st_mode)): - LOG.debug("Found md device with partition %s", - md_partition) - return md_partition - else: - LOG.debug('Could not find partition %(part)s on md ' - 'device %(dev)s', - {'part': md_partition, - 'dev': device}) - - # Partition not found, time to escalate. - error_msg = ("No partition with UUID %(uuid)s found on " - "device %(dev)s" % {'uuid': uuid, 'dev': device}) - LOG.error(error_msg) - raise errors.DeviceNotFound(error_msg) - except processutils.ProcessExecutionError as e: - error_msg = ('Finding the partition with UUID %(uuid)s on ' - 'device %(dev)s failed with %(err)s' % - {'uuid': uuid, 'dev': device, 'err': e}) - LOG.error(error_msg) - raise errors.CommandExecutionError(error_msg) - def _has_dracut(root): try: @@ -209,203 +103,6 @@ def _is_bootloader_loaded(dev): return _find_bootable_device(stdout, dev) -def _get_efi_bootloaders(location): - """Get all valid efi bootloaders in a given location - - :param location: the location where it should start looking for the - efi files. - :return: a list of relative paths to valid efi bootloaders or reference - files. - """ - # Let's find all files with .efi or .EFI extension - LOG.debug('Looking for all efi files on %s', location) - valid_bootloaders = [] - for root, dirs, files in os.walk(location): - efi_files = [f for f in files if f.lower() in BOOTLOADERS_EFI] - LOG.debug('efi files found in %(location)s : %(efi_files)s', - {'location': location, 'efi_files': str(efi_files)}) - for name in efi_files: - efi_f = os.path.join(root, name) - LOG.debug('Checking if %s is executable', efi_f) - if os.access(efi_f, os.X_OK): - v_bl = efi_f.split(location)[-1][1:] - LOG.debug('%s is a valid bootloader', v_bl) - valid_bootloaders.append(v_bl) - if 'csv' in efi_f.lower(): - v_bl = efi_f.split(location)[-1][1:] - LOG.debug('%s is a pointer to a bootloader', v_bl) - # The CSV files are intended to be authortative as - # to the bootloader and the label to be used. Since - # we found one, we're going to point directly to it. - # centos7 did ship with 2, but with the same contents. - # TODO(TheJulia): Perhaps we extend this to make a list - # of CSVs instead and only return those?! But then the - # question is which is right/first/preferred. - return [v_bl] - return valid_bootloaders - - -def _run_efibootmgr(valid_efi_bootloaders, device, efi_partition, - mount_point): - """Executes efibootmgr and removes duplicate entries. - - :param valid_efi_bootloaders: the list of valid efi bootloaders - :param device: the device to be used - :param efi_partition: the efi partition on the device - :param mount_point: The mountpoint for the EFI partition so we can - read contents of files if necessary to perform - proper bootloader injection operations. - """ - - # Before updating let's get information about the bootorder - LOG.debug("Getting information about boot order.") - original_efi_output = utils.execute('efibootmgr', '-v') - # NOTE(TheJulia): regex used to identify entries in the efibootmgr - # output on stdout. - entry_label = re.compile(r'Boot([0-9a-f-A-F]+)\*?\s(.*).*$') - label_id = 1 - for v_bl in valid_efi_bootloaders: - if 'csv' in v_bl.lower(): - LOG.debug('A CSV file has been identified as a bootloader hint. ' - 'File: %s', v_bl) - # These files are always UTF-16 encoded, sometimes have a header. - # Positive bonus is python silently drops the FEFF header. - with open(mount_point + '/' + v_bl, 'r', encoding='utf-16') as csv: - contents = str(csv.read()) - csv_contents = contents.split(',', maxsplit=3) - csv_filename = v_bl.split('/')[-1] - v_efi_bl_path = v_bl.replace(csv_filename, str(csv_contents[0])) - v_efi_bl_path = '\\' + v_efi_bl_path.replace('/', '\\') - label = csv_contents[1] - else: - v_efi_bl_path = '\\' + v_bl.replace('/', '\\') - label = 'ironic' + str(label_id) - - # Iterate through standard out, and look for duplicates - for line in original_efi_output[0].split('\n'): - match = entry_label.match(line) - # Look for the base label in the string if a line match - # occurs, so we can identify if we need to eliminate the - # entry. - if match and label in match.group(2): - boot_num = match.group(1) - LOG.debug("Found bootnum %s matching label", boot_num) - utils.execute('efibootmgr', '-b', boot_num, '-B') - - LOG.debug("Adding loader %(path)s on partition %(part)s of device " - " %(dev)s", {'path': v_efi_bl_path, 'part': efi_partition, - 'dev': device}) - # Update the nvram using efibootmgr - # https://linux.die.net/man/8/efibootmgr - utils.execute('efibootmgr', '-v', '-c', '-d', device, - '-p', efi_partition, '-w', '-L', label, - '-l', v_efi_bl_path) - # Increment the ID in case the loop runs again. - label_id += 1 - - -def _manage_uefi(device, efi_system_part_uuid=None): - """Manage the device looking for valid efi bootloaders to update the nvram. - - This method checks for valid efi bootloaders in the device, if they exists - it updates the nvram using the efibootmgr. - - :param device: the device to be checked. - :param efi_system_part_uuid: efi partition uuid. - :raises: DeviceNotFound if the efi partition cannot be found. - :return: True - if it founds any efi bootloader and the nvram was updated - using the efibootmgr. - False - if no efi bootloader is found. - """ - efi_partition_mount_point = None - efi_mounted = False - LOG.debug('Attempting UEFI loader autodetection and NVRAM record setup.') - try: - # Force UEFI to rescan the device. - utils.rescan_device(device) - - local_path = tempfile.mkdtemp() - # Trust the contents on the disk in the event of a whole disk image. - efi_partition = utils.get_efi_part_on_device(device) - if not efi_partition and efi_system_part_uuid: - # _get_partition returns + and we only need the - # partition number - partition = _get_partition(device, uuid=efi_system_part_uuid) - try: - efi_partition = int(partition.replace(device, "")) - except ValueError: - # NVMe Devices get a partitioning scheme that is different from - # traditional block devices like SCSI/SATA - efi_partition = int(partition.replace(device + 'p', "")) - - if not efi_partition: - # NOTE(dtantsur): we cannot have a valid EFI deployment without an - # EFI partition at all. This code path is easily hit when using an - # image that is not UEFI compatible (which sadly applies to most - # cloud images out there, with a nice exception of Ubuntu). - raise errors.DeviceNotFound( - "No EFI partition could be detected on device %s and " - "EFI partition UUID has not been recorded during deployment " - "(which is often the case for whole disk images). " - "Are you using a UEFI-compatible image?" % device) - - efi_partition_mount_point = os.path.join(local_path, "boot/efi") - if not os.path.exists(efi_partition_mount_point): - os.makedirs(efi_partition_mount_point) - - # The mount needs the device with the partition, in case the - # device ends with a digit we add a `p` and the partition number we - # found, otherwise we just join the device and the partition number - if device[-1].isdigit(): - efi_device_part = '{}p{}'.format(device, efi_partition) - utils.execute('mount', efi_device_part, efi_partition_mount_point) - else: - efi_device_part = '{}{}'.format(device, efi_partition) - utils.execute('mount', efi_device_part, efi_partition_mount_point) - efi_mounted = True - - valid_efi_bootloaders = _get_efi_bootloaders(efi_partition_mount_point) - if valid_efi_bootloaders: - _run_efibootmgr(valid_efi_bootloaders, device, efi_partition, - efi_partition_mount_point) - return True - else: - # NOTE(dtantsur): if we have an empty EFI partition, try to use - # grub-install to populate it. - LOG.warning('Empty EFI partition detected.') - return False - - except processutils.ProcessExecutionError as e: - error_msg = ('Could not verify uefi on device %(dev)s' - 'failed with %(err)s.' % {'dev': device, 'err': e}) - LOG.error(error_msg) - raise errors.CommandExecutionError(error_msg) - finally: - LOG.debug('Executing _manage_uefi clean-up.') - umount_warn_msg = "Unable to umount %(local_path)s. Error: %(error)s" - - try: - if efi_mounted: - utils.execute('umount', efi_partition_mount_point, - attempts=3, delay_on_retry=True) - except processutils.ProcessExecutionError as e: - error_msg = ('Umounting efi system partition failed. ' - 'Attempted 3 times. Error: %s' % e) - LOG.error(error_msg) - raise errors.CommandExecutionError(error_msg) - - else: - # If umounting the binds succeed then we can try to delete it - try: - utils.execute('sync') - except processutils.ProcessExecutionError as e: - LOG.warning(umount_warn_msg, {'path': local_path, 'error': e}) - else: - # After everything is umounted we can then remove the - # temporary directory - shutil.rmtree(local_path) - - # TODO(rg): handle PreP boot parts relocation as well def _prepare_boot_partitions_for_softraid(device, holders, efi_part, target_boot_mode): @@ -583,7 +280,7 @@ def _install_grub2(device, root_uuid, efi_system_part_uuid=None, # NOTE(TheJulia): Seems we need to get this before ever possibly # restart the device in the case of multi-device RAID as pyudev # doesn't exactly like the partition disappearing. - root_partition = _get_partition(device, uuid=root_uuid) + root_partition = partition_utils.get_partition(device, uuid=root_uuid) # If the root device is an md device (or partition), restart the device # (to help grub finding it) and identify the underlying holder disks @@ -608,7 +305,8 @@ def _install_grub2(device, root_uuid, efi_system_part_uuid=None, # Mount the partition and binds path = tempfile.mkdtemp() if efi_system_part_uuid: - efi_part = _get_partition(device, uuid=efi_system_part_uuid) + efi_part = partition_utils.get_partition( + device, uuid=efi_system_part_uuid) efi_partition = efi_part if hardware.is_md_device(device): holders = hardware.get_holder_disks(device) @@ -621,7 +319,8 @@ def _install_grub2(device, root_uuid, efi_system_part_uuid=None, # For power we want to install grub directly onto the PreP partition if prep_boot_part_uuid: - device = _get_partition(device, uuid=prep_boot_part_uuid) + device = partition_utils.get_partition( + device, uuid=prep_boot_part_uuid) # If the root device is an md device (or partition), # identify the underlying holder disks to install grub. @@ -956,9 +655,8 @@ def _efi_boot_setup(device, efi_system_part_uuid=None, target_boot_mode=None): except FileNotFoundError: LOG.warning("efibootmgr is not available in the ramdisk") else: - if _manage_uefi(device, - efi_system_part_uuid=efi_system_part_uuid): - return True + return efi_utils.manage_uefi( + device, efi_system_part_uuid=efi_system_part_uuid) return False diff --git a/ironic_python_agent/partition_utils.py b/ironic_python_agent/partition_utils.py index 498fb2843..18b4cfdbd 100644 --- a/ironic_python_agent/partition_utils.py +++ b/ironic_python_agent/partition_utils.py @@ -22,7 +22,9 @@ import gzip import io import math import os +import shlex import shutil +import stat import tempfile from ironic_lib import disk_utils @@ -35,6 +37,8 @@ from oslo_utils import excutils from oslo_utils import units import requests +from ironic_python_agent import errors +from ironic_python_agent import hardware from ironic_python_agent import utils as ipa_utils @@ -487,3 +491,92 @@ def _is_disk_larger_than_max_size(device, node_uuid): disksize_mb = int(disksize_bytes.strip()) // 1024 // 1024 return disksize_mb > MAX_DISK_SIZE_MB_SUPPORTED_BY_MBR + + +def get_partition(device, uuid): + """Find the partition of a given device.""" + LOG.debug("Find the partition %(uuid)s on device %(dev)s", + {'dev': device, 'uuid': uuid}) + + try: + ipa_utils.rescan_device(device) + lsblk = utils.execute( + 'lsblk', '-PbioKNAME,UUID,PARTUUID,TYPE,LABEL', device) + report = lsblk[0] + for line in report.split('\n'): + part = {} + # Split into KEY=VAL pairs + vals = shlex.split(line) + for key, val in (v.split('=', 1) for v in vals): + part[key] = val.strip() + # Ignore non partition + if part.get('TYPE') not in ['md', 'part']: + # NOTE(TheJulia): This technically creates an edge failure + # case where a filesystem on a whole block device sans + # partitioning would behave differently. + continue + + if part.get('UUID') == uuid: + LOG.debug("Partition %(uuid)s found on device " + "%(dev)s", {'uuid': uuid, 'dev': device}) + return '/dev/' + part.get('KNAME') + if part.get('PARTUUID') == uuid: + LOG.debug("Partition %(uuid)s found on device " + "%(dev)s", {'uuid': uuid, 'dev': device}) + return '/dev/' + part.get('KNAME') + if part.get('LABEL') == uuid: + LOG.debug("Partition %(uuid)s found on device " + "%(dev)s", {'uuid': uuid, 'dev': device}) + return '/dev/' + part.get('KNAME') + else: + # NOTE(TheJulia): We may want to consider moving towards using + # findfs in the future, if we're comfortable with the execution + # and interaction. There is value in either way though. + # NOTE(rg): alternative: blkid -l -t UUID=/PARTUUID= + try: + findfs, stderr = utils.execute('findfs', 'UUID=%s' % uuid) + return findfs.strip() + except processutils.ProcessExecutionError as e: + LOG.debug('First fallback detection attempt for locating ' + 'partition via UUID %(uuid)s failed. ' + 'Error: %(err)s', + {'uuid': uuid, + 'err': e}) + try: + findfs, stderr = utils.execute( + 'findfs', 'PARTUUID=%s' % uuid) + return findfs.strip() + except processutils.ProcessExecutionError as e: + LOG.debug('Secondary fallback detection attempt for ' + 'locating partition via UUID %(uuid)s failed. ' + 'Error: %(err)s', + {'uuid': uuid, + 'err': e}) + + # Last fallback: In case we cannot find the partition by UUID + # and the deploy device is an md device, we check if the md + # device has a partition (which we assume to contain the root fs). + if hardware.is_md_device(device): + md_partition = device + 'p1' + if (os.path.exists(md_partition) + and stat.S_ISBLK(os.stat(md_partition).st_mode)): + LOG.debug("Found md device with partition %s", + md_partition) + return md_partition + else: + LOG.debug('Could not find partition %(part)s on md ' + 'device %(dev)s', + {'part': md_partition, + 'dev': device}) + + # Partition not found, time to escalate. + error_msg = ("No partition with UUID %(uuid)s found on " + "device %(dev)s" % {'uuid': uuid, 'dev': device}) + LOG.error(error_msg) + raise errors.DeviceNotFound(error_msg) + except processutils.ProcessExecutionError as e: + error_msg = ('Finding the partition with UUID %(uuid)s on ' + 'device %(dev)s failed with %(err)s' % + {'uuid': uuid, 'dev': device, 'err': e}) + LOG.error(error_msg) + raise errors.CommandExecutionError(error_msg) diff --git a/ironic_python_agent/tests/unit/extensions/test_image.py b/ironic_python_agent/tests/unit/extensions/test_image.py index 6e5c2a4ed..2564ea12a 100644 --- a/ironic_python_agent/tests/unit/extensions/test_image.py +++ b/ironic_python_agent/tests/unit/extensions/test_image.py @@ -21,15 +21,17 @@ from unittest import mock from ironic_lib import utils as ilib_utils from oslo_concurrency import processutils +from ironic_python_agent import efi_utils from ironic_python_agent import errors from ironic_python_agent.extensions import image from ironic_python_agent import hardware +from ironic_python_agent import partition_utils from ironic_python_agent.tests.unit import base from ironic_python_agent import utils @mock.patch.object(hardware, 'dispatch_to_managers', autospec=True) -@mock.patch.object(utils, 'execute', autospec=True) +@mock.patch.object(ilib_utils, 'execute', autospec=True) @mock.patch.object(tempfile, 'mkdtemp', lambda *_: '/tmp/fake-dir') @mock.patch.object(shutil, 'rmtree', lambda *_: None) class TestImageExtension(base.IronicAgentTest): @@ -63,7 +65,7 @@ class TestImageExtension(base.IronicAgentTest): target_boot_mode='bios' ) - @mock.patch.object(image, '_manage_uefi', autospec=True) + @mock.patch.object(efi_utils, 'manage_uefi', autospec=True) @mock.patch.object(image, '_install_grub2', autospec=True) def test__install_bootloader_uefi(self, mock_grub2, mock_uefi, mock_execute, mock_dispatch): @@ -87,7 +89,7 @@ class TestImageExtension(base.IronicAgentTest): target_boot_mode='uefi' ) - @mock.patch.object(image, '_manage_uefi', autospec=True) + @mock.patch.object(efi_utils, 'manage_uefi', autospec=True) @mock.patch.object(image, '_install_grub2', autospec=True) def test__install_bootloader_uefi_ignores_manage_failure( self, mock_grub2, mock_uefi, @@ -114,7 +116,7 @@ class TestImageExtension(base.IronicAgentTest): target_boot_mode='uefi' ) - @mock.patch.object(image, '_manage_uefi', autospec=True) + @mock.patch.object(efi_utils, 'manage_uefi', autospec=True) @mock.patch.object(image, '_install_grub2', autospec=True) def test__install_bootloader_uefi_ignores_grub_failure( self, mock_grub2, mock_uefi, @@ -141,7 +143,7 @@ class TestImageExtension(base.IronicAgentTest): target_boot_mode='uefi' ) - @mock.patch.object(image, '_manage_uefi', autospec=True) + @mock.patch.object(efi_utils, 'manage_uefi', autospec=True) @mock.patch.object(image, '_install_grub2', autospec=True) def test__install_bootloader_uefi_ignores_grub_failure_api_override( self, mock_grub2, mock_uefi, @@ -168,7 +170,7 @@ class TestImageExtension(base.IronicAgentTest): target_boot_mode='uefi' ) - @mock.patch.object(image, '_manage_uefi', autospec=True) + @mock.patch.object(efi_utils, 'manage_uefi', autospec=True) @mock.patch.object(image, '_install_grub2', autospec=True) def test__install_bootloader_uefi_grub_failure_api_override( self, mock_grub2, mock_uefi, @@ -211,8 +213,8 @@ class TestImageExtension(base.IronicAgentTest): @mock.patch.object(hardware, 'is_md_device', lambda *_: False) @mock.patch.object(os.path, 'exists', lambda *_: False) - @mock.patch.object(image, '_get_efi_bootloaders', autospec=True) - @mock.patch.object(image, '_get_partition', autospec=True) + @mock.patch.object(efi_utils, '_get_efi_bootloaders', autospec=True) + @mock.patch.object(partition_utils, 'get_partition', autospec=True) @mock.patch.object(utils, 'get_efi_part_on_device', autospec=False) @mock.patch.object(os, 'makedirs', autospec=True) def test__uefi_bootloader_given_partition( @@ -259,8 +261,8 @@ class TestImageExtension(base.IronicAgentTest): @mock.patch.object(hardware, 'is_md_device', lambda *_: False) @mock.patch.object(os.path, 'exists', lambda *_: False) - @mock.patch.object(image, '_get_efi_bootloaders', autospec=True) - @mock.patch.object(image, '_get_partition', autospec=True) + @mock.patch.object(efi_utils, '_get_efi_bootloaders', autospec=True) + @mock.patch.object(partition_utils, 'get_partition', autospec=True) @mock.patch.object(utils, 'get_efi_part_on_device', autospec=True) @mock.patch.object(os, 'makedirs', autospec=True) def test__uefi_bootloader_find_partition( @@ -306,8 +308,8 @@ class TestImageExtension(base.IronicAgentTest): @mock.patch.object(hardware, 'is_md_device', lambda *_: False) @mock.patch.object(os.path, 'exists', lambda *_: False) - @mock.patch.object(image, '_get_efi_bootloaders', autospec=True) - @mock.patch.object(image, '_get_partition', autospec=True) + @mock.patch.object(efi_utils, '_get_efi_bootloaders', autospec=True) + @mock.patch.object(partition_utils, 'get_partition', autospec=True) @mock.patch.object(utils, 'get_efi_part_on_device', autospec=True) @mock.patch.object(os, 'makedirs', autospec=True) def test__uefi_bootloader_with_entry_removal( @@ -363,8 +365,8 @@ Boot0002 VENDMAGIC FvFile(9f3c6294-bf9b-4208-9808-be45dfc34b51) @mock.patch.object(hardware, 'is_md_device', lambda *_: False) @mock.patch.object(os.path, 'exists', lambda *_: False) - @mock.patch.object(image, '_get_efi_bootloaders', autospec=True) - @mock.patch.object(image, '_get_partition', autospec=True) + @mock.patch.object(efi_utils, '_get_efi_bootloaders', autospec=True) + @mock.patch.object(partition_utils, 'get_partition', autospec=True) @mock.patch.object(utils, 'get_efi_part_on_device', autospec=True) @mock.patch.object(os, 'makedirs', autospec=True) def test__uefi_bootloader_with_entry_removal_lenovo( @@ -425,8 +427,8 @@ Boot0004* ironic1 HD(1,GPT,55db8d03-c8f6-4a5b-9155-790dddc348fa,0x800,0x640 @mock.patch.object(hardware, 'is_md_device', lambda *_: False) @mock.patch.object(os.path, 'exists', lambda *_: False) - @mock.patch.object(image, '_get_efi_bootloaders', autospec=True) - @mock.patch.object(image, '_get_partition', autospec=True) + @mock.patch.object(efi_utils, '_get_efi_bootloaders', autospec=True) + @mock.patch.object(partition_utils, 'get_partition', autospec=True) @mock.patch.object(utils, 'get_efi_part_on_device', autospec=True) @mock.patch.object(os, 'makedirs', autospec=True) def test__add_multi_bootloaders( @@ -518,7 +520,7 @@ Boot0004* ironic1 HD(1,GPT,55db8d03-c8f6-4a5b-9155-790dddc348fa,0x800,0x640 @mock.patch.object(hardware, 'is_md_device', autospec=True) @mock.patch.object(hardware, 'md_get_raid_devices', autospec=True) @mock.patch.object(os, 'environ', autospec=True) - @mock.patch.object(image, '_get_partition', autospec=True) + @mock.patch.object(partition_utils, 'get_partition', autospec=True) def test__install_grub2(self, mock_get_part_uuid, environ_mock, mock_md_get_raid_devices, mock_is_md_device, mock_append_to_fstab, mock_execute, @@ -580,7 +582,7 @@ Boot0004* ironic1 HD(1,GPT,55db8d03-c8f6-4a5b-9155-790dddc348fa,0x800,0x640 @mock.patch.object(hardware, 'is_md_device', autospec=True) @mock.patch.object(hardware, 'md_get_raid_devices', autospec=True) @mock.patch.object(os, 'environ', autospec=True) - @mock.patch.object(image, '_get_partition', autospec=True) + @mock.patch.object(partition_utils, 'get_partition', autospec=True) def test__install_grub2_prep(self, mock_get_part_uuid, environ_mock, mock_md_get_raid_devices, mock_is_md_device, mock_execute, mock_dispatch): @@ -649,7 +651,7 @@ Boot0004* ironic1 HD(1,GPT,55db8d03-c8f6-4a5b-9155-790dddc348fa,0x800,0x640 @mock.patch.object(hardware, 'md_get_raid_devices', autospec=True) @mock.patch.object(os, 'environ', autospec=True) @mock.patch.object(os, 'makedirs', autospec=True) - @mock.patch.object(image, '_get_partition', autospec=True) + @mock.patch.object(partition_utils, 'get_partition', autospec=True) def test__install_grub2_uefi(self, mock_get_part_uuid, mkdir_mock, environ_mock, mock_md_get_raid_devices, mock_is_md_device, mock_append_to_fstab, @@ -737,7 +739,7 @@ Boot0004* ironic1 HD(1,GPT,55db8d03-c8f6-4a5b-9155-790dddc348fa,0x800,0x640 @mock.patch.object(hardware, 'md_get_raid_devices', autospec=True) @mock.patch.object(os, 'environ', autospec=True) @mock.patch.object(os, 'makedirs', autospec=True) - @mock.patch.object(image, '_get_partition', autospec=True) + @mock.patch.object(partition_utils, 'get_partition', autospec=True) def test__install_grub2_uefi_fstab(self, mock_get_part_uuid, mkdir_mock, environ_mock, mock_md_get_raid_devices, mock_is_md_device, mock_exists, @@ -834,7 +836,7 @@ Boot0004* ironic1 HD(1,GPT,55db8d03-c8f6-4a5b-9155-790dddc348fa,0x800,0x640 @mock.patch.object(hardware, 'md_get_raid_devices', autospec=True) @mock.patch.object(os, 'environ', autospec=True) @mock.patch.object(os, 'makedirs', autospec=True) - @mock.patch.object(image, '_get_partition', autospec=True) + @mock.patch.object(partition_utils, 'get_partition', autospec=True) def test__install_grub2_uefi_no_fstab( self, mock_get_part_uuid, mkdir_mock, @@ -948,7 +950,7 @@ Boot0004* ironic1 HD(1,GPT,55db8d03-c8f6-4a5b-9155-790dddc348fa,0x800,0x640 @mock.patch.object(hardware, 'md_get_raid_devices', autospec=True) @mock.patch.object(os, 'environ', autospec=True) @mock.patch.object(os, 'makedirs', autospec=True) - @mock.patch.object(image, '_get_partition', autospec=True) + @mock.patch.object(partition_utils, 'get_partition', autospec=True) def test__install_grub2_uefi_partition_image_with_loader( self, mock_get_part_uuid, mkdir_mock, environ_mock, mock_md_get_raid_devices, @@ -1032,7 +1034,7 @@ Boot0004* ironic1 HD(1,GPT,55db8d03-c8f6-4a5b-9155-790dddc348fa,0x800,0x640 @mock.patch.object(hardware, 'md_get_raid_devices', autospec=True) @mock.patch.object(os, 'environ', autospec=True) @mock.patch.object(os, 'makedirs', autospec=True) - @mock.patch.object(image, '_get_partition', autospec=True) + @mock.patch.object(partition_utils, 'get_partition', autospec=True) def test__install_grub2_uefi_partition_image_with_loader_with_grubcfg( self, mock_get_part_uuid, mkdir_mock, environ_mock, mock_md_get_raid_devices, @@ -1115,7 +1117,7 @@ Boot0004* ironic1 HD(1,GPT,55db8d03-c8f6-4a5b-9155-790dddc348fa,0x800,0x640 @mock.patch.object(hardware, 'md_get_raid_devices', autospec=True) @mock.patch.object(os, 'environ', autospec=True) @mock.patch.object(os, 'makedirs', autospec=True) - @mock.patch.object(image, '_get_partition', autospec=True) + @mock.patch.object(partition_utils, 'get_partition', autospec=True) def test__install_grub2_uefi_partition_image_with_preserve_failure( self, mock_get_part_uuid, mkdir_mock, environ_mock, mock_md_get_raid_devices, @@ -1229,7 +1231,7 @@ Boot0004* ironic1 HD(1,GPT,55db8d03-c8f6-4a5b-9155-790dddc348fa,0x800,0x640 @mock.patch.object(hardware, 'md_get_raid_devices', autospec=True) @mock.patch.object(os, 'environ', autospec=True) @mock.patch.object(os, 'makedirs', autospec=True) - @mock.patch.object(image, '_get_partition', autospec=True) + @mock.patch.object(partition_utils, 'get_partition', autospec=True) def test__install_grub2_uefi_partition_image_with_preserve_failure2( self, mock_get_part_uuid, mkdir_mock, environ_mock, mock_md_get_raid_devices, @@ -1351,7 +1353,7 @@ Boot0004* ironic1 HD(1,GPT,55db8d03-c8f6-4a5b-9155-790dddc348fa,0x800,0x640 @mock.patch.object(hardware, 'md_get_raid_devices', autospec=True) @mock.patch.object(os, 'environ', autospec=True) @mock.patch.object(os, 'makedirs', autospec=True) - @mock.patch.object(image, '_get_partition', autospec=True) + @mock.patch.object(partition_utils, 'get_partition', autospec=True) def test__install_grub2_uefi_partition_image_with_loader_grubcfg_fails( self, mock_get_part_uuid, mkdir_mock, environ_mock, mock_md_get_raid_devices, @@ -1440,7 +1442,7 @@ Boot0004* ironic1 HD(1,GPT,55db8d03-c8f6-4a5b-9155-790dddc348fa,0x800,0x640 @mock.patch.object(hardware, 'md_get_raid_devices', autospec=True) @mock.patch.object(os, 'environ', autospec=True) @mock.patch.object(os, 'makedirs', autospec=True) - @mock.patch.object(image, '_get_partition', autospec=True) + @mock.patch.object(partition_utils, 'get_partition', autospec=True) def test__install_grub2_uefi_partition_image_with_no_loader( self, mock_get_part_uuid, mkdir_mock, environ_mock, mock_md_get_raid_devices, @@ -1538,7 +1540,7 @@ Boot0004* ironic1 HD(1,GPT,55db8d03-c8f6-4a5b-9155-790dddc348fa,0x800,0x640 @mock.patch.object(hardware, 'md_get_raid_devices', autospec=True) @mock.patch.object(os, 'environ', autospec=True) @mock.patch.object(os, 'makedirs', autospec=True) - @mock.patch.object(image, '_get_partition', autospec=True) + @mock.patch.object(partition_utils, 'get_partition', autospec=True) def test__install_grub2_uefi_umount_fails( self, mock_get_part_uuid, mkdir_mock, environ_mock, mock_md_get_raid_devices, mock_is_md_device, mock_execute, @@ -1598,7 +1600,7 @@ Boot0004* ironic1 HD(1,GPT,55db8d03-c8f6-4a5b-9155-790dddc348fa,0x800,0x640 @mock.patch.object(hardware, 'md_get_raid_devices', autospec=True) @mock.patch.object(os, 'environ', autospec=True) @mock.patch.object(os, 'makedirs', autospec=True) - @mock.patch.object(image, '_get_partition', autospec=True) + @mock.patch.object(partition_utils, 'get_partition', autospec=True) def test__install_grub2_uefi_mount_fails( self, mock_get_part_uuid, mkdir_mock, environ_mock, mock_is_md_device, mock_md_get_raid_devices, mock_execute, @@ -1637,7 +1639,7 @@ Boot0004* ironic1 HD(1,GPT,55db8d03-c8f6-4a5b-9155-790dddc348fa,0x800,0x640 mock_execute.assert_has_calls(expected) @mock.patch.object(image, '_is_bootloader_loaded', lambda *_: False) - @mock.patch.object(image, '_get_partition', autospec=True) + @mock.patch.object(partition_utils, 'get_partition', autospec=True) def test__install_grub2_command_fail(self, mock_get_part_uuid, mock_execute, mock_dispatch): @@ -1851,7 +1853,7 @@ Boot0004* ironic1 HD(1,GPT,55db8d03-c8f6-4a5b-9155-790dddc348fa,0x800,0x640 return_value=['/dev/sda', '/dev/sdb']) @mock.patch.object(os, 'environ', autospec=True) @mock.patch.object(os, 'makedirs', autospec=True) - @mock.patch.object(image, '_get_partition', autospec=True) + @mock.patch.object(partition_utils, 'get_partition', autospec=True) @mock.patch.object(image, '_prepare_boot_partitions_for_softraid', autospec=True, return_value='/dev/md/esp') @@ -1969,7 +1971,7 @@ Boot0004* ironic1 HD(1,GPT,55db8d03-c8f6-4a5b-9155-790dddc348fa,0x800,0x640 return_value=['/dev/sda', '/dev/sdb']) @mock.patch.object(os, 'environ', autospec=True) @mock.patch.object(os, 'makedirs', autospec=True) - @mock.patch.object(image, '_get_partition', autospec=True) + @mock.patch.object(partition_utils, 'get_partition', autospec=True) @mock.patch.object(image, '_prepare_boot_partitions_for_softraid', autospec=True, return_value=[]) @@ -2053,133 +2055,6 @@ Boot0004* ironic1 HD(1,GPT,55db8d03-c8f6-4a5b-9155-790dddc348fa,0x800,0x640 mock_holder.assert_called_once_with(self.fake_dev) mock_dracut.assert_called_once_with(self.fake_dir) - @mock.patch.object(image, '_is_bootloader_loaded', autospec=True) - @mock.patch.object(hardware, 'is_md_device', autospec=True) - def test__get_partition(self, mock_is_md_device, mock_is_bootloader, - mock_execute, mock_dispatch): - mock_is_md_device.side_effect = [False] - mock_is_md_device.side_effect = [False, False] - lsblk_output = ('''KNAME="test" UUID="" TYPE="disk" - KNAME="test1" UUID="256a39e3-ca3c-4fb8-9cc2-b32eec441f47" TYPE="part" - KNAME="test2" UUID="%s" TYPE="part"''' % self.fake_root_uuid) - mock_execute.side_effect = (None, None, [lsblk_output]) - - root_part = image._get_partition(self.fake_dev, self.fake_root_uuid) - self.assertEqual('/dev/test2', root_part) - expected = [mock.call('partx', '-a', self.fake_dev, attempts=3, - delay_on_retry=True), - mock.call('udevadm', 'settle'), - mock.call('lsblk', '-PbioKNAME,UUID,PARTUUID,TYPE,LABEL', - self.fake_dev)] - mock_execute.assert_has_calls(expected) - self.assertFalse(mock_dispatch.called) - self.assertFalse(mock_is_bootloader.called) - - @mock.patch.object(hardware, 'is_md_device', autospec=True) - def test__get_partition_no_device_found(self, mock_is_md_device, - mock_execute, mock_dispatch): - mock_is_md_device.side_effect = [False, False] - lsblk_output = ('''KNAME="test" UUID="" TYPE="disk" - KNAME="test1" UUID="256a39e3-ca3c-4fb8-9cc2-b32eec441f47" TYPE="part" - KNAME="test2" UUID="" TYPE="part"''') - mock_execute.side_effect = ( - None, None, [lsblk_output], - processutils.ProcessExecutionError('boom'), - processutils.ProcessExecutionError('kaboom')) - - self.assertRaises(errors.DeviceNotFound, - image._get_partition, self.fake_dev, - self.fake_root_uuid) - expected = [mock.call('partx', '-a', self.fake_dev, attempts=3, - delay_on_retry=True), - mock.call('udevadm', 'settle'), - mock.call('lsblk', '-PbioKNAME,UUID,PARTUUID,TYPE,LABEL', - self.fake_dev)] - mock_execute.assert_has_calls(expected) - self.assertFalse(mock_dispatch.called) - - @mock.patch.object(hardware, 'is_md_device', autospec=True) - def test__get_partition_fallback_partuuid(self, mock_is_md_device, - mock_execute, mock_dispatch): - mock_is_md_device.side_effect = [False] - lsblk_output = ('''KNAME="test" UUID="" TYPE="disk" - KNAME="test1" UUID="256a39e3-ca3c-4fb8-9cc2-b32eec441f47" TYPE="part" - KNAME="test2" UUID="" TYPE="part"''') - findfs_output = ('/dev/loop0\n', None) - mock_execute.side_effect = ( - None, None, [lsblk_output], - processutils.ProcessExecutionError('boom'), - findfs_output) - - result = image._get_partition(self.fake_dev, self.fake_root_uuid) - self.assertEqual('/dev/loop0', result) - expected = [mock.call('partx', '-a', self.fake_dev, attempts=3, - delay_on_retry=True), - mock.call('udevadm', 'settle'), - mock.call('lsblk', '-PbioKNAME,UUID,PARTUUID,TYPE,LABEL', - self.fake_dev), - mock.call('findfs', 'UUID=%s' % self.fake_root_uuid), - mock.call('findfs', 'PARTUUID=%s' % self.fake_root_uuid)] - mock_execute.assert_has_calls(expected) - self.assertFalse(mock_dispatch.called) - - @mock.patch.object(hardware, 'is_md_device', autospec=True) - def test__get_partition_command_fail(self, mock_is_md_device, - mock_execute, mock_dispatch): - mock_is_md_device.side_effect = [False, False] - mock_execute.side_effect = (None, None, - processutils.ProcessExecutionError('boom')) - self.assertRaises(errors.CommandExecutionError, - image._get_partition, self.fake_dev, - self.fake_root_uuid) - - expected = [mock.call('partx', '-a', self.fake_dev, attempts=3, - delay_on_retry=True), - mock.call('udevadm', 'settle'), - mock.call('lsblk', '-PbioKNAME,UUID,PARTUUID,TYPE,LABEL', - self.fake_dev)] - mock_execute.assert_has_calls(expected) - self.assertFalse(mock_dispatch.called) - - @mock.patch.object(hardware, 'is_md_device', autospec=True) - def test__get_partition_partuuid(self, mock_is_md_device, mock_execute, - mock_dispatch): - mock_is_md_device.side_effect = [False, False] - lsblk_output = ('''KNAME="test" UUID="" TYPE="disk" - KNAME="test1" UUID="256a39e3-ca3c-4fb8-9cc2-b32eec441f47" TYPE="part" - KNAME="test2" UUID="903e7bf9-8a13-4f7f-811b-25dc16faf6f7" TYPE="part" \ - LABEL="%s"''' % self.fake_root_uuid) - mock_execute.side_effect = (None, None, [lsblk_output]) - - root_part = image._get_partition(self.fake_dev, self.fake_root_uuid) - self.assertEqual('/dev/test2', root_part) - expected = [mock.call('partx', '-a', self.fake_dev, attempts=3, - delay_on_retry=True), - mock.call('udevadm', 'settle'), - mock.call('lsblk', '-PbioKNAME,UUID,PARTUUID,TYPE,LABEL', - self.fake_dev)] - mock_execute.assert_has_calls(expected) - self.assertFalse(mock_dispatch.called) - - @mock.patch.object(hardware, 'is_md_device', autospec=True) - def test__get_partition_label(self, mock_is_md_device, mock_execute, - mock_dispatch): - mock_is_md_device.side_effect = [False, False] - lsblk_output = ('''KNAME="test" UUID="" TYPE="disk" - KNAME="test1" UUID="256a39e3-ca3c-4fb8-9cc2-b32eec441f47" TYPE="part" - KNAME="test2" PARTUUID="%s" TYPE="part"''' % self.fake_root_uuid) - mock_execute.side_effect = (None, None, [lsblk_output]) - - root_part = image._get_partition(self.fake_dev, self.fake_root_uuid) - self.assertEqual('/dev/test2', root_part) - expected = [mock.call('partx', '-a', self.fake_dev, attempts=3, - delay_on_retry=True), - mock.call('udevadm', 'settle'), - mock.call('lsblk', '-PbioKNAME,UUID,PARTUUID,TYPE,LABEL', - self.fake_dev)] - mock_execute.assert_has_calls(expected) - self.assertFalse(mock_dispatch.called) - def test__is_bootloader_loaded(self, mock_execute, mock_dispatch): mock_dispatch.return_value = hardware.BootInfo( @@ -2248,302 +2123,6 @@ Boot0004* ironic1 HD(1,GPT,55db8d03-c8f6-4a5b-9155-790dddc348fa,0x800,0x640 mock_dispatch.assert_any_call('get_boot_info') self.assertEqual(0, mock_execute.call_count) - @mock.patch.object(image, '_get_partition', autospec=True) - @mock.patch.object(utils, 'get_efi_part_on_device', autospec=True) - def test__manage_uefi_no_partition(self, mock_utils_efi_part, - mock_get_part_uuid, - mock_execute, mock_dispatch): - mock_utils_efi_part.return_value = None - self.assertRaises(errors.DeviceNotFound, - image._manage_uefi, self.fake_dev, None) - self.assertFalse(mock_get_part_uuid.called) - - @mock.patch.object(image, '_get_partition', autospec=True) - @mock.patch.object(utils, 'get_efi_part_on_device', autospec=True) - def test__manage_uefi_empty_partition_by_uuid(self, mock_utils_efi_part, - mock_get_part_uuid, - mock_execute, mock_dispatch): - mock_utils_efi_part.return_value = None - mock_get_part_uuid.return_value = self.fake_root_part - result = image._manage_uefi(self.fake_dev, self.fake_root_uuid) - self.assertFalse(result) - - @mock.patch.object(os.path, 'exists', lambda *_: False) - @mock.patch.object(image, '_get_efi_bootloaders', autospec=True) - @mock.patch.object(image, '_get_partition', autospec=True) - @mock.patch.object(utils, 'get_efi_part_on_device', autospec=True) - @mock.patch.object(os, 'makedirs', autospec=True) - def test__manage_uefi(self, mkdir_mock, mock_utils_efi_part, - mock_get_part_uuid, mock_efi_bl, mock_execute, - mock_dispatch): - mock_utils_efi_part.return_value = '1' - mock_get_part_uuid.return_value = self.fake_dev - - mock_efi_bl.return_value = ['EFI/BOOT/BOOTX64.EFI'] - - mock_execute.side_effect = iter([('', ''), ('', ''), - ('', ''), ('', ''), - ('', ''), ('', ''), - ('', '')]) - - expected = [mock.call('partx', '-a', '/dev/fake', attempts=3, - delay_on_retry=True), - mock.call('udevadm', 'settle'), - mock.call('mount', self.fake_efi_system_part, - self.fake_dir + '/boot/efi'), - mock.call('efibootmgr', '-v'), - mock.call('efibootmgr', '-v', '-c', '-d', self.fake_dev, - '-p', '1', '-w', - '-L', 'ironic1', '-l', - '\\EFI\\BOOT\\BOOTX64.EFI'), - mock.call('umount', self.fake_dir + '/boot/efi', - attempts=3, delay_on_retry=True), - mock.call('sync')] - - result = image._manage_uefi(self.fake_dev, self.fake_root_uuid) - self.assertTrue(result) - mkdir_mock.assert_called_once_with(self.fake_dir + '/boot/efi') - mock_efi_bl.assert_called_once_with(self.fake_dir + '/boot/efi') - mock_execute.assert_has_calls(expected) - self.assertEqual(7, mock_execute.call_count) - - @mock.patch.object(os.path, 'exists', lambda *_: False) - @mock.patch.object(image, '_get_efi_bootloaders', autospec=True) - @mock.patch.object(image, '_get_partition', autospec=True) - @mock.patch.object(utils, 'get_efi_part_on_device', autospec=True) - @mock.patch.object(os, 'makedirs', autospec=True) - def test__manage_uefi_found_csv(self, mkdir_mock, mock_utils_efi_part, - mock_get_part_uuid, mock_efi_bl, - mock_execute, mock_dispatch): - mock_utils_efi_part.return_value = '1' - mock_get_part_uuid.return_value = self.fake_dev - mock_efi_bl.return_value = ['EFI/vendor/BOOTX64.CSV'] - - # Format is ,,,humanfriendlytextnotused - # https://www.rodsbooks.com/efi-bootloaders/fallback.html - # Mild difference, Ubuntu ships a file without a 0xFEFF delimiter - # at the start of the file, where as Red Hat *does* - csv_file_data = u'shimx64.efi,Vendor String,,Grub2MadeUSDoThis\n' - # This test also handles deleting a pre-existing matching vendor - # string in advance. - dupe_entry = """ -BootCurrent: 0001 -Timeout: 0 seconds -BootOrder: 0000,00001 -Boot0000* Vendor String HD(1,GPT,4f3c6294-bf9b-4208-9808-be45dfc34b5c)File(\EFI\Boot\BOOTX64.EFI) -Boot0001 Vendor String HD(2,GPT,4f3c6294-bf9b-4208-9808-be45dfc34b5c)File(\EFI\Boot\BOOTX64.EFI) -Boot0002: VENDMAGIC FvFile(9f3c6294-bf9b-4208-9808-be45dfc34b51) -""" # noqa This is a giant literal string for testing. - - mock_execute.side_effect = iter([('', ''), ('', ''), - ('', ''), (dupe_entry, ''), - ('', ''), ('', ''), - ('', ''), ('', ''), - ('', '')]) - - expected = [mock.call('partx', '-a', '/dev/fake', attempts=3, - delay_on_retry=True), - mock.call('udevadm', 'settle'), - mock.call('mount', self.fake_efi_system_part, - self.fake_dir + '/boot/efi'), - mock.call('efibootmgr', '-v'), - mock.call('efibootmgr', '-b', '0000', '-B'), - mock.call('efibootmgr', '-b', '0001', '-B'), - mock.call('efibootmgr', '-v', '-c', '-d', self.fake_dev, - '-p', '1', '-w', - '-L', 'Vendor String', '-l', - '\\EFI\\vendor\\shimx64.efi'), - mock.call('umount', self.fake_dir + '/boot/efi', - attempts=3, delay_on_retry=True), - mock.call('sync')] - with mock.patch('builtins.open', - mock.mock_open(read_data=csv_file_data)): - result = image._manage_uefi(self.fake_dev, self.fake_root_uuid) - self.assertTrue(result) - mkdir_mock.assert_called_once_with(self.fake_dir + '/boot/efi') - mock_efi_bl.assert_called_once_with(self.fake_dir + '/boot/efi') - mock_execute.assert_has_calls(expected) - self.assertEqual(9, mock_execute.call_count) - - @mock.patch.object(os.path, 'exists', lambda *_: False) - @mock.patch.object(image, '_get_efi_bootloaders', autospec=True) - @mock.patch.object(image, '_get_partition', autospec=True) - @mock.patch.object(utils, 'get_efi_part_on_device', autospec=True) - @mock.patch.object(os, 'makedirs', autospec=True) - def test__manage_uefi_nvme_device(self, mkdir_mock, mock_utils_efi_part, - mock_get_part_uuid, mock_efi_bl, - mock_execute, mock_dispatch): - mock_utils_efi_part.return_value = '1' - mock_get_part_uuid.return_value = '/dev/fakenvme0p1' - - mock_efi_bl.return_value = ['EFI/BOOT/BOOTX64.EFI'] - - mock_execute.side_effect = iter([('', ''), ('', ''), - ('', ''), ('', ''), - ('', ''), ('', ''), - ('', '')]) - - expected = [mock.call('partx', '-a', '/dev/fakenvme0', attempts=3, - delay_on_retry=True), - mock.call('udevadm', 'settle'), - mock.call('mount', '/dev/fakenvme0p1', - self.fake_dir + '/boot/efi'), - mock.call('efibootmgr', '-v'), - mock.call('efibootmgr', '-v', '-c', '-d', '/dev/fakenvme0', - '-p', '1', '-w', - '-L', 'ironic1', '-l', - '\\EFI\\BOOT\\BOOTX64.EFI'), - mock.call('umount', self.fake_dir + '/boot/efi', - attempts=3, delay_on_retry=True), - mock.call('sync')] - - result = image._manage_uefi('/dev/fakenvme0', self.fake_root_uuid) - self.assertTrue(result) - mkdir_mock.assert_called_once_with(self.fake_dir + '/boot/efi') - mock_efi_bl.assert_called_once_with(self.fake_dir + '/boot/efi') - mock_execute.assert_has_calls(expected) - self.assertEqual(7, mock_execute.call_count) - - @mock.patch.object(os.path, 'exists', lambda *_: False) - @mock.patch.object(image, '_get_efi_bootloaders', autospec=True) - @mock.patch.object(image, '_get_partition', autospec=True) - @mock.patch.object(utils, 'get_efi_part_on_device', autospec=True) - @mock.patch.object(os, 'makedirs', autospec=True) - def test__manage_uefi_wholedisk( - self, mkdir_mock, mock_utils_efi_part, - mock_get_part_uuid, mock_efi_bl, mock_execute, - mock_dispatch): - mock_utils_efi_part.return_value = '1' - mock_get_part_uuid.side_effect = Exception - - mock_efi_bl.return_value = ['EFI/BOOT/BOOTX64.EFI'] - - mock_execute.side_effect = iter([('', ''), ('', ''), - ('', ''), ('', ''), - ('', ''), ('', ''), - ('', '')]) - - expected = [mock.call('partx', '-a', '/dev/fake', attempts=3, - delay_on_retry=True), - mock.call('udevadm', 'settle'), - mock.call('mount', self.fake_efi_system_part, - self.fake_dir + '/boot/efi'), - mock.call('efibootmgr', '-v'), - mock.call('efibootmgr', '-v', '-c', '-d', self.fake_dev, - '-p', '1', '-w', - '-L', 'ironic1', '-l', - '\\EFI\\BOOT\\BOOTX64.EFI'), - mock.call('umount', self.fake_dir + '/boot/efi', - attempts=3, delay_on_retry=True), - mock.call('sync')] - - result = image._manage_uefi(self.fake_dev, None) - self.assertTrue(result) - mkdir_mock.assert_called_once_with(self.fake_dir + '/boot/efi') - mock_efi_bl.assert_called_once_with(self.fake_dir + '/boot/efi') - mock_execute.assert_has_calls(expected) - self.assertEqual(7, mock_execute.call_count) - - @mock.patch.object(os, 'walk', autospec=True) - @mock.patch.object(os, 'access', autospec=False) - def test__no_efi_bootloaders(self, mock_access, mock_walk, mock_execute, - mock_dispatch): - # No valid efi file. - mock_walk.return_value = [ - ('/boot/efi', ['EFI'], []), - ('/boot/efi/EFI', ['centos', 'BOOT'], []), - ('/boot/efi/EFI/centos', ['fw', 'fonts'], - ['shimx64-centos.efi', - 'MokManager.efi', 'mmx64.efi', 'shim.efi', 'fwupia32.efi', - 'fwupx64.efi', 'shimx64.efi', 'grubenv', 'grubx64.efi', - 'grub.cfg']), - ('/boot/efi/EFI/centos/fw', [], []), - ('/boot/efi/EFI/centos/fonts', [], ['unicode.pf2']), - ('/boot/efi/EFI/BOOT', [], []) - ] - - result = image._get_efi_bootloaders("/boot/efi") - self.assertEqual(result, []) - mock_access.assert_not_called() - - @mock.patch.object(os, 'walk', autospec=True) - @mock.patch.object(os, 'access', autospec=True) - def test__get_efi_bootloaders(self, mock_access, mock_walk, mock_execute, - mock_dispatch): - mock_walk.return_value = [ - ('/boot/efi', ['EFI'], []), - ('/boot/efi/EFI', ['centos', 'BOOT'], []), - ('/boot/efi/EFI/centos', ['fw', 'fonts'], - ['shimx64-centos.efi', 'BOOTX64.CSV', - 'MokManager.efi', 'mmx64.efi', 'shim.efi', 'fwupia32.efi', - 'fwupx64.efi', 'shimx64.efi', 'grubenv', 'grubx64.efi', - 'grub.cfg']), - ('/boot/efi/EFI/centos/fw', [], []), - ('/boot/efi/EFI/centos/fonts', [], ['unicode.pf2']), - ('/boot/efi/EFI/BOOT', [], - ['BOOTX64.EFI', 'fallback.efi', 'fbx64.efi']) - ] - mock_access.return_value = True - result = image._get_efi_bootloaders("/boot/efi") - self.assertEqual(result[0], 'EFI/centos/BOOTX64.CSV') - - @mock.patch.object(os, 'walk', autospec=True) - @mock.patch.object(os, 'access', autospec=True) - def test__get_efi_bootloaders_no_csv( - self, mock_access, mock_walk, mock_execute, mock_dispatch): - mock_walk.return_value = [ - ('/boot/efi', ['EFI'], []), - ('/boot/efi/EFI', ['centos', 'BOOT'], []), - ('/boot/efi/EFI/centos', ['fw', 'fonts'], - ['shimx64-centos.efi', - 'MokManager.efi', 'mmx64.efi', 'shim.efi', 'fwupia32.efi', - 'fwupx64.efi', 'shimx64.efi', 'grubenv', 'grubx64.efi', - 'grub.cfg']), - ('/boot/efi/EFI/centos/fw', [], []), - ('/boot/efi/EFI/centos/fonts', [], ['unicode.pf2']), - ('/boot/efi/EFI/BOOT', [], - ['BOOTX64.EFI', 'fallback.efi', 'fbx64.efi']) - ] - mock_access.return_value = True - result = image._get_efi_bootloaders("/boot/efi") - self.assertEqual(result[0], 'EFI/BOOT/BOOTX64.EFI') - - @mock.patch.object(os, 'walk', autospec=True) - @mock.patch.object(os, 'access', autospec=True) - def test__get_windows_efi_bootloaders(self, mock_access, mock_walk, - mock_execute, mock_dispatch): - mock_walk.return_value = [ - ('/boot/efi', ['WINDOWS'], []), - ('/boot/efi/WINDOWS', ['system32'], []), - ('/boot/efi/WINDOWS/system32', [], - ['winload.efi']) - ] - mock_access.return_value = True - result = image._get_efi_bootloaders("/boot/efi") - self.assertEqual(result[0], 'WINDOWS/system32/winload.efi') - - def test__run_efibootmgr_no_bootloaders(self, mock_execute, mock_dispatch): - result = image._run_efibootmgr([], self.fake_dev, - self.fake_efi_system_part, - self.fake_dir) - expected = [] - self.assertIsNone(result) - mock_execute.assert_has_calls(expected) - - def test__run_efibootmgr(self, mock_execute, mock_dispatch): - mock_execute.return_value = ('', '') - result = image._run_efibootmgr(['EFI/BOOT/BOOTX64.EFI'], - self.fake_dev, - self.fake_efi_system_part, - self.fake_dir) - expected = [mock.call('efibootmgr', '-v'), - mock.call('efibootmgr', '-v', '-c', '-d', self.fake_dev, - '-p', self.fake_efi_system_part, '-w', - '-L', 'ironic1', '-l', - '\\EFI\\BOOT\\BOOTX64.EFI')] - self.assertIsNone(result) - mock_execute.assert_has_calls(expected) - @mock.patch.object(os.path, 'exists', lambda *_: True) def test__append_uefi_to_fstab_handles_error(self, mock_execute, mock_dispatch): diff --git a/ironic_python_agent/tests/unit/test_efi_utils.py b/ironic_python_agent/tests/unit/test_efi_utils.py new file mode 100644 index 000000000..f41a3af04 --- /dev/null +++ b/ironic_python_agent/tests/unit/test_efi_utils.py @@ -0,0 +1,304 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import shutil +import tempfile +from unittest import mock + +from ironic_python_agent import efi_utils +from ironic_python_agent import errors +from ironic_python_agent import partition_utils +from ironic_python_agent.tests.unit import base +from ironic_python_agent import utils + + +@mock.patch.object(os, 'walk', autospec=True) +@mock.patch.object(os, 'access', autospec=False) +class TestGetEfiBootloaders(base.IronicAgentTest): + + def test__no_efi_bootloaders(self, mock_access, mock_walk): + # No valid efi file. + mock_walk.return_value = [ + ('/boot/efi', ['EFI'], []), + ('/boot/efi/EFI', ['centos', 'BOOT'], []), + ('/boot/efi/EFI/centos', ['fw', 'fonts'], + ['shimx64-centos.efi', + 'MokManager.efi', 'mmx64.efi', 'shim.efi', 'fwupia32.efi', + 'fwupx64.efi', 'shimx64.efi', 'grubenv', 'grubx64.efi', + 'grub.cfg']), + ('/boot/efi/EFI/centos/fw', [], []), + ('/boot/efi/EFI/centos/fonts', [], ['unicode.pf2']), + ('/boot/efi/EFI/BOOT', [], []) + ] + + result = efi_utils._get_efi_bootloaders("/boot/efi") + self.assertEqual(result, []) + mock_access.assert_not_called() + + def test__get_efi_bootloaders(self, mock_access, mock_walk): + mock_walk.return_value = [ + ('/boot/efi', ['EFI'], []), + ('/boot/efi/EFI', ['centos', 'BOOT'], []), + ('/boot/efi/EFI/centos', ['fw', 'fonts'], + ['shimx64-centos.efi', 'BOOTX64.CSV', + 'MokManager.efi', 'mmx64.efi', 'shim.efi', 'fwupia32.efi', + 'fwupx64.efi', 'shimx64.efi', 'grubenv', 'grubx64.efi', + 'grub.cfg']), + ('/boot/efi/EFI/centos/fw', [], []), + ('/boot/efi/EFI/centos/fonts', [], ['unicode.pf2']), + ('/boot/efi/EFI/BOOT', [], + ['BOOTX64.EFI', 'fallback.efi', 'fbx64.efi']) + ] + mock_access.return_value = True + result = efi_utils._get_efi_bootloaders("/boot/efi") + self.assertEqual(result[0], 'EFI/centos/BOOTX64.CSV') + + def test__get_efi_bootloaders_no_csv(self, mock_access, mock_walk): + mock_walk.return_value = [ + ('/boot/efi', ['EFI'], []), + ('/boot/efi/EFI', ['centos', 'BOOT'], []), + ('/boot/efi/EFI/centos', ['fw', 'fonts'], + ['shimx64-centos.efi', + 'MokManager.efi', 'mmx64.efi', 'shim.efi', 'fwupia32.efi', + 'fwupx64.efi', 'shimx64.efi', 'grubenv', 'grubx64.efi', + 'grub.cfg']), + ('/boot/efi/EFI/centos/fw', [], []), + ('/boot/efi/EFI/centos/fonts', [], ['unicode.pf2']), + ('/boot/efi/EFI/BOOT', [], + ['BOOTX64.EFI', 'fallback.efi', 'fbx64.efi']) + ] + mock_access.return_value = True + result = efi_utils._get_efi_bootloaders("/boot/efi") + self.assertEqual(result[0], 'EFI/BOOT/BOOTX64.EFI') + + def test__get_windows_efi_bootloaders(self, mock_access, mock_walk): + mock_walk.return_value = [ + ('/boot/efi', ['WINDOWS'], []), + ('/boot/efi/WINDOWS', ['system32'], []), + ('/boot/efi/WINDOWS/system32', [], + ['winload.efi']) + ] + mock_access.return_value = True + result = efi_utils._get_efi_bootloaders("/boot/efi") + self.assertEqual(result[0], 'WINDOWS/system32/winload.efi') + + +@mock.patch.object(utils, 'execute', autospec=True) +class TestRunEfiBootmgr(base.IronicAgentTest): + + fake_dev = '/dev/fake' + fake_efi_system_part = '/dev/fake1' + fake_dir = '/tmp/fake-dir' + + def test__run_efibootmgr_no_bootloaders(self, mock_execute): + result = efi_utils._run_efibootmgr([], self.fake_dev, + self.fake_efi_system_part, + self.fake_dir) + expected = [] + self.assertIsNone(result) + mock_execute.assert_has_calls(expected) + + def test__run_efibootmgr(self, mock_execute): + mock_execute.return_value = ('', '') + result = efi_utils._run_efibootmgr(['EFI/BOOT/BOOTX64.EFI'], + self.fake_dev, + self.fake_efi_system_part, + self.fake_dir) + expected = [mock.call('efibootmgr', '-v'), + mock.call('efibootmgr', '-v', '-c', '-d', self.fake_dev, + '-p', self.fake_efi_system_part, '-w', + '-L', 'ironic1', '-l', + '\\EFI\\BOOT\\BOOTX64.EFI')] + self.assertIsNone(result) + mock_execute.assert_has_calls(expected) + + +@mock.patch.object(shutil, 'rmtree', lambda *_: None) +@mock.patch.object(tempfile, 'mkdtemp', lambda *_: '/tmp/fake-dir') +@mock.patch.object(utils, 'rescan_device', autospec=True) +@mock.patch.object(utils, 'execute', autospec=True) +@mock.patch.object(partition_utils, 'get_partition', autospec=True) +@mock.patch.object(utils, 'get_efi_part_on_device', autospec=True) +class TestManageUefi(base.IronicAgentTest): + + fake_dev = '/dev/fake' + fake_efi_system_part = '/dev/fake1' + fake_root_part = '/dev/fake2' + fake_root_uuid = '11111111-2222-3333-4444-555555555555' + fake_dir = '/tmp/fake-dir' + + def test_no_partition(self, mock_utils_efi_part, + mock_get_part_uuid, mock_execute, + mock_rescan): + mock_utils_efi_part.return_value = None + self.assertRaises(errors.DeviceNotFound, + efi_utils.manage_uefi, self.fake_dev, None) + self.assertFalse(mock_get_part_uuid.called) + mock_rescan.assert_called_once_with(self.fake_dev) + + def test_empty_partition_by_uuid(self, mock_utils_efi_part, + mock_get_part_uuid, mock_execute, + mock_rescan): + mock_utils_efi_part.return_value = None + mock_get_part_uuid.return_value = self.fake_root_part + result = efi_utils.manage_uefi(self.fake_dev, self.fake_root_uuid) + self.assertFalse(result) + mock_rescan.assert_called_once_with(self.fake_dev) + + @mock.patch.object(os.path, 'exists', lambda *_: False) + @mock.patch.object(efi_utils, '_get_efi_bootloaders', autospec=True) + @mock.patch.object(os, 'makedirs', autospec=True) + def test_ok(self, mkdir_mock, mock_efi_bl, mock_utils_efi_part, + mock_get_part_uuid, mock_execute, mock_rescan): + mock_utils_efi_part.return_value = '1' + mock_get_part_uuid.return_value = self.fake_dev + + mock_efi_bl.return_value = ['EFI/BOOT/BOOTX64.EFI'] + + mock_execute.side_effect = iter([('', ''), ('', ''), + ('', ''), ('', ''), + ('', ''), ('', ''), + ('', '')]) + + expected = [mock.call('mount', self.fake_efi_system_part, + self.fake_dir + '/boot/efi'), + mock.call('efibootmgr', '-v'), + mock.call('efibootmgr', '-v', '-c', '-d', self.fake_dev, + '-p', '1', '-w', + '-L', 'ironic1', '-l', + '\\EFI\\BOOT\\BOOTX64.EFI'), + mock.call('umount', self.fake_dir + '/boot/efi', + attempts=3, delay_on_retry=True), + mock.call('sync')] + + result = efi_utils.manage_uefi(self.fake_dev, self.fake_root_uuid) + self.assertTrue(result) + mkdir_mock.assert_called_once_with(self.fake_dir + '/boot/efi') + mock_efi_bl.assert_called_once_with(self.fake_dir + '/boot/efi') + mock_execute.assert_has_calls(expected) + self.assertEqual(5, mock_execute.call_count) + mock_rescan.assert_called_once_with(self.fake_dev) + + @mock.patch.object(os.path, 'exists', lambda *_: False) + @mock.patch.object(efi_utils, '_get_efi_bootloaders', autospec=True) + @mock.patch.object(os, 'makedirs', autospec=True) + def test_found_csv(self, mkdir_mock, mock_efi_bl, mock_utils_efi_part, + mock_get_part_uuid, mock_execute, mock_rescan): + mock_utils_efi_part.return_value = '1' + mock_get_part_uuid.return_value = self.fake_dev + mock_efi_bl.return_value = ['EFI/vendor/BOOTX64.CSV'] + + # Format is ,,,humanfriendlytextnotused + # https://www.rodsbooks.com/efi-bootloaders/fallback.html + # Mild difference, Ubuntu ships a file without a 0xFEFF delimiter + # at the start of the file, where as Red Hat *does* + csv_file_data = u'shimx64.efi,Vendor String,,Grub2MadeUSDoThis\n' + # This test also handles deleting a pre-existing matching vendor + # string in advance. + dupe_entry = """ +BootCurrent: 0001 +Timeout: 0 seconds +BootOrder: 0000,00001 +Boot0000* Vendor String HD(1,GPT,4f3c6294-bf9b-4208-9808-be45dfc34b5c)File(\EFI\Boot\BOOTX64.EFI) +Boot0001 Vendor String HD(2,GPT,4f3c6294-bf9b-4208-9808-be45dfc34b5c)File(\EFI\Boot\BOOTX64.EFI) +Boot0002: VENDMAGIC FvFile(9f3c6294-bf9b-4208-9808-be45dfc34b51) +""" # noqa This is a giant literal string for testing. + + mock_execute.side_effect = iter([('', ''), (dupe_entry, ''), + ('', ''), ('', ''), + ('', ''), ('', ''), + ('', '')]) + + expected = [mock.call('mount', self.fake_efi_system_part, + self.fake_dir + '/boot/efi'), + mock.call('efibootmgr', '-v'), + mock.call('efibootmgr', '-b', '0000', '-B'), + mock.call('efibootmgr', '-b', '0001', '-B'), + mock.call('efibootmgr', '-v', '-c', '-d', self.fake_dev, + '-p', '1', '-w', + '-L', 'Vendor String', '-l', + '\\EFI\\vendor\\shimx64.efi'), + mock.call('umount', self.fake_dir + '/boot/efi', + attempts=3, delay_on_retry=True), + mock.call('sync')] + with mock.patch('builtins.open', + mock.mock_open(read_data=csv_file_data)): + result = efi_utils.manage_uefi(self.fake_dev, self.fake_root_uuid) + self.assertTrue(result) + mkdir_mock.assert_called_once_with(self.fake_dir + '/boot/efi') + mock_efi_bl.assert_called_once_with(self.fake_dir + '/boot/efi') + mock_execute.assert_has_calls(expected) + + @mock.patch.object(os.path, 'exists', lambda *_: False) + @mock.patch.object(efi_utils, '_get_efi_bootloaders', autospec=True) + @mock.patch.object(os, 'makedirs', autospec=True) + def test_nvme_device(self, mkdir_mock, mock_efi_bl, mock_utils_efi_part, + mock_get_part_uuid, mock_execute, mock_rescan): + mock_utils_efi_part.return_value = '1' + mock_get_part_uuid.return_value = '/dev/fakenvme0p1' + + mock_efi_bl.return_value = ['EFI/BOOT/BOOTX64.EFI'] + + mock_execute.side_effect = iter([('', ''), ('', ''), + ('', ''), ('', ''), + ('', ''), ('', ''), + ('', '')]) + + expected = [mock.call('mount', '/dev/fakenvme0p1', + self.fake_dir + '/boot/efi'), + mock.call('efibootmgr', '-v'), + mock.call('efibootmgr', '-v', '-c', '-d', '/dev/fakenvme0', + '-p', '1', '-w', + '-L', 'ironic1', '-l', + '\\EFI\\BOOT\\BOOTX64.EFI'), + mock.call('umount', self.fake_dir + '/boot/efi', + attempts=3, delay_on_retry=True), + mock.call('sync')] + + result = efi_utils.manage_uefi('/dev/fakenvme0', self.fake_root_uuid) + self.assertTrue(result) + mkdir_mock.assert_called_once_with(self.fake_dir + '/boot/efi') + mock_efi_bl.assert_called_once_with(self.fake_dir + '/boot/efi') + mock_execute.assert_has_calls(expected) + + @mock.patch.object(os.path, 'exists', lambda *_: False) + @mock.patch.object(efi_utils, '_get_efi_bootloaders', autospec=True) + @mock.patch.object(os, 'makedirs', autospec=True) + def test_wholedisk(self, mkdir_mock, mock_efi_bl, mock_utils_efi_part, + mock_get_part_uuid, mock_execute, mock_rescan): + mock_utils_efi_part.return_value = '1' + mock_get_part_uuid.side_effect = Exception + + mock_efi_bl.return_value = ['EFI/BOOT/BOOTX64.EFI'] + + mock_execute.side_effect = iter([('', ''), ('', ''), + ('', ''), ('', ''), + ('', ''), ('', ''), + ('', '')]) + + expected = [mock.call('mount', self.fake_efi_system_part, + self.fake_dir + '/boot/efi'), + mock.call('efibootmgr', '-v'), + mock.call('efibootmgr', '-v', '-c', '-d', self.fake_dev, + '-p', '1', '-w', + '-L', 'ironic1', '-l', + '\\EFI\\BOOT\\BOOTX64.EFI'), + mock.call('umount', self.fake_dir + '/boot/efi', + attempts=3, delay_on_retry=True), + mock.call('sync')] + + result = efi_utils.manage_uefi(self.fake_dev, None) + self.assertTrue(result) + mkdir_mock.assert_called_once_with(self.fake_dir + '/boot/efi') + mock_efi_bl.assert_called_once_with(self.fake_dir + '/boot/efi') + mock_execute.assert_has_calls(expected) diff --git a/ironic_python_agent/tests/unit/test_partition_utils.py b/ironic_python_agent/tests/unit/test_partition_utils.py index 64316dc00..1bae6d158 100644 --- a/ironic_python_agent/tests/unit/test_partition_utils.py +++ b/ironic_python_agent/tests/unit/test_partition_utils.py @@ -22,6 +22,8 @@ from ironic_lib import utils from oslo_concurrency import processutils import requests +from ironic_python_agent import errors +from ironic_python_agent import hardware from ironic_python_agent import partition_utils from ironic_python_agent.tests.unit import base @@ -1173,3 +1175,122 @@ class RealFilePartitioningTestCase(base.IronicAgentTest): self.assertEqual([6, 3], sizes[:2], "unexpected partitioning %s" % part_table) self.assertIn(sizes[2], (9, 10)) + + +@mock.patch.object(utils, 'execute', autospec=True) +@mock.patch.object(hardware, 'is_md_device', autospec=True) +class TestGetPartition(base.IronicAgentTest): + + fake_dev = '/dev/fake' + fake_root_uuid = '11111111-2222-3333-4444-555555555555' + + def test(self, mock_is_md_device, mock_execute): + mock_is_md_device.side_effect = [False] + mock_is_md_device.side_effect = [False, False] + lsblk_output = ('''KNAME="test" UUID="" TYPE="disk" + KNAME="test1" UUID="256a39e3-ca3c-4fb8-9cc2-b32eec441f47" TYPE="part" + KNAME="test2" UUID="%s" TYPE="part"''' % self.fake_root_uuid) + mock_execute.side_effect = (None, None, [lsblk_output]) + + root_part = partition_utils.get_partition( + self.fake_dev, self.fake_root_uuid) + self.assertEqual('/dev/test2', root_part) + expected = [mock.call('partx', '-a', self.fake_dev, attempts=3, + delay_on_retry=True), + mock.call('udevadm', 'settle'), + mock.call('lsblk', '-PbioKNAME,UUID,PARTUUID,TYPE,LABEL', + self.fake_dev)] + mock_execute.assert_has_calls(expected) + + def test_no_device_found(self, mock_is_md_device, mock_execute): + mock_is_md_device.side_effect = [False, False] + lsblk_output = ('''KNAME="test" UUID="" TYPE="disk" + KNAME="test1" UUID="256a39e3-ca3c-4fb8-9cc2-b32eec441f47" TYPE="part" + KNAME="test2" UUID="" TYPE="part"''') + mock_execute.side_effect = ( + None, None, [lsblk_output], + processutils.ProcessExecutionError('boom'), + processutils.ProcessExecutionError('kaboom')) + + self.assertRaises(errors.DeviceNotFound, + partition_utils.get_partition, self.fake_dev, + self.fake_root_uuid) + expected = [mock.call('partx', '-a', self.fake_dev, attempts=3, + delay_on_retry=True), + mock.call('udevadm', 'settle'), + mock.call('lsblk', '-PbioKNAME,UUID,PARTUUID,TYPE,LABEL', + self.fake_dev)] + mock_execute.assert_has_calls(expected) + + def test_fallback_partuuid(self, mock_is_md_device, mock_execute): + mock_is_md_device.side_effect = [False] + lsblk_output = ('''KNAME="test" UUID="" TYPE="disk" + KNAME="test1" UUID="256a39e3-ca3c-4fb8-9cc2-b32eec441f47" TYPE="part" + KNAME="test2" UUID="" TYPE="part"''') + findfs_output = ('/dev/loop0\n', None) + mock_execute.side_effect = ( + None, None, [lsblk_output], + processutils.ProcessExecutionError('boom'), + findfs_output) + + result = partition_utils.get_partition( + self.fake_dev, self.fake_root_uuid) + self.assertEqual('/dev/loop0', result) + expected = [mock.call('partx', '-a', self.fake_dev, attempts=3, + delay_on_retry=True), + mock.call('udevadm', 'settle'), + mock.call('lsblk', '-PbioKNAME,UUID,PARTUUID,TYPE,LABEL', + self.fake_dev), + mock.call('findfs', 'UUID=%s' % self.fake_root_uuid), + mock.call('findfs', 'PARTUUID=%s' % self.fake_root_uuid)] + mock_execute.assert_has_calls(expected) + + def test_command_fail(self, mock_is_md_device, mock_execute): + mock_is_md_device.side_effect = [False, False] + mock_execute.side_effect = (None, None, + processutils.ProcessExecutionError('boom')) + self.assertRaises(errors.CommandExecutionError, + partition_utils.get_partition, self.fake_dev, + self.fake_root_uuid) + + expected = [mock.call('partx', '-a', self.fake_dev, attempts=3, + delay_on_retry=True), + mock.call('udevadm', 'settle'), + mock.call('lsblk', '-PbioKNAME,UUID,PARTUUID,TYPE,LABEL', + self.fake_dev)] + mock_execute.assert_has_calls(expected) + + def test_partuuid(self, mock_is_md_device, mock_execute): + mock_is_md_device.side_effect = [False, False] + lsblk_output = ('''KNAME="test" UUID="" TYPE="disk" + KNAME="test1" UUID="256a39e3-ca3c-4fb8-9cc2-b32eec441f47" TYPE="part" + KNAME="test2" UUID="903e7bf9-8a13-4f7f-811b-25dc16faf6f7" TYPE="part" \ + LABEL="%s"''' % self.fake_root_uuid) + mock_execute.side_effect = (None, None, [lsblk_output]) + + root_part = partition_utils.get_partition( + self.fake_dev, self.fake_root_uuid) + self.assertEqual('/dev/test2', root_part) + expected = [mock.call('partx', '-a', self.fake_dev, attempts=3, + delay_on_retry=True), + mock.call('udevadm', 'settle'), + mock.call('lsblk', '-PbioKNAME,UUID,PARTUUID,TYPE,LABEL', + self.fake_dev)] + mock_execute.assert_has_calls(expected) + + def test_label(self, mock_is_md_device, mock_execute): + mock_is_md_device.side_effect = [False, False] + lsblk_output = ('''KNAME="test" UUID="" TYPE="disk" + KNAME="test1" UUID="256a39e3-ca3c-4fb8-9cc2-b32eec441f47" TYPE="part" + KNAME="test2" PARTUUID="%s" TYPE="part"''' % self.fake_root_uuid) + mock_execute.side_effect = (None, None, [lsblk_output]) + + root_part = partition_utils.get_partition( + self.fake_dev, self.fake_root_uuid) + self.assertEqual('/dev/test2', root_part) + expected = [mock.call('partx', '-a', self.fake_dev, attempts=3, + delay_on_retry=True), + mock.call('udevadm', 'settle'), + mock.call('lsblk', '-PbioKNAME,UUID,PARTUUID,TYPE,LABEL', + self.fake_dev)] + mock_execute.assert_has_calls(expected)