Merge "Avoid using blkid in favour of lsblk and parted" into stable/xena

This commit is contained in:
Zuul 2021-09-20 15:30:47 +00:00 committed by Gerrit Code Review
commit df502f976a
2 changed files with 88 additions and 40 deletions

View File

@ -69,6 +69,8 @@ LOG = logging.getLogger(__name__)
_PARTED_PRINT_RE = re.compile(r"^(\d+):([\d\.]+)MiB:"
r"([\d\.]+)MiB:([\d\.]+)MiB:(\w*):(.*):(.*);")
_PARTED_TABLE_TYPE_RE = re.compile(r'^.*partition\s+table\s*:\s*(gpt|msdos)',
re.IGNORECASE | re.MULTILINE)
CONFIGDRIVE_LABEL = "config-2"
MAX_CONFIG_DRIVE_SIZE_MB = 64
@ -176,20 +178,17 @@ def get_partition_table_type(device):
:param device: the name of the device
:return: dos, gpt or None
"""
return get_device_information(device, probe=True).get('PTTYPE')
out = utils.execute('parted', '--script', device, '--', 'print',
run_as_root=True, use_standard_locale=True)[0]
m = _PARTED_TABLE_TYPE_RE.search(out)
if m:
return m.group(1)
LOG.warning("Unable to get partition table type for device %s", device)
return 'unknown'
def get_device_information(device, probe=False, fields=None):
"""Get information about a device using blkid.
Can be applied to all block devices: disks, RAID, partitions.
:param device: Device name.
:param probe: Switch to low-level probing mode.
:param fields: A list of fields to request (all by default).
:return: A dictionary with requested fields as keys.
:raises: ProcessExecutionError
"""
def _blkid(device, probe=False, fields=None):
args = []
if probe:
args.append('--probe')
@ -199,7 +198,42 @@ def get_device_information(device, probe=False, fields=None):
output, err = utils.execute('blkid', device, *args,
use_standard_locale=True, run_as_root=True)
if output.strip():
output = output.split(': ', 1)[1]
return output.split(': ', 1)[1]
else:
return ""
def _lsblk(device, deps=True, fields=None):
args = ['--pairs', '--bytes', '--ascii']
if not deps:
args.append('--nodeps')
if fields:
args.extend(['--output', ','.join(fields)])
else:
args.append('--output-all')
output, err = utils.execute('lsblk', device, *args,
use_standard_locale=True, run_as_root=True)
return output.strip()
def get_device_information(device, probe=False, fields=None):
"""Get information about a device using blkid.
Can be applied to all block devices: disks, RAID, partitions.
:param device: Device name.
:param probe: DEPRECATED, do not use.
:param fields: A list of fields to request (all by default).
:return: A dictionary with requested fields as keys.
:raises: ProcessExecutionError
"""
if probe:
output = _blkid(device, probe=True, fields=fields)
else:
output = _lsblk(device, fields=fields, deps=False)
if output:
return next(utils.parse_device_tags(output))
else:
return {}
@ -248,8 +282,7 @@ def get_uefi_disk_identifier(dev):
vals = line.split()
partition_id = vals[0]
try:
lsblk_output, _ = utils.execute('lsblk', '-PbioUUID', partition_id,
run_as_root=True)
lsblk_output = _lsblk(partition_id, fields=['UUID'])
disk_identifier = lsblk_output.split("=")[1].strip()
disk_identifier = disk_identifier.strip('"')
except processutils.ProcessExecutionError as e:

View File

@ -134,7 +134,8 @@ Device Start End Sectors Size Type
self.assertEqual(part_result, result)
execute_calls = [
mock.call('fdisk', '-l', self.dev, run_as_root=True),
mock.call('lsblk', '-PbioUUID', partition_id,
mock.call('lsblk', partition_id, '--pairs', '--bytes', '--ascii',
'--output', 'UUID', use_standard_locale=True,
run_as_root=True)
]
mock_execute.assert_has_calls(execute_calls)
@ -161,7 +162,9 @@ Device Boot Start End Sectors Size Id Type
disk_utils.get_uefi_disk_identifier, self.dev)
execute_calls = [
mock.call('fdisk', '-l', self.dev, run_as_root=True),
mock.call('lsblk', '-PbioUUID', partition_id, run_as_root=True)
mock.call('lsblk', partition_id, '--pairs', '--bytes', '--ascii',
'--output', 'UUID', use_standard_locale=True,
run_as_root=True)
]
mock_execute.assert_has_calls(execute_calls)
@ -898,8 +901,8 @@ BLKID_PROBE = ("""
"""PTUUID="123456" PTTYPE="gpt"
""")
BLKID_NORMAL = (
'dev/nvme0n1p1: UUID="123" BLOCK_SIZE="512" TYPE="vfat" '
LSBLK_NORMAL = (
'UUID="123" BLOCK_SIZE="512" TYPE="vfat" '
'PARTLABEL="EFI System Partition" PARTUUID="123456"'
)
@ -908,16 +911,16 @@ BLKID_NORMAL = (
class GetDeviceInformationTestCase(base.IronicLibTestCase):
def test_normal(self, mock_execute):
mock_execute.return_value = BLKID_NORMAL, ""
mock_execute.return_value = LSBLK_NORMAL, ""
result = disk_utils.get_device_information('/dev/fake')
self.assertEqual(
{'UUID': '123', 'BLOCK_SIZE': '512', 'TYPE': 'vfat',
'PARTLABEL': 'EFI System Partition', 'PARTUUID': '123456'},
result
)
mock_execute.assert_called_once_with('blkid', '/dev/fake',
use_standard_locale=True,
run_as_root=True)
mock_execute.assert_called_once_with(
'lsblk', '/dev/fake', '--pairs', '--bytes', '--ascii', '--nodeps',
'--output-all', use_standard_locale=True, run_as_root=True)
def test_probe(self, mock_execute):
mock_execute.return_value = BLKID_PROBE, ""
@ -928,7 +931,7 @@ class GetDeviceInformationTestCase(base.IronicLibTestCase):
run_as_root=True)
def test_fields(self, mock_execute):
mock_execute.return_value = BLKID_NORMAL, ""
mock_execute.return_value = LSBLK_NORMAL, ""
result = disk_utils.get_device_information('/dev/fake',
fields=['UUID', 'LABEL'])
# No filtering on our side, so returning all fake fields
@ -937,33 +940,45 @@ class GetDeviceInformationTestCase(base.IronicLibTestCase):
'PARTLABEL': 'EFI System Partition', 'PARTUUID': '123456'},
result
)
mock_execute.assert_called_once_with('blkid', '/dev/fake',
'--match-tag', 'UUID',
'--match-tag', 'LABEL',
use_standard_locale=True,
run_as_root=True)
mock_execute.assert_called_once_with(
'lsblk', '/dev/fake', '--pairs', '--bytes', '--ascii', '--nodeps',
'--output', 'UUID,LABEL',
use_standard_locale=True, run_as_root=True)
def test_empty(self, mock_execute):
mock_execute.return_value = "\n", ""
result = disk_utils.get_device_information('/dev/fake')
result = disk_utils.get_device_information('/dev/fake', probe=True)
self.assertEqual({}, result)
mock_execute.assert_called_once_with('blkid', '/dev/fake',
'--probe',
use_standard_locale=True,
run_as_root=True)
@mock.patch.object(disk_utils, 'get_device_information', autospec=True)
@mock.patch.object(utils, 'execute', autospec=True)
class GetPartitionTableTypeTestCase(base.IronicLibTestCase):
def test_ok(self, mock_get_device_info):
mock_get_device_info.return_value = {'PTTYPE': 'gpt'}
self.assertEqual('gpt',
disk_utils.get_partition_table_type('/dev/fake'))
mock_get_device_info.assert_called_once_with('/dev/fake', probe=True)
def test_gpt(self, mocked_execute):
self._test_by_type(mocked_execute, 'gpt', 'gpt')
def test_missing(self, mock_get_device_info):
mock_get_device_info.return_value = {}
self.assertIsNone(disk_utils.get_partition_table_type('/dev/fake'))
mock_get_device_info.assert_called_once_with('/dev/fake', probe=True)
def test_msdos(self, mocked_execute):
self._test_by_type(mocked_execute, 'msdos', 'msdos')
def test_unknown(self, mocked_execute):
self._test_by_type(mocked_execute, 'whatever', 'unknown')
def _test_by_type(self, mocked_execute, table_type_output,
expected_table_type):
parted_ret = PARTED_OUTPUT_UNFORMATTED.format(table_type_output)
mocked_execute.side_effect = [
(parted_ret, None),
]
ret = disk_utils.get_partition_table_type('hello')
mocked_execute.assert_called_once_with(
'parted', '--script', 'hello', '--', 'print',
run_as_root=True, use_standard_locale=True)
self.assertEqual(expected_table_type, ret)
PARTED_OUTPUT_UNFORMATTED = '''Model: whatever