744 lines
31 KiB
Python
744 lines
31 KiB
Python
# Copyright 2012 NTT Data. All Rights Reserved.
|
|
# Copyright 2012 Yahoo! Inc. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import functools
|
|
import os
|
|
import tempfile
|
|
|
|
import ddt
|
|
import mock
|
|
import os_traits
|
|
from oslo_concurrency import processutils
|
|
from oslo_config import cfg
|
|
from oslo_utils import fileutils
|
|
from oslo_utils.fixture import uuidsentinel as uuids
|
|
import six
|
|
|
|
from nova.compute import utils as compute_utils
|
|
from nova import context
|
|
from nova import exception
|
|
from nova import objects
|
|
from nova.objects import fields as obj_fields
|
|
import nova.privsep.fs
|
|
import nova.privsep.qemu
|
|
from nova import test
|
|
from nova.tests import fixtures as nova_fixtures
|
|
from nova.tests.unit import fake_instance
|
|
from nova.tests.unit.virt.libvirt import fakelibvirt
|
|
from nova.virt import images
|
|
from nova.virt.libvirt import guest as libvirt_guest
|
|
from nova.virt.libvirt import utils as libvirt_utils
|
|
|
|
CONF = cfg.CONF
|
|
|
|
|
|
@ddt.ddt
|
|
class LibvirtUtilsTestCase(test.NoDBTestCase):
|
|
|
|
@mock.patch('oslo_concurrency.processutils.execute')
|
|
def test_copy_image_local(self, mock_execute):
|
|
libvirt_utils.copy_image('src', 'dest')
|
|
mock_execute.assert_called_once_with('cp', '-r', 'src', 'dest')
|
|
|
|
@mock.patch('nova.virt.libvirt.volume.remotefs.SshDriver.copy_file')
|
|
def test_copy_image_remote_ssh(self, mock_rem_fs_remove):
|
|
self.flags(remote_filesystem_transport='ssh', group='libvirt')
|
|
libvirt_utils.copy_image('src', 'dest', host='host')
|
|
mock_rem_fs_remove.assert_called_once_with('src', 'host:dest',
|
|
on_completion=None, on_execute=None, compression=True)
|
|
|
|
@mock.patch('nova.virt.libvirt.volume.remotefs.RsyncDriver.copy_file')
|
|
def test_copy_image_remote_rsync(self, mock_rem_fs_remove):
|
|
self.flags(remote_filesystem_transport='rsync', group='libvirt')
|
|
libvirt_utils.copy_image('src', 'dest', host='host')
|
|
mock_rem_fs_remove.assert_called_once_with('src', 'host:dest',
|
|
on_completion=None, on_execute=None, compression=True)
|
|
|
|
@mock.patch('os.path.exists', return_value=True)
|
|
def test_disk_type_from_path(self, mock_exists):
|
|
# Seems like lvm detection
|
|
# if its in /dev ??
|
|
for p in ['/dev/b', '/dev/blah/blah']:
|
|
d_type = libvirt_utils.get_disk_type_from_path(p)
|
|
self.assertEqual('lvm', d_type)
|
|
|
|
# Try rbd detection
|
|
d_type = libvirt_utils.get_disk_type_from_path('rbd:pool/instance')
|
|
self.assertEqual('rbd', d_type)
|
|
|
|
# Try the other types
|
|
path = '/myhome/disk.config'
|
|
d_type = libvirt_utils.get_disk_type_from_path(path)
|
|
self.assertIsNone(d_type)
|
|
|
|
@mock.patch('os.path.exists', return_value=True)
|
|
@mock.patch('os.path.isdir', return_value=True)
|
|
def test_disk_type_ploop(self, mock_isdir, mock_exists):
|
|
path = '/some/path'
|
|
d_type = libvirt_utils.get_disk_type_from_path(path)
|
|
mock_isdir.assert_called_once_with(path)
|
|
mock_exists.assert_called_once_with("%s/DiskDescriptor.xml" % path)
|
|
self.assertEqual('ploop', d_type)
|
|
|
|
def test_valid_hostname_normal(self):
|
|
self.assertTrue(libvirt_utils.is_valid_hostname("hello.world.com"))
|
|
|
|
def test_valid_hostname_ipv4addr(self):
|
|
self.assertTrue(libvirt_utils.is_valid_hostname("10.0.2.1"))
|
|
|
|
def test_valid_hostname_ipv6addr(self):
|
|
self.assertTrue(libvirt_utils.is_valid_hostname("240:2ac3::2"))
|
|
|
|
def test_valid_hostname_bad(self):
|
|
self.assertFalse(libvirt_utils.is_valid_hostname("foo/?com=/bin/sh"))
|
|
|
|
@mock.patch('oslo_concurrency.processutils.execute')
|
|
def test_create_image(self, mock_execute):
|
|
libvirt_utils.create_image('raw', '/some/path', '10G')
|
|
libvirt_utils.create_image('qcow2', '/some/stuff', '1234567891234')
|
|
expected_args = [(('qemu-img', 'create', '-f', 'raw',
|
|
'/some/path', '10G'),),
|
|
(('qemu-img', 'create', '-f', 'qcow2',
|
|
'/some/stuff', '1234567891234'),)]
|
|
self.assertEqual(expected_args, mock_execute.call_args_list)
|
|
|
|
@mock.patch('os.path.exists', return_value=True)
|
|
@mock.patch('oslo_concurrency.processutils.execute')
|
|
@mock.patch('nova.virt.images.qemu_img_info')
|
|
def test_create_cow_image(self, mock_info, mock_execute, mock_exists):
|
|
mock_execute.return_value = ('stdout', None)
|
|
mock_info.return_value = mock.Mock(
|
|
file_format=mock.sentinel.backing_fmt,
|
|
cluster_size=mock.sentinel.cluster_size)
|
|
libvirt_utils.create_cow_image(mock.sentinel.backing_path,
|
|
mock.sentinel.new_path)
|
|
mock_info.assert_called_once_with(mock.sentinel.backing_path)
|
|
mock_execute.assert_has_calls([mock.call(
|
|
'qemu-img', 'create', '-f', 'qcow2', '-o',
|
|
'backing_file=%s,backing_fmt=%s,cluster_size=%s' % (
|
|
mock.sentinel.backing_path, mock.sentinel.backing_fmt,
|
|
mock.sentinel.cluster_size),
|
|
mock.sentinel.new_path)])
|
|
|
|
@ddt.unpack
|
|
@ddt.data({'fs_type': 'some_fs_type',
|
|
'default_eph_format': None,
|
|
'expected_fs_type': 'some_fs_type'},
|
|
{'fs_type': None,
|
|
'default_eph_format': None,
|
|
'expected_fs_type': nova.privsep.fs.FS_FORMAT_EXT4},
|
|
{'fs_type': None,
|
|
'default_eph_format': 'eph_format',
|
|
'expected_fs_type': 'eph_format'})
|
|
def test_create_ploop_image(self, fs_type,
|
|
default_eph_format,
|
|
expected_fs_type):
|
|
with test.nested(mock.patch('oslo_utils.fileutils.ensure_tree'),
|
|
mock.patch('nova.privsep.libvirt.ploop_init')
|
|
) as (mock_ensure_tree, mock_ploop_init):
|
|
self.flags(default_ephemeral_format=default_eph_format)
|
|
libvirt_utils.create_ploop_image('expanded', '/some/path',
|
|
'5G', fs_type)
|
|
mock_ensure_tree.assert_has_calls([
|
|
mock.call('/some/path')])
|
|
mock_ploop_init.assert_has_calls([
|
|
mock.call('5G', 'expanded', expected_fs_type,
|
|
'/some/path/root.hds')])
|
|
|
|
def test_pick_disk_driver_name(self):
|
|
type_map = {'kvm': ([True, 'qemu'], [False, 'qemu'], [None, 'qemu']),
|
|
'qemu': ([True, 'qemu'], [False, 'qemu'], [None, 'qemu']),
|
|
'uml': ([True, None], [False, None], [None, None]),
|
|
'lxc': ([True, None], [False, None], [None, None])}
|
|
# NOTE(aloga): Xen is tested in test_pick_disk_driver_name_xen
|
|
|
|
version = 1005001
|
|
for (virt_type, checks) in type_map.items():
|
|
self.flags(virt_type=virt_type, group='libvirt')
|
|
for (is_block_dev, expected_result) in checks:
|
|
result = libvirt_utils.pick_disk_driver_name(version,
|
|
is_block_dev)
|
|
self.assertEqual(result, expected_result)
|
|
|
|
@mock.patch('nova.privsep.libvirt.xend_probe')
|
|
@mock.patch('oslo_concurrency.processutils.execute')
|
|
def test_pick_disk_driver_name_xen(self, mock_execute, mock_xend_probe):
|
|
|
|
def execute_side_effect(*args, **kwargs):
|
|
if args == ('tap-ctl', 'check'):
|
|
if mock_execute.blktap is True:
|
|
return ('ok\n', '')
|
|
elif mock_execute.blktap is False:
|
|
return ('some error\n', '')
|
|
else:
|
|
raise OSError(2, "No such file or directory")
|
|
raise Exception('Unexpected call')
|
|
mock_execute.side_effect = execute_side_effect
|
|
|
|
def xend_probe_side_effect():
|
|
if mock_execute.xend is True:
|
|
return ('', '')
|
|
elif mock_execute.xend is False:
|
|
raise processutils.ProcessExecutionError("error")
|
|
else:
|
|
raise OSError(2, "No such file or directory")
|
|
mock_xend_probe.side_effect = xend_probe_side_effect
|
|
|
|
self.flags(virt_type="xen", group='libvirt')
|
|
versions = [4000000, 4001000, 4002000, 4003000, 4005000]
|
|
for version in versions:
|
|
# block dev
|
|
result = libvirt_utils.pick_disk_driver_name(version, True)
|
|
self.assertEqual(result, "phy")
|
|
self.assertFalse(mock_execute.called)
|
|
mock_execute.reset_mock()
|
|
# file dev
|
|
for blktap in True, False, None:
|
|
mock_execute.blktap = blktap
|
|
for xend in True, False, None:
|
|
mock_execute.xend = xend
|
|
result = libvirt_utils.pick_disk_driver_name(version,
|
|
False)
|
|
# qemu backend supported only by libxl which is
|
|
# production since xen 4.2. libvirt use libxl if
|
|
# xend service not started.
|
|
if version >= 4002000 and xend is not True:
|
|
self.assertEqual(result, 'qemu')
|
|
elif blktap:
|
|
if version == 4000000:
|
|
self.assertEqual(result, 'tap')
|
|
else:
|
|
self.assertEqual(result, 'tap2')
|
|
else:
|
|
self.assertEqual(result, 'file')
|
|
# default is_block_dev False
|
|
self.assertEqual(result,
|
|
libvirt_utils.pick_disk_driver_name(version))
|
|
mock_execute.reset_mock()
|
|
|
|
def test_copy_image(self):
|
|
dst_fd, dst_path = tempfile.mkstemp()
|
|
try:
|
|
os.close(dst_fd)
|
|
|
|
src_fd, src_path = tempfile.mkstemp()
|
|
try:
|
|
with os.fdopen(src_fd, 'w') as fp:
|
|
fp.write('canary')
|
|
|
|
libvirt_utils.copy_image(src_path, dst_path)
|
|
with open(dst_path, 'r') as fp:
|
|
self.assertEqual(fp.read(), 'canary')
|
|
finally:
|
|
os.unlink(src_path)
|
|
finally:
|
|
os.unlink(dst_path)
|
|
|
|
def test_write_to_file(self):
|
|
dst_fd, dst_path = tempfile.mkstemp()
|
|
try:
|
|
os.close(dst_fd)
|
|
|
|
libvirt_utils.write_to_file(dst_path, 'hello')
|
|
with open(dst_path, 'r') as fp:
|
|
self.assertEqual(fp.read(), 'hello')
|
|
finally:
|
|
os.unlink(dst_path)
|
|
|
|
@mock.patch.object(compute_utils, 'disk_ops_semaphore')
|
|
@mock.patch('nova.privsep.utils.supports_direct_io', return_value=False)
|
|
@mock.patch('oslo_concurrency.processutils.execute')
|
|
def test_extract_snapshot_no_directio(self, mock_execute,
|
|
mock_direct_io,
|
|
mock_disk_op_sema):
|
|
# Test a single variant with no support for direct IO.
|
|
# This could be removed if we add unit tests for convert_image().
|
|
src_format = 'qcow2'
|
|
dest_format = 'raw'
|
|
out_format = 'raw'
|
|
|
|
libvirt_utils.extract_snapshot('/path/to/disk/image', src_format,
|
|
'/extracted/snap', dest_format)
|
|
qemu_img_cmd = ('qemu-img', 'convert', '-t', 'writeback',
|
|
'-O', out_format, '-f', src_format, )
|
|
if CONF.libvirt.snapshot_compression and dest_format == "qcow2":
|
|
qemu_img_cmd += ('-c',)
|
|
qemu_img_cmd += ('/path/to/disk/image', '/extracted/snap')
|
|
mock_disk_op_sema.__enter__.assert_called_once()
|
|
mock_direct_io.assert_called_once_with(CONF.instances_path)
|
|
mock_execute.assert_called_once_with(*qemu_img_cmd)
|
|
|
|
@mock.patch.object(compute_utils, 'disk_ops_semaphore')
|
|
@mock.patch('nova.privsep.utils.supports_direct_io', return_value=True)
|
|
def _do_test_extract_snapshot(self, mock_execute, mock_direct_io,
|
|
mock_disk_op_sema,
|
|
src_format='qcow2',
|
|
dest_format='raw', out_format='raw'):
|
|
libvirt_utils.extract_snapshot('/path/to/disk/image', src_format,
|
|
'/extracted/snap', dest_format)
|
|
qemu_img_cmd = ('qemu-img', 'convert', '-t', 'none',
|
|
'-O', out_format, '-f', src_format, )
|
|
if CONF.libvirt.snapshot_compression and dest_format == "qcow2":
|
|
qemu_img_cmd += ('-c',)
|
|
qemu_img_cmd += ('/path/to/disk/image', '/extracted/snap')
|
|
mock_disk_op_sema.__enter__.assert_called_once()
|
|
mock_direct_io.assert_called_once_with(CONF.instances_path)
|
|
mock_execute.assert_called_once_with(*qemu_img_cmd)
|
|
|
|
@mock.patch('oslo_concurrency.processutils.execute')
|
|
def test_extract_snapshot_raw(self, mock_execute):
|
|
self._do_test_extract_snapshot(mock_execute)
|
|
|
|
@mock.patch('oslo_concurrency.processutils.execute')
|
|
def test_extract_snapshot_iso(self, mock_execute):
|
|
self._do_test_extract_snapshot(mock_execute, dest_format='iso')
|
|
|
|
@mock.patch('oslo_concurrency.processutils.execute')
|
|
def test_extract_snapshot_qcow2(self, mock_execute):
|
|
self._do_test_extract_snapshot(mock_execute,
|
|
dest_format='qcow2', out_format='qcow2')
|
|
|
|
@mock.patch('oslo_concurrency.processutils.execute')
|
|
def test_extract_snapshot_qcow2_and_compression(self, mock_execute):
|
|
self.flags(snapshot_compression=True, group='libvirt')
|
|
self._do_test_extract_snapshot(mock_execute,
|
|
dest_format='qcow2', out_format='qcow2')
|
|
|
|
@mock.patch('oslo_concurrency.processutils.execute')
|
|
def test_extract_snapshot_parallels(self, mock_execute):
|
|
self._do_test_extract_snapshot(mock_execute,
|
|
src_format='raw',
|
|
dest_format='ploop',
|
|
out_format='parallels')
|
|
|
|
def test_load_file(self):
|
|
dst_fd, dst_path = tempfile.mkstemp()
|
|
try:
|
|
os.close(dst_fd)
|
|
|
|
# We have a test for write_to_file. If that is sound, this suffices
|
|
libvirt_utils.write_to_file(dst_path, 'hello')
|
|
self.assertEqual(libvirt_utils.load_file(dst_path), 'hello')
|
|
finally:
|
|
os.unlink(dst_path)
|
|
|
|
def test_file_open(self):
|
|
dst_fd, dst_path = tempfile.mkstemp()
|
|
try:
|
|
os.close(dst_fd)
|
|
|
|
# We have a test for write_to_file. If that is sound, this suffices
|
|
libvirt_utils.write_to_file(dst_path, 'hello')
|
|
with libvirt_utils.file_open(dst_path, 'r') as fp:
|
|
self.assertEqual(fp.read(), 'hello')
|
|
finally:
|
|
os.unlink(dst_path)
|
|
|
|
def test_get_fs_info(self):
|
|
|
|
class FakeStatResult(object):
|
|
|
|
def __init__(self):
|
|
self.f_bsize = 4096
|
|
self.f_frsize = 4096
|
|
self.f_blocks = 2000
|
|
self.f_bfree = 1000
|
|
self.f_bavail = 900
|
|
self.f_files = 2000
|
|
self.f_ffree = 1000
|
|
self.f_favail = 900
|
|
self.f_flag = 4096
|
|
self.f_namemax = 255
|
|
|
|
self.path = None
|
|
|
|
def fake_statvfs(path):
|
|
self.path = path
|
|
return FakeStatResult()
|
|
|
|
self.stub_out('os.statvfs', fake_statvfs)
|
|
|
|
fs_info = libvirt_utils.get_fs_info('/some/file/path')
|
|
self.assertEqual('/some/file/path', self.path)
|
|
self.assertEqual(8192000, fs_info['total'])
|
|
self.assertEqual(3686400, fs_info['free'])
|
|
self.assertEqual(4096000, fs_info['used'])
|
|
|
|
@mock.patch('nova.virt.images.fetch_to_raw')
|
|
def test_fetch_image(self, mock_images):
|
|
context = 'opaque context'
|
|
target = '/tmp/targetfile'
|
|
image_id = '4'
|
|
trusted_certs = objects.TrustedCerts(
|
|
ids=['0b5d2c72-12cc-4ba6-a8d7-3ff5cc1d8cb8',
|
|
'674736e3-f25c-405c-8362-bbf991e0ce0a'])
|
|
libvirt_utils.fetch_image(context, target, image_id, trusted_certs)
|
|
mock_images.assert_called_once_with(
|
|
context, image_id, target, trusted_certs)
|
|
|
|
@mock.patch('nova.virt.images.fetch')
|
|
def test_fetch_initrd_image(self, mock_images):
|
|
_context = context.RequestContext(project_id=123,
|
|
project_name="aubergine",
|
|
user_id=456,
|
|
user_name="pie")
|
|
target = '/tmp/targetfile'
|
|
image_id = '4'
|
|
trusted_certs = objects.TrustedCerts(
|
|
ids=['0b5d2c72-12cc-4ba6-a8d7-3ff5cc1d8cb8',
|
|
'674736e3-f25c-405c-8362-bbf991e0ce0a'])
|
|
libvirt_utils.fetch_raw_image(_context, target, image_id,
|
|
trusted_certs)
|
|
mock_images.assert_called_once_with(
|
|
_context, image_id, target, trusted_certs)
|
|
|
|
@mock.patch.object(compute_utils, 'disk_ops_semaphore')
|
|
@mock.patch('nova.privsep.utils.supports_direct_io', return_value=True)
|
|
@mock.patch('nova.privsep.qemu.unprivileged_convert_image')
|
|
def test_fetch_raw_image(self, mock_convert_image, mock_direct_io,
|
|
mock_disk_op_sema):
|
|
|
|
def fake_rename(old, new):
|
|
self.executes.append(('mv', old, new))
|
|
|
|
def fake_unlink(path):
|
|
self.executes.append(('rm', path))
|
|
|
|
def fake_rm_on_error(path, remove=None):
|
|
self.executes.append(('rm', '-f', path))
|
|
|
|
def fake_qemu_img_info(path):
|
|
class FakeImgInfo(object):
|
|
pass
|
|
|
|
file_format = path.split('.')[-1]
|
|
if file_format == 'part':
|
|
file_format = path.split('.')[-2]
|
|
elif file_format == 'converted':
|
|
file_format = 'raw'
|
|
|
|
if 'backing' in path:
|
|
backing_file = 'backing'
|
|
else:
|
|
backing_file = None
|
|
|
|
FakeImgInfo.file_format = file_format
|
|
FakeImgInfo.backing_file = backing_file
|
|
FakeImgInfo.virtual_size = 1
|
|
|
|
return FakeImgInfo()
|
|
|
|
self.stub_out('os.rename', fake_rename)
|
|
self.stub_out('os.unlink', fake_unlink)
|
|
self.stub_out('nova.virt.images.fetch', lambda *_, **__: None)
|
|
self.stub_out('nova.virt.images.qemu_img_info', fake_qemu_img_info)
|
|
self.stub_out('oslo_utils.fileutils.delete_if_exists',
|
|
fake_rm_on_error)
|
|
|
|
# Since the remove param of fileutils.remove_path_on_error()
|
|
# is initialized at load time, we must provide a wrapper
|
|
# that explicitly resets it to our fake delete_if_exists()
|
|
old_rm_path_on_error = fileutils.remove_path_on_error
|
|
f = functools.partial(old_rm_path_on_error, remove=fake_rm_on_error)
|
|
self.stub_out('oslo_utils.fileutils.remove_path_on_error', f)
|
|
|
|
context = 'opaque context'
|
|
image_id = '4'
|
|
|
|
target = 't.qcow2'
|
|
self.executes = []
|
|
expected_commands = [('rm', 't.qcow2.part'),
|
|
('mv', 't.qcow2.converted', 't.qcow2')]
|
|
images.fetch_to_raw(context, image_id, target)
|
|
self.assertEqual(self.executes, expected_commands)
|
|
mock_disk_op_sema.__enter__.assert_called_once()
|
|
mock_convert_image.assert_called_with(
|
|
't.qcow2.part', 't.qcow2.converted', 'qcow2', 'raw',
|
|
CONF.instances_path, False)
|
|
mock_convert_image.reset_mock()
|
|
|
|
target = 't.raw'
|
|
self.executes = []
|
|
expected_commands = [('mv', 't.raw.part', 't.raw')]
|
|
images.fetch_to_raw(context, image_id, target)
|
|
self.assertEqual(self.executes, expected_commands)
|
|
mock_convert_image.assert_not_called()
|
|
|
|
target = 'backing.qcow2'
|
|
self.executes = []
|
|
expected_commands = [('rm', '-f', 'backing.qcow2.part')]
|
|
self.assertRaises(exception.ImageUnacceptable,
|
|
images.fetch_to_raw, context, image_id, target)
|
|
self.assertEqual(self.executes, expected_commands)
|
|
mock_convert_image.assert_not_called()
|
|
|
|
del self.executes
|
|
|
|
def test_get_instance_path_at_destination(self):
|
|
instance = fake_instance.fake_instance_obj(None, name='fake_inst',
|
|
uuid=uuids.instance)
|
|
|
|
migrate_data = None
|
|
inst_path_at_dest = libvirt_utils.get_instance_path_at_destination(
|
|
instance, migrate_data)
|
|
expected_path = os.path.join(CONF.instances_path, instance['uuid'])
|
|
self.assertEqual(expected_path, inst_path_at_dest)
|
|
|
|
migrate_data = {}
|
|
inst_path_at_dest = libvirt_utils.get_instance_path_at_destination(
|
|
instance, migrate_data)
|
|
expected_path = os.path.join(CONF.instances_path, instance['uuid'])
|
|
self.assertEqual(expected_path, inst_path_at_dest)
|
|
|
|
migrate_data = objects.LibvirtLiveMigrateData(
|
|
instance_relative_path='fake_relative_path')
|
|
inst_path_at_dest = libvirt_utils.get_instance_path_at_destination(
|
|
instance, migrate_data)
|
|
expected_path = os.path.join(CONF.instances_path, 'fake_relative_path')
|
|
self.assertEqual(expected_path, inst_path_at_dest)
|
|
|
|
def test_get_arch(self):
|
|
image_meta = objects.ImageMeta.from_dict(
|
|
{'properties': {'architecture': "X86_64"}})
|
|
image_arch = libvirt_utils.get_arch(image_meta)
|
|
self.assertEqual(obj_fields.Architecture.X86_64, image_arch)
|
|
|
|
def test_is_mounted(self):
|
|
mount_path = "/var/lib/nova/mnt"
|
|
source = "192.168.0.1:/nova"
|
|
proc_with_mnt = """/dev/sda3 / xfs rw,seclabel,attr2,inode64 0 0
|
|
tmpfs /tmp tmpfs rw,seclabel 0 0
|
|
hugetlbfs /dev/hugepages hugetlbfs rw,seclabel,relatime 0 0
|
|
mqueue /dev/mqueue mqueue rw,seclabel,relatime 0 0
|
|
debugfs /sys/kernel/debug debugfs rw,seclabel,relatime 0 0
|
|
nfsd /proc/fs/nfsd nfsd rw,relatime 0 0
|
|
/dev/sda1 /boot ext4 rw,seclabel,relatime,data=ordered 0 0
|
|
sunrpc /var/lib/nfs/rpc_pipefs rpc_pipefs rw,relatime 0 0
|
|
192.168.0.1:/nova /var/lib/nova/mnt nfs4 rw,relatime,vers=4.1
|
|
"""
|
|
proc_wrong_mnt = """/dev/sda3 / xfs rw,seclabel,attr2,inode64 0 0
|
|
tmpfs /tmp tmpfs rw,seclabel 0 0
|
|
hugetlbfs /dev/hugepages hugetlbfs rw,seclabel,relatime 0 0
|
|
mqueue /dev/mqueue mqueue rw,seclabel,relatime 0 0
|
|
debugfs /sys/kernel/debug debugfs rw,seclabel,relatime 0 0
|
|
nfsd /proc/fs/nfsd nfsd rw,relatime 0 0
|
|
/dev/sda1 /boot ext4 rw,seclabel,relatime,data=ordered 0 0
|
|
sunrpc /var/lib/nfs/rpc_pipefs rpc_pipefs rw,relatime 0 0
|
|
192.168.0.2:/nova /var/lib/nova/mnt nfs4 rw,relatime,vers=4.1
|
|
"""
|
|
proc_without_mnt = """/dev/sda3 / xfs rw,seclabel,,attr2,inode64 0 0
|
|
tmpfs /tmp tmpfs rw,seclabel 0 0
|
|
hugetlbfs /dev/hugepages hugetlbfs rw,seclabel,relatime 0 0
|
|
mqueue /dev/mqueue mqueue rw,seclabel,relatime 0 0
|
|
debugfs /sys/kernel/debug debugfs rw,seclabel,relatime 0 0
|
|
nfsd /proc/fs/nfsd nfsd rw,relatime 0 0
|
|
/dev/sda1 /boot ext4 rw,seclabel,relatime,data=ordered 0 0
|
|
sunrpc /var/lib/nfs/rpc_pipefs rpc_pipefs rw,relatime 0 0
|
|
"""
|
|
with mock.patch.object(os.path, 'ismount') as mock_ismount:
|
|
# is_mounted(mount_path) with no source is equivalent to
|
|
# os.path.ismount(mount_path)
|
|
mock_ismount.return_value = False
|
|
self.assertFalse(libvirt_utils.is_mounted(mount_path))
|
|
|
|
mock_ismount.return_value = True
|
|
self.assertTrue(libvirt_utils.is_mounted(mount_path))
|
|
|
|
# Source is given, and matches source in /proc/mounts
|
|
proc_mnt = mock.mock_open(read_data=proc_with_mnt)
|
|
with mock.patch.object(six.moves.builtins, "open", proc_mnt):
|
|
self.assertTrue(libvirt_utils.is_mounted(mount_path, source))
|
|
|
|
# Source is given, and doesn't match source in /proc/mounts
|
|
proc_mnt = mock.mock_open(read_data=proc_wrong_mnt)
|
|
with mock.patch.object(six.moves.builtins, "open", proc_mnt):
|
|
self.assertFalse(libvirt_utils.is_mounted(mount_path, source))
|
|
|
|
# Source is given, and mountpoint isn't present in /proc/mounts
|
|
# Note that this shouldn't occur, as os.path.ismount should have
|
|
# previously returned False in this case.
|
|
proc_umnt = mock.mock_open(read_data=proc_without_mnt)
|
|
with mock.patch.object(six.moves.builtins, "open", proc_umnt):
|
|
self.assertFalse(libvirt_utils.is_mounted(mount_path, source))
|
|
|
|
def test_find_disk_file_device(self):
|
|
self.useFixture(fakelibvirt.FakeLibvirtFixture())
|
|
xml = """
|
|
<domain type='kvm'>
|
|
<os>
|
|
<type>linux</type>
|
|
</os>
|
|
<devices>
|
|
<disk type="file" device="disk">
|
|
<driver name="qemu" type="qcow2" cache="none" io="native"/>
|
|
<source file="/tmp/hello"/>
|
|
<target bus="ide" dev="/dev/hda"/>
|
|
</disk>
|
|
</devices>
|
|
</domain>
|
|
"""
|
|
virt_dom = mock.Mock(XMLDesc=mock.Mock(return_value=xml))
|
|
guest = libvirt_guest.Guest(virt_dom)
|
|
disk_path, format = libvirt_utils.find_disk(guest)
|
|
self.assertEqual('/tmp/hello', disk_path)
|
|
self.assertEqual('qcow2', format)
|
|
|
|
def test_find_disk_block_device(self):
|
|
self.useFixture(fakelibvirt.FakeLibvirtFixture())
|
|
xml = """
|
|
<domain type='kvm'>
|
|
<os>
|
|
<type>linux</type>
|
|
</os>
|
|
<devices>
|
|
<disk type="block" device="disk">
|
|
<driver name="qemu" type="raw"/>
|
|
<source dev="/dev/nova-vg/hello"/>
|
|
<target bus="ide" dev="/dev/hda"/>
|
|
</disk>
|
|
</devices>
|
|
</domain>
|
|
"""
|
|
virt_dom = mock.Mock(XMLDesc=mock.Mock(return_value=xml))
|
|
guest = libvirt_guest.Guest(virt_dom)
|
|
disk_path, format = libvirt_utils.find_disk(guest)
|
|
self.assertEqual('/dev/nova-vg/hello', disk_path)
|
|
self.assertEqual('raw', format)
|
|
|
|
def test_find_disk_rbd(self):
|
|
self.useFixture(fakelibvirt.FakeLibvirtFixture())
|
|
xml = """
|
|
<domain type='kvm'>
|
|
<os>
|
|
<type>linux</type>
|
|
</os>
|
|
<devices>
|
|
<disk type="network" device="disk">
|
|
<driver name="qemu" type="raw"/>
|
|
<source name="pool/image" protocol="rbd">
|
|
<host name="1.2.3.4" port="456"/>
|
|
</source>
|
|
<target bus="virtio" dev="/dev/vda"/>
|
|
</disk>
|
|
</devices>
|
|
</domain>
|
|
"""
|
|
virt_dom = mock.Mock(XMLDesc=mock.Mock(return_value=xml))
|
|
guest = libvirt_guest.Guest(virt_dom)
|
|
disk_path, format = libvirt_utils.find_disk(guest)
|
|
self.assertEqual('rbd:pool/image', disk_path)
|
|
self.assertEqual('raw', format)
|
|
|
|
def test_find_disk_lxc(self):
|
|
self.useFixture(fakelibvirt.FakeLibvirtFixture())
|
|
xml = """
|
|
<domain type='lxc'>
|
|
<os>
|
|
<type>exe</type>
|
|
</os>
|
|
<devices>
|
|
<filesystem type="mount">
|
|
<source dir="/myhome/rootfs"/>
|
|
<target dir="/"/>
|
|
</filesystem>
|
|
</devices>
|
|
</domain>
|
|
"""
|
|
virt_dom = mock.Mock(XMLDesc=mock.Mock(return_value=xml))
|
|
guest = libvirt_guest.Guest(virt_dom)
|
|
disk_path, format = libvirt_utils.find_disk(guest)
|
|
self.assertEqual('/myhome/disk', disk_path)
|
|
self.assertIsNone(format)
|
|
|
|
def test_find_disk_parallels(self):
|
|
self.useFixture(fakelibvirt.FakeLibvirtFixture())
|
|
xml = """
|
|
<domain type='parallels'>
|
|
<os>
|
|
<type>exe</type>
|
|
</os>
|
|
<devices>
|
|
<filesystem type='file'>"
|
|
<driver format='ploop' type='ploop'/>"
|
|
<source file='/test/disk'/>"
|
|
<target dir='/'/>
|
|
</filesystem>"
|
|
</devices>
|
|
</domain>
|
|
"""
|
|
virt_dom = mock.Mock(XMLDesc=mock.Mock(return_value=xml))
|
|
guest = libvirt_guest.Guest(virt_dom)
|
|
disk_path, format = libvirt_utils.find_disk(guest)
|
|
self.assertEqual('/test/disk', disk_path)
|
|
self.assertEqual('ploop', format)
|
|
|
|
@mock.patch('nova.virt.libvirt.utils.get_arch')
|
|
def test_get_machine_type_from_fallbacks(self, mock_get_arch):
|
|
"""Test hardcoded arch-specific fallbacks for default machine type"""
|
|
image_meta = objects.ImageMeta.from_dict({"disk_format": "raw"})
|
|
host_cpu_archs = {
|
|
obj_fields.Architecture.ARMV7: "virt",
|
|
obj_fields.Architecture.AARCH64: "virt",
|
|
obj_fields.Architecture.S390: "s390-ccw-virtio",
|
|
obj_fields.Architecture.S390X: "s390-ccw-virtio",
|
|
obj_fields.Architecture.I686: "pc",
|
|
obj_fields.Architecture.X86_64: "pc",
|
|
}
|
|
for arch, expected_mtype in host_cpu_archs.items():
|
|
mock_get_arch.return_value = arch
|
|
mtype = libvirt_utils.get_machine_type(image_meta)
|
|
self.assertEqual(expected_mtype, mtype)
|
|
|
|
def test_get_machine_type_from_conf(self):
|
|
self.useFixture(nova_fixtures.ConfPatcher(
|
|
group="libvirt", hw_machine_type=['x86_64=q35', 'i686=legacy']))
|
|
self.assertEqual('q35',
|
|
libvirt_utils.get_default_machine_type('x86_64'))
|
|
|
|
def test_get_machine_type_no_conf_or_fallback(self):
|
|
self.assertIsNone(libvirt_utils.get_default_machine_type('sparc'))
|
|
|
|
def test_get_machine_type_missing_conf_and_fallback(self):
|
|
self.useFixture(nova_fixtures.ConfPatcher(
|
|
group="libvirt", hw_machine_type=['x86_64=q35', 'i686=legacy']))
|
|
self.assertIsNone(libvirt_utils.get_default_machine_type('sparc'))
|
|
|
|
def test_get_machine_type_survives_invalid_conf(self):
|
|
self.useFixture(nova_fixtures.ConfPatcher(
|
|
group="libvirt", hw_machine_type=['x86_64=q35', 'foo']))
|
|
self.assertEqual('q35',
|
|
libvirt_utils.get_default_machine_type('x86_64'))
|
|
|
|
def test_get_machine_type_from_image(self):
|
|
image_meta = objects.ImageMeta.from_dict({
|
|
"disk_format": "raw", "properties": {"hw_machine_type": "q35"}
|
|
})
|
|
os_mach_type = libvirt_utils.get_machine_type(image_meta)
|
|
self.assertEqual('q35', os_mach_type)
|
|
|
|
def test_get_flags_by_flavor_specs(self):
|
|
flavor = objects.Flavor(
|
|
id=1, flavorid='fakeid-1', name='fake1.small', memory_mb=128,
|
|
vcpus=1, root_gb=1, ephemeral_gb=0, swap=0, rxtx_factor=0,
|
|
deleted=False, extra_specs={
|
|
'trait:%s' % os_traits.HW_CPU_X86_3DNOW: 'required',
|
|
'trait:%s' % os_traits.HW_CPU_X86_SSE2: 'required',
|
|
'trait:%s' % os_traits.HW_CPU_HYPERTHREADING: 'required',
|
|
})
|
|
traits = libvirt_utils.get_flags_by_flavor_specs(flavor)
|
|
# we shouldn't see the hyperthreading trait since that's a valid trait
|
|
# but not a CPU flag
|
|
self.assertEqual(set(['3dnow', 'sse2']), traits)
|