1586 lines
64 KiB
Python
1586 lines
64 KiB
Python
# Copyright 2012 Grid Dynamics
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import base64
|
|
import inspect
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
|
|
import fixtures
|
|
import mock
|
|
from oslo_concurrency import lockutils
|
|
from oslo_config import cfg
|
|
from oslo_config import fixture as config_fixture
|
|
from oslo_utils import units
|
|
from oslo_utils import uuidutils
|
|
|
|
from nova import context
|
|
from nova import exception
|
|
from nova import keymgr
|
|
from nova import objects
|
|
from nova.openstack.common import imageutils
|
|
from nova import test
|
|
from nova.tests.unit import fake_processutils
|
|
from nova.tests.unit.virt.libvirt import fake_libvirt_utils
|
|
from nova.virt.image import model as imgmodel
|
|
from nova.virt import images
|
|
from nova.virt.libvirt import config as vconfig
|
|
from nova.virt.libvirt import imagebackend
|
|
from nova.virt.libvirt.storage import rbd_utils
|
|
|
|
CONF = cfg.CONF
|
|
CONF.import_opt('fixed_key', 'nova.keymgr.conf_key_mgr', group='keymgr')
|
|
|
|
|
|
class FakeSecret(object):
|
|
|
|
def value(self):
|
|
return base64.b64decode("MTIzNDU2Cg==")
|
|
|
|
|
|
class FakeConn(object):
|
|
|
|
def secretLookupByUUIDString(self, uuid):
|
|
return FakeSecret()
|
|
|
|
|
|
class _ImageTestCase(object):
|
|
|
|
def mock_create_image(self, image):
|
|
def create_image(fn, base, size, *args, **kwargs):
|
|
fn(target=base, *args, **kwargs)
|
|
image.create_image = create_image
|
|
|
|
def setUp(self):
|
|
super(_ImageTestCase, self).setUp()
|
|
self.fixture = self.useFixture(config_fixture.Config(lockutils.CONF))
|
|
self.INSTANCES_PATH = tempfile.mkdtemp(suffix='instances')
|
|
self.fixture.config(disable_process_locking=True,
|
|
group='oslo_concurrency')
|
|
self.flags(instances_path=self.INSTANCES_PATH)
|
|
self.INSTANCE = objects.Instance(id=1, uuid=uuidutils.generate_uuid())
|
|
self.DISK_INFO_PATH = os.path.join(self.INSTANCES_PATH,
|
|
self.INSTANCE['uuid'], 'disk.info')
|
|
self.NAME = 'fake.vm'
|
|
self.TEMPLATE = 'template'
|
|
self.CONTEXT = context.get_admin_context()
|
|
|
|
self.OLD_STYLE_INSTANCE_PATH = \
|
|
fake_libvirt_utils.get_instance_path(self.INSTANCE, forceold=True)
|
|
self.PATH = os.path.join(
|
|
fake_libvirt_utils.get_instance_path(self.INSTANCE), self.NAME)
|
|
|
|
# TODO(mikal): rename template_dir to base_dir and template_path
|
|
# to cached_image_path. This will be less confusing.
|
|
self.TEMPLATE_DIR = os.path.join(CONF.instances_path, '_base')
|
|
self.TEMPLATE_PATH = os.path.join(self.TEMPLATE_DIR, 'template')
|
|
|
|
self.useFixture(fixtures.MonkeyPatch(
|
|
'nova.virt.libvirt.imagebackend.libvirt_utils',
|
|
fake_libvirt_utils))
|
|
|
|
def tearDown(self):
|
|
super(_ImageTestCase, self).tearDown()
|
|
shutil.rmtree(self.INSTANCES_PATH)
|
|
|
|
def test_prealloc_image(self):
|
|
CONF.set_override('preallocate_images', 'space')
|
|
|
|
fake_processutils.fake_execute_clear_log()
|
|
fake_processutils.stub_out_processutils_execute(self.stubs)
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
|
|
def fake_fetch(target, *args, **kwargs):
|
|
return
|
|
|
|
self.stubs.Set(image, 'get_disk_size', lambda _: self.SIZE)
|
|
self.stubs.Set(os.path, 'exists', lambda _: True)
|
|
self.stubs.Set(os, 'access', lambda p, w: True)
|
|
|
|
# Call twice to verify testing fallocate is only called once.
|
|
image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE)
|
|
image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE)
|
|
|
|
self.assertEqual(fake_processutils.fake_execute_get_log(),
|
|
['fallocate -n -l 1 %s.fallocate_test' % self.PATH,
|
|
'fallocate -n -l %s %s' % (self.SIZE, self.PATH),
|
|
'fallocate -n -l %s %s' % (self.SIZE, self.PATH)])
|
|
|
|
def test_prealloc_image_without_write_access(self):
|
|
CONF.set_override('preallocate_images', 'space')
|
|
|
|
fake_processutils.fake_execute_clear_log()
|
|
fake_processutils.stub_out_processutils_execute(self.stubs)
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
|
|
def fake_fetch(target, *args, **kwargs):
|
|
return
|
|
|
|
self.stubs.Set(image, 'check_image_exists', lambda: True)
|
|
self.stubs.Set(image, '_can_fallocate', lambda: True)
|
|
self.stubs.Set(image, 'get_disk_size', lambda _: self.SIZE)
|
|
self.stubs.Set(os.path, 'exists', lambda _: True)
|
|
self.stubs.Set(os, 'access', lambda p, w: False)
|
|
|
|
# Testing fallocate is only called when user has write access.
|
|
image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE)
|
|
|
|
self.assertEqual(fake_processutils.fake_execute_get_log(), [])
|
|
|
|
def test_libvirt_fs_info(self):
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
fs = image.libvirt_fs_info("/mnt")
|
|
# check that exception hasn't been raised and the method
|
|
# returned correct object
|
|
self.assertIsInstance(fs, vconfig.LibvirtConfigGuestFilesys)
|
|
self.assertEqual(fs.target_dir, "/mnt")
|
|
if image.is_block_dev:
|
|
self.assertEqual(fs.source_type, "block")
|
|
self.assertEqual(fs.source_dev, image.path)
|
|
else:
|
|
self.assertEqual(fs.source_type, "file")
|
|
self.assertEqual(fs.source_file, image.path)
|
|
|
|
def test_libvirt_info(self):
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
extra_specs = {
|
|
'quota:disk_read_bytes_sec': 10 * units.Mi,
|
|
'quota:disk_read_iops_sec': 1 * units.Ki,
|
|
'quota:disk_write_bytes_sec': 20 * units.Mi,
|
|
'quota:disk_write_iops_sec': 2 * units.Ki,
|
|
'quota:disk_total_bytes_sec': 30 * units.Mi,
|
|
'quota:disk_total_iops_sec': 3 * units.Ki,
|
|
}
|
|
|
|
disk = image.libvirt_info(disk_bus="virtio",
|
|
disk_dev="/dev/vda",
|
|
device_type="cdrom",
|
|
cache_mode="none",
|
|
extra_specs=extra_specs,
|
|
hypervisor_version=4004001)
|
|
|
|
self.assertIsInstance(disk, vconfig.LibvirtConfigGuestDisk)
|
|
self.assertEqual("/dev/vda", disk.target_dev)
|
|
self.assertEqual("virtio", disk.target_bus)
|
|
self.assertEqual("none", disk.driver_cache)
|
|
self.assertEqual("cdrom", disk.source_device)
|
|
|
|
self.assertEqual(10 * units.Mi, disk.disk_read_bytes_sec)
|
|
self.assertEqual(1 * units.Ki, disk.disk_read_iops_sec)
|
|
self.assertEqual(20 * units.Mi, disk.disk_write_bytes_sec)
|
|
self.assertEqual(2 * units.Ki, disk.disk_write_iops_sec)
|
|
self.assertEqual(30 * units.Mi, disk.disk_total_bytes_sec)
|
|
self.assertEqual(3 * units.Ki, disk.disk_total_iops_sec)
|
|
|
|
@mock.patch('nova.virt.disk.api.get_disk_size')
|
|
def test_get_disk_size(self, get_disk_size):
|
|
get_disk_size.return_value = 2361393152
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
self.assertEqual(2361393152, image.get_disk_size(image.path))
|
|
get_disk_size.assert_called_once_with(image.path)
|
|
|
|
|
|
class RawTestCase(_ImageTestCase, test.NoDBTestCase):
|
|
|
|
SIZE = 1024
|
|
|
|
def setUp(self):
|
|
self.image_class = imagebackend.Raw
|
|
super(RawTestCase, self).setUp()
|
|
self.stubs.Set(imagebackend.Raw, 'correct_format', lambda _: None)
|
|
|
|
def prepare_mocks(self):
|
|
fn = self.mox.CreateMockAnything()
|
|
self.mox.StubOutWithMock(imagebackend.utils.synchronized,
|
|
'__call__')
|
|
self.mox.StubOutWithMock(imagebackend.libvirt_utils, 'copy_image')
|
|
self.mox.StubOutWithMock(imagebackend.disk, 'extend')
|
|
return fn
|
|
|
|
def test_cache(self):
|
|
self.mox.StubOutWithMock(os.path, 'exists')
|
|
if self.OLD_STYLE_INSTANCE_PATH:
|
|
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
|
|
os.path.exists(self.TEMPLATE_DIR).AndReturn(False)
|
|
os.path.exists(self.PATH).AndReturn(False)
|
|
os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
|
|
fn = self.mox.CreateMockAnything()
|
|
fn(target=self.TEMPLATE_PATH)
|
|
self.mox.StubOutWithMock(imagebackend.fileutils, 'ensure_tree')
|
|
imagebackend.fileutils.ensure_tree(self.TEMPLATE_DIR)
|
|
self.mox.ReplayAll()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
self.mock_create_image(image)
|
|
image.cache(fn, self.TEMPLATE)
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
def test_cache_image_exists(self):
|
|
self.mox.StubOutWithMock(os.path, 'exists')
|
|
if self.OLD_STYLE_INSTANCE_PATH:
|
|
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
|
|
os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
|
|
os.path.exists(self.PATH).AndReturn(True)
|
|
os.path.exists(self.TEMPLATE_PATH).AndReturn(True)
|
|
self.mox.ReplayAll()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
image.cache(None, self.TEMPLATE)
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
def test_cache_base_dir_exists(self):
|
|
self.mox.StubOutWithMock(os.path, 'exists')
|
|
if self.OLD_STYLE_INSTANCE_PATH:
|
|
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
|
|
os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
|
|
os.path.exists(self.PATH).AndReturn(False)
|
|
os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
|
|
fn = self.mox.CreateMockAnything()
|
|
fn(target=self.TEMPLATE_PATH)
|
|
self.mox.StubOutWithMock(imagebackend.fileutils, 'ensure_tree')
|
|
self.mox.ReplayAll()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
self.mock_create_image(image)
|
|
image.cache(fn, self.TEMPLATE)
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
def test_cache_template_exists(self):
|
|
self.mox.StubOutWithMock(os.path, 'exists')
|
|
if self.OLD_STYLE_INSTANCE_PATH:
|
|
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
|
|
os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
|
|
os.path.exists(self.PATH).AndReturn(False)
|
|
os.path.exists(self.TEMPLATE_PATH).AndReturn(True)
|
|
self.mox.ReplayAll()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
self.mock_create_image(image)
|
|
image.cache(None, self.TEMPLATE)
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
def test_create_image(self):
|
|
fn = self.prepare_mocks()
|
|
fn(target=self.TEMPLATE_PATH, max_size=None, image_id=None)
|
|
imagebackend.libvirt_utils.copy_image(self.TEMPLATE_PATH, self.PATH)
|
|
self.mox.ReplayAll()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
image.create_image(fn, self.TEMPLATE_PATH, None, image_id=None)
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
def test_create_image_generated(self):
|
|
fn = self.prepare_mocks()
|
|
fn(target=self.PATH)
|
|
self.mox.ReplayAll()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
image.create_image(fn, self.TEMPLATE_PATH, None)
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
@mock.patch.object(images, 'qemu_img_info',
|
|
return_value=imageutils.QemuImgInfo())
|
|
def test_create_image_extend(self, fake_qemu_img_info):
|
|
fn = self.prepare_mocks()
|
|
fn(max_size=self.SIZE, target=self.TEMPLATE_PATH, image_id=None)
|
|
imagebackend.libvirt_utils.copy_image(self.TEMPLATE_PATH, self.PATH)
|
|
image = imgmodel.LocalFileImage(self.PATH, imgmodel.FORMAT_RAW)
|
|
imagebackend.disk.extend(image, self.SIZE)
|
|
self.mox.ReplayAll()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
image.create_image(fn, self.TEMPLATE_PATH, self.SIZE, image_id=None)
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
def test_correct_format(self):
|
|
self.stubs.UnsetAll()
|
|
|
|
self.mox.StubOutWithMock(os.path, 'exists')
|
|
self.mox.StubOutWithMock(imagebackend.images, 'qemu_img_info')
|
|
|
|
os.path.exists(self.PATH).AndReturn(True)
|
|
os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
|
|
info = self.mox.CreateMockAnything()
|
|
info.file_format = 'foo'
|
|
imagebackend.images.qemu_img_info(self.PATH).AndReturn(info)
|
|
os.path.exists(CONF.instances_path).AndReturn(True)
|
|
self.mox.ReplayAll()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME, path=self.PATH)
|
|
self.assertEqual(image.driver_format, 'foo')
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
@mock.patch.object(images, 'qemu_img_info',
|
|
side_effect=exception.InvalidDiskInfo(
|
|
reason='invalid path'))
|
|
def test_resolve_driver_format(self, fake_qemu_img_info):
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
driver_format = image.resolve_driver_format()
|
|
self.assertEqual(driver_format, 'raw')
|
|
|
|
def test_get_model(self):
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
model = image.get_model(FakeConn())
|
|
self.assertEqual(imgmodel.LocalFileImage(self.PATH,
|
|
imgmodel.FORMAT_RAW),
|
|
model)
|
|
|
|
|
|
class Qcow2TestCase(_ImageTestCase, test.NoDBTestCase):
|
|
SIZE = units.Gi
|
|
|
|
def setUp(self):
|
|
self.image_class = imagebackend.Qcow2
|
|
super(Qcow2TestCase, self).setUp()
|
|
self.QCOW2_BASE = (self.TEMPLATE_PATH +
|
|
'_%d' % (self.SIZE / units.Gi))
|
|
|
|
def prepare_mocks(self):
|
|
fn = self.mox.CreateMockAnything()
|
|
self.mox.StubOutWithMock(imagebackend.utils.synchronized,
|
|
'__call__')
|
|
self.mox.StubOutWithMock(imagebackend.libvirt_utils,
|
|
'create_cow_image')
|
|
self.mox.StubOutWithMock(imagebackend.libvirt_utils, 'copy_image')
|
|
self.mox.StubOutWithMock(imagebackend.disk, 'extend')
|
|
return fn
|
|
|
|
def test_cache(self):
|
|
self.mox.StubOutWithMock(os.path, 'exists')
|
|
if self.OLD_STYLE_INSTANCE_PATH:
|
|
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
|
|
os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
|
|
os.path.exists(CONF.instances_path).AndReturn(True)
|
|
os.path.exists(self.TEMPLATE_DIR).AndReturn(False)
|
|
os.path.exists(self.INSTANCES_PATH).AndReturn(True)
|
|
os.path.exists(self.PATH).AndReturn(False)
|
|
os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
|
|
fn = self.mox.CreateMockAnything()
|
|
fn(target=self.TEMPLATE_PATH)
|
|
self.mox.ReplayAll()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
self.mock_create_image(image)
|
|
image.cache(fn, self.TEMPLATE)
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
def test_cache_image_exists(self):
|
|
self.mox.StubOutWithMock(os.path, 'exists')
|
|
if self.OLD_STYLE_INSTANCE_PATH:
|
|
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
|
|
os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
|
|
os.path.exists(self.INSTANCES_PATH).AndReturn(True)
|
|
os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
|
|
os.path.exists(self.PATH).AndReturn(True)
|
|
os.path.exists(self.TEMPLATE_PATH).AndReturn(True)
|
|
self.mox.ReplayAll()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
image.cache(None, self.TEMPLATE)
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
def test_cache_base_dir_exists(self):
|
|
self.mox.StubOutWithMock(os.path, 'exists')
|
|
if self.OLD_STYLE_INSTANCE_PATH:
|
|
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
|
|
os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
|
|
os.path.exists(self.INSTANCES_PATH).AndReturn(True)
|
|
os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
|
|
os.path.exists(self.PATH).AndReturn(False)
|
|
os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
|
|
fn = self.mox.CreateMockAnything()
|
|
fn(target=self.TEMPLATE_PATH)
|
|
self.mox.ReplayAll()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
self.mock_create_image(image)
|
|
image.cache(fn, self.TEMPLATE)
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
def test_cache_template_exists(self):
|
|
self.mox.StubOutWithMock(os.path, 'exists')
|
|
if self.OLD_STYLE_INSTANCE_PATH:
|
|
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
|
|
os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
|
|
os.path.exists(self.INSTANCES_PATH).AndReturn(True)
|
|
os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
|
|
os.path.exists(self.PATH).AndReturn(False)
|
|
os.path.exists(self.TEMPLATE_PATH).AndReturn(True)
|
|
self.mox.ReplayAll()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
self.mock_create_image(image)
|
|
image.cache(None, self.TEMPLATE)
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
def test_create_image(self):
|
|
fn = self.prepare_mocks()
|
|
fn(max_size=None, target=self.TEMPLATE_PATH)
|
|
imagebackend.libvirt_utils.create_cow_image(self.TEMPLATE_PATH,
|
|
self.PATH)
|
|
self.mox.ReplayAll()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
image.create_image(fn, self.TEMPLATE_PATH, None)
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
def test_create_image_with_size(self):
|
|
fn = self.prepare_mocks()
|
|
fn(max_size=self.SIZE, target=self.TEMPLATE_PATH)
|
|
self.mox.StubOutWithMock(os.path, 'exists')
|
|
self.mox.StubOutWithMock(imagebackend.Image,
|
|
'verify_base_size')
|
|
if self.OLD_STYLE_INSTANCE_PATH:
|
|
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
|
|
os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
|
|
os.path.exists(self.INSTANCES_PATH).AndReturn(True)
|
|
os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
|
|
os.path.exists(self.PATH).AndReturn(False)
|
|
os.path.exists(self.PATH).AndReturn(False)
|
|
imagebackend.Image.verify_base_size(self.TEMPLATE_PATH, self.SIZE)
|
|
imagebackend.libvirt_utils.create_cow_image(self.TEMPLATE_PATH,
|
|
self.PATH)
|
|
image = imgmodel.LocalFileImage(self.PATH, imgmodel.FORMAT_QCOW2)
|
|
imagebackend.disk.extend(image, self.SIZE)
|
|
self.mox.ReplayAll()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
image.create_image(fn, self.TEMPLATE_PATH, self.SIZE)
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
def test_create_image_too_small(self):
|
|
fn = self.prepare_mocks()
|
|
self.mox.StubOutWithMock(os.path, 'exists')
|
|
self.mox.StubOutWithMock(imagebackend.Qcow2, 'get_disk_size')
|
|
if self.OLD_STYLE_INSTANCE_PATH:
|
|
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
|
|
os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
|
|
os.path.exists(self.INSTANCES_PATH).AndReturn(True)
|
|
os.path.exists(self.TEMPLATE_PATH).AndReturn(True)
|
|
imagebackend.Qcow2.get_disk_size(self.TEMPLATE_PATH
|
|
).AndReturn(self.SIZE)
|
|
self.mox.ReplayAll()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
self.assertRaises(exception.FlavorDiskSmallerThanImage,
|
|
image.create_image, fn, self.TEMPLATE_PATH, 1)
|
|
self.mox.VerifyAll()
|
|
|
|
def test_generate_resized_backing_files(self):
|
|
fn = self.prepare_mocks()
|
|
fn(max_size=self.SIZE, target=self.TEMPLATE_PATH)
|
|
self.mox.StubOutWithMock(os.path, 'exists')
|
|
self.mox.StubOutWithMock(imagebackend.libvirt_utils,
|
|
'get_disk_backing_file')
|
|
self.mox.StubOutWithMock(imagebackend.Image,
|
|
'verify_base_size')
|
|
if self.OLD_STYLE_INSTANCE_PATH:
|
|
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
|
|
os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
|
|
os.path.exists(CONF.instances_path).AndReturn(True)
|
|
os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
|
|
os.path.exists(self.PATH).AndReturn(True)
|
|
|
|
imagebackend.libvirt_utils.get_disk_backing_file(self.PATH)\
|
|
.AndReturn(self.QCOW2_BASE)
|
|
os.path.exists(self.QCOW2_BASE).AndReturn(False)
|
|
imagebackend.Image.verify_base_size(self.TEMPLATE_PATH, self.SIZE)
|
|
imagebackend.libvirt_utils.copy_image(self.TEMPLATE_PATH,
|
|
self.QCOW2_BASE)
|
|
image = imgmodel.LocalFileImage(self.QCOW2_BASE,
|
|
imgmodel.FORMAT_QCOW2)
|
|
imagebackend.disk.extend(image, self.SIZE)
|
|
|
|
os.path.exists(self.PATH).AndReturn(True)
|
|
self.mox.ReplayAll()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
image.create_image(fn, self.TEMPLATE_PATH, self.SIZE)
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
def test_qcow2_exists_and_has_no_backing_file(self):
|
|
fn = self.prepare_mocks()
|
|
fn(max_size=self.SIZE, target=self.TEMPLATE_PATH)
|
|
self.mox.StubOutWithMock(os.path, 'exists')
|
|
self.mox.StubOutWithMock(imagebackend.libvirt_utils,
|
|
'get_disk_backing_file')
|
|
self.mox.StubOutWithMock(imagebackend.Image,
|
|
'verify_base_size')
|
|
if self.OLD_STYLE_INSTANCE_PATH:
|
|
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
|
|
os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
|
|
os.path.exists(self.INSTANCES_PATH).AndReturn(True)
|
|
|
|
os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
|
|
os.path.exists(self.PATH).AndReturn(True)
|
|
|
|
imagebackend.libvirt_utils.get_disk_backing_file(self.PATH)\
|
|
.AndReturn(None)
|
|
imagebackend.Image.verify_base_size(self.TEMPLATE_PATH, self.SIZE)
|
|
os.path.exists(self.PATH).AndReturn(True)
|
|
self.mox.ReplayAll()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
image.create_image(fn, self.TEMPLATE_PATH, self.SIZE)
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
def test_resolve_driver_format(self):
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
driver_format = image.resolve_driver_format()
|
|
self.assertEqual(driver_format, 'qcow2')
|
|
|
|
def test_get_model(self):
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
model = image.get_model(FakeConn())
|
|
self.assertEqual(imgmodel.LocalFileImage(self.PATH,
|
|
imgmodel.FORMAT_QCOW2),
|
|
model)
|
|
|
|
|
|
class LvmTestCase(_ImageTestCase, test.NoDBTestCase):
|
|
VG = 'FakeVG'
|
|
TEMPLATE_SIZE = 512
|
|
SIZE = 1024
|
|
|
|
def setUp(self):
|
|
self.image_class = imagebackend.Lvm
|
|
super(LvmTestCase, self).setUp()
|
|
self.flags(images_volume_group=self.VG, group='libvirt')
|
|
self.flags(enabled=False, group='ephemeral_storage_encryption')
|
|
self.INSTANCE['ephemeral_key_uuid'] = None
|
|
self.LV = '%s_%s' % (self.INSTANCE['uuid'], self.NAME)
|
|
self.OLD_STYLE_INSTANCE_PATH = None
|
|
self.PATH = os.path.join('/dev', self.VG, self.LV)
|
|
self.disk = imagebackend.disk
|
|
self.utils = imagebackend.utils
|
|
self.lvm = imagebackend.lvm
|
|
|
|
def prepare_mocks(self):
|
|
fn = self.mox.CreateMockAnything()
|
|
self.mox.StubOutWithMock(self.disk, 'resize2fs')
|
|
self.mox.StubOutWithMock(self.lvm, 'create_volume')
|
|
self.mox.StubOutWithMock(self.disk, 'get_disk_size')
|
|
self.mox.StubOutWithMock(self.utils, 'execute')
|
|
return fn
|
|
|
|
def _create_image(self, sparse):
|
|
fn = self.prepare_mocks()
|
|
fn(max_size=None, target=self.TEMPLATE_PATH)
|
|
self.lvm.create_volume(self.VG,
|
|
self.LV,
|
|
self.TEMPLATE_SIZE,
|
|
sparse=sparse)
|
|
self.disk.get_disk_size(self.TEMPLATE_PATH
|
|
).AndReturn(self.TEMPLATE_SIZE)
|
|
cmd = ('qemu-img', 'convert', '-O', 'raw', self.TEMPLATE_PATH,
|
|
self.PATH)
|
|
self.utils.execute(*cmd, run_as_root=True)
|
|
self.mox.ReplayAll()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
image.create_image(fn, self.TEMPLATE_PATH, None)
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
def _create_image_generated(self, sparse):
|
|
fn = self.prepare_mocks()
|
|
self.lvm.create_volume(self.VG, self.LV,
|
|
self.SIZE, sparse=sparse)
|
|
fn(target=self.PATH, ephemeral_size=None)
|
|
self.mox.ReplayAll()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
image.create_image(fn, self.TEMPLATE_PATH,
|
|
self.SIZE, ephemeral_size=None)
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
def _create_image_resize(self, sparse):
|
|
fn = self.prepare_mocks()
|
|
fn(max_size=self.SIZE, target=self.TEMPLATE_PATH)
|
|
self.lvm.create_volume(self.VG, self.LV,
|
|
self.SIZE, sparse=sparse)
|
|
self.disk.get_disk_size(self.TEMPLATE_PATH
|
|
).AndReturn(self.TEMPLATE_SIZE)
|
|
cmd = ('qemu-img', 'convert', '-O', 'raw', self.TEMPLATE_PATH,
|
|
self.PATH)
|
|
self.utils.execute(*cmd, run_as_root=True)
|
|
self.disk.resize2fs(self.PATH, run_as_root=True)
|
|
self.mox.ReplayAll()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
image.create_image(fn, self.TEMPLATE_PATH, self.SIZE)
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
def test_cache(self):
|
|
self.mox.StubOutWithMock(os.path, 'exists')
|
|
if self.OLD_STYLE_INSTANCE_PATH:
|
|
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
|
|
os.path.exists(self.TEMPLATE_DIR).AndReturn(False)
|
|
os.path.exists(self.PATH).AndReturn(False)
|
|
os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
|
|
|
|
fn = self.mox.CreateMockAnything()
|
|
fn(target=self.TEMPLATE_PATH)
|
|
self.mox.StubOutWithMock(imagebackend.fileutils, 'ensure_tree')
|
|
imagebackend.fileutils.ensure_tree(self.TEMPLATE_DIR)
|
|
self.mox.ReplayAll()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
self.mock_create_image(image)
|
|
image.cache(fn, self.TEMPLATE)
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
def test_cache_image_exists(self):
|
|
self.mox.StubOutWithMock(os.path, 'exists')
|
|
if self.OLD_STYLE_INSTANCE_PATH:
|
|
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
|
|
os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
|
|
os.path.exists(self.PATH).AndReturn(True)
|
|
os.path.exists(self.TEMPLATE_PATH).AndReturn(True)
|
|
self.mox.ReplayAll()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
image.cache(None, self.TEMPLATE)
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
def test_cache_base_dir_exists(self):
|
|
self.mox.StubOutWithMock(os.path, 'exists')
|
|
if self.OLD_STYLE_INSTANCE_PATH:
|
|
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
|
|
os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
|
|
os.path.exists(self.PATH).AndReturn(False)
|
|
os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
|
|
fn = self.mox.CreateMockAnything()
|
|
fn(target=self.TEMPLATE_PATH)
|
|
self.mox.StubOutWithMock(imagebackend.fileutils, 'ensure_tree')
|
|
self.mox.ReplayAll()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
self.mock_create_image(image)
|
|
image.cache(fn, self.TEMPLATE)
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
def test_create_image(self):
|
|
self._create_image(False)
|
|
|
|
def test_create_image_sparsed(self):
|
|
self.flags(sparse_logical_volumes=True, group='libvirt')
|
|
self._create_image(True)
|
|
|
|
def test_create_image_generated(self):
|
|
self._create_image_generated(False)
|
|
|
|
def test_create_image_generated_sparsed(self):
|
|
self.flags(sparse_logical_volumes=True, group='libvirt')
|
|
self._create_image_generated(True)
|
|
|
|
def test_create_image_resize(self):
|
|
self._create_image_resize(False)
|
|
|
|
def test_create_image_resize_sparsed(self):
|
|
self.flags(sparse_logical_volumes=True, group='libvirt')
|
|
self._create_image_resize(True)
|
|
|
|
def test_create_image_negative(self):
|
|
fn = self.prepare_mocks()
|
|
fn(max_size=self.SIZE, target=self.TEMPLATE_PATH)
|
|
self.lvm.create_volume(self.VG,
|
|
self.LV,
|
|
self.SIZE,
|
|
sparse=False
|
|
).AndRaise(RuntimeError())
|
|
self.disk.get_disk_size(self.TEMPLATE_PATH
|
|
).AndReturn(self.TEMPLATE_SIZE)
|
|
self.mox.StubOutWithMock(self.lvm, 'remove_volumes')
|
|
self.lvm.remove_volumes([self.PATH])
|
|
self.mox.ReplayAll()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
|
|
self.assertRaises(RuntimeError, image.create_image, fn,
|
|
self.TEMPLATE_PATH, self.SIZE)
|
|
self.mox.VerifyAll()
|
|
|
|
def test_create_image_generated_negative(self):
|
|
fn = self.prepare_mocks()
|
|
fn(target=self.PATH,
|
|
ephemeral_size=None).AndRaise(RuntimeError())
|
|
self.lvm.create_volume(self.VG,
|
|
self.LV,
|
|
self.SIZE,
|
|
sparse=False)
|
|
self.mox.StubOutWithMock(self.lvm, 'remove_volumes')
|
|
self.lvm.remove_volumes([self.PATH])
|
|
self.mox.ReplayAll()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
|
|
self.assertRaises(RuntimeError, image.create_image, fn,
|
|
self.TEMPLATE_PATH, self.SIZE,
|
|
ephemeral_size=None)
|
|
self.mox.VerifyAll()
|
|
|
|
def test_prealloc_image(self):
|
|
CONF.set_override('preallocate_images', 'space')
|
|
|
|
fake_processutils.fake_execute_clear_log()
|
|
fake_processutils.stub_out_processutils_execute(self.stubs)
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
|
|
def fake_fetch(target, *args, **kwargs):
|
|
return
|
|
|
|
self.stubs.Set(os.path, 'exists', lambda _: True)
|
|
self.stubs.Set(image, 'check_image_exists', lambda: True)
|
|
self.stubs.Set(image, 'get_disk_size', lambda _: self.SIZE)
|
|
|
|
image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE)
|
|
|
|
self.assertEqual(fake_processutils.fake_execute_get_log(), [])
|
|
|
|
|
|
class EncryptedLvmTestCase(_ImageTestCase, test.NoDBTestCase):
|
|
VG = 'FakeVG'
|
|
TEMPLATE_SIZE = 512
|
|
SIZE = 1024
|
|
|
|
def setUp(self):
|
|
super(EncryptedLvmTestCase, self).setUp()
|
|
self.image_class = imagebackend.Lvm
|
|
self.flags(enabled=True, group='ephemeral_storage_encryption')
|
|
self.flags(cipher='aes-xts-plain64',
|
|
group='ephemeral_storage_encryption')
|
|
self.flags(key_size=512, group='ephemeral_storage_encryption')
|
|
self.flags(fixed_key='00000000000000000000000000000000'
|
|
'00000000000000000000000000000000',
|
|
group='keymgr')
|
|
self.flags(images_volume_group=self.VG, group='libvirt')
|
|
self.LV = '%s_%s' % (self.INSTANCE['uuid'], self.NAME)
|
|
self.OLD_STYLE_INSTANCE_PATH = None
|
|
self.LV_PATH = os.path.join('/dev', self.VG, self.LV)
|
|
self.PATH = os.path.join('/dev/mapper',
|
|
imagebackend.dmcrypt.volume_name(self.LV))
|
|
self.key_manager = keymgr.API()
|
|
self.INSTANCE['ephemeral_key_uuid'] =\
|
|
self.key_manager.create_key(self.CONTEXT)
|
|
self.KEY = self.key_manager.get_key(self.CONTEXT,
|
|
self.INSTANCE['ephemeral_key_uuid']).get_encoded()
|
|
|
|
self.lvm = imagebackend.lvm
|
|
self.disk = imagebackend.disk
|
|
self.utils = imagebackend.utils
|
|
self.libvirt_utils = imagebackend.libvirt_utils
|
|
self.dmcrypt = imagebackend.dmcrypt
|
|
|
|
def _create_image(self, sparse):
|
|
with test.nested(
|
|
mock.patch.object(self.lvm, 'create_volume', mock.Mock()),
|
|
mock.patch.object(self.lvm, 'remove_volumes', mock.Mock()),
|
|
mock.patch.object(self.disk, 'resize2fs', mock.Mock()),
|
|
mock.patch.object(self.disk, 'get_disk_size',
|
|
mock.Mock(return_value=self.TEMPLATE_SIZE)),
|
|
mock.patch.object(self.dmcrypt, 'create_volume', mock.Mock()),
|
|
mock.patch.object(self.dmcrypt, 'delete_volume', mock.Mock()),
|
|
mock.patch.object(self.dmcrypt, 'list_volumes', mock.Mock()),
|
|
mock.patch.object(self.libvirt_utils, 'create_lvm_image',
|
|
mock.Mock()),
|
|
mock.patch.object(self.libvirt_utils, 'remove_logical_volumes',
|
|
mock.Mock()),
|
|
mock.patch.object(self.utils, 'execute', mock.Mock())):
|
|
fn = mock.Mock()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
image.create_image(fn, self.TEMPLATE_PATH, self.TEMPLATE_SIZE,
|
|
context=self.CONTEXT)
|
|
|
|
fn.assert_called_with(context=self.CONTEXT,
|
|
max_size=self.TEMPLATE_SIZE,
|
|
target=self.TEMPLATE_PATH)
|
|
self.lvm.create_volume.assert_called_with(self.VG,
|
|
self.LV,
|
|
self.TEMPLATE_SIZE,
|
|
sparse=sparse)
|
|
self.dmcrypt.create_volume.assert_called_with(
|
|
self.PATH.rpartition('/')[2],
|
|
self.LV_PATH,
|
|
CONF.ephemeral_storage_encryption.cipher,
|
|
CONF.ephemeral_storage_encryption.key_size,
|
|
self.KEY)
|
|
cmd = ('qemu-img',
|
|
'convert',
|
|
'-O',
|
|
'raw',
|
|
self.TEMPLATE_PATH,
|
|
self.PATH)
|
|
self.utils.execute.assert_called_with(*cmd, run_as_root=True)
|
|
|
|
def _create_image_generated(self, sparse):
|
|
with test.nested(
|
|
mock.patch.object(self.lvm, 'create_volume', mock.Mock()),
|
|
mock.patch.object(self.lvm, 'remove_volumes', mock.Mock()),
|
|
mock.patch.object(self.disk, 'resize2fs', mock.Mock()),
|
|
mock.patch.object(self.disk, 'get_disk_size',
|
|
mock.Mock(return_value=self.TEMPLATE_SIZE)),
|
|
mock.patch.object(self.dmcrypt, 'create_volume', mock.Mock()),
|
|
mock.patch.object(self.dmcrypt, 'delete_volume', mock.Mock()),
|
|
mock.patch.object(self.dmcrypt, 'list_volumes', mock.Mock()),
|
|
mock.patch.object(self.libvirt_utils, 'create_lvm_image',
|
|
mock.Mock()),
|
|
mock.patch.object(self.libvirt_utils, 'remove_logical_volumes',
|
|
mock.Mock()),
|
|
mock.patch.object(self.utils, 'execute', mock.Mock())):
|
|
fn = mock.Mock()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
image.create_image(fn, self.TEMPLATE_PATH,
|
|
self.SIZE,
|
|
ephemeral_size=None,
|
|
context=self.CONTEXT)
|
|
|
|
self.lvm.create_volume.assert_called_with(
|
|
self.VG,
|
|
self.LV,
|
|
self.SIZE,
|
|
sparse=sparse)
|
|
self.dmcrypt.create_volume.assert_called_with(
|
|
self.PATH.rpartition('/')[2],
|
|
self.LV_PATH,
|
|
CONF.ephemeral_storage_encryption.cipher,
|
|
CONF.ephemeral_storage_encryption.key_size,
|
|
self.KEY)
|
|
fn.assert_called_with(target=self.PATH,
|
|
ephemeral_size=None, context=self.CONTEXT)
|
|
|
|
def _create_image_resize(self, sparse):
|
|
with test.nested(
|
|
mock.patch.object(self.lvm, 'create_volume', mock.Mock()),
|
|
mock.patch.object(self.lvm, 'remove_volumes', mock.Mock()),
|
|
mock.patch.object(self.disk, 'resize2fs', mock.Mock()),
|
|
mock.patch.object(self.disk, 'get_disk_size',
|
|
mock.Mock(return_value=self.TEMPLATE_SIZE)),
|
|
mock.patch.object(self.dmcrypt, 'create_volume', mock.Mock()),
|
|
mock.patch.object(self.dmcrypt, 'delete_volume', mock.Mock()),
|
|
mock.patch.object(self.dmcrypt, 'list_volumes', mock.Mock()),
|
|
mock.patch.object(self.libvirt_utils, 'create_lvm_image',
|
|
mock.Mock()),
|
|
mock.patch.object(self.libvirt_utils, 'remove_logical_volumes',
|
|
mock.Mock()),
|
|
mock.patch.object(self.utils, 'execute', mock.Mock())):
|
|
fn = mock.Mock()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
image.create_image(fn, self.TEMPLATE_PATH, self.SIZE,
|
|
context=self.CONTEXT)
|
|
|
|
fn.assert_called_with(context=self.CONTEXT, max_size=self.SIZE,
|
|
target=self.TEMPLATE_PATH)
|
|
self.disk.get_disk_size.assert_called_with(self.TEMPLATE_PATH)
|
|
self.lvm.create_volume.assert_called_with(
|
|
self.VG,
|
|
self.LV,
|
|
self.SIZE,
|
|
sparse=sparse)
|
|
self.dmcrypt.create_volume.assert_called_with(
|
|
self.PATH.rpartition('/')[2],
|
|
self.LV_PATH,
|
|
CONF.ephemeral_storage_encryption.cipher,
|
|
CONF.ephemeral_storage_encryption.key_size,
|
|
self.KEY)
|
|
cmd = ('qemu-img',
|
|
'convert',
|
|
'-O',
|
|
'raw',
|
|
self.TEMPLATE_PATH,
|
|
self.PATH)
|
|
self.utils.execute.assert_called_with(*cmd, run_as_root=True)
|
|
self.disk.resize2fs.assert_called_with(self.PATH, run_as_root=True)
|
|
|
|
def test_create_image(self):
|
|
self._create_image(False)
|
|
|
|
def test_create_image_sparsed(self):
|
|
self.flags(sparse_logical_volumes=True, group='libvirt')
|
|
self._create_image(True)
|
|
|
|
def test_create_image_generated(self):
|
|
self._create_image_generated(False)
|
|
|
|
def test_create_image_generated_sparsed(self):
|
|
self.flags(sparse_logical_volumes=True, group='libvirt')
|
|
self._create_image_generated(True)
|
|
|
|
def test_create_image_resize(self):
|
|
self._create_image_resize(False)
|
|
|
|
def test_create_image_resize_sparsed(self):
|
|
self.flags(sparse_logical_volumes=True, group='libvirt')
|
|
self._create_image_resize(True)
|
|
|
|
def test_create_image_negative(self):
|
|
with test.nested(
|
|
mock.patch.object(self.lvm, 'create_volume', mock.Mock()),
|
|
mock.patch.object(self.lvm, 'remove_volumes', mock.Mock()),
|
|
mock.patch.object(self.disk, 'resize2fs', mock.Mock()),
|
|
mock.patch.object(self.disk, 'get_disk_size',
|
|
mock.Mock(return_value=self.TEMPLATE_SIZE)),
|
|
mock.patch.object(self.dmcrypt, 'create_volume', mock.Mock()),
|
|
mock.patch.object(self.dmcrypt, 'delete_volume', mock.Mock()),
|
|
mock.patch.object(self.dmcrypt, 'list_volumes', mock.Mock()),
|
|
mock.patch.object(self.libvirt_utils, 'create_lvm_image',
|
|
mock.Mock()),
|
|
mock.patch.object(self.libvirt_utils, 'remove_logical_volumes',
|
|
mock.Mock()),
|
|
mock.patch.object(self.utils, 'execute', mock.Mock())):
|
|
fn = mock.Mock()
|
|
self.lvm.create_volume.side_effect = RuntimeError()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
self.assertRaises(
|
|
RuntimeError,
|
|
image.create_image,
|
|
fn,
|
|
self.TEMPLATE_PATH,
|
|
self.SIZE,
|
|
context=self.CONTEXT)
|
|
|
|
fn.assert_called_with(
|
|
context=self.CONTEXT,
|
|
max_size=self.SIZE,
|
|
target=self.TEMPLATE_PATH)
|
|
self.disk.get_disk_size.assert_called_with(
|
|
self.TEMPLATE_PATH)
|
|
self.lvm.create_volume.assert_called_with(
|
|
self.VG,
|
|
self.LV,
|
|
self.SIZE,
|
|
sparse=False)
|
|
self.dmcrypt.delete_volume.assert_called_with(
|
|
self.PATH.rpartition('/')[2])
|
|
self.lvm.remove_volumes.assert_called_with([self.LV_PATH])
|
|
|
|
def test_create_image_encrypt_negative(self):
|
|
with test.nested(
|
|
mock.patch.object(self.lvm, 'create_volume', mock.Mock()),
|
|
mock.patch.object(self.lvm, 'remove_volumes', mock.Mock()),
|
|
mock.patch.object(self.disk, 'resize2fs', mock.Mock()),
|
|
mock.patch.object(self.disk, 'get_disk_size',
|
|
mock.Mock(return_value=self.TEMPLATE_SIZE)),
|
|
mock.patch.object(self.dmcrypt, 'create_volume', mock.Mock()),
|
|
mock.patch.object(self.dmcrypt, 'delete_volume', mock.Mock()),
|
|
mock.patch.object(self.dmcrypt, 'list_volumes', mock.Mock()),
|
|
mock.patch.object(self.libvirt_utils, 'create_lvm_image',
|
|
mock.Mock()),
|
|
mock.patch.object(self.libvirt_utils, 'remove_logical_volumes',
|
|
mock.Mock()),
|
|
mock.patch.object(self.utils, 'execute', mock.Mock())):
|
|
fn = mock.Mock()
|
|
self.dmcrypt.create_volume.side_effect = RuntimeError()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
self.assertRaises(
|
|
RuntimeError,
|
|
image.create_image,
|
|
fn,
|
|
self.TEMPLATE_PATH,
|
|
self.SIZE,
|
|
context=self.CONTEXT)
|
|
|
|
fn.assert_called_with(
|
|
context=self.CONTEXT,
|
|
max_size=self.SIZE,
|
|
target=self.TEMPLATE_PATH)
|
|
self.disk.get_disk_size.assert_called_with(self.TEMPLATE_PATH)
|
|
self.lvm.create_volume.assert_called_with(
|
|
self.VG,
|
|
self.LV,
|
|
self.SIZE,
|
|
sparse=False)
|
|
self.dmcrypt.create_volume.assert_called_with(
|
|
self.dmcrypt.volume_name(self.LV),
|
|
self.LV_PATH,
|
|
CONF.ephemeral_storage_encryption.cipher,
|
|
CONF.ephemeral_storage_encryption.key_size,
|
|
self.KEY)
|
|
self.dmcrypt.delete_volume.assert_called_with(
|
|
self.PATH.rpartition('/')[2])
|
|
self.lvm.remove_volumes.assert_called_with([self.LV_PATH])
|
|
|
|
def test_create_image_generated_negative(self):
|
|
with test.nested(
|
|
mock.patch.object(self.lvm, 'create_volume', mock.Mock()),
|
|
mock.patch.object(self.lvm, 'remove_volumes', mock.Mock()),
|
|
mock.patch.object(self.disk, 'resize2fs', mock.Mock()),
|
|
mock.patch.object(self.disk, 'get_disk_size',
|
|
mock.Mock(return_value=self.TEMPLATE_SIZE)),
|
|
mock.patch.object(self.dmcrypt, 'create_volume', mock.Mock()),
|
|
mock.patch.object(self.dmcrypt, 'delete_volume', mock.Mock()),
|
|
mock.patch.object(self.dmcrypt, 'list_volumes', mock.Mock()),
|
|
mock.patch.object(self.libvirt_utils, 'create_lvm_image',
|
|
mock.Mock()),
|
|
mock.patch.object(self.libvirt_utils, 'remove_logical_volumes',
|
|
mock.Mock()),
|
|
mock.patch.object(self.utils, 'execute', mock.Mock())):
|
|
fn = mock.Mock()
|
|
fn.side_effect = RuntimeError()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
self.assertRaises(RuntimeError,
|
|
image.create_image,
|
|
fn,
|
|
self.TEMPLATE_PATH,
|
|
self.SIZE,
|
|
ephemeral_size=None,
|
|
context=self.CONTEXT)
|
|
|
|
self.lvm.create_volume.assert_called_with(
|
|
self.VG,
|
|
self.LV,
|
|
self.SIZE,
|
|
sparse=False)
|
|
self.dmcrypt.create_volume.assert_called_with(
|
|
self.PATH.rpartition('/')[2],
|
|
self.LV_PATH,
|
|
CONF.ephemeral_storage_encryption.cipher,
|
|
CONF.ephemeral_storage_encryption.key_size,
|
|
self.KEY)
|
|
fn.assert_called_with(
|
|
target=self.PATH,
|
|
ephemeral_size=None,
|
|
context=self.CONTEXT)
|
|
self.dmcrypt.delete_volume.assert_called_with(
|
|
self.PATH.rpartition('/')[2])
|
|
self.lvm.remove_volumes.assert_called_with([self.LV_PATH])
|
|
|
|
def test_create_image_generated_encrypt_negative(self):
|
|
with test.nested(
|
|
mock.patch.object(self.lvm, 'create_volume', mock.Mock()),
|
|
mock.patch.object(self.lvm, 'remove_volumes', mock.Mock()),
|
|
mock.patch.object(self.disk, 'resize2fs', mock.Mock()),
|
|
mock.patch.object(self.disk, 'get_disk_size',
|
|
mock.Mock(return_value=self.TEMPLATE_SIZE)),
|
|
mock.patch.object(self.dmcrypt, 'create_volume', mock.Mock()),
|
|
mock.patch.object(self.dmcrypt, 'delete_volume', mock.Mock()),
|
|
mock.patch.object(self.dmcrypt, 'list_volumes', mock.Mock()),
|
|
mock.patch.object(self.libvirt_utils, 'create_lvm_image',
|
|
mock.Mock()),
|
|
mock.patch.object(self.libvirt_utils, 'remove_logical_volumes',
|
|
mock.Mock()),
|
|
mock.patch.object(self.utils, 'execute', mock.Mock())):
|
|
fn = mock.Mock()
|
|
fn.side_effect = RuntimeError()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
self.assertRaises(
|
|
RuntimeError,
|
|
image.create_image,
|
|
fn,
|
|
self.TEMPLATE_PATH,
|
|
self.SIZE,
|
|
ephemeral_size=None,
|
|
context=self.CONTEXT)
|
|
|
|
self.lvm.create_volume.assert_called_with(
|
|
self.VG,
|
|
self.LV,
|
|
self.SIZE,
|
|
sparse=False)
|
|
self.dmcrypt.create_volume.assert_called_with(
|
|
self.PATH.rpartition('/')[2],
|
|
self.LV_PATH,
|
|
CONF.ephemeral_storage_encryption.cipher,
|
|
CONF.ephemeral_storage_encryption.key_size,
|
|
self.KEY)
|
|
self.dmcrypt.delete_volume.assert_called_with(
|
|
self.PATH.rpartition('/')[2])
|
|
self.lvm.remove_volumes.assert_called_with([self.LV_PATH])
|
|
|
|
def test_prealloc_image(self):
|
|
self.flags(preallocate_images='space')
|
|
fake_processutils.fake_execute_clear_log()
|
|
fake_processutils.stub_out_processutils_execute(self.stubs)
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
|
|
def fake_fetch(target, *args, **kwargs):
|
|
return
|
|
|
|
self.stubs.Set(os.path, 'exists', lambda _: True)
|
|
self.stubs.Set(image, 'check_image_exists', lambda: True)
|
|
self.stubs.Set(image, 'get_disk_size', lambda _: self.SIZE)
|
|
|
|
image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE)
|
|
|
|
self.assertEqual(fake_processutils.fake_execute_get_log(), [])
|
|
|
|
def test_get_model(self):
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
model = image.get_model(FakeConn())
|
|
self.assertEqual(imgmodel.LocalBlockImage(self.PATH),
|
|
model)
|
|
|
|
|
|
class RbdTestCase(_ImageTestCase, test.NoDBTestCase):
|
|
POOL = "FakePool"
|
|
USER = "FakeUser"
|
|
CONF = "FakeConf"
|
|
SIZE = 1024
|
|
|
|
def setUp(self):
|
|
self.image_class = imagebackend.Rbd
|
|
super(RbdTestCase, self).setUp()
|
|
self.flags(images_rbd_pool=self.POOL,
|
|
rbd_user=self.USER,
|
|
images_rbd_ceph_conf=self.CONF,
|
|
group='libvirt')
|
|
self.libvirt_utils = imagebackend.libvirt_utils
|
|
self.utils = imagebackend.utils
|
|
self.mox.StubOutWithMock(rbd_utils, 'rbd')
|
|
self.mox.StubOutWithMock(rbd_utils, 'rados')
|
|
|
|
def test_cache(self):
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
|
|
self.mox.StubOutWithMock(os.path, 'exists')
|
|
self.mox.StubOutWithMock(image, 'check_image_exists')
|
|
os.path.exists(self.TEMPLATE_DIR).AndReturn(False)
|
|
image.check_image_exists().AndReturn(False)
|
|
os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
|
|
fn = self.mox.CreateMockAnything()
|
|
fn(target=self.TEMPLATE_PATH)
|
|
self.mox.StubOutWithMock(imagebackend.fileutils, 'ensure_tree')
|
|
imagebackend.fileutils.ensure_tree(self.TEMPLATE_DIR)
|
|
self.mox.ReplayAll()
|
|
|
|
self.mock_create_image(image)
|
|
image.cache(fn, self.TEMPLATE)
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
def test_cache_base_dir_exists(self):
|
|
fn = self.mox.CreateMockAnything()
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
|
|
self.mox.StubOutWithMock(os.path, 'exists')
|
|
self.mox.StubOutWithMock(image, 'check_image_exists')
|
|
os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
|
|
image.check_image_exists().AndReturn(False)
|
|
os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
|
|
fn = self.mox.CreateMockAnything()
|
|
fn(target=self.TEMPLATE_PATH)
|
|
self.mox.StubOutWithMock(imagebackend.fileutils, 'ensure_tree')
|
|
self.mox.ReplayAll()
|
|
|
|
self.mock_create_image(image)
|
|
image.cache(fn, self.TEMPLATE)
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
def test_cache_image_exists(self):
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
|
|
self.mox.StubOutWithMock(os.path, 'exists')
|
|
self.mox.StubOutWithMock(image, 'check_image_exists')
|
|
os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
|
|
image.check_image_exists().AndReturn(True)
|
|
os.path.exists(self.TEMPLATE_PATH).AndReturn(True)
|
|
self.mox.ReplayAll()
|
|
|
|
image.cache(None, self.TEMPLATE)
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
def test_cache_template_exists(self):
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
|
|
self.mox.StubOutWithMock(os.path, 'exists')
|
|
self.mox.StubOutWithMock(image, 'check_image_exists')
|
|
os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
|
|
image.check_image_exists().AndReturn(False)
|
|
os.path.exists(self.TEMPLATE_PATH).AndReturn(True)
|
|
self.mox.ReplayAll()
|
|
|
|
self.mock_create_image(image)
|
|
image.cache(None, self.TEMPLATE)
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
def test_create_image(self):
|
|
fn = self.mox.CreateMockAnything()
|
|
fn(max_size=None, target=self.TEMPLATE_PATH)
|
|
|
|
rbd_utils.rbd.RBD_FEATURE_LAYERING = 1
|
|
|
|
fake_processutils.fake_execute_clear_log()
|
|
fake_processutils.stub_out_processutils_execute(self.stubs)
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
self.mox.StubOutWithMock(image, 'check_image_exists')
|
|
image.check_image_exists().AndReturn(False)
|
|
image.check_image_exists().AndReturn(False)
|
|
self.mox.ReplayAll()
|
|
|
|
image.create_image(fn, self.TEMPLATE_PATH, None)
|
|
|
|
rbd_name = "%s_%s" % (self.INSTANCE['uuid'], self.NAME)
|
|
cmd = ('rbd', 'import', '--pool', self.POOL, self.TEMPLATE_PATH,
|
|
rbd_name, '--image-format=2', '--id', self.USER,
|
|
'--conf', self.CONF)
|
|
self.assertEqual(fake_processutils.fake_execute_get_log(),
|
|
[' '.join(cmd)])
|
|
self.mox.VerifyAll()
|
|
|
|
def test_create_image_resize(self):
|
|
fn = self.mox.CreateMockAnything()
|
|
full_size = self.SIZE * 2
|
|
fn(max_size=full_size, target=self.TEMPLATE_PATH)
|
|
|
|
rbd_utils.rbd.RBD_FEATURE_LAYERING = 1
|
|
|
|
fake_processutils.fake_execute_clear_log()
|
|
fake_processutils.stub_out_processutils_execute(self.stubs)
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
self.mox.StubOutWithMock(image, 'check_image_exists')
|
|
image.check_image_exists().AndReturn(False)
|
|
image.check_image_exists().AndReturn(False)
|
|
rbd_name = "%s_%s" % (self.INSTANCE['uuid'], self.NAME)
|
|
cmd = ('rbd', 'import', '--pool', self.POOL, self.TEMPLATE_PATH,
|
|
rbd_name, '--image-format=2', '--id', self.USER,
|
|
'--conf', self.CONF)
|
|
self.mox.StubOutWithMock(image, 'get_disk_size')
|
|
image.get_disk_size(rbd_name).AndReturn(self.SIZE)
|
|
self.mox.StubOutWithMock(image.driver, 'resize')
|
|
image.driver.resize(rbd_name, full_size)
|
|
self.mox.StubOutWithMock(image, 'verify_base_size')
|
|
image.verify_base_size(self.TEMPLATE_PATH, full_size)
|
|
|
|
self.mox.ReplayAll()
|
|
|
|
image.create_image(fn, self.TEMPLATE_PATH, full_size)
|
|
|
|
self.assertEqual(fake_processutils.fake_execute_get_log(),
|
|
[' '.join(cmd)])
|
|
self.mox.VerifyAll()
|
|
|
|
def test_create_image_already_exists(self):
|
|
rbd_utils.rbd.RBD_FEATURE_LAYERING = 1
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
self.mox.StubOutWithMock(image, 'check_image_exists')
|
|
image.check_image_exists().AndReturn(True)
|
|
self.mox.StubOutWithMock(image, 'get_disk_size')
|
|
image.get_disk_size(self.TEMPLATE_PATH).AndReturn(self.SIZE)
|
|
image.check_image_exists().AndReturn(True)
|
|
rbd_name = "%s_%s" % (self.INSTANCE['uuid'], self.NAME)
|
|
image.get_disk_size(rbd_name).AndReturn(self.SIZE)
|
|
|
|
self.mox.ReplayAll()
|
|
|
|
fn = self.mox.CreateMockAnything()
|
|
image.create_image(fn, self.TEMPLATE_PATH, self.SIZE)
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
def test_prealloc_image(self):
|
|
CONF.set_override('preallocate_images', 'space')
|
|
|
|
fake_processutils.fake_execute_clear_log()
|
|
fake_processutils.stub_out_processutils_execute(self.stubs)
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
|
|
def fake_fetch(target, *args, **kwargs):
|
|
return
|
|
|
|
self.stubs.Set(os.path, 'exists', lambda _: True)
|
|
self.stubs.Set(image, 'check_image_exists', lambda: True)
|
|
self.stubs.Set(image, 'get_disk_size', lambda _: self.SIZE)
|
|
|
|
image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE)
|
|
|
|
self.assertEqual(fake_processutils.fake_execute_get_log(), [])
|
|
|
|
def test_parent_compatible(self):
|
|
self.assertEqual(inspect.getargspec(imagebackend.Image.libvirt_info),
|
|
inspect.getargspec(self.image_class.libvirt_info))
|
|
|
|
def test_image_path(self):
|
|
|
|
conf = "FakeConf"
|
|
pool = "FakePool"
|
|
user = "FakeUser"
|
|
|
|
self.flags(images_rbd_pool=pool, group='libvirt')
|
|
self.flags(images_rbd_ceph_conf=conf, group='libvirt')
|
|
self.flags(rbd_user=user, group='libvirt')
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
rbd_path = "rbd:%s/%s:id=%s:conf=%s" % (pool, image.rbd_name,
|
|
user, conf)
|
|
|
|
self.assertEqual(image.path, rbd_path)
|
|
|
|
def test_get_disk_size(self):
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
with mock.patch.object(image.driver, 'size') as size_mock:
|
|
size_mock.return_value = 2361393152
|
|
|
|
self.assertEqual(2361393152, image.get_disk_size(image.path))
|
|
size_mock.assert_called_once_with(image.rbd_name)
|
|
|
|
def test_create_image_too_small(self):
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
with mock.patch.object(image, 'driver') as driver_mock:
|
|
driver_mock.exists.return_value = True
|
|
driver_mock.size.return_value = 2
|
|
|
|
self.assertRaises(exception.FlavorDiskSmallerThanImage,
|
|
image.create_image, mock.MagicMock(),
|
|
self.TEMPLATE_PATH, 1)
|
|
driver_mock.size.assert_called_once_with(image.rbd_name)
|
|
|
|
@mock.patch.object(rbd_utils.RBDDriver, "get_mon_addrs")
|
|
def test_libvirt_info(self, mock_mon_addrs):
|
|
def get_mon_addrs():
|
|
hosts = ["server1", "server2"]
|
|
ports = ["1899", "1920"]
|
|
return hosts, ports
|
|
mock_mon_addrs.side_effect = get_mon_addrs
|
|
|
|
super(RbdTestCase, self).test_libvirt_info()
|
|
|
|
@mock.patch.object(rbd_utils.RBDDriver, "get_mon_addrs")
|
|
def test_get_model(self, mock_mon_addrs):
|
|
pool = "FakePool"
|
|
user = "FakeUser"
|
|
|
|
self.flags(images_rbd_pool=pool, group='libvirt')
|
|
self.flags(rbd_user=user, group='libvirt')
|
|
self.flags(rbd_secret_uuid="3306a5c4-8378-4b3c-aa1f-7b48d3a26172",
|
|
group='libvirt')
|
|
|
|
def get_mon_addrs():
|
|
hosts = ["server1", "server2"]
|
|
ports = ["1899", "1920"]
|
|
return hosts, ports
|
|
mock_mon_addrs.side_effect = get_mon_addrs
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
model = image.get_model(FakeConn())
|
|
self.assertEqual(imgmodel.RBDImage(
|
|
self.INSTANCE["uuid"] + "_fake.vm",
|
|
"FakePool",
|
|
"FakeUser",
|
|
"MTIzNDU2Cg==",
|
|
["server1:1899", "server2:1920"]),
|
|
model)
|
|
|
|
def test_import_file(self):
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
|
|
@mock.patch.object(image, 'check_image_exists')
|
|
@mock.patch.object(image.driver, 'remove_image')
|
|
@mock.patch.object(image.driver, 'import_image')
|
|
def _test(mock_import, mock_remove, mock_exists):
|
|
mock_exists.return_value = True
|
|
image.import_file(self.INSTANCE, mock.sentinel.file,
|
|
mock.sentinel.remote_name)
|
|
name = '%s_%s' % (self.INSTANCE.uuid,
|
|
mock.sentinel.remote_name)
|
|
mock_exists.assert_called_once_with()
|
|
mock_remove.assert_called_once_with(name)
|
|
mock_import.assert_called_once_with(mock.sentinel.file, name)
|
|
_test()
|
|
|
|
def test_import_file_not_found(self):
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
|
|
@mock.patch.object(image, 'check_image_exists')
|
|
@mock.patch.object(image.driver, 'remove_image')
|
|
@mock.patch.object(image.driver, 'import_image')
|
|
def _test(mock_import, mock_remove, mock_exists):
|
|
mock_exists.return_value = False
|
|
image.import_file(self.INSTANCE, mock.sentinel.file,
|
|
mock.sentinel.remote_name)
|
|
name = '%s_%s' % (self.INSTANCE.uuid,
|
|
mock.sentinel.remote_name)
|
|
mock_exists.assert_called_once_with()
|
|
self.assertFalse(mock_remove.called)
|
|
mock_import.assert_called_once_with(mock.sentinel.file, name)
|
|
_test()
|
|
|
|
|
|
class PloopTestCase(_ImageTestCase, test.NoDBTestCase):
|
|
SIZE = 1024
|
|
|
|
def setUp(self):
|
|
self.image_class = imagebackend.Ploop
|
|
super(PloopTestCase, self).setUp()
|
|
self.utils = imagebackend.utils
|
|
|
|
def prepare_mocks(self):
|
|
fn = self.mox.CreateMockAnything()
|
|
self.mox.StubOutWithMock(imagebackend.utils.synchronized,
|
|
'__call__')
|
|
self.mox.StubOutWithMock(imagebackend.libvirt_utils, 'copy_image')
|
|
self.mox.StubOutWithMock(self.utils, 'execute')
|
|
return fn
|
|
|
|
def test_cache(self):
|
|
self.mox.StubOutWithMock(os.path, 'exists')
|
|
if self.OLD_STYLE_INSTANCE_PATH:
|
|
os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
|
|
os.path.exists(self.TEMPLATE_DIR).AndReturn(False)
|
|
os.path.exists(self.PATH).AndReturn(False)
|
|
os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
|
|
fn = self.mox.CreateMockAnything()
|
|
fn(target=self.TEMPLATE_PATH)
|
|
self.mox.StubOutWithMock(imagebackend.fileutils, 'ensure_tree')
|
|
imagebackend.fileutils.ensure_tree(self.TEMPLATE_DIR)
|
|
self.mox.ReplayAll()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
self.mock_create_image(image)
|
|
image.cache(fn, self.TEMPLATE)
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
def test_create_image(self):
|
|
self.stubs.Set(imagebackend.Ploop, 'get_disk_size', lambda a, b: 2048)
|
|
fn = self.prepare_mocks()
|
|
fn(target=self.TEMPLATE_PATH, max_size=2048, image_id=None)
|
|
img_path = os.path.join(self.PATH, "root.hds")
|
|
imagebackend.libvirt_utils.copy_image(self.TEMPLATE_PATH, img_path)
|
|
self.utils.execute("ploop", "restore-descriptor", "-f", "raw",
|
|
self.PATH, img_path)
|
|
self.utils.execute("ploop", "grow", '-s', "2K",
|
|
os.path.join(self.PATH, "DiskDescriptor.xml"),
|
|
run_as_root=True)
|
|
self.mox.ReplayAll()
|
|
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
image.create_image(fn, self.TEMPLATE_PATH, 2048, image_id=None)
|
|
|
|
self.mox.VerifyAll()
|
|
|
|
def test_prealloc_image(self):
|
|
self.flags(preallocate_images='space')
|
|
fake_processutils.fake_execute_clear_log()
|
|
fake_processutils.stub_out_processutils_execute(self.stubs)
|
|
image = self.image_class(self.INSTANCE, self.NAME)
|
|
|
|
def fake_fetch(target, *args, **kwargs):
|
|
return
|
|
|
|
self.stubs.Set(os.path, 'exists', lambda _: True)
|
|
self.stubs.Set(image, 'check_image_exists', lambda: True)
|
|
self.stubs.Set(image, 'get_disk_size', lambda _: self.SIZE)
|
|
|
|
image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE)
|
|
|
|
|
|
class BackendTestCase(test.NoDBTestCase):
|
|
INSTANCE = objects.Instance(id=1, uuid=uuidutils.generate_uuid())
|
|
NAME = 'fake-name.suffix'
|
|
|
|
def setUp(self):
|
|
super(BackendTestCase, self).setUp()
|
|
self.flags(enabled=False, group='ephemeral_storage_encryption')
|
|
self.INSTANCE['ephemeral_key_uuid'] = None
|
|
|
|
def get_image(self, use_cow, image_type):
|
|
return imagebackend.Backend(use_cow).image(self.INSTANCE,
|
|
self.NAME,
|
|
image_type)
|
|
|
|
def _test_image(self, image_type, image_not_cow, image_cow):
|
|
image1 = self.get_image(False, image_type)
|
|
image2 = self.get_image(True, image_type)
|
|
|
|
def assertIsInstance(instance, class_object):
|
|
failure = ('Expected %s,' +
|
|
' but got %s.') % (class_object.__name__,
|
|
instance.__class__.__name__)
|
|
self.assertIsInstance(instance, class_object, msg=failure)
|
|
|
|
assertIsInstance(image1, image_not_cow)
|
|
assertIsInstance(image2, image_cow)
|
|
|
|
def test_image_raw(self):
|
|
self._test_image('raw', imagebackend.Raw, imagebackend.Raw)
|
|
|
|
def test_image_raw_preallocate_images(self):
|
|
flags = ('space', 'Space', 'SPACE')
|
|
for f in flags:
|
|
self.flags(preallocate_images=f)
|
|
raw = imagebackend.Raw(self.INSTANCE, 'fake_disk', '/tmp/xyz')
|
|
self.assertTrue(raw.preallocate)
|
|
|
|
def test_image_raw_preallocate_images_bad_conf(self):
|
|
self.flags(preallocate_images='space1')
|
|
raw = imagebackend.Raw(self.INSTANCE, 'fake_disk', '/tmp/xyz')
|
|
self.assertFalse(raw.preallocate)
|
|
|
|
def test_image_qcow2(self):
|
|
self._test_image('qcow2', imagebackend.Qcow2, imagebackend.Qcow2)
|
|
|
|
def test_image_qcow2_preallocate_images(self):
|
|
flags = ('space', 'Space', 'SPACE')
|
|
for f in flags:
|
|
self.flags(preallocate_images=f)
|
|
qcow = imagebackend.Qcow2(self.INSTANCE, 'fake_disk', '/tmp/xyz')
|
|
self.assertTrue(qcow.preallocate)
|
|
|
|
def test_image_qcow2_preallocate_images_bad_conf(self):
|
|
self.flags(preallocate_images='space1')
|
|
qcow = imagebackend.Qcow2(self.INSTANCE, 'fake_disk', '/tmp/xyz')
|
|
self.assertFalse(qcow.preallocate)
|
|
|
|
def test_image_lvm(self):
|
|
self.flags(images_volume_group='FakeVG', group='libvirt')
|
|
self._test_image('lvm', imagebackend.Lvm, imagebackend.Lvm)
|
|
|
|
def test_image_rbd(self):
|
|
conf = "FakeConf"
|
|
pool = "FakePool"
|
|
self.flags(images_rbd_pool=pool, group='libvirt')
|
|
self.flags(images_rbd_ceph_conf=conf, group='libvirt')
|
|
self.mox.StubOutWithMock(rbd_utils, 'rbd')
|
|
self.mox.StubOutWithMock(rbd_utils, 'rados')
|
|
self._test_image('rbd', imagebackend.Rbd, imagebackend.Rbd)
|
|
|
|
def test_image_default(self):
|
|
self._test_image('default', imagebackend.Raw, imagebackend.Qcow2)
|