virt: Move generic virt tests to nova/tests/virt/

In the interest of better organization, this patch moves generic virt testing
code into nova/tests/virt.

Change-Id: I79473726a848f0abbcb13865efe30e02aea09ae5
This commit is contained in:
Rick Harris
2013-05-21 16:24:39 +00:00
parent b712206e71
commit 62f6727441
6 changed files with 0 additions and 1650 deletions

View File

@@ -1,60 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2013 Citrix Systems, Inc.
# Copyright 2013 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova import test
from nova.virt import driver
class FakeDriver(object):
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
class FakeDriver2(FakeDriver):
pass
class ToDriverRegistryTestCase(test.TestCase):
def assertDriverInstance(self, inst, class_, *args, **kwargs):
self.assertEquals(class_, inst.__class__)
self.assertEquals(args, inst.args)
self.assertEquals(kwargs, inst.kwargs)
def test_driver_dict_from_config(self):
drvs = driver.driver_dict_from_config(
[
'key1=nova.tests.test_driver.FakeDriver',
'key2=nova.tests.test_driver.FakeDriver2',
], 'arg1', 'arg2', param1='value1', param2='value2'
)
self.assertEquals(
sorted(['key1', 'key2']),
sorted(drvs.keys())
)
self.assertDriverInstance(
drvs['key1'],
FakeDriver, 'arg1', 'arg2', param1='value1',
param2='value2')
self.assertDriverInstance(
drvs['key2'],
FakeDriver2, 'arg1', 'arg2', param1='value1',
param2='value2')

View File

@@ -1,139 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Isaku Yamahata
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from nova import test
from nova import utils
from nova.virt.disk import api as disk_api
from nova.virt import driver
class TestVirtDriver(test.TestCase):
def test_block_device(self):
swap = {'device_name': '/dev/sdb',
'swap_size': 1}
ephemerals = [{'num': 0,
'virtual_name': 'ephemeral0',
'device_name': '/dev/sdc1',
'size': 1}]
block_device_mapping = [{'mount_device': '/dev/sde',
'device_path': 'fake_device'}]
block_device_info = {
'root_device_name': '/dev/sda',
'swap': swap,
'ephemerals': ephemerals,
'block_device_mapping': block_device_mapping}
empty_block_device_info = {}
self.assertEqual(
driver.block_device_info_get_root(block_device_info), '/dev/sda')
self.assertEqual(
driver.block_device_info_get_root(empty_block_device_info), None)
self.assertEqual(
driver.block_device_info_get_root(None), None)
self.assertEqual(
driver.block_device_info_get_swap(block_device_info), swap)
self.assertEqual(driver.block_device_info_get_swap(
empty_block_device_info)['device_name'], None)
self.assertEqual(driver.block_device_info_get_swap(
empty_block_device_info)['swap_size'], 0)
self.assertEqual(
driver.block_device_info_get_swap({'swap': None})['device_name'],
None)
self.assertEqual(
driver.block_device_info_get_swap({'swap': None})['swap_size'],
0)
self.assertEqual(
driver.block_device_info_get_swap(None)['device_name'], None)
self.assertEqual(
driver.block_device_info_get_swap(None)['swap_size'], 0)
self.assertEqual(
driver.block_device_info_get_ephemerals(block_device_info),
ephemerals)
self.assertEqual(
driver.block_device_info_get_ephemerals(empty_block_device_info),
[])
self.assertEqual(
driver.block_device_info_get_ephemerals(None),
[])
def test_swap_is_usable(self):
self.assertFalse(driver.swap_is_usable(None))
self.assertFalse(driver.swap_is_usable({'device_name': None}))
self.assertFalse(driver.swap_is_usable({'device_name': '/dev/sdb',
'swap_size': 0}))
self.assertTrue(driver.swap_is_usable({'device_name': '/dev/sdb',
'swap_size': 1}))
class TestVirtDisk(test.TestCase):
def setUp(self):
super(TestVirtDisk, self).setUp()
self.executes = []
def fake_execute(*cmd, **kwargs):
self.executes.append(cmd)
return None, None
self.stubs.Set(utils, 'execute', fake_execute)
def test_lxc_teardown_container(self):
def proc_mounts(self, mount_point):
mount_points = {
'/mnt/loop/nopart': '/dev/loop0',
'/mnt/loop/part': '/dev/mapper/loop0p1',
'/mnt/nbd/nopart': '/dev/nbd15',
'/mnt/nbd/part': '/dev/mapper/nbd15p1',
}
return mount_points[mount_point]
self.stubs.Set(os.path, 'exists', lambda _: True)
self.stubs.Set(disk_api._DiskImage, '_device_for_path', proc_mounts)
expected_commands = []
disk_api.teardown_container('/mnt/loop/nopart')
expected_commands += [
('umount', '/dev/loop0'),
('losetup', '--detach', '/dev/loop0'),
]
disk_api.teardown_container('/mnt/loop/part')
expected_commands += [
('umount', '/dev/mapper/loop0p1'),
('kpartx', '-d', '/dev/loop0'),
('losetup', '--detach', '/dev/loop0'),
]
disk_api.teardown_container('/mnt/nbd/nopart')
expected_commands += [
('umount', '/dev/nbd15'),
('qemu-nbd', '-d', '/dev/nbd15'),
]
disk_api.teardown_container('/mnt/nbd/part')
expected_commands += [
('umount', '/dev/mapper/nbd15p1'),
('kpartx', '-d', '/dev/nbd15'),
('qemu-nbd', '-d', '/dev/nbd15'),
]
self.assertEqual(self.executes, expected_commands)

View File

@@ -1,219 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2012 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import sys
from nova import exception
from nova import test
from nova.tests import fakeguestfs
from nova.virt.disk import api as diskapi
from nova.virt.disk.vfs import guestfs as vfsguestfs
class VirtDiskTest(test.TestCase):
def setUp(self):
super(VirtDiskTest, self).setUp()
sys.modules['guestfs'] = fakeguestfs
vfsguestfs.guestfs = fakeguestfs
def test_inject_data(self):
self.assertTrue(diskapi.inject_data("/some/file", use_cow=True))
self.assertTrue(diskapi.inject_data("/some/file",
mandatory=('files',)))
self.assertTrue(diskapi.inject_data("/some/file", key="mysshkey",
mandatory=('key',)))
os_name = os.name
os.name = 'nt' # Cause password injection to fail
self.assertRaises(exception.NovaException,
diskapi.inject_data,
"/some/file", admin_password="p",
mandatory=('admin_password',))
self.assertFalse(diskapi.inject_data("/some/file", admin_password="p"))
os.name = os_name
def test_inject_data_key(self):
vfs = vfsguestfs.VFSGuestFS("/some/file", "qcow2")
vfs.setup()
diskapi._inject_key_into_fs("mysshkey", vfs)
self.assertTrue("/root/.ssh" in vfs.handle.files)
self.assertEquals(vfs.handle.files["/root/.ssh"],
{'isdir': True, 'gid': 0, 'uid': 0, 'mode': 0700})
self.assertTrue("/root/.ssh/authorized_keys" in vfs.handle.files)
self.assertEquals(vfs.handle.files["/root/.ssh/authorized_keys"],
{'isdir': False,
'content': "Hello World\n# The following ssh " +
"key was injected by Nova\nmysshkey\n",
'gid': 100,
'uid': 100,
'mode': 0600})
vfs.teardown()
def test_inject_data_key_with_selinux(self):
vfs = vfsguestfs.VFSGuestFS("/some/file", "qcow2")
vfs.setup()
vfs.make_path("etc/selinux")
vfs.make_path("etc/rc.d")
diskapi._inject_key_into_fs("mysshkey", vfs)
self.assertTrue("/etc/rc.d/rc.local" in vfs.handle.files)
self.assertEquals(vfs.handle.files["/etc/rc.d/rc.local"],
{'isdir': False,
'content': "Hello World#!/bin/sh\n# Added by " +
"Nova to ensure injected ssh keys " +
"have the right context\nrestorecon " +
"-RF root/.ssh 2>/dev/null || :\n",
'gid': 100,
'uid': 100,
'mode': 0700})
self.assertTrue("/root/.ssh" in vfs.handle.files)
self.assertEquals(vfs.handle.files["/root/.ssh"],
{'isdir': True, 'gid': 0, 'uid': 0, 'mode': 0700})
self.assertTrue("/root/.ssh/authorized_keys" in vfs.handle.files)
self.assertEquals(vfs.handle.files["/root/.ssh/authorized_keys"],
{'isdir': False,
'content': "Hello World\n# The following ssh " +
"key was injected by Nova\nmysshkey\n",
'gid': 100,
'uid': 100,
'mode': 0600})
vfs.teardown()
def test_inject_data_key_with_selinux_append_with_newline(self):
vfs = vfsguestfs.VFSGuestFS("/some/file", "qcow2")
vfs.setup()
vfs.replace_file("/etc/rc.d/rc.local", "#!/bin/sh\necho done")
vfs.make_path("etc/selinux")
vfs.make_path("etc/rc.d")
diskapi._inject_key_into_fs("mysshkey", vfs)
self.assertTrue("/etc/rc.d/rc.local" in vfs.handle.files)
self.assertEquals(vfs.handle.files["/etc/rc.d/rc.local"],
{'isdir': False,
'content': "#!/bin/sh\necho done\n# Added "
"by Nova to ensure injected ssh keys have "
"the right context\nrestorecon -RF "
"root/.ssh 2>/dev/null || :\n",
'gid': 100,
'uid': 100,
'mode': 0700})
vfs.teardown()
def test_inject_net(self):
vfs = vfsguestfs.VFSGuestFS("/some/file", "qcow2")
vfs.setup()
diskapi._inject_net_into_fs("mynetconfig", vfs)
self.assertTrue("/etc/network/interfaces" in vfs.handle.files)
self.assertEquals(vfs.handle.files["/etc/network/interfaces"],
{'content': 'mynetconfig',
'gid': 100,
'isdir': False,
'mode': 0700,
'uid': 100})
vfs.teardown()
def test_inject_metadata(self):
vfs = vfsguestfs.VFSGuestFS("/some/file", "qcow2")
vfs.setup()
diskapi._inject_metadata_into_fs([{"key": "foo",
"value": "bar"},
{"key": "eek",
"value": "wizz"}], vfs)
self.assertTrue("/meta.js" in vfs.handle.files)
self.assertEquals(vfs.handle.files["/meta.js"],
{'content': '{"foo": "bar", ' +
'"eek": "wizz"}',
'gid': 100,
'isdir': False,
'mode': 0700,
'uid': 100})
vfs.teardown()
def test_inject_admin_password(self):
vfs = vfsguestfs.VFSGuestFS("/some/file", "qcow2")
vfs.setup()
def fake_salt():
return "1234567890abcdef"
self.stubs.Set(diskapi, '_generate_salt', fake_salt)
vfs.handle.write("/etc/shadow",
"root:$1$12345678$xxxxx:14917:0:99999:7:::\n" +
"bin:*:14495:0:99999:7:::\n" +
"daemon:*:14495:0:99999:7:::\n")
vfs.handle.write("/etc/passwd",
"root:x:0:0:root:/root:/bin/bash\n" +
"bin:x:1:1:bin:/bin:/sbin/nologin\n" +
"daemon:x:2:2:daemon:/sbin:/sbin/nologin\n")
diskapi._inject_admin_password_into_fs("123456", vfs)
self.assertEquals(vfs.handle.files["/etc/passwd"],
{'content': "root:x:0:0:root:/root:/bin/bash\n" +
"bin:x:1:1:bin:/bin:/sbin/nologin\n" +
"daemon:x:2:2:daemon:/sbin:" +
"/sbin/nologin\n",
'gid': 100,
'isdir': False,
'mode': 0700,
'uid': 100})
shadow = vfs.handle.files["/etc/shadow"]
# if the encrypted password is only 13 characters long, then
# nova.virt.disk.api:_set_password fell back to DES.
if len(shadow['content']) == 91:
self.assertEquals(shadow,
{'content': "root:12tir.zIbWQ3c" +
":14917:0:99999:7:::\n" +
"bin:*:14495:0:99999:7:::\n" +
"daemon:*:14495:0:99999:7:::\n",
'gid': 100,
'isdir': False,
'mode': 0700,
'uid': 100})
else:
self.assertEquals(shadow,
{'content': "root:$1$12345678$a4ge4d5iJ5vw" +
"vbFS88TEN0:14917:0:99999:7:::\n" +
"bin:*:14495:0:99999:7:::\n" +
"daemon:*:14495:0:99999:7:::\n",
'gid': 100,
'isdir': False,
'mode': 0700,
'uid': 100})
vfs.teardown()

View File

@@ -1,203 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2012 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
from nova import exception
from nova import test
from nova.tests import fakeguestfs
from nova.virt.disk.vfs import guestfs as vfsimpl
class VirtDiskVFSGuestFSTest(test.TestCase):
def setUp(self):
super(VirtDiskVFSGuestFSTest, self).setUp()
sys.modules['guestfs'] = fakeguestfs
vfsimpl.guestfs = fakeguestfs
def test_appliance_setup_inspect(self):
vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2",
imgfmt="qcow2",
partition=-1)
vfs.setup()
self.assertEqual(vfs.handle.running, True)
self.assertEqual(len(vfs.handle.mounts), 2)
self.assertEqual(vfs.handle.mounts[0][1],
"/dev/mapper/guestvgf-lv_root")
self.assertEqual(vfs.handle.mounts[1][1], "/dev/vda1")
self.assertEqual(vfs.handle.mounts[0][2], "/")
self.assertEqual(vfs.handle.mounts[1][2], "/boot")
handle = vfs.handle
vfs.teardown()
self.assertEqual(vfs.handle, None)
self.assertEqual(handle.running, False)
self.assertEqual(handle.closed, True)
self.assertEqual(len(handle.mounts), 0)
def test_appliance_setup_inspect_no_root_raises(self):
vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2",
imgfmt="qcow2",
partition=-1)
# call setup to init the handle so we can stub it
vfs.setup()
def fake_inspect_os():
return []
self.stubs.Set(vfs.handle, 'inspect_os', fake_inspect_os)
self.assertRaises(exception.NovaException, vfs.setup_os_inspect)
def test_appliance_setup_inspect_multi_boots_raises(self):
vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2",
imgfmt="qcow2",
partition=-1)
# call setup to init the handle so we can stub it
vfs.setup()
def fake_inspect_os():
return ['fake1', 'fake2']
self.stubs.Set(vfs.handle, 'inspect_os', fake_inspect_os)
self.assertRaises(exception.NovaException, vfs.setup_os_inspect)
def test_appliance_setup_static_nopart(self):
vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2",
imgfmt="qcow2",
partition=None)
vfs.setup()
self.assertEqual(vfs.handle.running, True)
self.assertEqual(len(vfs.handle.mounts), 1)
self.assertEqual(vfs.handle.mounts[0][1], "/dev/sda")
self.assertEqual(vfs.handle.mounts[0][2], "/")
handle = vfs.handle
vfs.teardown()
self.assertEqual(vfs.handle, None)
self.assertEqual(handle.running, False)
self.assertEqual(handle.closed, True)
self.assertEqual(len(handle.mounts), 0)
def test_appliance_setup_static_part(self):
vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2",
imgfmt="qcow2",
partition=2)
vfs.setup()
self.assertEqual(vfs.handle.running, True)
self.assertEqual(len(vfs.handle.mounts), 1)
self.assertEqual(vfs.handle.mounts[0][1], "/dev/sda2")
self.assertEqual(vfs.handle.mounts[0][2], "/")
handle = vfs.handle
vfs.teardown()
self.assertEqual(vfs.handle, None)
self.assertEqual(handle.running, False)
self.assertEqual(handle.closed, True)
self.assertEqual(len(handle.mounts), 0)
def test_makepath(self):
vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", imgfmt="qcow2")
vfs.setup()
vfs.make_path("/some/dir")
vfs.make_path("/other/dir")
self.assertTrue("/some/dir" in vfs.handle.files)
self.assertTrue("/other/dir" in vfs.handle.files)
self.assertTrue(vfs.handle.files["/some/dir"]["isdir"])
self.assertTrue(vfs.handle.files["/other/dir"]["isdir"])
vfs.teardown()
def test_append_file(self):
vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", imgfmt="qcow2")
vfs.setup()
vfs.append_file("/some/file", " Goodbye")
self.assertTrue("/some/file" in vfs.handle.files)
self.assertEqual(vfs.handle.files["/some/file"]["content"],
"Hello World Goodbye")
vfs.teardown()
def test_replace_file(self):
vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", imgfmt="qcow2")
vfs.setup()
vfs.replace_file("/some/file", "Goodbye")
self.assertTrue("/some/file" in vfs.handle.files)
self.assertEqual(vfs.handle.files["/some/file"]["content"],
"Goodbye")
vfs.teardown()
def test_read_file(self):
vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", imgfmt="qcow2")
vfs.setup()
self.assertEqual(vfs.read_file("/some/file"), "Hello World")
vfs.teardown()
def test_has_file(self):
vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", imgfmt="qcow2")
vfs.setup()
vfs.read_file("/some/file")
self.assertTrue(vfs.has_file("/some/file"))
self.assertFalse(vfs.has_file("/other/file"))
vfs.teardown()
def test_set_permissions(self):
vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", imgfmt="qcow2")
vfs.setup()
vfs.read_file("/some/file")
self.assertEquals(vfs.handle.files["/some/file"]["mode"], 0700)
vfs.set_permissions("/some/file", 0777)
self.assertEquals(vfs.handle.files["/some/file"]["mode"], 0777)
vfs.teardown()
def test_set_ownership(self):
vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", imgfmt="qcow2")
vfs.setup()
vfs.read_file("/some/file")
self.assertEquals(vfs.handle.files["/some/file"]["uid"], 100)
self.assertEquals(vfs.handle.files["/some/file"]["gid"], 100)
vfs.set_ownership("/some/file", "fred", None)
self.assertEquals(vfs.handle.files["/some/file"]["uid"], 105)
self.assertEquals(vfs.handle.files["/some/file"]["gid"], 100)
vfs.set_ownership("/some/file", None, "users")
self.assertEquals(vfs.handle.files["/some/file"]["uid"], 105)
self.assertEquals(vfs.handle.files["/some/file"]["gid"], 500)
vfs.set_ownership("/some/file", "joe", "admins")
self.assertEquals(vfs.handle.files["/some/file"]["uid"], 110)
self.assertEquals(vfs.handle.files["/some/file"]["gid"], 600)
vfs.teardown()

View File

@@ -1,387 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2012 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from nova import exception
from nova.openstack.common import processutils
from nova import test
from nova.tests import utils as tests_utils
from nova.virt.disk.vfs import localfs as vfsimpl
CONF = cfg.CONF
dirs = []
files = {}
commands = []
def fake_execute(*args, **kwargs):
commands.append({"args": args, "kwargs": kwargs})
if args[0] == "readlink":
if args[1] == "-nm":
if args[2] in ["/scratch/dir/some/file",
"/scratch/dir/some/dir",
"/scratch/dir/other/dir",
"/scratch/dir/other/file"]:
return args[2], ""
elif args[1] == "-e":
if args[2] in files:
return args[2], ""
return "", "No such file"
elif args[0] == "mkdir":
dirs.append(args[2])
elif args[0] == "chown":
owner = args[1]
path = args[2]
if path not in files:
raise Exception("No such file: " + path)
sep = owner.find(':')
if sep != -1:
user = owner[0:sep]
group = owner[sep + 1:]
else:
user = owner
group = None
if user:
if user == "fred":
uid = 105
else:
uid = 110
files[path]["uid"] = uid
if group:
if group == "users":
gid = 500
else:
gid = 600
files[path]["gid"] = gid
elif args[0] == "chgrp":
group = args[1]
path = args[2]
if path not in files:
raise Exception("No such file: " + path)
if group == "users":
gid = 500
else:
gid = 600
files[path]["gid"] = gid
elif args[0] == "chmod":
mode = args[1]
path = args[2]
if path not in files:
raise Exception("No such file: " + path)
files[path]["mode"] = int(mode, 8)
elif args[0] == "cat":
path = args[1]
if path not in files:
files[path] = {
"content": "Hello World",
"gid": 100,
"uid": 100,
"mode": 0700
}
return files[path]["content"], ""
elif args[0] == "tee":
if args[1] == "-a":
path = args[2]
append = True
else:
path = args[1]
append = False
if path not in files:
files[path] = {
"content": "Hello World",
"gid": 100,
"uid": 100,
"mode": 0700,
}
if append:
files[path]["content"] += kwargs["process_input"]
else:
files[path]["content"] = kwargs["process_input"]
class VirtDiskVFSLocalFSTestPaths(test.TestCase):
def setUp(self):
super(VirtDiskVFSLocalFSTestPaths, self).setUp()
real_execute = processutils.execute
def nonroot_execute(*cmd_parts, **kwargs):
kwargs.pop('run_as_root', None)
return real_execute(*cmd_parts, **kwargs)
self.stubs.Set(processutils, 'execute', nonroot_execute)
def test_check_safe_path(self):
if tests_utils.is_osx():
self.skipTest("Unable to test on OSX")
vfs = vfsimpl.VFSLocalFS("dummy.img")
vfs.imgdir = "/foo"
ret = vfs._canonical_path('etc/something.conf')
self.assertEquals(ret, '/foo/etc/something.conf')
def test_check_unsafe_path(self):
if tests_utils.is_osx():
self.skipTest("Unable to test on OSX")
vfs = vfsimpl.VFSLocalFS("dummy.img")
vfs.imgdir = "/foo"
self.assertRaises(exception.Invalid,
vfs._canonical_path,
'etc/../../../something.conf')
class VirtDiskVFSLocalFSTest(test.TestCase):
def test_makepath(self):
global dirs, commands
dirs = []
commands = []
self.stubs.Set(processutils, 'execute', fake_execute)
vfs = vfsimpl.VFSLocalFS(imgfile="/dummy.qcow2", imgfmt="qcow2")
vfs.imgdir = "/scratch/dir"
vfs.make_path("/some/dir")
vfs.make_path("/other/dir")
self.assertEqual(dirs,
["/scratch/dir/some/dir", "/scratch/dir/other/dir"]),
root_helper = 'sudo nova-rootwrap %s' % CONF.rootwrap_config
self.assertEqual(commands,
[{'args': ('readlink', '-nm',
'/scratch/dir/some/dir'),
'kwargs': {'run_as_root': True,
'root_helper': root_helper}},
{'args': ('mkdir', '-p',
'/scratch/dir/some/dir'),
'kwargs': {'run_as_root': True,
'root_helper': root_helper}},
{'args': ('readlink', '-nm',
'/scratch/dir/other/dir'),
'kwargs': {'run_as_root': True,
'root_helper': root_helper}},
{'args': ('mkdir', '-p',
'/scratch/dir/other/dir'),
'kwargs': {'run_as_root': True,
'root_helper': root_helper}}])
def test_append_file(self):
global files, commands
files = {}
commands = []
self.stubs.Set(processutils, 'execute', fake_execute)
vfs = vfsimpl.VFSLocalFS(imgfile="/dummy.qcow2", imgfmt="qcow2")
vfs.imgdir = "/scratch/dir"
vfs.append_file("/some/file", " Goodbye")
self.assertTrue("/scratch/dir/some/file" in files)
self.assertEquals(files["/scratch/dir/some/file"]["content"],
"Hello World Goodbye")
root_helper = 'sudo nova-rootwrap %s' % CONF.rootwrap_config
self.assertEqual(commands,
[{'args': ('readlink', '-nm',
'/scratch/dir/some/file'),
'kwargs': {'run_as_root': True,
'root_helper': root_helper}},
{'args': ('tee', '-a',
'/scratch/dir/some/file'),
'kwargs': {'process_input': ' Goodbye',
'run_as_root': True,
'root_helper': root_helper}}])
def test_replace_file(self):
global files, commands
files = {}
commands = []
self.stubs.Set(processutils, 'execute', fake_execute)
vfs = vfsimpl.VFSLocalFS(imgfile="/dummy.qcow2", imgfmt="qcow2")
vfs.imgdir = "/scratch/dir"
vfs.replace_file("/some/file", "Goodbye")
self.assertTrue("/scratch/dir/some/file" in files)
self.assertEquals(files["/scratch/dir/some/file"]["content"],
"Goodbye")
root_helper = 'sudo nova-rootwrap %s' % CONF.rootwrap_config
self.assertEqual(commands,
[{'args': ('readlink', '-nm',
'/scratch/dir/some/file'),
'kwargs': {'run_as_root': True,
'root_helper': root_helper}},
{'args': ('tee', '/scratch/dir/some/file'),
'kwargs': {'process_input': 'Goodbye',
'run_as_root': True,
'root_helper': root_helper}}])
def test_read_file(self):
global commands, files
files = {}
commands = []
self.stubs.Set(processutils, 'execute', fake_execute)
vfs = vfsimpl.VFSLocalFS(imgfile="/dummy.qcow2", imgfmt="qcow2")
vfs.imgdir = "/scratch/dir"
self.assertEqual(vfs.read_file("/some/file"), "Hello World")
root_helper = 'sudo nova-rootwrap %s' % CONF.rootwrap_config
self.assertEqual(commands,
[{'args': ('readlink', '-nm',
'/scratch/dir/some/file'),
'kwargs': {'run_as_root': True,
'root_helper': root_helper}},
{'args': ('cat', '/scratch/dir/some/file'),
'kwargs': {'run_as_root': True,
'root_helper': root_helper}}])
def test_has_file(self):
global commands, files
files = {}
commands = []
self.stubs.Set(processutils, 'execute', fake_execute)
vfs = vfsimpl.VFSLocalFS(imgfile="/dummy.qcow2", imgfmt="qcow2")
vfs.imgdir = "/scratch/dir"
vfs.read_file("/some/file")
self.assertTrue(vfs.has_file("/some/file"))
self.assertFalse(vfs.has_file("/other/file"))
root_helper = 'sudo nova-rootwrap %s' % CONF.rootwrap_config
self.assertEqual(commands,
[{'args': ('readlink', '-nm',
'/scratch/dir/some/file'),
'kwargs': {'run_as_root': True,
'root_helper': root_helper}},
{'args': ('cat', '/scratch/dir/some/file'),
'kwargs': {'run_as_root': True,
'root_helper': root_helper}},
{'args': ('readlink', '-nm',
'/scratch/dir/some/file'),
'kwargs': {'run_as_root': True,
'root_helper': root_helper}},
{'args': ('readlink', '-e',
'/scratch/dir/some/file'),
'kwargs': {'run_as_root': True,
'root_helper': root_helper}},
{'args': ('readlink', '-nm',
'/scratch/dir/other/file'),
'kwargs': {'run_as_root': True,
'root_helper': root_helper}},
{'args': ('readlink', '-e',
'/scratch/dir/other/file'),
'kwargs': {'run_as_root': True,
'root_helper': root_helper}},
])
def test_set_permissions(self):
global commands, files
commands = []
files = {}
self.stubs.Set(processutils, 'execute', fake_execute)
vfs = vfsimpl.VFSLocalFS(imgfile="/dummy.qcow2", imgfmt="qcow2")
vfs.imgdir = "/scratch/dir"
vfs.read_file("/some/file")
vfs.set_permissions("/some/file", 0777)
self.assertEquals(files["/scratch/dir/some/file"]["mode"], 0777)
root_helper = 'sudo nova-rootwrap %s' % CONF.rootwrap_config
self.assertEqual(commands,
[{'args': ('readlink', '-nm',
'/scratch/dir/some/file'),
'kwargs': {'run_as_root': True,
'root_helper': root_helper}},
{'args': ('cat', '/scratch/dir/some/file'),
'kwargs': {'run_as_root': True,
'root_helper': root_helper}},
{'args': ('readlink', '-nm',
'/scratch/dir/some/file'),
'kwargs': {'run_as_root': True,
'root_helper': root_helper}},
{'args': ('chmod', '777',
'/scratch/dir/some/file'),
'kwargs': {'run_as_root': True,
'root_helper': root_helper}}])
def test_set_ownership(self):
global commands, files
commands = []
files = {}
self.stubs.Set(processutils, 'execute', fake_execute)
vfs = vfsimpl.VFSLocalFS(imgfile="/dummy.qcow2", imgfmt="qcow2")
vfs.imgdir = "/scratch/dir"
vfs.read_file("/some/file")
self.assertEquals(files["/scratch/dir/some/file"]["uid"], 100)
self.assertEquals(files["/scratch/dir/some/file"]["gid"], 100)
vfs.set_ownership("/some/file", "fred", None)
self.assertEquals(files["/scratch/dir/some/file"]["uid"], 105)
self.assertEquals(files["/scratch/dir/some/file"]["gid"], 100)
vfs.set_ownership("/some/file", None, "users")
self.assertEquals(files["/scratch/dir/some/file"]["uid"], 105)
self.assertEquals(files["/scratch/dir/some/file"]["gid"], 500)
vfs.set_ownership("/some/file", "joe", "admins")
self.assertEquals(files["/scratch/dir/some/file"]["uid"], 110)
self.assertEquals(files["/scratch/dir/some/file"]["gid"], 600)
root_helper = 'sudo nova-rootwrap %s' % CONF.rootwrap_config
self.assertEqual(commands,
[{'args': ('readlink', '-nm',
'/scratch/dir/some/file'),
'kwargs': {'run_as_root': True,
'root_helper': root_helper}},
{'args': ('cat', '/scratch/dir/some/file'),
'kwargs': {'run_as_root': True,
'root_helper': root_helper}},
{'args': ('readlink', '-nm',
'/scratch/dir/some/file'),
'kwargs': {'run_as_root': True,
'root_helper': root_helper}},
{'args': ('chown', 'fred',
'/scratch/dir/some/file'),
'kwargs': {'run_as_root': True,
'root_helper': root_helper}},
{'args': ('readlink', '-nm',
'/scratch/dir/some/file'),
'kwargs': {'run_as_root': True,
'root_helper': root_helper}},
{'args': ('chgrp', 'users',
'/scratch/dir/some/file'),
'kwargs': {'run_as_root': True,
'root_helper': root_helper}},
{'args': ('readlink', '-nm',
'/scratch/dir/some/file'),
'kwargs': {'run_as_root': True,
'root_helper': root_helper}},
{'args': ('chown', 'joe:admins',
'/scratch/dir/some/file'),
'kwargs': {'run_as_root': True,
'root_helper': root_helper}}])

View File

@@ -1,642 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2010 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
import fixtures
import netaddr
import sys
import traceback
from nova.compute import manager
from nova import exception
from nova.openstack.common import importutils
from nova.openstack.common import log as logging
from nova import test
from nova.tests.image import fake as fake_image
from nova.tests import utils as test_utils
from nova.tests.virt.libvirt import fake_libvirt_utils
from nova.virt import event as virtevent
from nova.virt import fake
LOG = logging.getLogger(__name__)
def catch_notimplementederror(f):
"""Decorator to simplify catching drivers raising NotImplementedError
If a particular call makes a driver raise NotImplementedError, we
log it so that we can extract this information afterwards to
automatically generate a hypervisor/feature support matrix."""
def wrapped_func(self, *args, **kwargs):
try:
return f(self, *args, **kwargs)
except NotImplementedError:
frame = traceback.extract_tb(sys.exc_info()[2])[-1]
LOG.error('%(driver)s does not implement %(method)s' % {
'driver': type(self.connection),
'method': frame[2]})
wrapped_func.__name__ = f.__name__
wrapped_func.__doc__ = f.__doc__
return wrapped_func
class _FakeDriverBackendTestCase(object):
def _setup_fakelibvirt(self):
# So that the _supports_direct_io does the test based
# on the current working directory, instead of the
# default instances_path which doesn't exist
self.flags(instances_path='')
# Put fakelibvirt in place
if 'libvirt' in sys.modules:
self.saved_libvirt = sys.modules['libvirt']
else:
self.saved_libvirt = None
import nova.tests.virt.libvirt.fake_imagebackend as fake_imagebackend
import nova.tests.virt.libvirt.fake_libvirt_utils as fake_libvirt_utils
import nova.tests.virt.libvirt.fakelibvirt as fakelibvirt
sys.modules['libvirt'] = fakelibvirt
import nova.virt.libvirt.driver
import nova.virt.libvirt.firewall
self.useFixture(fixtures.MonkeyPatch(
'nova.virt.libvirt.driver.imagebackend',
fake_imagebackend))
self.useFixture(fixtures.MonkeyPatch(
'nova.virt.libvirt.driver.libvirt',
fakelibvirt))
self.useFixture(fixtures.MonkeyPatch(
'nova.virt.libvirt.driver.libvirt_utils',
fake_libvirt_utils))
self.useFixture(fixtures.MonkeyPatch(
'nova.virt.libvirt.imagebackend.libvirt_utils',
fake_libvirt_utils))
self.useFixture(fixtures.MonkeyPatch(
'nova.virt.libvirt.firewall.libvirt',
fakelibvirt))
self.flags(rescue_image_id="2",
rescue_kernel_id="3",
rescue_ramdisk_id=None,
libvirt_snapshots_directory='./')
def fake_extend(image, size):
pass
def fake_migrateToURI(*a):
pass
def fake_make_drive(_self, _path):
pass
def fake_get_instance_disk_info(_self, instance, xml=None):
return '[]'
self.stubs.Set(nova.virt.libvirt.driver.LibvirtDriver,
'get_instance_disk_info',
fake_get_instance_disk_info)
self.stubs.Set(nova.virt.libvirt.driver.disk,
'extend', fake_extend)
# Like the existing fakelibvirt.migrateToURI, do nothing,
# but don't fail for these tests.
self.stubs.Set(nova.virt.libvirt.driver.libvirt.Domain,
'migrateToURI', fake_migrateToURI)
# We can't actually make a config drive v2 because ensure_tree has
# been faked out
self.stubs.Set(nova.virt.configdrive.ConfigDriveBuilder,
'make_drive', fake_make_drive)
def _teardown_fakelibvirt(self):
# Restore libvirt
if self.saved_libvirt:
sys.modules['libvirt'] = self.saved_libvirt
def setUp(self):
super(_FakeDriverBackendTestCase, self).setUp()
# TODO(sdague): it would be nice to do this in a way that only
# the relevant backends where replaced for tests, though this
# should not harm anything by doing it for all backends
fake_image.stub_out_image_service(self.stubs)
self._setup_fakelibvirt()
def tearDown(self):
fake_image.FakeImageService_reset()
self._teardown_fakelibvirt()
super(_FakeDriverBackendTestCase, self).tearDown()
class VirtDriverLoaderTestCase(_FakeDriverBackendTestCase, test.TestCase):
"""Test that ComputeManager can successfully load both
old style and new style drivers and end up with the correct
final class"""
# if your driver supports being tested in a fake way, it can go here
#
# both long form and short form drivers are supported
new_drivers = {
'nova.virt.fake.FakeDriver': 'FakeDriver',
'nova.virt.libvirt.LibvirtDriver': 'LibvirtDriver',
'fake.FakeDriver': 'FakeDriver',
'libvirt.LibvirtDriver': 'LibvirtDriver'
}
def test_load_new_drivers(self):
for cls, driver in self.new_drivers.iteritems():
self.flags(compute_driver=cls)
# NOTE(sdague) the try block is to make it easier to debug a
# failure by knowing which driver broke
try:
cm = manager.ComputeManager()
except Exception as e:
self.fail("Couldn't load driver %s - %s" % (cls, e))
self.assertEqual(cm.driver.__class__.__name__, driver,
"Could't load driver %s" % cls)
def test_fail_to_load_new_drivers(self):
self.flags(compute_driver='nova.virt.amiga')
def _fake_exit(error):
raise test.TestingException()
self.stubs.Set(sys, 'exit', _fake_exit)
self.assertRaises(test.TestingException, manager.ComputeManager)
class _VirtDriverTestCase(_FakeDriverBackendTestCase):
def setUp(self):
super(_VirtDriverTestCase, self).setUp()
self.flags(instances_path=self.useFixture(fixtures.TempDir()).path)
self.connection = importutils.import_object(self.driver_module,
fake.FakeVirtAPI())
self.ctxt = test_utils.get_test_admin_context()
self.image_service = fake_image.FakeImageService()
def _get_running_instance(self):
instance_ref = test_utils.get_test_instance()
network_info = test_utils.get_test_network_info()
image_info = test_utils.get_test_image_info(None, instance_ref)
self.connection.spawn(self.ctxt, instance_ref, image_info,
[], 'herp', network_info=network_info)
return instance_ref, network_info
@catch_notimplementederror
def test_init_host(self):
self.connection.init_host('myhostname')
@catch_notimplementederror
def test_list_instances(self):
self.connection.list_instances()
@catch_notimplementederror
def test_spawn(self):
instance_ref, network_info = self._get_running_instance()
domains = self.connection.list_instances()
self.assertIn(instance_ref['name'], domains)
num_instances = self.connection.get_num_instances()
self.assertEqual(1, num_instances)
@catch_notimplementederror
def test_snapshot_not_running(self):
instance_ref = test_utils.get_test_instance()
img_ref = self.image_service.create(self.ctxt, {'name': 'snap-1'})
self.assertRaises(exception.InstanceNotRunning,
self.connection.snapshot,
self.ctxt, instance_ref, img_ref['id'],
lambda *args, **kwargs: None)
@catch_notimplementederror
def test_snapshot_running(self):
img_ref = self.image_service.create(self.ctxt, {'name': 'snap-1'})
instance_ref, network_info = self._get_running_instance()
self.connection.snapshot(self.ctxt, instance_ref, img_ref['id'],
lambda *args, **kwargs: None)
@catch_notimplementederror
def test_reboot(self):
reboot_type = "SOFT"
instance_ref, network_info = self._get_running_instance()
self.connection.reboot(self.ctxt, instance_ref, network_info,
reboot_type)
@catch_notimplementederror
def test_get_host_ip_addr(self):
host_ip = self.connection.get_host_ip_addr()
# Will raise an exception if it's not a valid IP at all
ip = netaddr.IPAddress(host_ip)
# For now, assume IPv4.
self.assertEquals(ip.version, 4)
@catch_notimplementederror
def test_set_admin_password(self):
instance_ref, network_info = self._get_running_instance()
self.connection.set_admin_password(instance_ref, 'p4ssw0rd')
@catch_notimplementederror
def test_inject_file(self):
instance_ref, network_info = self._get_running_instance()
self.connection.inject_file(instance_ref,
base64.b64encode('/testfile'),
base64.b64encode('testcontents'))
@catch_notimplementederror
def test_resume_state_on_host_boot(self):
instance_ref, network_info = self._get_running_instance()
self.connection.resume_state_on_host_boot(self.ctxt, instance_ref,
network_info)
@catch_notimplementederror
def test_rescue(self):
instance_ref, network_info = self._get_running_instance()
self.connection.rescue(self.ctxt, instance_ref, network_info, None, '')
@catch_notimplementederror
def test_unrescue_unrescued_instance(self):
instance_ref, network_info = self._get_running_instance()
self.connection.unrescue(instance_ref, network_info)
@catch_notimplementederror
def test_unrescue_rescued_instance(self):
instance_ref, network_info = self._get_running_instance()
self.connection.rescue(self.ctxt, instance_ref, network_info, None, '')
self.connection.unrescue(instance_ref, network_info)
@catch_notimplementederror
def test_poll_rebooting_instances(self):
instances = [self._get_running_instance()]
self.connection.poll_rebooting_instances(10, instances)
@catch_notimplementederror
def test_migrate_disk_and_power_off(self):
instance_ref, network_info = self._get_running_instance()
instance_type_ref = test_utils.get_test_instance_type()
self.connection.migrate_disk_and_power_off(
self.ctxt, instance_ref, 'dest_host', instance_type_ref,
network_info)
@catch_notimplementederror
def test_power_off(self):
instance_ref, network_info = self._get_running_instance()
self.connection.power_off(instance_ref)
@catch_notimplementederror
def test_power_on_running(self):
instance_ref, network_info = self._get_running_instance()
self.connection.power_on(instance_ref)
@catch_notimplementederror
def test_power_on_powered_off(self):
instance_ref, network_info = self._get_running_instance()
self.connection.power_off(instance_ref)
self.connection.power_on(instance_ref)
@catch_notimplementederror
def test_soft_delete(self):
instance_ref, network_info = self._get_running_instance()
self.connection.soft_delete(instance_ref)
@catch_notimplementederror
def test_restore_running(self):
instance_ref, network_info = self._get_running_instance()
self.connection.restore(instance_ref)
@catch_notimplementederror
def test_restore_soft_deleted(self):
instance_ref, network_info = self._get_running_instance()
self.connection.soft_delete(instance_ref)
self.connection.restore(instance_ref)
@catch_notimplementederror
def test_pause(self):
instance_ref, network_info = self._get_running_instance()
self.connection.pause(instance_ref)
@catch_notimplementederror
def test_unpause_unpaused_instance(self):
instance_ref, network_info = self._get_running_instance()
self.connection.unpause(instance_ref)
@catch_notimplementederror
def test_unpause_paused_instance(self):
instance_ref, network_info = self._get_running_instance()
self.connection.pause(instance_ref)
self.connection.unpause(instance_ref)
@catch_notimplementederror
def test_suspend(self):
instance_ref, network_info = self._get_running_instance()
self.connection.suspend(instance_ref)
@catch_notimplementederror
def test_resume_unsuspended_instance(self):
instance_ref, network_info = self._get_running_instance()
self.connection.resume(instance_ref, network_info)
@catch_notimplementederror
def test_resume_suspended_instance(self):
instance_ref, network_info = self._get_running_instance()
self.connection.suspend(instance_ref)
self.connection.resume(instance_ref, network_info)
@catch_notimplementederror
def test_destroy_instance_nonexistent(self):
fake_instance = {'id': 42, 'name': 'I just made this up!',
'uuid': 'bda5fb9e-b347-40e8-8256-42397848cb00'}
network_info = test_utils.get_test_network_info()
self.connection.destroy(fake_instance, network_info)
@catch_notimplementederror
def test_destroy_instance(self):
instance_ref, network_info = self._get_running_instance()
self.assertIn(instance_ref['name'],
self.connection.list_instances())
self.connection.destroy(instance_ref, network_info)
self.assertNotIn(instance_ref['name'],
self.connection.list_instances())
@catch_notimplementederror
def test_get_volume_connector(self):
result = self.connection.get_volume_connector({'id': 'fake'})
self.assertTrue('ip' in result)
self.assertTrue('initiator' in result)
self.assertTrue('host' in result)
@catch_notimplementederror
def test_attach_detach_volume(self):
instance_ref, network_info = self._get_running_instance()
self.connection.attach_volume({'driver_volume_type': 'fake'},
instance_ref,
'/dev/sda')
self.connection.detach_volume({'driver_volume_type': 'fake'},
instance_ref,
'/dev/sda')
@catch_notimplementederror
def test_attach_detach_different_power_states(self):
instance_ref, network_info = self._get_running_instance()
self.connection.power_off(instance_ref)
self.connection.attach_volume({'driver_volume_type': 'fake'},
instance_ref,
'/dev/sda')
self.connection.power_on(instance_ref)
self.connection.detach_volume({'driver_volume_type': 'fake'},
instance_ref,
'/dev/sda')
@catch_notimplementederror
def test_get_info(self):
instance_ref, network_info = self._get_running_instance()
info = self.connection.get_info(instance_ref)
self.assertIn('state', info)
self.assertIn('max_mem', info)
self.assertIn('mem', info)
self.assertIn('num_cpu', info)
self.assertIn('cpu_time', info)
@catch_notimplementederror
def test_get_info_for_unknown_instance(self):
self.assertRaises(exception.NotFound,
self.connection.get_info,
{'name': 'I just made this name up'})
@catch_notimplementederror
def test_get_diagnostics(self):
instance_ref, network_info = self._get_running_instance()
self.connection.get_diagnostics(instance_ref)
@catch_notimplementederror
def test_block_stats(self):
instance_ref, network_info = self._get_running_instance()
stats = self.connection.block_stats(instance_ref['name'], 'someid')
self.assertEquals(len(stats), 5)
@catch_notimplementederror
def test_interface_stats(self):
instance_ref, network_info = self._get_running_instance()
stats = self.connection.interface_stats(instance_ref['name'], 'someid')
self.assertEquals(len(stats), 8)
@catch_notimplementederror
def test_get_console_output(self):
fake_libvirt_utils.files['dummy.log'] = ''
instance_ref, network_info = self._get_running_instance()
console_output = self.connection.get_console_output(instance_ref)
self.assertTrue(isinstance(console_output, basestring))
@catch_notimplementederror
def test_get_vnc_console(self):
instance_ref, network_info = self._get_running_instance()
vnc_console = self.connection.get_vnc_console(instance_ref)
self.assertIn('internal_access_path', vnc_console)
self.assertIn('host', vnc_console)
self.assertIn('port', vnc_console)
@catch_notimplementederror
def test_get_spice_console(self):
instance_ref, network_info = self._get_running_instance()
spice_console = self.connection.get_spice_console(instance_ref)
self.assertIn('internal_access_path', spice_console)
self.assertIn('host', spice_console)
self.assertIn('port', spice_console)
self.assertIn('tlsPort', spice_console)
@catch_notimplementederror
def test_get_console_pool_info(self):
instance_ref, network_info = self._get_running_instance()
console_pool = self.connection.get_console_pool_info(instance_ref)
self.assertIn('address', console_pool)
self.assertIn('username', console_pool)
self.assertIn('password', console_pool)
@catch_notimplementederror
def test_refresh_security_group_rules(self):
# FIXME: Create security group and add the instance to it
instance_ref, network_info = self._get_running_instance()
self.connection.refresh_security_group_rules(1)
@catch_notimplementederror
def test_refresh_security_group_members(self):
# FIXME: Create security group and add the instance to it
instance_ref, network_info = self._get_running_instance()
self.connection.refresh_security_group_members(1)
@catch_notimplementederror
def test_refresh_provider_fw_rules(self):
instance_ref, network_info = self._get_running_instance()
self.connection.refresh_provider_fw_rules()
@catch_notimplementederror
def test_ensure_filtering_for_instance(self):
instance_ref = test_utils.get_test_instance()
network_info = test_utils.get_test_network_info()
self.connection.ensure_filtering_rules_for_instance(instance_ref,
network_info)
@catch_notimplementederror
def test_unfilter_instance(self):
instance_ref = test_utils.get_test_instance()
network_info = test_utils.get_test_network_info()
self.connection.unfilter_instance(instance_ref, network_info)
@catch_notimplementederror
def test_live_migration(self):
instance_ref, network_info = self._get_running_instance()
self.connection.live_migration(self.ctxt, instance_ref, 'otherhost',
lambda *a: None, lambda *a: None)
@catch_notimplementederror
def _check_host_status_fields(self, host_status):
self.assertIn('disk_total', host_status)
self.assertIn('disk_used', host_status)
self.assertIn('host_memory_total', host_status)
self.assertIn('host_memory_free', host_status)
@catch_notimplementederror
def test_get_host_stats(self):
host_status = self.connection.get_host_stats()
self._check_host_status_fields(host_status)
@catch_notimplementederror
def test_set_host_enabled(self):
self.connection.set_host_enabled('a useless argument?', True)
@catch_notimplementederror
def test_get_host_uptime(self):
self.connection.get_host_uptime('a useless argument?')
@catch_notimplementederror
def test_host_power_action_reboot(self):
self.connection.host_power_action('a useless argument?', 'reboot')
@catch_notimplementederror
def test_host_power_action_shutdown(self):
self.connection.host_power_action('a useless argument?', 'shutdown')
@catch_notimplementederror
def test_host_power_action_startup(self):
self.connection.host_power_action('a useless argument?', 'startup')
@catch_notimplementederror
def test_add_to_aggregate(self):
self.connection.add_to_aggregate(self.ctxt, 'aggregate', 'host')
@catch_notimplementederror
def test_remove_from_aggregate(self):
self.connection.remove_from_aggregate(self.ctxt, 'aggregate', 'host')
def test_events(self):
got_events = []
def handler(event):
got_events.append(event)
self.connection.register_event_listener(handler)
event1 = virtevent.LifecycleEvent(
"cef19ce0-0ca2-11df-855d-b19fbce37686",
virtevent.EVENT_LIFECYCLE_STARTED)
event2 = virtevent.LifecycleEvent(
"cef19ce0-0ca2-11df-855d-b19fbce37686",
virtevent.EVENT_LIFECYCLE_PAUSED)
self.connection.emit_event(event1)
self.connection.emit_event(event2)
want_events = [event1, event2]
self.assertEqual(want_events, got_events)
event3 = virtevent.LifecycleEvent(
"cef19ce0-0ca2-11df-855d-b19fbce37686",
virtevent.EVENT_LIFECYCLE_RESUMED)
event4 = virtevent.LifecycleEvent(
"cef19ce0-0ca2-11df-855d-b19fbce37686",
virtevent.EVENT_LIFECYCLE_STOPPED)
self.connection.emit_event(event3)
self.connection.emit_event(event4)
want_events = [event1, event2, event3, event4]
self.assertEqual(want_events, got_events)
def test_event_bad_object(self):
# Passing in something which does not inherit
# from virtevent.Event
def handler(event):
pass
self.connection.register_event_listener(handler)
badevent = {
"foo": "bar"
}
self.assertRaises(ValueError,
self.connection.emit_event,
badevent)
def test_event_bad_callback(self):
# Check that if a callback raises an exception,
# it does not propagate back out of the
# 'emit_event' call
def handler(event):
raise Exception("Hit Me!")
self.connection.register_event_listener(handler)
event1 = virtevent.LifecycleEvent(
"cef19ce0-0ca2-11df-855d-b19fbce37686",
virtevent.EVENT_LIFECYCLE_STARTED)
self.connection.emit_event(event1)
class AbstractDriverTestCase(_VirtDriverTestCase, test.TestCase):
def setUp(self):
self.driver_module = "nova.virt.driver.ComputeDriver"
super(AbstractDriverTestCase, self).setUp()
class FakeConnectionTestCase(_VirtDriverTestCase, test.TestCase):
def setUp(self):
self.driver_module = 'nova.virt.fake.FakeDriver'
super(FakeConnectionTestCase, self).setUp()
class LibvirtConnTestCase(_VirtDriverTestCase, test.TestCase):
def setUp(self):
# Point _VirtDriverTestCase at the right module
self.driver_module = 'nova.virt.libvirt.LibvirtDriver'
super(LibvirtConnTestCase, self).setUp()
def test_force_hard_reboot(self):
self.flags(libvirt_wait_soft_reboot_seconds=0)
self.test_reboot()
def test_migrate_disk_and_power_off(self):
# there is lack of fake stuff to execute this method. so pass.
self.skipTest("Test nothing, but this method"
" needed to override superclass.")