Merge "Move some generic functions from IPA"

This commit is contained in:
Zuul 2020-09-02 22:54:40 +00:00 committed by Gerrit Code Review
commit 07d98520f0
5 changed files with 327 additions and 24 deletions

120
ironic_lib/capabilities.py Normal file
View File

@ -0,0 +1,120 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Code for working with capabilities."""
import json
import logging
from ironic_lib.common.i18n import _
LOG = logging.getLogger(__name__)
def _parse_old_format(cap_str, skip_malformed=True):
"""Extract capabilities from string.
:param cap_str: A string in the key1:value1,key2:value2 format.
:param skip_malformed: Whether to skip malformed items or raise ValueError.
:return: a dictionary
"""
capabilities = {}
for node_capability in cap_str.split(','):
parts = node_capability.split(':', 1)
if len(parts) == 2 and parts[0] and parts[1]:
capabilities[parts[0]] = parts[1]
else:
if skip_malformed:
LOG.warning("Ignoring malformed capability '%s'. "
"Format should be 'key:val'.", node_capability)
else:
raise ValueError(
_("Malformed capability %s. Format should be 'key:val'")
% node_capability)
return capabilities
def parse(capabilities, compat=True, skip_malformed=False):
"""Extract capabilities from provided object.
The capabilities value can either be a dict, or a json str, or
a key1:value1,key2:value2 formatted string (if compat is True).
If None, an empty dictionary is returned.
:param capabilities: The capabilities value. Can either be a dict, or
a json str, or a key1:value1,key2:value2 formatted
string (if compat is True).
:param compat: Whether to parse the old format key1:value1,key2:value2.
:param skip_malformed: Whether to skip malformed items or raise ValueError.
:returns: A dictionary with the capabilities if found and well formatted,
otherwise an empty dictionary.
:raises: TypeError if the capabilities are of invalid type.
:raises: ValueError on a malformed capability if skip_malformed is False
or on invalid JSON with compat is False.
"""
if capabilities is None:
return {}
elif isinstance(capabilities, str):
try:
return json.loads(capabilities)
except (ValueError, TypeError) as exc:
if compat:
return _parse_old_format(capabilities,
skip_malformed=skip_malformed)
else:
raise ValueError(
_('Invalid JSON capabilities %(value)s: %(error)s')
% {'value': capabilities, 'error': exc})
elif not isinstance(capabilities, dict):
raise TypeError(
_('Invalid capabilities, expected a string or a dict, got %s')
% capabilities)
else:
return capabilities
def combine(capabilities_dict, skip_none=False):
"""Combine capabilities into the old format.
:param capabilities_dict: Capabilities as a mapping.
:param skip_none: If True, skips all items with value of None.
:returns: Capabilities as a string key1:value1,key2:value2.
"""
return ','.join(["%s:%s" % (key, value)
for key, value in capabilities_dict.items()
if not skip_none or value is not None])
def update_and_combine(capabilities, new_values, skip_malformed=False,
skip_none=False):
"""Parses capabilities, updated them with new values and re-combines.
:param capabilities: The capabilities value. Can either be a dict, or
a json str, or a key1:value1,key2:value2 formatted
string (if compat is True).
:param new_values: New values as a dictionary.
:param skip_malformed: Whether to skip malformed items or raise ValueError.
:param skip_none: If True, skips all items with value of None.
:returns: Capabilities in the old format (key1:value1,key2:value2).
:raises: TypeError if the capabilities are of invalid type.
:raises: ValueError on a malformed capability if skip_malformed is False.
"""
if not isinstance(new_values, dict):
raise TypeError(
_("Cannot update capabilities. The new capabilities should be in "
"a dictionary. Provided value is %s") % new_values)
capabilities = parse(capabilities, skip_malformed=skip_malformed)
capabilities.update(new_values)
return combine(capabilities, skip_none=skip_none)

View File

@ -346,7 +346,7 @@ def make_partitions(dev, root_mb, swap_mb, ephemeral_mb,
if commit: if commit:
# write to the disk # write to the disk
dp.commit() dp.commit()
_trigger_device_rescan(dev) trigger_device_rescan(dev)
return part_dict return part_dict
@ -773,11 +773,8 @@ def _get_labelled_partition(device_path, label, node_uuid):
:returns: block device file for partition if it exists; otherwise it :returns: block device file for partition if it exists; otherwise it
returns None. returns None.
""" """
partprobe(device_path)
try: try:
utils.execute('partprobe', device_path, run_as_root=True,
attempts=CONF.disk_utils.partprobe_attempts)
# lsblk command
output, err = utils.execute('lsblk', '-Po', 'name,label', device_path, output, err = utils.execute('lsblk', '-Po', 'name,label', device_path,
check_exit_code=[0, 1], check_exit_code=[0, 1],
use_standard_locale=True, run_as_root=True) use_standard_locale=True, run_as_root=True)
@ -882,7 +879,50 @@ def fix_gpt_partition(device, node_uuid):
raise exception.InstanceDeployFailure(msg) raise exception.InstanceDeployFailure(msg)
def _trigger_device_rescan(device): def udev_settle():
"""Wait for the udev event queue to settle.
Wait for the udev event queue to settle to make sure all devices
are detected once the machine boots up.
:return: True on success, False otherwise.
"""
LOG.debug('Waiting until udev event queue is empty')
try:
utils.execute('udevadm', 'settle')
except processutils.ProcessExecutionError as e:
LOG.warning('Something went wrong when waiting for udev '
'to settle. Error: %s', e)
return False
else:
return True
def partprobe(device, attempts=None):
"""Probe partitions on the given device.
:param device: The block device containing paritions that is attempting
to be updated.
:param attempts: Number of attempts to run partprobe, the default is read
from the configuration.
:return: True on success, False otherwise.
"""
if attempts is None:
attempts = CONF.disk_utils.partprobe_attempts
try:
utils.execute('partprobe', device, run_as_root=True, attempts=attempts)
except (processutils.UnknownArgumentError,
processutils.ProcessExecutionError, OSError) as e:
LOG.warning("Unable to probe for partitions on device %(device)s, "
"the partitioning table may be broken. Error: %(error)s",
{'device': device, 'error': e})
return False
else:
return True
def trigger_device_rescan(device, attempts=None):
"""Sync and trigger device rescan. """Sync and trigger device rescan.
Disk partition performed via parted, when performed on a ramdisk Disk partition performed via parted, when performed on a ramdisk
@ -900,25 +940,25 @@ def _trigger_device_rescan(device):
:param device: The block device containing paritions that is attempting :param device: The block device containing paritions that is attempting
to be updated. to be updated.
:param attempts: Number of attempts to run partprobe, the default is read
from the configuration.
:return: True on success, False otherwise.
""" """
# TODO(TheJulia): This helper method was broken out for re-use, and LOG.debug('Explicitly calling sync to force buffer/cache flush')
# does not have an explicit test case for itself, and it should, but
# it's steps are fairly explicitly checked with mocks on execute in
# the partition code.
LOG.debug('Explicitly calling sync to force buffer/cache flush.')
utils.execute('sync') utils.execute('sync')
# Make sure any additions to the partitioning are reflected in the # Make sure any additions to the partitioning are reflected in the
# kernel. # kernel.
LOG.debug('Waiting until udev event queue is empty') udev_settle()
utils.execute('udevadm', 'settle') partprobe(device, attempts=attempts)
try: try:
utils.execute('partprobe', device, run_as_root=True,
attempts=CONF.disk_utils.partprobe_attempts)
# Also verify that the partitioning is correct now. # Also verify that the partitioning is correct now.
utils.execute('sgdisk', '-v', device, run_as_root=True) utils.execute('sgdisk', '-v', device, run_as_root=True)
except processutils.ProcessExecutionError as exc: except processutils.ProcessExecutionError as exc:
LOG.warning('Failed to verify partitioning after creating ' LOG.warning('Failed to verify partition tables on device %(dev)s: '
'partition(s): %s', exc) '%(err)s', {'dev': device, 'err': exc})
return False
else:
return True
def create_config_drive_partition(node_uuid, device, configdrive): def create_config_drive_partition(node_uuid, device, configdrive):
@ -1004,7 +1044,7 @@ def create_config_drive_partition(node_uuid, device, configdrive):
'mkpart', 'primary', 'fat32', startlimit, 'mkpart', 'primary', 'fat32', startlimit,
endlimit, run_as_root=True) endlimit, run_as_root=True)
# Trigger device rescan # Trigger device rescan
_trigger_device_rescan(device) trigger_device_rescan(device)
upd_parts = set(part['number'] for part in list_partitions(device)) upd_parts = set(part['number'] for part in list_partitions(device))
new_part = set(upd_parts) - set(cur_parts) new_part = set(upd_parts) - set(cur_parts)
@ -1021,8 +1061,7 @@ def create_config_drive_partition(node_uuid, device, configdrive):
else: else:
config_drive_part = '%s%s' % (device, new_part.pop()) config_drive_part = '%s%s' % (device, new_part.pop())
LOG.debug('Waiting until udev event queue is empty') udev_settle()
utils.execute('udevadm', 'settle')
# NOTE(vsaienko): check that devise actually exists, # NOTE(vsaienko): check that devise actually exists,
# it is not handled by udevadm when using ISCSI, for more info see: # it is not handled by udevadm when using ISCSI, for more info see:

View File

@ -0,0 +1,85 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
from ironic_lib import capabilities
from ironic_lib.tests import base
class ParseTestCase(base.IronicLibTestCase):
def test_none(self):
self.assertEqual({}, capabilities.parse(None))
def test_from_dict(self):
expected_dict = {"hello": "world"}
self.assertDictEqual(expected_dict, capabilities.parse(expected_dict))
def test_from_json_string(self):
caps = '{"test": "world"}'
self.assertDictEqual({"test": "world"}, capabilities.parse(caps))
def test_from_old_format(self):
caps = 'hello:test1,cat:meow'
self.assertDictEqual({'hello': 'test1', 'cat': 'meow'},
capabilities.parse(caps))
def test_from_old_format_with_malformed(self):
caps = 'hello:test1,badformat'
self.assertRaisesRegex(ValueError, 'Malformed capability',
capabilities.parse, caps)
def test_from_old_format_skip_malformed(self):
caps = 'hello:test1,badformat'
self.assertDictEqual({'hello': 'test1'},
capabilities.parse(caps, skip_malformed=True))
def test_no_old_format(self):
caps = 'hello:test1,cat:meow'
self.assertRaisesRegex(ValueError, 'Invalid JSON capabilities',
capabilities.parse, caps, compat=False)
def test_unexpected_type(self):
self.assertRaisesRegex(TypeError, 'Invalid capabilities',
capabilities.parse, 42)
class CombineTestCase(base.IronicLibTestCase):
def test_combine(self):
caps = capabilities.combine(
collections.OrderedDict([('hello', None), ('cat', 'meow')]))
self.assertEqual('hello:None,cat:meow', caps)
def test_skip_none(self):
caps = capabilities.combine(
collections.OrderedDict([('hello', None), ('cat', 'meow')]),
skip_none=True)
self.assertEqual('cat:meow', caps)
class UpdateAndCombineTestCase(base.IronicLibTestCase):
def test_from_dict(self):
result = capabilities.update_and_combine(
{'key1': 'old value', 'key2': 'value2'}, {'key1': 'value1'})
self.assertIn(result, ['key1:value1,key2:value2',
'key2:value2,key1:value1'])
def test_from_old_format(self):
result = capabilities.update_and_combine(
'key1:old value,key2:value2', {'key1': 'value1'})
self.assertIn(result, ['key1:value1,key2:value2',
'key2:value2,key1:value1'])
def test_skip_none(self):
result = capabilities.update_and_combine(
'key1:old value,key2:value2', {'key1': None}, skip_none=True)
self.assertEqual('key2:value2', result)

View File

@ -845,9 +845,9 @@ class PopulateImageTestCase(base.IronicLibTestCase):
self.assertFalse(mock_dd.called) self.assertFalse(mock_dd.called)
# NOTE(TheJulia): _trigger_device_rescan is systemwide thus pointless # NOTE(TheJulia): trigger_device_rescan is systemwide thus pointless
# to execute in the file test case. Also, CI unit test jobs lack sgdisk. # to execute in the file test case. Also, CI unit test jobs lack sgdisk.
@mock.patch.object(disk_utils, '_trigger_device_rescan', lambda *_: None) @mock.patch.object(disk_utils, 'trigger_device_rescan', lambda *_: None)
@mock.patch.object(utils, 'wait_for_disk_to_become_available', lambda *_: None) @mock.patch.object(utils, 'wait_for_disk_to_become_available', lambda *_: None)
@mock.patch.object(disk_utils, 'is_block_device', lambda d: True) @mock.patch.object(disk_utils, 'is_block_device', lambda d: True)
@mock.patch.object(disk_utils, 'block_uuid', lambda p: 'uuid') @mock.patch.object(disk_utils, 'block_uuid', lambda p: 'uuid')
@ -1269,8 +1269,13 @@ class WholeDiskPartitionTestCases(base.IronicLibTestCase):
'Failed to retrieve partition labels', 'Failed to retrieve partition labels',
disk_utils._get_labelled_partition, self.dev, disk_utils._get_labelled_partition, self.dev,
self.config_part_label, self.node_uuid) self.config_part_label, self.node_uuid)
mock_execute.assert_called_once_with( execute_calls = [
'partprobe', self.dev, run_as_root=True, attempts=10) mock.call('partprobe', self.dev, run_as_root=True, attempts=10),
mock.call('lsblk', '-Po', 'name,label', self.dev,
check_exit_code=[0, 1],
use_standard_locale=True, run_as_root=True)
]
mock_execute.assert_has_calls(execute_calls)
self.assertEqual(1, mock_log.call_count) self.assertEqual(1, mock_log.call_count)
def _test_is_disk_larger_than_max_size(self, mock_execute, blk_out): def _test_is_disk_larger_than_max_size(self, mock_execute, blk_out):
@ -1896,3 +1901,37 @@ class WholeDiskConfigDriveTestCases(base.IronicLibTestCase):
mock_fix_gpt_partition.assert_called_with(self.dev, self.node_uuid) mock_fix_gpt_partition.assert_called_with(self.dev, self.node_uuid)
mock_is_disk_gpt.assert_called_with(self.dev, self.node_uuid) mock_is_disk_gpt.assert_called_with(self.dev, self.node_uuid)
mock_count.assert_called_once_with(self.dev) mock_count.assert_called_once_with(self.dev)
@mock.patch.object(utils, 'execute', autospec=True)
class TriggerDeviceRescanTestCase(base.IronicLibTestCase):
def test_trigger(self, mock_execute):
self.assertTrue(disk_utils.trigger_device_rescan('/dev/fake'))
mock_execute.assert_has_calls([
mock.call('sync'),
mock.call('udevadm', 'settle'),
mock.call('partprobe', '/dev/fake', run_as_root=True, attempts=10),
mock.call('sgdisk', '-v', '/dev/fake', run_as_root=True),
])
def test_custom_attempts(self, mock_execute):
self.assertTrue(
disk_utils.trigger_device_rescan('/dev/fake', attempts=1))
mock_execute.assert_has_calls([
mock.call('sync'),
mock.call('udevadm', 'settle'),
mock.call('partprobe', '/dev/fake', run_as_root=True, attempts=1),
mock.call('sgdisk', '-v', '/dev/fake', run_as_root=True),
])
def test_fails(self, mock_execute):
mock_execute.side_effect = [('', '')] * 3 + [
processutils.ProcessExecutionError
]
self.assertFalse(disk_utils.trigger_device_rescan('/dev/fake'))
mock_execute.assert_has_calls([
mock.call('sync'),
mock.call('udevadm', 'settle'),
mock.call('partprobe', '/dev/fake', run_as_root=True, attempts=10),
mock.call('sgdisk', '-v', '/dev/fake', run_as_root=True),
])

View File

@ -105,6 +105,26 @@ def execute(*cmd, **kwargs):
return result return result
def try_execute(*cmd, **kwargs):
"""The same as execute but returns None on error.
Executes and logs results from a system command. See docs for
oslo_concurrency.processutils.execute for usage.
Instead of raising an exception on failure, this method simply
returns None in case of failure.
:param cmd: positional arguments to pass to processutils.execute()
:param kwargs: keyword arguments to pass to processutils.execute()
:raises: UnknownArgumentError on receiving unknown arguments
:returns: tuple of (stdout, stderr) or None in some error cases
"""
try:
return execute(*cmd, **kwargs)
except (processutils.ProcessExecutionError, OSError) as e:
LOG.debug('Command failed: %s', e)
def mkfs(fs, path, label=None): def mkfs(fs, path, label=None):
"""Format a file or block device """Format a file or block device