# Copyright 2014 Red Hat, Inc. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import os import stat from unittest import mock from ironic_lib import exception from ironic_lib import qemu_img from ironic_lib.tests import base from ironic_lib import utils from oslo_concurrency import processutils from oslo_config import cfg from ironic_python_agent import disk_utils CONF = cfg.CONF @mock.patch.object(utils, 'execute', autospec=True) class ListPartitionsTestCase(base.IronicLibTestCase): def test_correct(self, execute_mock): output = """ BYT; /dev/sda:500107862016B:scsi:512:4096:msdos:ATA HGST HTS725050A7:; 1:1.00MiB:501MiB:500MiB:ext4::boot; 2:501MiB:476940MiB:476439MiB:::; """ expected = [ {'number': 1, 'start': 1, 'end': 501, 'size': 500, 'filesystem': 'ext4', 'partition_name': '', 'flags': 'boot', 'path': '/dev/fake1'}, {'number': 2, 'start': 501, 'end': 476940, 'size': 476439, 'filesystem': '', 'partition_name': '', 'flags': '', 'path': '/dev/fake2'}, ] execute_mock.return_value = (output, '') result = disk_utils.list_partitions('/dev/fake') self.assertEqual(expected, result) execute_mock.assert_called_once_with( 'parted', '-s', '-m', '/dev/fake', 'unit', 'MiB', 'print', use_standard_locale=True) @mock.patch.object(disk_utils.LOG, 'warning', autospec=True) def test_incorrect(self, log_mock, execute_mock): output = """ BYT; /dev/sda:500107862016B:scsi:512:4096:msdos:ATA HGST HTS725050A7:; 1:XX1076MiB:---:524MiB:ext4::boot; """ execute_mock.return_value = (output, '') self.assertEqual([], disk_utils.list_partitions('/dev/fake')) self.assertEqual(1, log_mock.call_count) def test_correct_gpt_nvme(self, execute_mock): output = """ BYT; /dev/vda:40960MiB:virtblk:512:512:gpt:Virtio Block Device:; 2:1.00MiB:2.00MiB:1.00MiB::Bios partition:bios_grub; 1:4.00MiB:5407MiB:5403MiB:ext4:Root partition:; 3:5407MiB:5507MiB:100MiB:fat16:Boot partition:boot, esp; """ expected = [ {'end': 2, 'number': 2, 'start': 1, 'flags': 'bios_grub', 'filesystem': '', 'partition_name': 'Bios partition', 'size': 1, 'path': '/dev/fake0p2'}, {'end': 5407, 'number': 1, 'start': 4, 'flags': '', 'filesystem': 'ext4', 'partition_name': 'Root partition', 'size': 5403, 'path': '/dev/fake0p1'}, {'end': 5507, 'number': 3, 'start': 5407, 'flags': 'boot, esp', 'filesystem': 'fat16', 'partition_name': 'Boot partition', 'size': 100, 'path': '/dev/fake0p3'}, ] execute_mock.return_value = (output, '') result = disk_utils.list_partitions('/dev/fake0') self.assertEqual(expected, result) execute_mock.assert_called_once_with( 'parted', '-s', '-m', '/dev/fake0', 'unit', 'MiB', 'print', use_standard_locale=True) @mock.patch.object(disk_utils.LOG, 'warning', autospec=True) def test_incorrect_gpt(self, log_mock, execute_mock): output = """ BYT; /dev/vda:40960MiB:virtblk:512:512:gpt:Virtio Block Device:; 2:XX1.00MiB:---:1.00MiB::primary:bios_grub; """ execute_mock.return_value = (output, '') self.assertEqual([], disk_utils.list_partitions('/dev/fake')) self.assertEqual(1, log_mock.call_count) @mock.patch.object(utils, 'execute', autospec=True) class MakePartitionsTestCase(base.IronicLibTestCase): def setUp(self): super(MakePartitionsTestCase, self).setUp() self.dev = 'fake-dev' self.root_mb = 1024 self.swap_mb = 512 self.ephemeral_mb = 0 self.configdrive_mb = 0 self.node_uuid = "12345678-1234-1234-1234-1234567890abcxyz" self.efi_size = CONF.disk_utils.efi_system_partition_size self.bios_size = CONF.disk_utils.bios_boot_partition_size def _get_parted_cmd(self, dev, label=None): if label is None: label = 'msdos' return ['parted', '-a', 'optimal', '-s', dev, '--', 'unit', 'MiB', 'mklabel', label] def _add_efi_sz(self, x): return str(x + self.efi_size) def _add_bios_sz(self, x): return str(x + self.bios_size) def _test_make_partitions(self, mock_exc, boot_option, boot_mode='bios', disk_label=None, cpu_arch=""): mock_exc.return_value = ('', '') disk_utils.make_partitions(self.dev, self.root_mb, self.swap_mb, self.ephemeral_mb, self.configdrive_mb, self.node_uuid, boot_option=boot_option, boot_mode=boot_mode, disk_label=disk_label, cpu_arch=cpu_arch) if boot_option == "local" and boot_mode == "uefi": expected_mkpart = ['mkpart', 'primary', 'fat32', '1', self._add_efi_sz(1), 'set', '1', 'boot', 'on', 'mkpart', 'primary', 'linux-swap', self._add_efi_sz(1), self._add_efi_sz(513), 'mkpart', 'primary', '', self._add_efi_sz(513), self._add_efi_sz(1537)] else: if boot_option == "local": if disk_label == "gpt": if cpu_arch.startswith('ppc64'): expected_mkpart = ['mkpart', 'primary', '', '1', '9', 'set', '1', 'prep', 'on', 'mkpart', 'primary', 'linux-swap', '9', '521', 'mkpart', 'primary', '', '521', '1545'] else: expected_mkpart = ['mkpart', 'primary', '', '1', self._add_bios_sz(1), 'set', '1', 'bios_grub', 'on', 'mkpart', 'primary', 'linux-swap', self._add_bios_sz(1), self._add_bios_sz(513), 'mkpart', 'primary', '', self._add_bios_sz(513), self._add_bios_sz(1537)] elif cpu_arch.startswith('ppc64'): expected_mkpart = ['mkpart', 'primary', '', '1', '9', 'set', '1', 'boot', 'on', 'set', '1', 'prep', 'on', 'mkpart', 'primary', 'linux-swap', '9', '521', 'mkpart', 'primary', '', '521', '1545'] else: expected_mkpart = ['mkpart', 'primary', 'linux-swap', '1', '513', 'mkpart', 'primary', '', '513', '1537', 'set', '2', 'boot', 'on'] else: expected_mkpart = ['mkpart', 'primary', 'linux-swap', '1', '513', 'mkpart', 'primary', '', '513', '1537'] self.dev = 'fake-dev' parted_cmd = (self._get_parted_cmd(self.dev, disk_label) + expected_mkpart) parted_call = mock.call(*parted_cmd, use_standard_locale=True) fuser_cmd = ['fuser', 'fake-dev'] fuser_call = mock.call(*fuser_cmd, check_exit_code=[0, 1]) sync_calls = [mock.call('sync'), mock.call('udevadm', 'settle'), mock.call('partprobe', self.dev, attempts=10), mock.call('udevadm', 'settle'), mock.call('sgdisk', '-v', self.dev)] mock_exc.assert_has_calls([parted_call, fuser_call] + sync_calls) def test_make_partitions(self, mock_exc): self._test_make_partitions(mock_exc, boot_option="netboot") def test_make_partitions_local_boot(self, mock_exc): self._test_make_partitions(mock_exc, boot_option="local") def test_make_partitions_local_boot_uefi(self, mock_exc): self._test_make_partitions(mock_exc, boot_option="local", boot_mode="uefi", disk_label="gpt") def test_make_partitions_local_boot_gpt_bios(self, mock_exc): self._test_make_partitions(mock_exc, boot_option="local", disk_label="gpt") def test_make_partitions_disk_label_gpt(self, mock_exc): self._test_make_partitions(mock_exc, boot_option="netboot", disk_label="gpt") def test_make_partitions_mbr_with_prep(self, mock_exc): self._test_make_partitions(mock_exc, boot_option="local", disk_label="msdos", cpu_arch="ppc64le") def test_make_partitions_gpt_with_prep(self, mock_exc): self._test_make_partitions(mock_exc, boot_option="local", disk_label="gpt", cpu_arch="ppc64le") def test_make_partitions_with_ephemeral(self, mock_exc): self.ephemeral_mb = 2048 expected_mkpart = ['mkpart', 'primary', '', '1', '2049', 'mkpart', 'primary', 'linux-swap', '2049', '2561', 'mkpart', 'primary', '', '2561', '3585'] self.dev = 'fake-dev' cmd = self._get_parted_cmd(self.dev) + expected_mkpart mock_exc.return_value = ('', '') disk_utils.make_partitions(self.dev, self.root_mb, self.swap_mb, self.ephemeral_mb, self.configdrive_mb, self.node_uuid) parted_call = mock.call(*cmd, use_standard_locale=True) mock_exc.assert_has_calls([parted_call]) def test_make_partitions_with_iscsi_device(self, mock_exc): self.ephemeral_mb = 2048 expected_mkpart = ['mkpart', 'primary', '', '1', '2049', 'mkpart', 'primary', 'linux-swap', '2049', '2561', 'mkpart', 'primary', '', '2561', '3585'] self.dev = '/dev/iqn.2008-10.org.openstack:%s.fake-9' % self.node_uuid ep = '/dev/iqn.2008-10.org.openstack:%s.fake-9-part1' % self.node_uuid swap = ('/dev/iqn.2008-10.org.openstack:%s.fake-9-part2' % self.node_uuid) root = ('/dev/iqn.2008-10.org.openstack:%s.fake-9-part3' % self.node_uuid) expected_result = {'ephemeral': ep, 'swap': swap, 'root': root} cmd = self._get_parted_cmd(self.dev) + expected_mkpart mock_exc.return_value = ('', '') result = disk_utils.make_partitions( self.dev, self.root_mb, self.swap_mb, self.ephemeral_mb, self.configdrive_mb, self.node_uuid) parted_call = mock.call(*cmd, use_standard_locale=True) mock_exc.assert_has_calls([parted_call]) self.assertEqual(expected_result, result) def test_make_partitions_with_nvme_device(self, mock_exc): self.ephemeral_mb = 2048 expected_mkpart = ['mkpart', 'primary', '', '1', '2049', 'mkpart', 'primary', 'linux-swap', '2049', '2561', 'mkpart', 'primary', '', '2561', '3585'] self.dev = '/dev/nvmefake-9' ep = '/dev/nvmefake-9p1' swap = '/dev/nvmefake-9p2' root = '/dev/nvmefake-9p3' expected_result = {'ephemeral': ep, 'swap': swap, 'root': root} cmd = self._get_parted_cmd(self.dev) + expected_mkpart mock_exc.return_value = ('', '') result = disk_utils.make_partitions( self.dev, self.root_mb, self.swap_mb, self.ephemeral_mb, self.configdrive_mb, self.node_uuid) parted_call = mock.call(*cmd, use_standard_locale=True) mock_exc.assert_has_calls([parted_call]) self.assertEqual(expected_result, result) def test_make_partitions_with_local_device(self, mock_exc): self.ephemeral_mb = 2048 expected_mkpart = ['mkpart', 'primary', '', '1', '2049', 'mkpart', 'primary', 'linux-swap', '2049', '2561', 'mkpart', 'primary', '', '2561', '3585'] self.dev = 'fake-dev' expected_result = {'ephemeral': 'fake-dev1', 'swap': 'fake-dev2', 'root': 'fake-dev3'} cmd = self._get_parted_cmd(self.dev) + expected_mkpart mock_exc.return_value = ('', '') result = disk_utils.make_partitions( self.dev, self.root_mb, self.swap_mb, self.ephemeral_mb, self.configdrive_mb, self.node_uuid) parted_call = mock.call(*cmd, use_standard_locale=True) mock_exc.assert_has_calls([parted_call]) self.assertEqual(expected_result, result) @mock.patch.object(utils, 'execute', autospec=True) class DestroyMetaDataTestCase(base.IronicLibTestCase): def setUp(self): super(DestroyMetaDataTestCase, self).setUp() self.dev = 'fake-dev' self.node_uuid = "12345678-1234-1234-1234-1234567890abcxyz" def test_destroy_disk_metadata_4096(self, mock_exec): mock_exec.side_effect = iter([ (None, None), ('4096\n', None), ('524288\n', None), (None, None), (None, None), (None, None), (None, None)]) expected_calls = [mock.call('wipefs', '--force', '--all', 'fake-dev', use_standard_locale=True), mock.call('blockdev', '--getss', 'fake-dev'), mock.call('blockdev', '--getsize64', 'fake-dev'), mock.call('dd', 'bs=4096', 'if=/dev/zero', 'of=fake-dev', 'count=5', 'oflag=direct', use_standard_locale=True), mock.call('dd', 'bs=4096', 'if=/dev/zero', 'of=fake-dev', 'count=5', 'oflag=direct', 'seek=123', use_standard_locale=True), mock.call('sgdisk', '-Z', 'fake-dev', use_standard_locale=True), mock.call('fuser', self.dev, check_exit_code=[0, 1])] disk_utils.destroy_disk_metadata(self.dev, self.node_uuid) mock_exec.assert_has_calls(expected_calls) def test_destroy_disk_metadata(self, mock_exec): # Note(TheJulia): This list will get-reused, but only the second # execution returning a string is needed for the test as otherwise # command output is not used. mock_exec.side_effect = iter([ (None, None), ('512\n', None), ('524288\n', None), (None, None), (None, None), (None, None), (None, None)]) expected_calls = [mock.call('wipefs', '--force', '--all', 'fake-dev', use_standard_locale=True), mock.call('blockdev', '--getss', 'fake-dev'), mock.call('blockdev', '--getsize64', 'fake-dev'), mock.call('dd', 'bs=512', 'if=/dev/zero', 'of=fake-dev', 'count=33', 'oflag=direct', use_standard_locale=True), mock.call('dd', 'bs=512', 'if=/dev/zero', 'of=fake-dev', 'count=33', 'oflag=direct', 'seek=991', use_standard_locale=True), mock.call('sgdisk', '-Z', 'fake-dev', use_standard_locale=True), mock.call('fuser', self.dev, check_exit_code=[0, 1])] disk_utils.destroy_disk_metadata(self.dev, self.node_uuid) mock_exec.assert_has_calls(expected_calls) def test_destroy_disk_metadata_wipefs_fail(self, mock_exec): mock_exec.side_effect = processutils.ProcessExecutionError expected_call = [mock.call('wipefs', '--force', '--all', 'fake-dev', use_standard_locale=True)] self.assertRaises(processutils.ProcessExecutionError, disk_utils.destroy_disk_metadata, self.dev, self.node_uuid) mock_exec.assert_has_calls(expected_call) def test_destroy_disk_metadata_sgdisk_fail(self, mock_exec): expected_calls = [mock.call('wipefs', '--force', '--all', 'fake-dev', use_standard_locale=True), mock.call('blockdev', '--getss', 'fake-dev'), mock.call('blockdev', '--getsize64', 'fake-dev'), mock.call('dd', 'bs=512', 'if=/dev/zero', 'of=fake-dev', 'count=33', 'oflag=direct', use_standard_locale=True), mock.call('dd', 'bs=512', 'if=/dev/zero', 'of=fake-dev', 'count=33', 'oflag=direct', 'seek=991', use_standard_locale=True), mock.call('sgdisk', '-Z', 'fake-dev', use_standard_locale=True)] mock_exec.side_effect = iter([ (None, None), ('512\n', None), ('524288\n', None), (None, None), (None, None), processutils.ProcessExecutionError()]) self.assertRaises(processutils.ProcessExecutionError, disk_utils.destroy_disk_metadata, self.dev, self.node_uuid) mock_exec.assert_has_calls(expected_calls) def test_destroy_disk_metadata_wipefs_not_support_force(self, mock_exec): mock_exec.side_effect = iter([ processutils.ProcessExecutionError(description='--force'), (None, None), ('512\n', None), ('524288\n', None), (None, None), (None, None), (None, None), (None, None)]) expected_call = [mock.call('wipefs', '--force', '--all', 'fake-dev', use_standard_locale=True), mock.call('wipefs', '--all', 'fake-dev', use_standard_locale=True)] disk_utils.destroy_disk_metadata(self.dev, self.node_uuid) mock_exec.assert_has_calls(expected_call) def test_destroy_disk_metadata_ebr(self, mock_exec): expected_calls = [mock.call('wipefs', '--force', '--all', 'fake-dev', use_standard_locale=True), mock.call('blockdev', '--getss', 'fake-dev'), mock.call('blockdev', '--getsize64', 'fake-dev'), mock.call('dd', 'bs=512', 'if=/dev/zero', 'of=fake-dev', 'count=2', 'oflag=direct', use_standard_locale=True), mock.call('sgdisk', '-Z', 'fake-dev', use_standard_locale=True)] mock_exec.side_effect = iter([ (None, None), ('512\n', None), ('1024\n', None), # an EBR is 2 sectors (None, None), (None, None), (None, None), (None, None)]) disk_utils.destroy_disk_metadata(self.dev, self.node_uuid) mock_exec.assert_has_calls(expected_calls) def test_destroy_disk_metadata_tiny_partition(self, mock_exec): expected_calls = [mock.call('wipefs', '--force', '--all', 'fake-dev', use_standard_locale=True), mock.call('blockdev', '--getss', 'fake-dev'), mock.call('blockdev', '--getsize64', 'fake-dev'), mock.call('dd', 'bs=512', 'if=/dev/zero', 'of=fake-dev', 'count=33', 'oflag=direct', use_standard_locale=True), mock.call('dd', 'bs=512', 'if=/dev/zero', 'of=fake-dev', 'count=33', 'oflag=direct', 'seek=9', use_standard_locale=True), mock.call('sgdisk', '-Z', 'fake-dev', use_standard_locale=True)] mock_exec.side_effect = iter([ (None, None), ('512\n', None), ('21504\n', None), (None, None), (None, None), (None, None), (None, None)]) disk_utils.destroy_disk_metadata(self.dev, self.node_uuid) mock_exec.assert_has_calls(expected_calls) @mock.patch.object(utils, 'execute', autospec=True) class GetDeviceByteSizeTestCase(base.IronicLibTestCase): def setUp(self): super(GetDeviceByteSizeTestCase, self).setUp() self.dev = 'fake-dev' self.node_uuid = "12345678-1234-1234-1234-1234567890abcxyz" def test_get_dev_byte_size(self, mock_exec): mock_exec.return_value = ("64", "") expected_call = [mock.call('blockdev', '--getsize64', self.dev)] disk_utils.get_dev_byte_size(self.dev) mock_exec.assert_has_calls(expected_call) @mock.patch.object(disk_utils, 'dd', autospec=True) @mock.patch.object(qemu_img, 'image_info', autospec=True) @mock.patch.object(qemu_img, 'convert_image', autospec=True) class PopulateImageTestCase(base.IronicLibTestCase): def test_populate_raw_image(self, mock_cg, mock_qinfo, mock_dd): type(mock_qinfo.return_value).file_format = mock.PropertyMock( return_value='raw') disk_utils.populate_image('src', 'dst') mock_dd.assert_called_once_with('src', 'dst', conv_flags=None) self.assertFalse(mock_cg.called) def test_populate_raw_image_with_convert(self, mock_cg, mock_qinfo, mock_dd): type(mock_qinfo.return_value).file_format = mock.PropertyMock( return_value='raw') disk_utils.populate_image('src', 'dst', conv_flags='sparse') mock_dd.assert_called_once_with('src', 'dst', conv_flags='sparse') self.assertFalse(mock_cg.called) def test_populate_qcow2_image(self, mock_cg, mock_qinfo, mock_dd): type(mock_qinfo.return_value).file_format = mock.PropertyMock( return_value='qcow2') disk_utils.populate_image('src', 'dst') mock_cg.assert_called_once_with('src', 'dst', 'raw', True, sparse_size='0') self.assertFalse(mock_dd.called) @mock.patch('time.sleep', lambda sec: None) class OtherFunctionTestCase(base.IronicLibTestCase): @mock.patch.object(os, 'stat', autospec=True) @mock.patch.object(stat, 'S_ISBLK', autospec=True) def test_is_block_device_works(self, mock_is_blk, mock_os): device = '/dev/disk/by-path/ip-1.2.3.4:5678-iscsi-iqn.fake-lun-9' mock_is_blk.return_value = True mock_os().st_mode = 10000 self.assertTrue(disk_utils.is_block_device(device)) mock_is_blk.assert_called_once_with(mock_os().st_mode) @mock.patch.object(os, 'stat', autospec=True) def test_is_block_device_raises(self, mock_os): device = '/dev/disk/by-path/ip-1.2.3.4:5678-iscsi-iqn.fake-lun-9' mock_os.side_effect = OSError self.assertRaises(exception.InstanceDeployFailure, disk_utils.is_block_device, device) mock_os.assert_has_calls([mock.call(device)] * 3) @mock.patch.object(os, 'stat', autospec=True) def test_is_block_device_attempts(self, mock_os): CONF.set_override('partition_detection_attempts', 2, group='disk_utils') device = '/dev/disk/by-path/ip-1.2.3.4:5678-iscsi-iqn.fake-lun-9' mock_os.side_effect = OSError self.assertRaises(exception.InstanceDeployFailure, disk_utils.is_block_device, device) mock_os.assert_has_calls([mock.call(device)] * 2) @mock.patch.object(os.path, 'getsize', autospec=True) @mock.patch.object(qemu_img, 'image_info', autospec=True) def test_get_image_mb(self, mock_qinfo, mock_getsize): mb = 1024 * 1024 mock_getsize.return_value = 0 type(mock_qinfo.return_value).virtual_size = mock.PropertyMock( return_value=0) self.assertEqual(0, disk_utils.get_image_mb('x', False)) self.assertEqual(0, disk_utils.get_image_mb('x', True)) mock_getsize.return_value = 1 type(mock_qinfo.return_value).virtual_size = mock.PropertyMock( return_value=1) self.assertEqual(1, disk_utils.get_image_mb('x', False)) self.assertEqual(1, disk_utils.get_image_mb('x', True)) mock_getsize.return_value = mb type(mock_qinfo.return_value).virtual_size = mock.PropertyMock( return_value=mb) self.assertEqual(1, disk_utils.get_image_mb('x', False)) self.assertEqual(1, disk_utils.get_image_mb('x', True)) mock_getsize.return_value = mb + 1 type(mock_qinfo.return_value).virtual_size = mock.PropertyMock( return_value=mb + 1) self.assertEqual(2, disk_utils.get_image_mb('x', False)) self.assertEqual(2, disk_utils.get_image_mb('x', True)) def _test_count_mbr_partitions(self, output, mock_execute): mock_execute.return_value = (output, '') out = disk_utils.count_mbr_partitions('/dev/fake') mock_execute.assert_called_once_with('partprobe', '-d', '-s', '/dev/fake', use_standard_locale=True) return out @mock.patch.object(utils, 'execute', autospec=True) def test_count_mbr_partitions(self, mock_execute): output = "/dev/fake: msdos partitions 1 2 3 <5 6>" pp, lp = self._test_count_mbr_partitions(output, mock_execute) self.assertEqual(3, pp) self.assertEqual(2, lp) @mock.patch.object(utils, 'execute', autospec=True) def test_count_mbr_partitions_no_logical_partitions(self, mock_execute): output = "/dev/fake: msdos partitions 1 2" pp, lp = self._test_count_mbr_partitions(output, mock_execute) self.assertEqual(2, pp) self.assertEqual(0, lp) @mock.patch.object(utils, 'execute', autospec=True) def test_count_mbr_partitions_wrong_partition_table(self, mock_execute): output = "/dev/fake: gpt partitions 1 2 3 4 5 6" mock_execute.return_value = (output, '') self.assertRaises(ValueError, disk_utils.count_mbr_partitions, '/dev/fake') mock_execute.assert_called_once_with('partprobe', '-d', '-s', '/dev/fake', use_standard_locale=True) @mock.patch.object(disk_utils, 'get_device_information', autospec=True) def test_block_uuid(self, mock_get_device_info): mock_get_device_info.return_value = {'UUID': '123', 'PARTUUID': '123456'} self.assertEqual('123', disk_utils.block_uuid('/dev/fake')) mock_get_device_info.assert_called_once_with( '/dev/fake', fields=['UUID', 'PARTUUID']) @mock.patch.object(disk_utils, 'get_device_information', autospec=True) def test_block_uuid_fallback_to_uuid(self, mock_get_device_info): mock_get_device_info.return_value = {'PARTUUID': '123456'} self.assertEqual('123456', disk_utils.block_uuid('/dev/fake')) mock_get_device_info.assert_called_once_with( '/dev/fake', fields=['UUID', 'PARTUUID']) @mock.patch.object(utils, 'execute', autospec=True) class FixGptStructsTestCases(base.IronicLibTestCase): def setUp(self): super(FixGptStructsTestCases, self).setUp() self.dev = "/dev/fake" self.config_part_label = "config-2" self.node_uuid = "12345678-1234-1234-1234-1234567890abcxyz" def test_fix_gpt_structs_fix_required(self, mock_execute): sgdisk_v_output = """ Problem: The secondary header's self-pointer indicates that it doesn't reside at the end of the disk. If you've added a disk to a RAID array, use the 'e' option on the experts' menu to adjust the secondary header's and partition table's locations. Identified 1 problems! """ mock_execute.return_value = (sgdisk_v_output, '') execute_calls = [ mock.call('sgdisk', '-v', '/dev/fake'), mock.call('sgdisk', '-e', '/dev/fake') ] disk_utils._fix_gpt_structs('/dev/fake', self.node_uuid) mock_execute.assert_has_calls(execute_calls) def test_fix_gpt_structs_fix_not_required(self, mock_execute): mock_execute.return_value = ('', '') disk_utils._fix_gpt_structs('/dev/fake', self.node_uuid) mock_execute.assert_called_once_with('sgdisk', '-v', '/dev/fake') @mock.patch.object(disk_utils.LOG, 'error', autospec=True) def test_fix_gpt_structs_exc(self, mock_log, mock_execute): mock_execute.side_effect = processutils.ProcessExecutionError self.assertRaisesRegex(exception.InstanceDeployFailure, 'Failed to fix GPT data structures on disk', disk_utils._fix_gpt_structs, self.dev, self.node_uuid) mock_execute.assert_called_once_with('sgdisk', '-v', '/dev/fake') self.assertEqual(1, mock_log.call_count) @mock.patch.object(utils, 'execute', autospec=True) class TriggerDeviceRescanTestCase(base.IronicLibTestCase): def test_trigger(self, mock_execute): self.assertTrue(disk_utils.trigger_device_rescan('/dev/fake')) mock_execute.assert_has_calls([ mock.call('sync'), mock.call('udevadm', 'settle'), mock.call('partprobe', '/dev/fake', attempts=10), mock.call('udevadm', 'settle'), mock.call('sgdisk', '-v', '/dev/fake'), ]) def test_custom_attempts(self, mock_execute): self.assertTrue( disk_utils.trigger_device_rescan('/dev/fake', attempts=1)) mock_execute.assert_has_calls([ mock.call('sync'), mock.call('udevadm', 'settle'), mock.call('partprobe', '/dev/fake', attempts=1), mock.call('udevadm', 'settle'), mock.call('sgdisk', '-v', '/dev/fake'), ]) def test_fails(self, mock_execute): mock_execute.side_effect = [('', '')] * 4 + [ processutils.ProcessExecutionError ] self.assertFalse(disk_utils.trigger_device_rescan('/dev/fake')) mock_execute.assert_has_calls([ mock.call('sync'), mock.call('udevadm', 'settle'), mock.call('partprobe', '/dev/fake', attempts=10), mock.call('udevadm', 'settle'), mock.call('sgdisk', '-v', '/dev/fake'), ]) BLKID_PROBE = (""" /dev/disk/by-path/ip-10.1.0.52:3260-iscsi-iqn.2008-10.org.openstack: """ """PTUUID="123456" PTTYPE="gpt" """) LSBLK_NORMAL = ( 'UUID="123" BLOCK_SIZE="512" TYPE="vfat" ' 'PARTLABEL="EFI System Partition" PARTUUID="123456"' ) @mock.patch.object(utils, 'execute', autospec=True) class GetDeviceInformationTestCase(base.IronicLibTestCase): def test_normal(self, mock_execute): mock_execute.return_value = LSBLK_NORMAL, "" result = disk_utils.get_device_information('/dev/fake') self.assertEqual( {'UUID': '123', 'BLOCK_SIZE': '512', 'TYPE': 'vfat', 'PARTLABEL': 'EFI System Partition', 'PARTUUID': '123456'}, result ) mock_execute.assert_called_once_with( 'lsblk', '/dev/fake', '--pairs', '--bytes', '--ascii', '--nodeps', '--output-all', use_standard_locale=True) def test_fields(self, mock_execute): mock_execute.return_value = LSBLK_NORMAL, "" result = disk_utils.get_device_information('/dev/fake', fields=['UUID', 'LABEL']) # No filtering on our side, so returning all fake fields self.assertEqual( {'UUID': '123', 'BLOCK_SIZE': '512', 'TYPE': 'vfat', 'PARTLABEL': 'EFI System Partition', 'PARTUUID': '123456'}, result ) mock_execute.assert_called_once_with( 'lsblk', '/dev/fake', '--pairs', '--bytes', '--ascii', '--nodeps', '--output', 'UUID,LABEL', use_standard_locale=True) def test_empty(self, mock_execute): mock_execute.return_value = "\n", "" result = disk_utils.get_device_information('/dev/fake') self.assertEqual({}, result) mock_execute.assert_called_once_with( 'lsblk', '/dev/fake', '--pairs', '--bytes', '--ascii', '--nodeps', '--output-all', use_standard_locale=True) @mock.patch.object(utils, 'execute', autospec=True) class GetPartitionTableTypeTestCase(base.IronicLibTestCase): def test_gpt(self, mocked_execute): self._test_by_type(mocked_execute, 'gpt', 'gpt') def test_msdos(self, mocked_execute): self._test_by_type(mocked_execute, 'msdos', 'msdos') def test_unknown(self, mocked_execute): self._test_by_type(mocked_execute, 'whatever', 'unknown') def _test_by_type(self, mocked_execute, table_type_output, expected_table_type): parted_ret = PARTED_OUTPUT_UNFORMATTED.format(table_type_output) mocked_execute.side_effect = [ (parted_ret, None), ] ret = disk_utils.get_partition_table_type('hello') mocked_execute.assert_called_once_with( 'parted', '--script', 'hello', '--', 'print', use_standard_locale=True) self.assertEqual(expected_table_type, ret) PARTED_OUTPUT_UNFORMATTED = '''Model: whatever Disk /dev/sda: 450GB Sector size (logical/physical): 512B/512B Partition Table: {} Disk Flags: Number Start End Size File system Name Flags 14 1049kB 5243kB 4194kB bios_grub 15 5243kB 116MB 111MB fat32 boot, esp 1 116MB 2361MB 2245MB ext4 ''' @mock.patch.object(disk_utils, 'list_partitions', autospec=True) @mock.patch.object(disk_utils, 'get_partition_table_type', autospec=True) class FindEfiPartitionTestCase(base.IronicLibTestCase): def test_find_efi_partition(self, mocked_type, mocked_parts): mocked_parts.return_value = [ {'number': '1', 'flags': ''}, {'number': '14', 'flags': 'bios_grub'}, {'number': '15', 'flags': 'esp, boot'}, ] ret = disk_utils.find_efi_partition('/dev/sda') self.assertEqual({'number': '15', 'flags': 'esp, boot'}, ret) def test_find_efi_partition_only_boot_flag_gpt(self, mocked_type, mocked_parts): mocked_type.return_value = 'gpt' mocked_parts.return_value = [ {'number': '1', 'flags': ''}, {'number': '14', 'flags': 'bios_grub'}, {'number': '15', 'flags': 'boot'}, ] ret = disk_utils.find_efi_partition('/dev/sda') self.assertEqual({'number': '15', 'flags': 'boot'}, ret) def test_find_efi_partition_only_boot_flag_mbr(self, mocked_type, mocked_parts): mocked_type.return_value = 'msdos' mocked_parts.return_value = [ {'number': '1', 'flags': ''}, {'number': '14', 'flags': 'bios_grub'}, {'number': '15', 'flags': 'boot'}, ] self.assertIsNone(disk_utils.find_efi_partition('/dev/sda')) def test_find_efi_partition_not_found(self, mocked_type, mocked_parts): mocked_parts.return_value = [ {'number': '1', 'flags': ''}, {'number': '14', 'flags': 'bios_grub'}, ] self.assertIsNone(disk_utils.find_efi_partition('/dev/sda')) class WaitForDisk(base.IronicLibTestCase): def setUp(self): super(WaitForDisk, self).setUp() CONF.set_override('check_device_interval', .01, group='disk_partitioner') CONF.set_override('check_device_max_retries', 2, group='disk_partitioner') @mock.patch.object(utils, 'execute', autospec=True) def test_wait_for_disk_to_become_available(self, mock_exc): mock_exc.return_value = ('', '') disk_utils.wait_for_disk_to_become_available('fake-dev') fuser_cmd = ['fuser', 'fake-dev'] fuser_call = mock.call(*fuser_cmd, check_exit_code=[0, 1]) self.assertEqual(1, mock_exc.call_count) mock_exc.assert_has_calls([fuser_call]) @mock.patch.object(utils, 'execute', autospec=True, side_effect=processutils.ProcessExecutionError( stderr='fake')) def test_wait_for_disk_to_become_available_no_fuser(self, mock_exc): self.assertRaises(exception.IronicException, disk_utils.wait_for_disk_to_become_available, 'fake-dev') fuser_cmd = ['fuser', 'fake-dev'] fuser_call = mock.call(*fuser_cmd, check_exit_code=[0, 1]) self.assertEqual(2, mock_exc.call_count) mock_exc.assert_has_calls([fuser_call, fuser_call]) @mock.patch.object(utils, 'execute', autospec=True) def test_wait_for_disk_to_become_available_device_in_use_psmisc( self, mock_exc): # Test that the device is not available. This version has the 'psmisc' # version of 'fuser' values for stdout and stderr. # NOTE(TheJulia): Looks like fuser returns the actual list of pids # in the stdout output, where as all other text is returned in # stderr. # The 'psmisc' version has a leading space character in stdout. The # filename is output to stderr mock_exc.side_effect = [(' 1234 ', 'fake-dev: '), (' 15503 3919 15510 15511', 'fake-dev:')] expected_error = ('Processes with the following PIDs are ' 'holding device fake-dev: 15503, 3919, 15510, ' '15511. Timed out waiting for completion.') self.assertRaisesRegex( exception.IronicException, expected_error, disk_utils.wait_for_disk_to_become_available, 'fake-dev') fuser_cmd = ['fuser', 'fake-dev'] fuser_call = mock.call(*fuser_cmd, check_exit_code=[0, 1]) self.assertEqual(2, mock_exc.call_count) mock_exc.assert_has_calls([fuser_call, fuser_call]) @mock.patch.object(utils, 'execute', autospec=True) def test_wait_for_disk_to_become_available_device_in_use_busybox( self, mock_exc): # Test that the device is not available. This version has the 'busybox' # version of 'fuser' values for stdout and stderr. # NOTE(TheJulia): Looks like fuser returns the actual list of pids # in the stdout output, where as all other text is returned in # stderr. # The 'busybox' version does not have a leading space character in # stdout. Also nothing is output to stderr. mock_exc.side_effect = [('1234', ''), ('15503 3919 15510 15511', '')] expected_error = ('Processes with the following PIDs are ' 'holding device fake-dev: 15503, 3919, 15510, ' '15511. Timed out waiting for completion.') self.assertRaisesRegex( exception.IronicException, expected_error, disk_utils.wait_for_disk_to_become_available, 'fake-dev') fuser_cmd = ['fuser', 'fake-dev'] fuser_call = mock.call(*fuser_cmd, check_exit_code=[0, 1]) self.assertEqual(2, mock_exc.call_count) mock_exc.assert_has_calls([fuser_call, fuser_call]) @mock.patch.object(utils, 'execute', autospec=True) def test_wait_for_disk_to_become_available_no_device(self, mock_exc): # NOTE(TheJulia): Looks like fuser returns the actual list of pids # in the stdout output, where as all other text is returned in # stderr. mock_exc.return_value = ('', 'Specified filename /dev/fake ' 'does not exist.') expected_error = ('Fuser exited with "Specified filename ' '/dev/fake does not exist." while checking ' 'locks for device fake-dev. Timed out waiting ' 'for completion.') self.assertRaisesRegex( exception.IronicException, expected_error, disk_utils.wait_for_disk_to_become_available, 'fake-dev') fuser_cmd = ['fuser', 'fake-dev'] fuser_call = mock.call(*fuser_cmd, check_exit_code=[0, 1]) self.assertEqual(2, mock_exc.call_count) mock_exc.assert_has_calls([fuser_call, fuser_call]) @mock.patch.object(utils, 'execute', autospec=True) def test_wait_for_disk_to_become_available_dev_becomes_avail_psmisc( self, mock_exc): # Test that initially device is not available but then becomes # available. This version has the 'psmisc' version of 'fuser' values # for stdout and stderr. # The 'psmisc' version has a leading space character in stdout. The # filename is output to stderr mock_exc.side_effect = [(' 1234 ', 'fake-dev: '), ('', '')] disk_utils.wait_for_disk_to_become_available('fake-dev') fuser_cmd = ['fuser', 'fake-dev'] fuser_call = mock.call(*fuser_cmd, check_exit_code=[0, 1]) self.assertEqual(2, mock_exc.call_count) mock_exc.assert_has_calls([fuser_call, fuser_call]) @mock.patch.object(utils, 'execute', autospec=True) def test_wait_for_disk_to_become_available_dev_becomes_avail_busybox( self, mock_exc): # Test that initially device is not available but then becomes # available. This version has the 'busybox' version of 'fuser' values # for stdout and stderr. # The 'busybox' version does not have a leading space character in # stdout. Also nothing is output to stderr. mock_exc.side_effect = [('1234 5895', ''), ('', '')] disk_utils.wait_for_disk_to_become_available('fake-dev') fuser_cmd = ['fuser', 'fake-dev'] fuser_call = mock.call(*fuser_cmd, check_exit_code=[0, 1]) self.assertEqual(2, mock_exc.call_count) mock_exc.assert_has_calls([fuser_call, fuser_call])