# Copyright 2012 Grid Dynamics # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import base64 import errno import os import shutil import tempfile from castellan import key_manager import ddt import fixtures import mock from oslo_concurrency import lockutils from oslo_config import fixture as config_fixture from oslo_service import loopingcall from oslo_utils import imageutils from oslo_utils import units from oslo_utils import uuidutils from nova.compute import utils as compute_utils import nova.conf from nova import context from nova import exception from nova import objects from nova.storage import rbd_utils from nova import test from nova.tests.unit import fake_processutils from nova import utils from nova.virt.image import model as imgmodel from nova.virt import images from nova.virt.libvirt import config as vconfig from nova.virt.libvirt import imagebackend from nova.virt.libvirt import utils as libvirt_utils CONF = nova.conf.CONF class FakeSecret(object): def value(self): return base64.b64decode("MTIzNDU2Cg==") class FakeConn(object): def secretLookupByUUIDString(self, uuid): return FakeSecret() @ddt.ddt class _ImageTestCase(object): def mock_create_image(self, image): def create_image(fn, base, size, *args, **kwargs): fn(target=base, *args, **kwargs) image.create_image = create_image def setUp(self): super(_ImageTestCase, self).setUp() self.fixture = self.useFixture(config_fixture.Config(lockutils.CONF)) self.INSTANCES_PATH = tempfile.mkdtemp(suffix='instances') self.fixture.config(disable_process_locking=True, group='oslo_concurrency') self.flags(instances_path=self.INSTANCES_PATH) self.INSTANCE = objects.Instance(id=1, uuid=uuidutils.generate_uuid()) self.DISK_INFO_PATH = os.path.join(self.INSTANCES_PATH, self.INSTANCE['uuid'], 'disk.info') self.NAME = 'fake.vm' self.TEMPLATE = 'template' self.CONTEXT = context.get_admin_context() self.PATH = os.path.join( libvirt_utils.get_instance_path(self.INSTANCE), self.NAME) # TODO(mikal): rename template_dir to base_dir and template_path # to cached_image_path. This will be less confusing. self.TEMPLATE_DIR = os.path.join(CONF.instances_path, '_base') self.TEMPLATE_PATH = os.path.join(self.TEMPLATE_DIR, 'template') # Ensure can_fallocate is not initialised on the class if hasattr(self.image_class, 'can_fallocate'): del self.image_class.can_fallocate # This will be used to mock some decorations like utils.synchronize def _fake_deco(func): return func self._fake_deco = _fake_deco def tearDown(self): super(_ImageTestCase, self).tearDown() shutil.rmtree(self.INSTANCES_PATH) def test_prealloc_image(self): CONF.set_override('preallocate_images', 'space') fake_processutils.fake_execute_clear_log() fake_processutils.stub_out_processutils_execute(self) image = self.image_class(self.INSTANCE, self.NAME) def fake_fetch(target, *args, **kwargs): return self.stub_out('os.path.exists', lambda _: True) self.stub_out('os.access', lambda p, w: True) with mock.patch.object(image, 'get_disk_size', return_value=self.SIZE): # Call twice to verify testing fallocate is only called once. image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE) image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE) self.assertEqual(fake_processutils.fake_execute_get_log(), ['fallocate -l 1 %s.fallocate_test' % self.PATH, 'fallocate -n -l %s %s' % (self.SIZE, self.PATH), 'fallocate -n -l %s %s' % (self.SIZE, self.PATH)]) def test_prealloc_image_without_write_access(self): CONF.set_override('preallocate_images', 'space') fake_processutils.fake_execute_clear_log() fake_processutils.stub_out_processutils_execute(self) image = self.image_class(self.INSTANCE, self.NAME) def fake_fetch(target, *args, **kwargs): return with test.nested( mock.patch.object(image, 'exists', lambda: True), mock.patch.object(image, '_can_fallocate', lambda: True), mock.patch.object(image, 'get_disk_size', lambda _: self.SIZE) ) as (mock_exists, mock_can, mock_get): self.stub_out('os.path.exists', lambda _: True) self.stub_out('os.access', lambda p, w: False) # Testing fallocate is only called when user has write access. image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE) self.assertEqual(fake_processutils.fake_execute_get_log(), []) def test_libvirt_fs_info(self): image = self.image_class(self.INSTANCE, self.NAME) fs = image.libvirt_fs_info("/mnt") # check that exception hasn't been raised and the method # returned correct object self.assertIsInstance(fs, vconfig.LibvirtConfigGuestFilesys) self.assertEqual(fs.target_dir, "/mnt") if image.is_block_dev: self.assertEqual(fs.source_type, "block") self.assertEqual(fs.source_dev, image.path) else: self.assertEqual(fs.source_type, "file") self.assertEqual(fs.source_file, image.path) def test_libvirt_info(self): image = self.image_class(self.INSTANCE, self.NAME) extra_specs = { 'quota:disk_read_bytes_sec': 10 * units.Mi, 'quota:disk_read_iops_sec': 1 * units.Ki, 'quota:disk_write_bytes_sec': 20 * units.Mi, 'quota:disk_write_iops_sec': 2 * units.Ki, 'quota:disk_total_bytes_sec': 30 * units.Mi, 'quota:disk_total_iops_sec': 3 * units.Ki, } disk_info = { 'bus': 'virtio', 'dev': '/dev/vda', 'type': 'cdrom', } disk = image.libvirt_info(disk_info, cache_mode="none", extra_specs=extra_specs, hypervisor_version=4004001, boot_order="1") self.assertIsInstance(disk, vconfig.LibvirtConfigGuestDisk) self.assertEqual("/dev/vda", disk.target_dev) self.assertEqual("virtio", disk.target_bus) self.assertEqual("none", disk.driver_cache) self.assertEqual("cdrom", disk.source_device) self.assertEqual("1", disk.boot_order) self.assertEqual(10 * units.Mi, disk.disk_read_bytes_sec) self.assertEqual(1 * units.Ki, disk.disk_read_iops_sec) self.assertEqual(20 * units.Mi, disk.disk_write_bytes_sec) self.assertEqual(2 * units.Ki, disk.disk_write_iops_sec) self.assertEqual(30 * units.Mi, disk.disk_total_bytes_sec) self.assertEqual(3 * units.Ki, disk.disk_total_iops_sec) @mock.patch('nova.virt.disk.api.get_disk_size') def test_get_disk_size(self, get_disk_size): get_disk_size.return_value = 2361393152 image = self.image_class(self.INSTANCE, self.NAME) self.assertEqual(2361393152, image.get_disk_size(image.path)) get_disk_size.assert_called_once_with(image.path) def _test_libvirt_info_scsi_with_unit(self, disk_unit): # The address should be set if bus is scsi and unit is set. # Otherwise, it should not be set at all. image = self.image_class(self.INSTANCE, self.NAME) disk_info = { 'bus': 'scsi', 'dev': '/dev/sda', 'type': 'disk', } disk = image.libvirt_info(disk_info, cache_mode='none', extra_specs={}, hypervisor_version=4004001, disk_unit=disk_unit) if disk_unit: self.assertEqual(0, disk.device_addr.controller) self.assertEqual(disk_unit, disk.device_addr.unit) else: self.assertIsNone(disk.device_addr) @ddt.data(5, None) def test_libvirt_info_scsi_with_unit(self, disk_unit): self._test_libvirt_info_scsi_with_unit(disk_unit) class FlatTestCase(_ImageTestCase, test.NoDBTestCase): SIZE = 1024 def setUp(self): self.image_class = imagebackend.Flat super(FlatTestCase, self).setUp() @mock.patch.object(imagebackend.fileutils, 'ensure_tree') @mock.patch.object(os.path, 'exists') def test_cache(self, mock_exists, mock_ensure): self.stub_out('nova.virt.libvirt.imagebackend.Flat.correct_format', lambda _: None) mock_exists.side_effect = [False, False, False] exist_calls = [mock.call(self.TEMPLATE_DIR), mock.call(self.PATH), mock.call(self.TEMPLATE_PATH)] fn = mock.MagicMock() fn(target=self.TEMPLATE_PATH) image = self.image_class(self.INSTANCE, self.NAME) self.mock_create_image(image) image.cache(fn, self.TEMPLATE) mock_ensure.assert_called_once_with(self.TEMPLATE_DIR) mock_exists.assert_has_calls(exist_calls) @mock.patch.object(os.path, 'exists') def test_cache_image_exists(self, mock_exists): self.stub_out('nova.virt.libvirt.imagebackend.Flat.correct_format', lambda _: None) mock_exists.side_effect = [True, True, True] exist_calls = [mock.call(self.TEMPLATE_DIR), mock.call(self.PATH), mock.call(self.TEMPLATE_PATH)] image = self.image_class(self.INSTANCE, self.NAME) image.cache(None, self.TEMPLATE) mock_exists.assert_has_calls(exist_calls) @mock.patch.object(os.path, 'exists') def test_cache_base_dir_exists(self, mock_exists): self.stub_out('nova.virt.libvirt.imagebackend.Flat.correct_format', lambda _: None) mock_exists.side_effect = [True, False, False] exist_calls = [mock.call(self.TEMPLATE_DIR), mock.call(self.PATH), mock.call(self.TEMPLATE_PATH)] fn = mock.MagicMock() fn(target=self.TEMPLATE_PATH) image = self.image_class(self.INSTANCE, self.NAME) self.mock_create_image(image) image.cache(fn, self.TEMPLATE) mock_exists.assert_has_calls(exist_calls) @mock.patch.object(os.path, 'exists') def test_cache_template_exists(self, mock_exists): self.stub_out('nova.virt.libvirt.imagebackend.Flat.correct_format', lambda _: None) mock_exists.side_effect = [True, False, True] exist_calls = [mock.call(self.TEMPLATE_DIR), mock.call(self.PATH), mock.call(self.TEMPLATE_PATH)] image = self.image_class(self.INSTANCE, self.NAME) self.mock_create_image(image) image.cache(None, self.TEMPLATE) mock_exists.assert_has_calls(exist_calls) @mock.patch('os.path.exists') def test_cache_generating_resize(self, mock_path_exists): # Test for bug 1608934 # The Flat backend doesn't write to the image cache when creating a # non-image backend. Test that we don't try to get the disk size of # a non-existent backend. base_dir = os.path.join(CONF.instances_path, CONF.image_cache.subdirectory_name) # Lets assume the base image cache directory already exists existing = set([base_dir]) def fake_exists(path): # Return True only for files previously created during # execution. This allows us to test that we're not calling # get_disk_size() on something which hasn't been previously # created. return path in existing def fake_get_disk_size(path): # get_disk_size will explode if called on a path which doesn't # exist. Specific exception not important for this test. if path not in existing: raise AssertionError # Not important, won't actually be called by patched code. return 2 * units.Gi def fake_template(target=None, **kwargs): # The template function we pass to cache. Calling this will # cause target to be created. existing.add(target) mock_path_exists.side_effect = fake_exists image = self.image_class(self.INSTANCE, self.NAME) # We're not testing preallocation image.preallocate = False with test.nested( mock.patch.object(image, 'exists'), mock.patch.object(image, 'correct_format'), mock.patch.object(image, 'get_disk_size'), mock.patch.object(image, 'resize_image') ) as ( mock_disk_exists, mock_correct_format, mock_get_disk_size, mock_resize_image ): # Assume the disk doesn't already exist mock_disk_exists.return_value = False # This won't actually be executed since change I46b5658e, # but this is how the unpatched code will fail. We include this # here as a belt-and-braces sentinel. mock_get_disk_size.side_effect = fake_get_disk_size # Try to create a 2G image image.cache(fake_template, 'fake_cache_name', 2 * units.Gi) # The real assertion is that the above call to cache() didn't # raise AssertionError which, if we get here, it clearly didn't. self.assertFalse(image.resize_image.called) @mock.patch.object(imagebackend.disk, 'extend') @mock.patch('nova.virt.libvirt.utils.copy_image') @mock.patch.object(imagebackend.utils, 'synchronized') @mock.patch('nova.privsep.path.utime') def test_create_image(self, mock_utime, mock_sync, mock_copy, mock_extend): mock_sync.side_effect = lambda *a, **kw: self._fake_deco fn = mock.MagicMock() image = self.image_class(self.INSTANCE, self.NAME) image.create_image(fn, self.TEMPLATE_PATH, None, image_id=None) mock_copy.assert_called_once_with(self.TEMPLATE_PATH, self.PATH) fn.assert_called_once_with(target=self.TEMPLATE_PATH, image_id=None) self.assertTrue(mock_sync.called) self.assertFalse(mock_extend.called) mock_utime.assert_called() @mock.patch.object(imagebackend.disk, 'extend') @mock.patch('nova.virt.libvirt.utils.copy_image') @mock.patch.object(imagebackend.utils, 'synchronized') def test_create_image_generated(self, mock_sync, mock_copy, mock_extend): mock_sync.side_effect = lambda *a, **kw: self._fake_deco fn = mock.MagicMock() image = self.image_class(self.INSTANCE, self.NAME) image.create_image(fn, self.TEMPLATE_PATH, None) fn.assert_called_once_with(target=self.PATH) self.assertFalse(mock_copy.called) self.assertTrue(mock_sync.called) self.assertFalse(mock_extend.called) @mock.patch.object(imagebackend.disk, 'extend') @mock.patch('nova.virt.libvirt.utils.copy_image') @mock.patch.object(imagebackend.utils, 'synchronized') @mock.patch.object(images, 'qemu_img_info', return_value=imageutils.QemuImgInfo()) @mock.patch('nova.privsep.path.utime') def test_create_image_extend(self, mock_utime, mock_qemu, mock_sync, mock_copy, mock_extend): mock_sync.side_effect = lambda *a, **kw: self._fake_deco fn = mock.MagicMock() mock_qemu.return_value.virtual_size = 1024 fn(target=self.TEMPLATE_PATH, image_id=None) image = self.image_class(self.INSTANCE, self.NAME) image.create_image(fn, self.TEMPLATE_PATH, self.SIZE, image_id=None) mock_copy.assert_called_once_with(self.TEMPLATE_PATH, self.PATH) self.assertTrue(mock_sync.called) mock_extend.assert_called_once_with( imgmodel.LocalFileImage(self.PATH, imgmodel.FORMAT_RAW), self.SIZE) mock_qemu.assert_called_once_with(self.TEMPLATE_PATH) mock_utime.assert_called() @mock.patch.object(os.path, 'exists') @mock.patch.object(imagebackend.images, 'qemu_img_info') def test_correct_format(self, mock_qemu, mock_exist): mock_exist.side_effect = [True, False, True] info = mock.MagicMock() info.file_format = 'foo' mock_qemu.return_value = info image = self.image_class(self.INSTANCE, self.NAME, path=self.PATH) self.assertEqual(image.driver_format, 'foo') mock_qemu.assert_called_once_with(self.PATH) mock_exist.assert_has_calls([mock.call(self.PATH), mock.call(self.DISK_INFO_PATH), mock.call(CONF.instances_path)]) @mock.patch.object(images, 'qemu_img_info', side_effect=exception.InvalidDiskInfo( reason='invalid path')) def test_resolve_driver_format(self, fake_qemu_img_info): image = self.image_class(self.INSTANCE, self.NAME) driver_format = image.resolve_driver_format() self.assertEqual(driver_format, 'raw') def test_get_model(self): image = self.image_class(self.INSTANCE, self.NAME) model = image.get_model(FakeConn()) self.assertEqual(imgmodel.LocalFileImage(self.PATH, imgmodel.FORMAT_RAW), model) class Qcow2TestCase(_ImageTestCase, test.NoDBTestCase): SIZE = units.Gi def setUp(self): self.image_class = imagebackend.Qcow2 super(Qcow2TestCase, self).setUp() self.QCOW2_BASE = (self.TEMPLATE_PATH + '_%d' % (self.SIZE / units.Gi)) @mock.patch.object(os.path, 'exists') def test_cache(self, mock_exists): mock_exists.side_effect = [False, True, False, True, False, False] fn = mock.MagicMock() image = self.image_class(self.INSTANCE, self.NAME) self.mock_create_image(image) exist_calls = [mock.call(self.DISK_INFO_PATH), mock.call(CONF.instances_path), mock.call(self.TEMPLATE_DIR), mock.call(self.INSTANCES_PATH), mock.call(self.PATH), mock.call(self.TEMPLATE_PATH)] image.cache(fn, self.TEMPLATE) fn.assert_called_once_with(target=self.TEMPLATE_PATH) mock_exists.assert_has_calls(exist_calls) @mock.patch.object(os.path, 'exists') def test_cache_image_exists(self, mock_exists): mock_exists.side_effect = [False, True, True, True, True] exist_calls = [mock.call(self.DISK_INFO_PATH), mock.call(self.INSTANCES_PATH), mock.call(self.TEMPLATE_DIR), mock.call(self.PATH), mock.call(self.TEMPLATE_PATH)] image = self.image_class(self.INSTANCE, self.NAME) image.cache(None, self.TEMPLATE) mock_exists.assert_has_calls(exist_calls) @mock.patch.object(os.path, 'exists') def test_cache_base_dir_exists(self, mock_exists): mock_exists.side_effect = [False, True, True, False, False] exist_calls = [mock.call(self.DISK_INFO_PATH), mock.call(self.INSTANCES_PATH), mock.call(self.TEMPLATE_DIR), mock.call(self.PATH), mock.call(self.TEMPLATE_PATH)] fn = mock.MagicMock() image = self.image_class(self.INSTANCE, self.NAME) self.mock_create_image(image) image.cache(fn, self.TEMPLATE) fn.assert_called_once_with(target=self.TEMPLATE_PATH) mock_exists.assert_has_calls(exist_calls) @mock.patch.object(os.path, 'exists') def test_cache_template_exists(self, mock_exists): mock_exists.side_effect = [False, True, True, False, True] exist_calls = [mock.call(self.DISK_INFO_PATH), mock.call(self.INSTANCES_PATH), mock.call(self.TEMPLATE_DIR), mock.call(self.PATH), mock.call(self.TEMPLATE_PATH)] image = self.image_class(self.INSTANCE, self.NAME) self.mock_create_image(image) image.cache(None, self.TEMPLATE) mock_exists.assert_has_calls(exist_calls) @mock.patch.object(imagebackend.utils, 'synchronized') @mock.patch('nova.virt.libvirt.utils.create_cow_image') @mock.patch.object(imagebackend.disk, 'extend') @mock.patch('nova.privsep.path.utime') def test_create_image(self, mock_utime, mock_extend, mock_create, mock_sync): mock_sync.side_effect = lambda *a, **kw: self._fake_deco fn = mock.MagicMock() image = self.image_class(self.INSTANCE, self.NAME) image.create_image(fn, self.TEMPLATE_PATH, None) mock_create.assert_called_once_with(self.TEMPLATE_PATH, self.PATH) fn.assert_called_once_with(target=self.TEMPLATE_PATH) self.assertTrue(mock_sync.called) self.assertFalse(mock_extend.called) mock_utime.assert_called() @mock.patch.object(imagebackend.utils, 'synchronized') @mock.patch('nova.virt.libvirt.utils.create_cow_image') @mock.patch.object(imagebackend.disk, 'extend') @mock.patch.object(os.path, 'exists', side_effect=[]) @mock.patch.object(imagebackend.Image, 'verify_base_size') @mock.patch('nova.privsep.path.utime') def test_create_image_with_size(self, mock_utime, mock_verify, mock_exist, mock_extend, mock_create, mock_sync): mock_sync.side_effect = lambda *a, **kw: self._fake_deco fn = mock.MagicMock() mock_exist.side_effect = [False, True, False, False, False] exist_calls = [mock.call(self.DISK_INFO_PATH), mock.call(self.INSTANCES_PATH), mock.call(self.TEMPLATE_PATH), mock.call(self.PATH), mock.call(self.PATH)] image = self.image_class(self.INSTANCE, self.NAME) image.create_image(fn, self.TEMPLATE_PATH, self.SIZE) mock_verify.assert_called_once_with(self.TEMPLATE_PATH, self.SIZE) mock_create.assert_called_once_with(self.TEMPLATE_PATH, self.PATH) mock_extend.assert_called_once_with( imgmodel.LocalFileImage(self.PATH, imgmodel.FORMAT_QCOW2), self.SIZE) fn.assert_called_once_with(target=self.TEMPLATE_PATH) mock_exist.assert_has_calls(exist_calls) self.assertTrue(mock_sync.called) mock_utime.assert_called() @mock.patch.object(imagebackend.utils, 'synchronized') @mock.patch('nova.virt.libvirt.utils.create_cow_image') @mock.patch.object(imagebackend.disk, 'extend') @mock.patch.object(os.path, 'exists', side_effect=[]) @mock.patch.object(imagebackend.Qcow2, 'get_disk_size') @mock.patch('nova.privsep.path.utime') def test_create_image_too_small(self, mock_utime, mock_get, mock_exist, mock_extend, mock_create, mock_sync): mock_sync.side_effect = lambda *a, **kw: self._fake_deco mock_get.return_value = self.SIZE fn = mock.MagicMock() mock_exist.side_effect = [False, True, True] exist_calls = [mock.call(self.DISK_INFO_PATH), mock.call(self.INSTANCES_PATH), mock.call(self.TEMPLATE_PATH)] image = self.image_class(self.INSTANCE, self.NAME) self.assertRaises(exception.FlavorDiskSmallerThanImage, image.create_image, fn, self.TEMPLATE_PATH, 1) mock_get.assert_called_once_with(self.TEMPLATE_PATH) mock_exist.assert_has_calls(exist_calls) self.assertTrue(mock_sync.called) self.assertFalse(mock_create.called) self.assertFalse(mock_extend.called) @mock.patch.object(imagebackend.utils, 'synchronized') @mock.patch('nova.virt.libvirt.utils.create_cow_image') @mock.patch('nova.virt.libvirt.utils.get_disk_backing_file') @mock.patch.object(imagebackend.disk, 'extend') @mock.patch.object(os.path, 'exists', side_effect=[]) @mock.patch.object(imagebackend.Image, 'verify_base_size') @mock.patch('nova.virt.libvirt.utils.copy_image') @mock.patch('nova.privsep.path.utime') def test_generate_resized_backing_files(self, mock_utime, mock_copy, mock_verify, mock_exist, mock_extend, mock_get, mock_create, mock_sync): mock_sync.side_effect = lambda *a, **kw: self._fake_deco mock_get.return_value = self.QCOW2_BASE fn = mock.MagicMock() mock_exist.side_effect = [False, True, False, True, False, True] exist_calls = [mock.call(self.DISK_INFO_PATH), mock.call(CONF.instances_path), mock.call(self.TEMPLATE_PATH), mock.call(self.PATH), mock.call(self.QCOW2_BASE), mock.call(self.PATH)] image = self.image_class(self.INSTANCE, self.NAME) image.create_image(fn, self.TEMPLATE_PATH, self.SIZE) mock_get.assert_called_once_with(self.PATH) mock_verify.assert_called_once_with(self.TEMPLATE_PATH, self.SIZE) mock_copy.assert_called_once_with(self.TEMPLATE_PATH, self.QCOW2_BASE) mock_extend.assert_called_once_with( imgmodel.LocalFileImage(self.QCOW2_BASE, imgmodel.FORMAT_QCOW2), self.SIZE) mock_exist.assert_has_calls(exist_calls) fn.assert_called_once_with(target=self.TEMPLATE_PATH) self.assertTrue(mock_sync.called) self.assertFalse(mock_create.called) mock_utime.assert_called() @mock.patch.object(imagebackend.utils, 'synchronized') @mock.patch('nova.virt.libvirt.utils.create_cow_image') @mock.patch('nova.virt.libvirt.utils.get_disk_backing_file') @mock.patch.object(imagebackend.disk, 'extend') @mock.patch.object(os.path, 'exists', side_effect=[]) @mock.patch.object(imagebackend.Image, 'verify_base_size') @mock.patch('nova.privsep.path.utime') def test_qcow2_exists_and_has_no_backing_file(self, mock_utime, mock_verify, mock_exist, mock_extend, mock_get, mock_create, mock_sync): mock_sync.side_effect = lambda *a, **kw: self._fake_deco mock_get.return_value = None fn = mock.MagicMock() mock_exist.side_effect = [False, True, False, True, True] exist_calls = [mock.call(self.DISK_INFO_PATH), mock.call(self.INSTANCES_PATH), mock.call(self.TEMPLATE_PATH), mock.call(self.PATH), mock.call(self.PATH)] image = self.image_class(self.INSTANCE, self.NAME) image.create_image(fn, self.TEMPLATE_PATH, self.SIZE) mock_get.assert_called_once_with(self.PATH) fn.assert_called_once_with(target=self.TEMPLATE_PATH) mock_verify.assert_called_once_with(self.TEMPLATE_PATH, self.SIZE) mock_exist.assert_has_calls(exist_calls) self.assertTrue(mock_sync.called) self.assertFalse(mock_create.called) self.assertFalse(mock_extend.called) def test_resolve_driver_format(self): image = self.image_class(self.INSTANCE, self.NAME) driver_format = image.resolve_driver_format() self.assertEqual(driver_format, 'qcow2') def test_get_model(self): image = self.image_class(self.INSTANCE, self.NAME) model = image.get_model(FakeConn()) self.assertEqual(imgmodel.LocalFileImage(self.PATH, imgmodel.FORMAT_QCOW2), model) class LvmTestCase(_ImageTestCase, test.NoDBTestCase): VG = 'FakeVG' TEMPLATE_SIZE = 512 SIZE = 1024 def setUp(self): self.image_class = imagebackend.Lvm super(LvmTestCase, self).setUp() self.flags(images_volume_group=self.VG, group='libvirt') self.flags(enabled=False, group='ephemeral_storage_encryption') self.INSTANCE['ephemeral_key_uuid'] = None self.LV = '%s_%s' % (self.INSTANCE['uuid'], self.NAME) self.PATH = os.path.join('/dev', self.VG, self.LV) @mock.patch.object(compute_utils, 'disk_ops_semaphore') @mock.patch('nova.privsep.utils.supports_direct_io', return_value=True) @mock.patch.object(imagebackend.lvm, 'create_volume') @mock.patch.object(imagebackend.disk, 'get_disk_size', return_value=TEMPLATE_SIZE) @mock.patch('nova.privsep.qemu.convert_image') def _create_image(self, sparse, mock_convert_image, mock_get, mock_create, mock_ignored, mock_disk_op_sema): fn = mock.MagicMock() image = self.image_class(self.INSTANCE, self.NAME) image.create_image(fn, self.TEMPLATE_PATH, None) mock_create.assert_called_once_with(self.VG, self.LV, self.TEMPLATE_SIZE, sparse=sparse) fn.assert_called_once_with(target=self.TEMPLATE_PATH) mock_get.assert_called_once_with(self.TEMPLATE_PATH) path = '/dev/%s/%s_%s' % (self.VG, self.INSTANCE.uuid, self.NAME) mock_convert_image.assert_called_once_with( self.TEMPLATE_PATH, path, None, 'raw', CONF.instances_path, False) mock_disk_op_sema.__enter__.assert_called_once() @mock.patch.object(imagebackend.lvm, 'create_volume') def _create_image_generated(self, sparse, mock_create): fn = mock.MagicMock() image = self.image_class(self.INSTANCE, self.NAME) image.create_image(fn, self.TEMPLATE_PATH, self.SIZE, ephemeral_size=None) mock_create.assert_called_once_with(self.VG, self.LV, self.SIZE, sparse=sparse) fn.assert_called_once_with(target=self.PATH, ephemeral_size=None) @mock.patch.object(compute_utils, 'disk_ops_semaphore') @mock.patch('nova.privsep.utils.supports_direct_io', return_value=True) @mock.patch.object(imagebackend.disk, 'resize2fs') @mock.patch.object(imagebackend.lvm, 'create_volume') @mock.patch.object(imagebackend.disk, 'get_disk_size', return_value=TEMPLATE_SIZE) @mock.patch('nova.privsep.qemu.convert_image') def _create_image_resize(self, sparse, mock_convert_image, mock_get, mock_create, mock_resize, mock_ignored, mock_disk_op_sema): fn = mock.MagicMock() fn(target=self.TEMPLATE_PATH) image = self.image_class(self.INSTANCE, self.NAME) image.create_image(fn, self.TEMPLATE_PATH, self.SIZE) mock_create.assert_called_once_with(self.VG, self.LV, self.SIZE, sparse=sparse) mock_get.assert_called_once_with(self.TEMPLATE_PATH) mock_convert_image.assert_called_once_with( self.TEMPLATE_PATH, self.PATH, None, 'raw', CONF.instances_path, False) mock_disk_op_sema.__enter__.assert_called_once() mock_resize.assert_called_once_with(self.PATH, run_as_root=True) @mock.patch.object(imagebackend.fileutils, 'ensure_tree') @mock.patch.object(os.path, 'exists') def test_cache(self, mock_exists, mock_ensure): mock_exists.side_effect = [False, False, False] exist_calls = [mock.call(self.TEMPLATE_DIR), mock.call(self.PATH), mock.call(self.TEMPLATE_PATH)] fn = mock.MagicMock() image = self.image_class(self.INSTANCE, self.NAME) self.mock_create_image(image) image.cache(fn, self.TEMPLATE) fn.assert_called_once_with(target=self.TEMPLATE_PATH) mock_ensure.assert_called_once_with(self.TEMPLATE_DIR) mock_exists.assert_has_calls(exist_calls) @mock.patch.object(os.path, 'exists') def test_cache_image_exists(self, mock_exists): mock_exists.side_effect = [True, True, True] exist_calls = [mock.call(self.TEMPLATE_DIR), mock.call(self.PATH), mock.call(self.TEMPLATE_PATH)] image = self.image_class(self.INSTANCE, self.NAME) image.cache(None, self.TEMPLATE) mock_exists.assert_has_calls(exist_calls) @mock.patch.object(imagebackend.fileutils, 'ensure_tree') @mock.patch.object(os.path, 'exists', side_effect=[True, False, False]) def test_cache_base_dir_exists(self, mock_exists, mock_ensure): exist_calls = [mock.call(self.TEMPLATE_DIR), mock.call(self.PATH), mock.call(self.TEMPLATE_PATH)] fn = mock.MagicMock() image = self.image_class(self.INSTANCE, self.NAME) self.mock_create_image(image) image.cache(fn, self.TEMPLATE) mock_exists.assert_has_calls(exist_calls) mock_ensure.assert_not_called() @mock.patch('os.path.exists', autospec=True) @mock.patch('nova.utils.synchronized', autospec=True) @mock.patch.object(imagebackend, 'lvm', autospec=True) @mock.patch.object(imagebackend.fileutils, 'ensure_tree', autospec=True) def test_cache_ephemeral(self, mock_ensure, mock_lvm, mock_synchronized, mock_exists): # Ignores its arguments and returns the wrapped function unmodified def fake_synchronized(*args, **kwargs): def outer(fn): def wrapper(*wargs, **wkwargs): fn(*wargs, **wkwargs) return wrapper return outer mock_synchronized.side_effect = fake_synchronized # Fake exists returns true for paths which have been added to the # exists set exists = set() def fake_exists(path): return path in exists mock_exists.side_effect = fake_exists # Fake create_volume causes exists to return true for the volume def fake_create_volume(vg, lv, size, sparse=False): exists.add(os.path.join('/dev', vg, lv)) mock_lvm.create_volume.side_effect = fake_create_volume # Assert that when we call cache() for an ephemeral disk with the # Lvm backend, we call fetch_func with a target of the Lvm disk size_gb = 1 size = size_gb * units.Gi fetch_func = mock.MagicMock() image = self.image_class(self.INSTANCE, self.NAME) image.cache(fetch_func, self.TEMPLATE, ephemeral_size=size_gb, size=size) mock_ensure.assert_called_once_with(self.TEMPLATE_DIR) mock_lvm.create_volume.assert_called_once_with(self.VG, self.LV, size, sparse=False) fetch_func.assert_called_once_with(target=self.PATH, ephemeral_size=size_gb) mock_synchronized.assert_called() def test_create_image(self): self._create_image(False) def test_create_image_sparsed(self): self.flags(sparse_logical_volumes=True, group='libvirt') self._create_image(True) def test_create_image_generated(self): self._create_image_generated(False) def test_create_image_generated_sparsed(self): self.flags(sparse_logical_volumes=True, group='libvirt') self._create_image_generated(True) def test_create_image_resize(self): self._create_image_resize(False) def test_create_image_resize_sparsed(self): self.flags(sparse_logical_volumes=True, group='libvirt') self._create_image_resize(True) @mock.patch.object(imagebackend.lvm, 'create_volume', side_effect=RuntimeError) @mock.patch.object(imagebackend.disk, 'get_disk_size', return_value=TEMPLATE_SIZE) @mock.patch.object(imagebackend.lvm, 'remove_volumes') def test_create_image_negative(self, mock_remove, mock_get, mock_create): fn = mock.MagicMock() image = self.image_class(self.INSTANCE, self.NAME) self.assertRaises(RuntimeError, image.create_image, fn, self.TEMPLATE_PATH, self.SIZE) mock_create.assert_called_once_with(self.VG, self.LV, self.SIZE, sparse=False) fn.assert_called_once_with(target=self.TEMPLATE_PATH) mock_get.assert_called_once_with(self.TEMPLATE_PATH) mock_remove.assert_called_once_with([self.PATH]) @mock.patch.object(imagebackend.lvm, 'create_volume') @mock.patch.object(imagebackend.lvm, 'remove_volumes') def test_create_image_generated_negative(self, mock_remove, mock_create): fn = mock.MagicMock() fn.side_effect = RuntimeError image = self.image_class(self.INSTANCE, self.NAME) self.assertRaises(RuntimeError, image.create_image, fn, self.TEMPLATE_PATH, self.SIZE, ephemeral_size=None) mock_create.assert_called_once_with(self.VG, self.LV, self.SIZE, sparse=False) fn.assert_called_once_with(target=self.PATH, ephemeral_size=None) mock_remove.assert_called_once_with([self.PATH]) def test_prealloc_image(self): CONF.set_override('preallocate_images', 'space') fake_processutils.fake_execute_clear_log() fake_processutils.stub_out_processutils_execute(self) image = self.image_class(self.INSTANCE, self.NAME) def fake_fetch(target, *args, **kwargs): return self.stub_out('os.path.exists', lambda _: True) self.stub_out('nova.virt.libvirt.imagebackend.Lvm.exists', lambda *a, **kw: True) self.stub_out('nova.virt.libvirt.imagebackend.Lvm.get_disk_size', lambda *a, **kw: self.SIZE) image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE) self.assertEqual(fake_processutils.fake_execute_get_log(), []) class EncryptedLvmTestCase(_ImageTestCase, test.NoDBTestCase): VG = 'FakeVG' TEMPLATE_SIZE = 512 SIZE = 1024 def setUp(self): self.image_class = imagebackend.Lvm super(EncryptedLvmTestCase, self).setUp() self.flags(enabled=True, group='ephemeral_storage_encryption') self.flags(cipher='aes-xts-plain64', group='ephemeral_storage_encryption') self.flags(key_size=512, group='ephemeral_storage_encryption') self.flags(fixed_key='00000000000000000000000000000000' '00000000000000000000000000000000', group='key_manager') self.flags(images_volume_group=self.VG, group='libvirt') self.LV = '%s_%s' % (self.INSTANCE['uuid'], self.NAME) self.LV_PATH = os.path.join('/dev', self.VG, self.LV) self.PATH = os.path.join('/dev/mapper', imagebackend.dmcrypt.volume_name(self.LV)) self.key_manager = key_manager.API() self.INSTANCE['ephemeral_key_uuid'] =\ self.key_manager.create_key(self.CONTEXT, 'AES', 256) self.KEY = self.key_manager.get(self.CONTEXT, self.INSTANCE['ephemeral_key_uuid']).get_encoded() self.lvm = imagebackend.lvm self.disk = imagebackend.disk self.utils = imagebackend.utils self.libvirt_utils = imagebackend.libvirt_utils self.dmcrypt = imagebackend.dmcrypt def _create_image(self, sparse): with test.nested( mock.patch('nova.privsep.utils.supports_direct_io', return_value=True), mock.patch.object(self.lvm, 'create_volume', mock.Mock()), mock.patch.object(self.lvm, 'remove_volumes', mock.Mock()), mock.patch.object(self.disk, 'resize2fs', mock.Mock()), mock.patch.object(self.disk, 'get_disk_size', mock.Mock(return_value=self.TEMPLATE_SIZE)), mock.patch.object(self.dmcrypt, 'create_volume', mock.Mock()), mock.patch.object(self.dmcrypt, 'delete_volume', mock.Mock()), mock.patch.object(self.dmcrypt, 'list_volumes', mock.Mock()), mock.patch('nova.privsep.qemu.convert_image'), mock.patch.object(compute_utils, 'disk_ops_semaphore')): fn = mock.Mock() image = self.image_class(self.INSTANCE, self.NAME) image.create_image(fn, self.TEMPLATE_PATH, self.TEMPLATE_SIZE, context=self.CONTEXT) compute_utils.disk_ops_semaphore.__enter__.assert_called_once() fn.assert_called_with(context=self.CONTEXT, target=self.TEMPLATE_PATH) self.lvm.create_volume.assert_called_with(self.VG, self.LV, self.TEMPLATE_SIZE, sparse=sparse) self.dmcrypt.create_volume.assert_called_with( self.PATH.rpartition('/')[2], self.LV_PATH, CONF.ephemeral_storage_encryption.cipher, CONF.ephemeral_storage_encryption.key_size, self.KEY) nova.privsep.qemu.convert_image.assert_called_with( self.TEMPLATE_PATH, self.PATH, None, 'raw', CONF.instances_path, False) def _create_image_generated(self, sparse): with test.nested( mock.patch.object(self.lvm, 'create_volume', mock.Mock()), mock.patch.object(self.lvm, 'remove_volumes', mock.Mock()), mock.patch.object(self.disk, 'resize2fs', mock.Mock()), mock.patch.object(self.disk, 'get_disk_size', mock.Mock(return_value=self.TEMPLATE_SIZE)), mock.patch.object(self.dmcrypt, 'create_volume', mock.Mock()), mock.patch.object(self.dmcrypt, 'delete_volume', mock.Mock()), mock.patch.object(self.dmcrypt, 'list_volumes', mock.Mock()), mock.patch('nova.privsep.qemu.convert_image')): fn = mock.Mock() image = self.image_class(self.INSTANCE, self.NAME) image.create_image(fn, self.TEMPLATE_PATH, self.SIZE, ephemeral_size=None, context=self.CONTEXT) self.lvm.create_volume.assert_called_with( self.VG, self.LV, self.SIZE, sparse=sparse) self.dmcrypt.create_volume.assert_called_with( self.PATH.rpartition('/')[2], self.LV_PATH, CONF.ephemeral_storage_encryption.cipher, CONF.ephemeral_storage_encryption.key_size, self.KEY) fn.assert_called_with(target=self.PATH, ephemeral_size=None, context=self.CONTEXT) def _create_image_resize(self, sparse): with test.nested( mock.patch('nova.privsep.utils.supports_direct_io', return_value=True), mock.patch.object(self.lvm, 'create_volume', mock.Mock()), mock.patch.object(self.lvm, 'remove_volumes', mock.Mock()), mock.patch.object(self.disk, 'resize2fs', mock.Mock()), mock.patch.object(self.disk, 'get_disk_size', mock.Mock(return_value=self.TEMPLATE_SIZE)), mock.patch.object(self.dmcrypt, 'create_volume', mock.Mock()), mock.patch.object(self.dmcrypt, 'delete_volume', mock.Mock()), mock.patch.object(self.dmcrypt, 'list_volumes', mock.Mock()), mock.patch('nova.privsep.qemu.convert_image'), mock.patch.object(compute_utils, 'disk_ops_semaphore')): fn = mock.Mock() image = self.image_class(self.INSTANCE, self.NAME) image.create_image(fn, self.TEMPLATE_PATH, self.SIZE, context=self.CONTEXT) compute_utils.disk_ops_semaphore.__enter__.assert_called_once() fn.assert_called_with(context=self.CONTEXT, target=self.TEMPLATE_PATH) self.disk.get_disk_size.assert_called_with(self.TEMPLATE_PATH) self.lvm.create_volume.assert_called_with( self.VG, self.LV, self.SIZE, sparse=sparse) self.dmcrypt.create_volume.assert_called_with( self.PATH.rpartition('/')[2], self.LV_PATH, CONF.ephemeral_storage_encryption.cipher, CONF.ephemeral_storage_encryption.key_size, self.KEY) nova.privsep.qemu.convert_image.assert_called_with( self.TEMPLATE_PATH, self.PATH, None, 'raw', CONF.instances_path, False) self.disk.resize2fs.assert_called_with(self.PATH, run_as_root=True) def test_create_image(self): self._create_image(False) def test_create_image_sparsed(self): self.flags(sparse_logical_volumes=True, group='libvirt') self._create_image(True) def test_create_image_generated(self): self._create_image_generated(False) def test_create_image_generated_sparsed(self): self.flags(sparse_logical_volumes=True, group='libvirt') self._create_image_generated(True) def test_create_image_resize(self): self._create_image_resize(False) def test_create_image_resize_sparsed(self): self.flags(sparse_logical_volumes=True, group='libvirt') self._create_image_resize(True) def test_create_image_negative(self): with test.nested( mock.patch.object(self.lvm, 'create_volume', mock.Mock()), mock.patch.object(self.lvm, 'remove_volumes', mock.Mock()), mock.patch.object(self.disk, 'resize2fs', mock.Mock()), mock.patch.object(self.disk, 'get_disk_size', mock.Mock(return_value=self.TEMPLATE_SIZE)), mock.patch.object(self.dmcrypt, 'create_volume', mock.Mock()), mock.patch.object(self.dmcrypt, 'delete_volume', mock.Mock()), mock.patch.object(self.dmcrypt, 'list_volumes', mock.Mock()), mock.patch('oslo_concurrency.processutils.execute', mock.Mock())): fn = mock.Mock() self.lvm.create_volume.side_effect = RuntimeError() image = self.image_class(self.INSTANCE, self.NAME) self.assertRaises( RuntimeError, image.create_image, fn, self.TEMPLATE_PATH, self.SIZE, context=self.CONTEXT) fn.assert_called_with( context=self.CONTEXT, target=self.TEMPLATE_PATH) self.disk.get_disk_size.assert_called_with( self.TEMPLATE_PATH) self.lvm.create_volume.assert_called_with( self.VG, self.LV, self.SIZE, sparse=False) self.dmcrypt.delete_volume.assert_called_with( self.PATH.rpartition('/')[2]) self.lvm.remove_volumes.assert_called_with([self.LV_PATH]) def test_create_image_encrypt_negative(self): with test.nested( mock.patch.object(self.lvm, 'create_volume', mock.Mock()), mock.patch.object(self.lvm, 'remove_volumes', mock.Mock()), mock.patch.object(self.disk, 'resize2fs', mock.Mock()), mock.patch.object(self.disk, 'get_disk_size', mock.Mock(return_value=self.TEMPLATE_SIZE)), mock.patch.object(self.dmcrypt, 'create_volume', mock.Mock()), mock.patch.object(self.dmcrypt, 'delete_volume', mock.Mock()), mock.patch.object(self.dmcrypt, 'list_volumes', mock.Mock()), mock.patch('oslo_concurrency.processutils.execute', mock.Mock())): fn = mock.Mock() self.dmcrypt.create_volume.side_effect = RuntimeError() image = self.image_class(self.INSTANCE, self.NAME) self.assertRaises( RuntimeError, image.create_image, fn, self.TEMPLATE_PATH, self.SIZE, context=self.CONTEXT) fn.assert_called_with( context=self.CONTEXT, target=self.TEMPLATE_PATH) self.disk.get_disk_size.assert_called_with(self.TEMPLATE_PATH) self.lvm.create_volume.assert_called_with( self.VG, self.LV, self.SIZE, sparse=False) self.dmcrypt.create_volume.assert_called_with( self.dmcrypt.volume_name(self.LV), self.LV_PATH, CONF.ephemeral_storage_encryption.cipher, CONF.ephemeral_storage_encryption.key_size, self.KEY) self.dmcrypt.delete_volume.assert_called_with( self.PATH.rpartition('/')[2]) self.lvm.remove_volumes.assert_called_with([self.LV_PATH]) def test_create_image_generated_negative(self): with test.nested( mock.patch.object(self.lvm, 'create_volume', mock.Mock()), mock.patch.object(self.lvm, 'remove_volumes', mock.Mock()), mock.patch.object(self.disk, 'resize2fs', mock.Mock()), mock.patch.object(self.disk, 'get_disk_size', mock.Mock(return_value=self.TEMPLATE_SIZE)), mock.patch.object(self.dmcrypt, 'create_volume', mock.Mock()), mock.patch.object(self.dmcrypt, 'delete_volume', mock.Mock()), mock.patch.object(self.dmcrypt, 'list_volumes', mock.Mock()), mock.patch('oslo_concurrency.processutils.execute', mock.Mock())): fn = mock.Mock() fn.side_effect = RuntimeError() image = self.image_class(self.INSTANCE, self.NAME) self.assertRaises(RuntimeError, image.create_image, fn, self.TEMPLATE_PATH, self.SIZE, ephemeral_size=None, context=self.CONTEXT) self.lvm.create_volume.assert_called_with( self.VG, self.LV, self.SIZE, sparse=False) self.dmcrypt.create_volume.assert_called_with( self.PATH.rpartition('/')[2], self.LV_PATH, CONF.ephemeral_storage_encryption.cipher, CONF.ephemeral_storage_encryption.key_size, self.KEY) fn.assert_called_with( target=self.PATH, ephemeral_size=None, context=self.CONTEXT) self.dmcrypt.delete_volume.assert_called_with( self.PATH.rpartition('/')[2]) self.lvm.remove_volumes.assert_called_with([self.LV_PATH]) def test_create_image_generated_encrypt_negative(self): with test.nested( mock.patch.object(self.lvm, 'create_volume', mock.Mock()), mock.patch.object(self.lvm, 'remove_volumes', mock.Mock()), mock.patch.object(self.disk, 'resize2fs', mock.Mock()), mock.patch.object(self.disk, 'get_disk_size', mock.Mock(return_value=self.TEMPLATE_SIZE)), mock.patch.object(self.dmcrypt, 'create_volume', mock.Mock()), mock.patch.object(self.dmcrypt, 'delete_volume', mock.Mock()), mock.patch.object(self.dmcrypt, 'list_volumes', mock.Mock()), mock.patch('oslo_concurrency.processutils.execute', mock.Mock())): fn = mock.Mock() fn.side_effect = RuntimeError() image = self.image_class(self.INSTANCE, self.NAME) self.assertRaises( RuntimeError, image.create_image, fn, self.TEMPLATE_PATH, self.SIZE, ephemeral_size=None, context=self.CONTEXT) self.lvm.create_volume.assert_called_with( self.VG, self.LV, self.SIZE, sparse=False) self.dmcrypt.create_volume.assert_called_with( self.PATH.rpartition('/')[2], self.LV_PATH, CONF.ephemeral_storage_encryption.cipher, CONF.ephemeral_storage_encryption.key_size, self.KEY) self.dmcrypt.delete_volume.assert_called_with( self.PATH.rpartition('/')[2]) self.lvm.remove_volumes.assert_called_with([self.LV_PATH]) def test_prealloc_image(self): self.flags(preallocate_images='space') fake_processutils.fake_execute_clear_log() fake_processutils.stub_out_processutils_execute(self) image = self.image_class(self.INSTANCE, self.NAME) def fake_fetch(target, *args, **kwargs): return self.stub_out('os.path.exists', lambda _: True) self.stub_out('nova.virt.libvirt.imagebackend.Lvm.exists', lambda *a, **kw: True) self.stub_out('nova.virt.libvirt.imagebackend.Lvm.get_disk_size', lambda *a, **kw: self.SIZE) image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE) self.assertEqual(fake_processutils.fake_execute_get_log(), []) def test_get_model(self): image = self.image_class(self.INSTANCE, self.NAME) model = image.get_model(FakeConn()) self.assertEqual(imgmodel.LocalBlockImage(self.PATH), model) @ddt.ddt class RbdTestCase(_ImageTestCase, test.NoDBTestCase): FSID = "FakeFsID" POOL = "FakePool" USER = "FakeUser" CONF = "FakeConf" SIZE = 1024 def setUp(self): self.image_class = imagebackend.Rbd super(RbdTestCase, self).setUp() self.flags(images_rbd_pool=self.POOL, rbd_user=self.USER, images_rbd_ceph_conf=self.CONF, group='libvirt') self.libvirt_utils = imagebackend.libvirt_utils self.utils = imagebackend.utils # mock out the cephclients for avoiding ImportError exception rbd_utils.rbd = mock.Mock() rbd_utils.rados = mock.Mock() @mock.patch.object(os.path, 'exists', return_value=False) @mock.patch.object(imagebackend.Rbd, 'exists', return_value=False) @mock.patch.object(imagebackend.fileutils, 'ensure_tree') def test_cache(self, mock_ensure, mock_img_exist, mock_os_exist): image = self.image_class(self.INSTANCE, self.NAME) fn = mock.MagicMock() self.mock_create_image(image) image.cache(fn, self.TEMPLATE) mock_ensure.assert_called_once_with(self.TEMPLATE_DIR) fn.assert_called_once_with(target=self.TEMPLATE_PATH) mock_img_exist.assert_called_with() mock_os_exist.assert_has_calls([ mock.call(self.TEMPLATE_DIR), mock.call(self.TEMPLATE_PATH) ]) @mock.patch.object(os.path, 'exists') @mock.patch.object(imagebackend.Rbd, 'exists') @mock.patch.object(imagebackend.fileutils, 'ensure_tree') def test_cache_base_dir_exists(self, mock_ensure, mock_img_exist, mock_os_exist): mock_os_exist.side_effect = [True, False] mock_img_exist.return_value = False image = self.image_class(self.INSTANCE, self.NAME) fn = mock.MagicMock() self.mock_create_image(image) image.cache(fn, self.TEMPLATE) mock_img_exist.assert_called_once_with() mock_os_exist.assert_has_calls([ mock.call(self.TEMPLATE_DIR), mock.call(self.TEMPLATE_PATH) ]) fn.assert_called_once_with(target=self.TEMPLATE_PATH) @mock.patch.object(os.path, 'exists', return_value=True) @mock.patch.object(imagebackend.Rbd, 'exists', return_value=True) def test_cache_image_exists(self, mock_img_exist, mock_os_exist): image = self.image_class(self.INSTANCE, self.NAME) image.cache(None, self.TEMPLATE) mock_img_exist.assert_called_once_with() mock_os_exist.assert_has_calls([ mock.call(self.TEMPLATE_DIR), mock.call(self.TEMPLATE_PATH) ]) @mock.patch.object(os.path, 'exists') @mock.patch.object(imagebackend.Rbd, 'exists') def test_cache_template_exists(self, mock_img_exist, mock_os_exist): mock_os_exist.return_value = True mock_img_exist.return_value = False image = self.image_class(self.INSTANCE, self.NAME) self.mock_create_image(image) image.cache(None, self.TEMPLATE) mock_img_exist.assert_called_once_with() mock_os_exist.assert_has_calls([ mock.call(self.TEMPLATE_DIR), mock.call(self.TEMPLATE_PATH) ]) @mock.patch.object(imagebackend.Rbd, 'exists') def test_create_image(self, mock_exists): fn = mock.MagicMock() rbd_utils.rbd.RBD_FEATURE_LAYERING = 1 fake_processutils.fake_execute_clear_log() fake_processutils.stub_out_processutils_execute(self) image = self.image_class(self.INSTANCE, self.NAME) mock_exists.return_value = False image.create_image(fn, self.TEMPLATE_PATH, None) rbd_name = "%s_%s" % (self.INSTANCE['uuid'], self.NAME) cmd = ('rbd', 'import', '--pool', self.POOL, self.TEMPLATE_PATH, rbd_name, '--image-format=2', '--id', self.USER, '--conf', self.CONF) self.assertEqual(fake_processutils.fake_execute_get_log(), [' '.join(cmd)]) mock_exists.assert_has_calls([mock.call(), mock.call()]) fn.assert_called_once_with(target=self.TEMPLATE_PATH) @mock.patch.object(images, 'qemu_img_info') @mock.patch.object(os.path, 'exists', return_value=False) def test__remove_non_raw_cache_image_not_exists( self, mock_exists, mock_qemu): image = self.image_class(self.INSTANCE, self.NAME) image._remove_non_raw_cache_image(self.TEMPLATE_PATH) mock_qemu.assert_not_called() @mock.patch.object(os, 'remove') @mock.patch.object(images, 'qemu_img_info', return_value=imageutils.QemuImgInfo()) @mock.patch.object(os.path, 'exists', return_value=True) def test__remove_non_raw_cache_image_with_raw_cache( self, mock_exists, mock_qemu, mock_remove): mock_qemu.return_value.file_format = 'raw' image = self.image_class(self.INSTANCE, self.NAME) image._remove_non_raw_cache_image(self.TEMPLATE_PATH) mock_remove.assert_not_called() @mock.patch.object(os, 'remove') @mock.patch.object(images, 'qemu_img_info', return_value=imageutils.QemuImgInfo()) @mock.patch.object(os.path, 'exists', return_value=True) def test__remove_non_raw_cache_image_with_qcow2_cache( self, mock_exists, mock_qemu, mock_remove): mock_qemu.return_value.file_format = 'qcow2' image = self.image_class(self.INSTANCE, self.NAME) image._remove_non_raw_cache_image(self.TEMPLATE_PATH) mock_remove.assert_called_once_with(self.TEMPLATE_PATH) @mock.patch.object(images, 'qemu_img_info', return_value=imageutils.QemuImgInfo()) @mock.patch.object(rbd_utils.RBDDriver, 'resize') @mock.patch.object(imagebackend.Rbd, 'verify_base_size') @mock.patch.object(imagebackend.Rbd, 'get_disk_size') @mock.patch.object(imagebackend.Rbd, 'exists') def test_create_image_resize(self, mock_exists, mock_get, mock_verify, mock_resize, mock_qemu): fn = mock.MagicMock() full_size = self.SIZE * 2 rbd_utils.rbd.RBD_FEATURE_LAYERING = 1 fake_processutils.fake_execute_clear_log() fake_processutils.stub_out_processutils_execute(self) image = self.image_class(self.INSTANCE, self.NAME) mock_exists.return_value = False mock_qemu.return_value.file_format = 'raw' mock_get.return_value = self.SIZE rbd_name = "%s_%s" % (self.INSTANCE['uuid'], self.NAME) cmd = ('rbd', 'import', '--pool', self.POOL, self.TEMPLATE_PATH, rbd_name, '--image-format=2', '--id', self.USER, '--conf', self.CONF) image.create_image(fn, self.TEMPLATE_PATH, full_size) self.assertEqual(fake_processutils.fake_execute_get_log(), [' '.join(cmd)]) mock_exists.assert_has_calls([mock.call(), mock.call()]) mock_get.assert_called_once_with(rbd_name) mock_resize.assert_called_once_with(rbd_name, full_size) mock_verify.assert_called_once_with(self.TEMPLATE_PATH, full_size) fn.assert_called_once_with(target=self.TEMPLATE_PATH) @mock.patch.object(images, 'qemu_img_info', return_value=imageutils.QemuImgInfo()) @mock.patch.object(imagebackend.Rbd, 'get_disk_size') @mock.patch.object(imagebackend.Rbd, 'exists') def test_create_image_already_exists(self, mock_exists, mock_get, mock_qemu): rbd_utils.rbd.RBD_FEATURE_LAYERING = 1 image = self.image_class(self.INSTANCE, self.NAME) mock_exists.return_value = True mock_qemu.return_value.file_format = 'raw' mock_get.return_value = self.SIZE rbd_name = "%s_%s" % (self.INSTANCE['uuid'], self.NAME) fn = mock.MagicMock() image.create_image(fn, self.TEMPLATE_PATH, self.SIZE) mock_exists.assert_has_calls([mock.call(), mock.call()]) mock_get.assert_has_calls([mock.call(self.TEMPLATE_PATH), mock.call(rbd_name)]) def test_prealloc_image(self): CONF.set_override('preallocate_images', 'space') fake_processutils.fake_execute_clear_log() fake_processutils.stub_out_processutils_execute(self) image = self.image_class(self.INSTANCE, self.NAME) def fake_fetch(target, *args, **kwargs): return self.stub_out('os.path.exists', lambda _: True) self.stub_out('nova.virt.libvirt.imagebackend.Rbd.exists', lambda *a, **kw: True) self.stub_out('nova.virt.libvirt.imagebackend.Rbd.get_disk_size', lambda *a, **kw: self.SIZE) image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE) self.assertEqual(fake_processutils.fake_execute_get_log(), []) def test_parent_compatible(self): self.assertEqual(utils.getargspec(imagebackend.Image.libvirt_info), utils.getargspec(self.image_class.libvirt_info)) def test_image_path(self): conf = "FakeConf" pool = "FakePool" user = "FakeUser" self.flags(images_rbd_pool=pool, group='libvirt') self.flags(images_rbd_ceph_conf=conf, group='libvirt') self.flags(rbd_user=user, group='libvirt') image = self.image_class(self.INSTANCE, self.NAME) rbd_path = "rbd:%s/%s:id=%s:conf=%s" % (pool, image.rbd_name, user, conf) self.assertEqual(image.path, rbd_path) def test_get_disk_size(self): image = self.image_class(self.INSTANCE, self.NAME) with mock.patch.object(image.driver, 'size') as size_mock: size_mock.return_value = 2361393152 self.assertEqual(2361393152, image.get_disk_size(image.path)) size_mock.assert_called_once_with(image.rbd_name) @mock.patch.object(images, 'qemu_img_info', return_value=imageutils.QemuImgInfo()) def test_create_image_too_small(self, mock_qemu): mock_qemu.return_value.file_format = 'raw' image = self.image_class(self.INSTANCE, self.NAME) with mock.patch.object(image, 'driver') as driver_mock: driver_mock.exists.return_value = True driver_mock.size.return_value = 2 self.assertRaises(exception.FlavorDiskSmallerThanImage, image.create_image, mock.MagicMock(), self.TEMPLATE_PATH, 1) driver_mock.size.assert_called_once_with(image.rbd_name) @mock.patch.object(rbd_utils.RBDDriver, "get_mon_addrs") def test_libvirt_info(self, mock_mon_addrs): def get_mon_addrs(): hosts = ["server1", "server2"] ports = ["1899", "1920"] return hosts, ports mock_mon_addrs.side_effect = get_mon_addrs super(RbdTestCase, self).test_libvirt_info() @ddt.data(5, None) @mock.patch.object(rbd_utils.RBDDriver, "get_mon_addrs") def test_libvirt_info_scsi_with_unit(self, disk_unit, mock_mon_addrs): def get_mon_addrs(): hosts = ["server1", "server2"] ports = ["1899", "1920"] return hosts, ports mock_mon_addrs.side_effect = get_mon_addrs super(RbdTestCase, self)._test_libvirt_info_scsi_with_unit(disk_unit) @mock.patch.object(rbd_utils.RBDDriver, "get_mon_addrs") def test_get_model(self, mock_mon_addrs): pool = "FakePool" user = "FakeUser" self.flags(images_rbd_pool=pool, group='libvirt') self.flags(rbd_user=user, group='libvirt') self.flags(rbd_secret_uuid="3306a5c4-8378-4b3c-aa1f-7b48d3a26172", group='libvirt') # image.get_model() should always pass strip_brackets=False # for building proper IPv6 address+ports for libguestfs def get_mon_addrs(strip_brackets=True): if strip_brackets: hosts = ["server1", "server2", "::1"] else: hosts = ["server1", "server2", "[::1]"] ports = ["1899", "1920", "1930"] return hosts, ports mock_mon_addrs.side_effect = get_mon_addrs image = self.image_class(self.INSTANCE, self.NAME) model = image.get_model(FakeConn()) self.assertEqual(imgmodel.RBDImage( self.INSTANCE["uuid"] + "_fake.vm", "FakePool", "FakeUser", b"MTIzNDU2Cg==", ["server1:1899", "server2:1920", "[::1]:1930"]), model) @mock.patch.object(rbd_utils.RBDDriver, 'parent_info') @mock.patch.object(rbd_utils.RBDDriver, 'flatten') def test_flatten(self, mock_flatten, mock_parent_info): image = self.image_class(self.INSTANCE, self.NAME) image.flatten() mock_flatten.assert_called_once_with(image.rbd_name, pool=self.POOL) mock_parent_info.assert_called_once_with( image.rbd_name, pool=self.POOL) @mock.patch.object(imagebackend, 'LOG') @mock.patch.object(rbd_utils.RBDDriver, 'parent_info') @mock.patch.object(rbd_utils.RBDDriver, 'flatten') def test_flatten_already_flat( self, mock_flatten, mock_parent_info, mock_log): mock_parent_info.side_effect = exception.ImageUnacceptable( image_id=1, reason='foo') image = self.image_class(self.INSTANCE, self.NAME) image.flatten() mock_log.debug.assert_called_once() mock_flatten.assert_not_called() mock_parent_info.assert_called_once_with( image.rbd_name, pool=self.POOL) def test_import_file(self): image = self.image_class(self.INSTANCE, self.NAME) @mock.patch.object(image, 'exists') @mock.patch.object(image.driver, 'remove_image') @mock.patch.object(image.driver, 'import_image') def _test(mock_import, mock_remove, mock_exists): mock_exists.return_value = True image.import_file(self.INSTANCE, mock.sentinel.file, mock.sentinel.remote_name) name = '%s_%s' % (self.INSTANCE.uuid, mock.sentinel.remote_name) mock_exists.assert_called_once_with() mock_remove.assert_called_once_with(name) mock_import.assert_called_once_with(mock.sentinel.file, name) _test() @mock.patch.object(imagebackend.Rbd, 'exists') @mock.patch.object(rbd_utils.RBDDriver, 'remove_image') @mock.patch.object(rbd_utils.RBDDriver, 'import_image') def test_import_file_not_found(self, mock_import, mock_remove, mock_exists): image = self.image_class(self.INSTANCE, self.NAME) mock_exists.return_value = False image.import_file(self.INSTANCE, mock.sentinel.file, mock.sentinel.remote_name) name = '%s_%s' % (self.INSTANCE.uuid, mock.sentinel.remote_name) mock_exists.assert_called_once_with() self.assertFalse(mock_remove.called) mock_import.assert_called_once_with(mock.sentinel.file, name) def test_get_parent_pool(self): image = self.image_class(self.INSTANCE, self.NAME) with mock.patch.object(rbd_utils.RBDDriver, 'parent_info') as mock_pi: mock_pi.return_value = [self.POOL, 'fake-image', 'fake-snap'] parent_pool = image._get_parent_pool(self.CONTEXT, 'fake-image', self.FSID) self.assertEqual(self.POOL, parent_pool) def test_get_parent_pool_no_parent_info(self): image = self.image_class(self.INSTANCE, self.NAME) rbd_uri = 'rbd://%s/%s/fake-image/fake-snap' % (self.FSID, self.POOL) with test.nested(mock.patch.object(rbd_utils.RBDDriver, 'parent_info'), mock.patch.object(imagebackend.IMAGE_API, 'get'), ) as (mock_pi, mock_get): mock_pi.side_effect = exception.ImageUnacceptable(image_id='test', reason='test') mock_get.return_value = {'locations': [{'url': rbd_uri}]} parent_pool = image._get_parent_pool(self.CONTEXT, 'fake-image', self.FSID) self.assertEqual(self.POOL, parent_pool) def test_get_parent_pool_non_local_image(self): image = self.image_class(self.INSTANCE, self.NAME) rbd_uri = 'rbd://remote-cluster/remote-pool/fake-image/fake-snap' with test.nested( mock.patch.object(rbd_utils.RBDDriver, 'parent_info'), mock.patch.object(imagebackend.IMAGE_API, 'get') ) as (mock_pi, mock_get): mock_pi.side_effect = exception.ImageUnacceptable(image_id='test', reason='test') mock_get.return_value = {'locations': [{'url': rbd_uri}]} self.assertRaises(exception.ImageUnacceptable, image._get_parent_pool, self.CONTEXT, 'fake-image', self.FSID) def test_direct_snapshot(self): image = self.image_class(self.INSTANCE, self.NAME) test_snap = 'rbd://%s/%s/fake-image-id/snap' % (self.FSID, self.POOL) with test.nested( mock.patch.object(rbd_utils.RBDDriver, 'get_fsid', return_value=self.FSID), mock.patch.object(image, '_get_parent_pool', return_value=self.POOL), mock.patch.object(rbd_utils.RBDDriver, 'create_snap'), mock.patch.object(rbd_utils.RBDDriver, 'clone'), mock.patch.object(rbd_utils.RBDDriver, 'flatten'), mock.patch.object(image, 'cleanup_direct_snapshot') ) as (mock_fsid, mock_parent, mock_create_snap, mock_clone, mock_flatten, mock_cleanup): location = image.direct_snapshot(self.CONTEXT, 'fake-snapshot', 'fake-format', 'fake-image-id', 'fake-base-image') mock_fsid.assert_called_once_with() mock_parent.assert_called_once_with(self.CONTEXT, 'fake-base-image', self.FSID) mock_create_snap.assert_has_calls([mock.call(image.rbd_name, 'fake-snapshot', protect=True), mock.call('fake-image-id', 'snap', pool=self.POOL, protect=True)]) mock_clone.assert_called_once_with(mock.ANY, 'fake-image-id', dest_pool=self.POOL) mock_flatten.assert_called_once_with('fake-image-id', pool=self.POOL) mock_cleanup.assert_called_once_with(mock.ANY) self.assertEqual(test_snap, location) def test_direct_snapshot_cleans_up_on_failures(self): image = self.image_class(self.INSTANCE, self.NAME) test_snap = 'rbd://%s/%s/%s/snap' % (self.FSID, image.driver.pool, image.rbd_name) with test.nested( mock.patch.object(rbd_utils.RBDDriver, 'get_fsid', return_value=self.FSID), mock.patch.object(image, '_get_parent_pool', return_value=self.POOL), mock.patch.object(rbd_utils.RBDDriver, 'create_snap'), mock.patch.object(rbd_utils.RBDDriver, 'clone', side_effect=exception.Forbidden('testing')), mock.patch.object(rbd_utils.RBDDriver, 'flatten'), mock.patch.object(image, 'cleanup_direct_snapshot')) as ( mock_fsid, mock_parent, mock_create_snap, mock_clone, mock_flatten, mock_cleanup): self.assertRaises(exception.Forbidden, image.direct_snapshot, self.CONTEXT, 'snap', 'fake-format', 'fake-image-id', 'fake-base-image') mock_create_snap.assert_called_once_with(image.rbd_name, 'snap', protect=True) self.assertFalse(mock_flatten.called) mock_cleanup.assert_called_once_with(dict(url=test_snap)) def test_cleanup_direct_snapshot(self): image = self.image_class(self.INSTANCE, self.NAME) test_snap = 'rbd://%s/%s/%s/snap' % (self.FSID, image.driver.pool, image.rbd_name) with test.nested( mock.patch.object(rbd_utils.RBDDriver, 'remove_snap'), mock.patch.object(rbd_utils.RBDDriver, 'destroy_volume') ) as (mock_rm, mock_destroy): # Ensure that the method does nothing when no location is provided image.cleanup_direct_snapshot(None) self.assertFalse(mock_rm.called) # Ensure that destroy_volume is not called image.cleanup_direct_snapshot(dict(url=test_snap)) mock_rm.assert_called_once_with(image.rbd_name, 'snap', force=True, ignore_errors=False, pool=image.driver.pool) self.assertFalse(mock_destroy.called) def test_cleanup_direct_snapshot_destroy_volume(self): image = self.image_class(self.INSTANCE, self.NAME) test_snap = 'rbd://%s/%s/%s/snap' % (self.FSID, image.driver.pool, image.rbd_name) with test.nested( mock.patch.object(rbd_utils.RBDDriver, 'remove_snap'), mock.patch.object(rbd_utils.RBDDriver, 'destroy_volume') ) as (mock_rm, mock_destroy): # Ensure that destroy_volume is called image.cleanup_direct_snapshot(dict(url=test_snap), also_destroy_volume=True) mock_rm.assert_called_once_with(image.rbd_name, 'snap', force=True, ignore_errors=False, pool=image.driver.pool) mock_destroy.assert_called_once_with(image.rbd_name, pool=image.driver.pool) @mock.patch('nova.virt.libvirt.imagebackend.IMAGE_API') def test_copy_to_store(self, mock_imgapi): # Test copy_to_store() happy path where we ask for the image # to be copied, it goes into progress and then completes. self.flags(images_rbd_glance_copy_poll_interval=0, group='libvirt') self.flags(images_rbd_glance_store_name='store', group='libvirt') image = self.image_class(self.INSTANCE, self.NAME) mock_imgapi.get.side_effect = [ # Simulate a race between starting the copy and the first poll {'stores': []}, # Second poll shows it in progress {'os_glance_importing_to_stores': ['store'], 'stores': []}, # Third poll shows it has also been copied to a non-local store {'os_glance_importing_to_stores': ['store'], 'stores': ['other']}, # Should-be-last poll shows it complete {'os_glance_importing_to_stores': [], 'stores': ['other', 'store']}, ] image.copy_to_store(self.CONTEXT, {'id': 'foo'}) mock_imgapi.copy_image_to_store.assert_called_once_with( self.CONTEXT, 'foo', 'store') self.assertEqual(4, mock_imgapi.get.call_count) @mock.patch('nova.virt.libvirt.imagebackend.IMAGE_API') def test_copy_to_store_race_with_existing(self, mock_imgapi): # Test copy_to_store() where we race to ask Glance to do the # copy with another node. One of us will get a BadRequest, which # should not cause us to fail. If our desired store is now # in progress, continue to wait like we would have if we had # won the race. self.flags(images_rbd_glance_copy_poll_interval=0, group='libvirt') self.flags(images_rbd_glance_store_name='store', group='libvirt') image = self.image_class(self.INSTANCE, self.NAME) mock_imgapi.copy_image_to_store.side_effect = ( exception.ImageBadRequest(image_id='foo', response='already in progress')) # Make the first poll indicate that the image has already # been copied mock_imgapi.get.return_value = {'stores': ['store', 'other']} # Despite the (expected) exception from the copy, we should # not raise here if the subsequent poll works. image.copy_to_store(self.CONTEXT, {'id': 'foo'}) mock_imgapi.get.assert_called_once_with(self.CONTEXT, 'foo', include_locations=True) mock_imgapi.copy_image_to_store.assert_called_once_with( self.CONTEXT, 'foo', 'store') @mock.patch('nova.virt.libvirt.imagebackend.IMAGE_API') def test_copy_to_store_import_impossible(self, mock_imgapi): # Test copy_to_store() where Glance tells us that the image # is not copy-able for some reason (like it is not active yet # or some other workflow reason). image = self.image_class(self.INSTANCE, self.NAME) mock_imgapi.copy_image_to_store.side_effect = ( exception.ImageImportImpossible(image_id='foo', reason='because tests')) self.assertRaises(exception.ImageUnacceptable, image.copy_to_store, self.CONTEXT, {'id': 'foo'}) @mock.patch('nova.virt.libvirt.imagebackend.IMAGE_API') def test_copy_to_store_import_failed_other_reason(self, mock_imgapi): # Test copy_to_store() where some unexpected failure gets raised. # We should bubble that up so it gets all the way back to the caller # of the clone() itself, which can handle it independent of one of # the image-specific exceptions. image = self.image_class(self.INSTANCE, self.NAME) mock_imgapi.copy_image_to_store.side_effect = test.TestingException # Make sure any other exception makes it through, as those are already # expected failures by the callers of the imagebackend code. self.assertRaises(test.TestingException, image.copy_to_store, self.CONTEXT, {'id': 'foo'}) @mock.patch('nova.virt.libvirt.imagebackend.IMAGE_API') def test_copy_to_store_import_failed_in_progress(self, mock_imgapi): # Test copy_to_store() in the situation where we ask for the copy, # things start to look good (in progress) and later get reported # as failed. self.flags(images_rbd_glance_copy_poll_interval=0, group='libvirt') self.flags(images_rbd_glance_store_name='store', group='libvirt') image = self.image_class(self.INSTANCE, self.NAME) mock_imgapi.get.side_effect = [ # First poll shows it in progress {'os_glance_importing_to_stores': ['store'], 'stores': []}, # Second poll shows it failed {'os_glance_failed_import': ['store'], 'stores': []}, ] exc = self.assertRaises(exception.ImageUnacceptable, image.copy_to_store, self.CONTEXT, {'id': 'foo'}) self.assertIn('unsuccessful because', str(exc)) @mock.patch.object(loopingcall.FixedIntervalWithTimeoutLoopingCall, 'start') @mock.patch('nova.virt.libvirt.imagebackend.IMAGE_API') def test_copy_to_store_import_failed_timeout(self, mock_imgapi, mock_timer_start): # Test copy_to_store() simulating the case where we timeout waiting # for Glance to do the copy. self.flags(images_rbd_glance_store_name='store', group='libvirt') image = self.image_class(self.INSTANCE, self.NAME) mock_timer_start.side_effect = loopingcall.LoopingCallTimeOut() exc = self.assertRaises(exception.ImageUnacceptable, image.copy_to_store, self.CONTEXT, {'id': 'foo'}) self.assertIn('timed out', str(exc)) mock_imgapi.copy_image_to_store.assert_called_once_with( self.CONTEXT, 'foo', 'store') @mock.patch('nova.storage.rbd_utils.RBDDriver') @mock.patch('nova.virt.libvirt.imagebackend.IMAGE_API') def test_clone_copy_to_store(self, mock_imgapi, mock_driver_): # Call image.clone() in a way that will cause it to fall through # the locations check to the copy-to-store behavior, and assert # that after the copy, we recurse (without becoming infinite) and # do the check again. self.flags(images_rbd_glance_store_name='store', group='libvirt') fake_image = { 'id': 'foo', 'disk_format': 'raw', 'locations': ['fake'], } mock_imgapi.get.return_value = fake_image mock_driver = mock_driver_.return_value mock_driver.is_cloneable.side_effect = [False, True] image = self.image_class(self.INSTANCE, self.NAME) with mock.patch.object(image, 'copy_to_store') as mock_copy: image.clone(self.CONTEXT, 'foo') mock_copy.assert_called_once_with(self.CONTEXT, fake_image) mock_driver.is_cloneable.assert_has_calls([ # First call is the initial check mock.call('fake', fake_image), # Second call with the same location must be because we # recursed after the copy-to-store operation mock.call('fake', fake_image)]) @mock.patch('nova.storage.rbd_utils.RBDDriver') @mock.patch('nova.virt.libvirt.imagebackend.IMAGE_API') def test_clone_copy_to_store_failed(self, mock_imgapi, mock_driver_): # Call image.clone() in a way that will cause it to fall through # the locations check to the copy-to-store behavior, but simulate # some situation where we didn't actually copy the image and the # recursed check does not succeed. Assert that we do not copy again, # nor recurse again, and raise the expected error. self.flags(images_rbd_glance_store_name='store', group='libvirt') fake_image = { 'id': 'foo', 'disk_format': 'raw', 'locations': ['fake'], } mock_imgapi.get.return_value = fake_image mock_driver = mock_driver_.return_value mock_driver.is_cloneable.side_effect = [False, False] image = self.image_class(self.INSTANCE, self.NAME) with mock.patch.object(image, 'copy_to_store') as mock_copy: self.assertRaises(exception.ImageUnacceptable, image.clone, self.CONTEXT, 'foo') mock_copy.assert_called_once_with(self.CONTEXT, fake_image) mock_driver.is_cloneable.assert_has_calls([ # First call is the initial check mock.call('fake', fake_image), # Second call with the same location must be because we # recursed after the copy-to-store operation mock.call('fake', fake_image)]) @mock.patch('nova.storage.rbd_utils.RBDDriver') @mock.patch('nova.virt.libvirt.imagebackend.IMAGE_API') def test_clone_without_needed_copy(self, mock_imgapi, mock_driver_): # Call image.clone() in a way that will cause it to pass the locations # check the first time. Assert that we do not call copy-to-store # nor recurse. self.flags(images_rbd_glance_store_name='store', group='libvirt') fake_image = { 'id': 'foo', 'disk_format': 'raw', 'locations': ['fake'], } mock_imgapi.get.return_value = fake_image mock_driver = mock_driver_.return_value mock_driver.is_cloneable.return_value = True image = self.image_class(self.INSTANCE, self.NAME) with mock.patch.object(image, 'copy_to_store') as mock_copy: image.clone(self.CONTEXT, 'foo') mock_copy.assert_not_called() mock_driver.is_cloneable.assert_called_once_with('fake', fake_image) @mock.patch('nova.storage.rbd_utils.RBDDriver') @mock.patch('nova.virt.libvirt.imagebackend.IMAGE_API') def test_clone_copy_not_configured(self, mock_imgapi, mock_driver_): # Call image.clone() in a way that will cause it to fail the locations # check the first time. Assert that if the store name is not configured # we do not try to copy-to-store and just raise the original exception # indicating that the image is not reachable. fake_image = { 'id': 'foo', 'disk_format': 'raw', 'locations': ['fake'], } mock_imgapi.get.return_value = fake_image mock_driver = mock_driver_.return_value mock_driver.is_cloneable.return_value = False image = self.image_class(self.INSTANCE, self.NAME) with mock.patch.object(image, 'copy_to_store') as mock_copy: self.assertRaises(exception.ImageUnacceptable, image.clone, self.CONTEXT, 'foo') mock_copy.assert_not_called() mock_driver.is_cloneable.assert_called_once_with('fake', fake_image) class PloopTestCase(_ImageTestCase, test.NoDBTestCase): SIZE = 1024 def setUp(self): self.image_class = imagebackend.Ploop super(PloopTestCase, self).setUp() self.utils = imagebackend.utils @mock.patch.object(imagebackend.fileutils, 'ensure_tree') @mock.patch.object(os.path, 'exists') def test_cache(self, mock_exists, mock_ensure): mock_exists.side_effect = [False, False, False] exist_calls = [mock.call(self.TEMPLATE_DIR), mock.call(self.PATH), mock.call(self.TEMPLATE_PATH)] fn = mock.MagicMock() image = self.image_class(self.INSTANCE, self.NAME) self.mock_create_image(image) image.cache(fn, self.TEMPLATE) mock_ensure.assert_called_once_with(self.TEMPLATE_DIR) mock_exists.assert_has_calls(exist_calls) fn.assert_called_once_with(target=self.TEMPLATE_PATH) @mock.patch.object(imagebackend.Ploop, 'get_disk_size', return_value=2048) @mock.patch.object(imagebackend.utils, 'synchronized') @mock.patch('nova.virt.libvirt.utils.copy_image') @mock.patch('nova.privsep.libvirt.ploop_restore_descriptor') @mock.patch.object(imagebackend.disk, 'extend') def test_create_image(self, mock_extend, mock_ploop_restore_descriptor, mock_copy, mock_sync, mock_get): mock_sync.side_effect = lambda *a, **kw: self._fake_deco fn = mock.MagicMock() img_path = os.path.join(self.PATH, "root.hds") image = self.image_class(self.INSTANCE, self.NAME) image.create_image(fn, self.TEMPLATE_PATH, 2048, image_id=None) mock_copy.assert_called_once_with(self.TEMPLATE_PATH, img_path) mock_ploop_restore_descriptor.assert_called_once_with(self.PATH, img_path, "raw") self.assertTrue(mock_sync.called) fn.assert_called_once_with(target=self.TEMPLATE_PATH, image_id=None) mock_extend.assert_called_once_with( imgmodel.LocalFileImage(self.PATH, imgmodel.FORMAT_PLOOP), 2048) def test_create_image_generated(self): fn = mock.Mock() image = self.image_class(self.INSTANCE, self.NAME) image.create_image(fn, self.TEMPLATE_PATH, 2048, ephemeral_size=2) fn.assert_called_with(target=self.PATH, ephemeral_size=2) def test_prealloc_image(self): self.flags(preallocate_images='space') fake_processutils.fake_execute_clear_log() fake_processutils.stub_out_processutils_execute(self) image = self.image_class(self.INSTANCE, self.NAME) def fake_fetch(target, *args, **kwargs): return self.stub_out('os.path.exists', lambda _: True) self.stub_out('nova.virt.libvirt.imagebackend.Ploop.exists', lambda *a, **kw: True) self.stub_out('nova.virt.libvirt.imagebackend.Ploop.get_disk_size', lambda *a, **kw: self.SIZE) image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE) class BackendTestCase(test.NoDBTestCase): INSTANCE = objects.Instance(id=1, uuid=uuidutils.generate_uuid()) NAME = 'fake-name.suffix' def setUp(self): super(BackendTestCase, self).setUp() self.flags(enabled=False, group='ephemeral_storage_encryption') self.flags(instances_path=self.useFixture(fixtures.TempDir()).path) self.INSTANCE['ephemeral_key_uuid'] = None def get_image(self, use_cow, image_type): return imagebackend.Backend(use_cow).by_name(self.INSTANCE, self.NAME, image_type) def _test_image(self, image_type, image_not_cow, image_cow): image1 = self.get_image(False, image_type) image2 = self.get_image(True, image_type) def assertIsInstance(instance, class_object): failure = ('Expected %s,' + ' but got %s.') % (class_object.__name__, instance.__class__.__name__) self.assertIsInstance(instance, class_object, msg=failure) assertIsInstance(image1, image_not_cow) assertIsInstance(image2, image_cow) def test_image_flat(self): self._test_image('raw', imagebackend.Flat, imagebackend.Flat) def test_image_flat_preallocate_images(self): self.flags(preallocate_images='space') raw = imagebackend.Flat(self.INSTANCE, 'fake_disk', '/tmp/xyz') self.assertTrue(raw.preallocate) def test_image_flat_native_io(self): self.flags(preallocate_images="space") raw = imagebackend.Flat(self.INSTANCE, 'fake_disk', '/tmp/xyz') self.assertEqual(raw.driver_io, "native") def test_image_qcow2(self): self._test_image('qcow2', imagebackend.Qcow2, imagebackend.Qcow2) def test_image_qcow2_preallocate_images(self): self.flags(preallocate_images='space') qcow = imagebackend.Qcow2(self.INSTANCE, 'fake_disk', '/tmp/xyz') self.assertTrue(qcow.preallocate) def test_image_qcow2_native_io(self): self.flags(preallocate_images="space") qcow = imagebackend.Qcow2(self.INSTANCE, 'fake_disk', '/tmp/xyz') self.assertEqual(qcow.driver_io, "native") def test_image_lvm_native_io(self): def _test_native_io(is_sparse, driver_io): self.flags(images_volume_group='FakeVG', group='libvirt') self.flags(sparse_logical_volumes=is_sparse, group='libvirt') lvm = imagebackend.Lvm(self.INSTANCE, 'fake_disk') self.assertEqual(lvm.driver_io, driver_io) _test_native_io(is_sparse=False, driver_io="native") _test_native_io(is_sparse=True, driver_io=None) def test_image_lvm(self): self.flags(images_volume_group='FakeVG', group='libvirt') self._test_image('lvm', imagebackend.Lvm, imagebackend.Lvm) @mock.patch.object(rbd_utils, 'rbd') @mock.patch.object(rbd_utils, 'rados') def test_image_rbd(self, mock_rados, mock_rbd): conf = "FakeConf" pool = "FakePool" self.flags(images_rbd_pool=pool, group='libvirt') self.flags(images_rbd_ceph_conf=conf, group='libvirt') self._test_image('rbd', imagebackend.Rbd, imagebackend.Rbd) def test_image_default(self): self._test_image('default', imagebackend.Flat, imagebackend.Qcow2) class UtimeWorkaroundTestCase(test.NoDBTestCase): ERROR_STUB = "sentinel.path: [Errno 13] Permission Denied" def setUp(self): super(UtimeWorkaroundTestCase, self).setUp() self.mock_utime = self.useFixture( fixtures.MockPatch('nova.privsep.path.utime')).mock def test_update_utime_no_error(self): # If utime doesn't raise an error we shouldn't raise or log anything imagebackend._update_utime_ignore_eacces(mock.sentinel.path) self.mock_utime.assert_called_once_with(mock.sentinel.path) self.assertNotIn(self.ERROR_STUB, self.stdlog.logger.output) def test_update_utime_eacces(self): # If utime raises EACCES we should log the error, but ignore it e = OSError() e.errno = errno.EACCES e.strerror = "Permission Denied" self.mock_utime.side_effect = e imagebackend._update_utime_ignore_eacces(mock.sentinel.path) self.mock_utime.assert_called_once_with(mock.sentinel.path) self.assertIn(self.ERROR_STUB, self.stdlog.logger.output) def test_update_utime_eio(self): # If utime raises any other error we should raise it e = OSError() e.errno = errno.EIO e.strerror = "IO Error" self.mock_utime.side_effect = e ex = self.assertRaises( OSError, imagebackend._update_utime_ignore_eacces, mock.sentinel.path) self.assertIs(ex, e) self.mock_utime.assert_called_once_with(mock.sentinel.path) self.assertNotIn(self.ERROR_STUB, self.stdlog.logger.output)