From 62f6727441e1b17a0bfaddd789f87ad68d873c72 Mon Sep 17 00:00:00 2001 From: Rick Harris Date: Tue, 21 May 2013 16:24:39 +0000 Subject: [PATCH] virt: Move generic virt tests to nova/tests/virt/ In the interest of better organization, this patch moves generic virt testing code into nova/tests/virt. Change-Id: I79473726a848f0abbcb13865efe30e02aea09ae5 --- nova/tests/test_driver.py | 60 --- nova/tests/test_virt.py | 139 ----- nova/tests/test_virt_disk.py | 219 -------- nova/tests/test_virt_disk_vfs_guestfs.py | 203 ------- nova/tests/test_virt_disk_vfs_localfs.py | 387 -------------- nova/tests/test_virt_drivers.py | 642 ----------------------- 6 files changed, 1650 deletions(-) delete mode 100644 nova/tests/test_driver.py delete mode 100644 nova/tests/test_virt.py delete mode 100644 nova/tests/test_virt_disk.py delete mode 100644 nova/tests/test_virt_disk_vfs_guestfs.py delete mode 100644 nova/tests/test_virt_disk_vfs_localfs.py delete mode 100644 nova/tests/test_virt_drivers.py diff --git a/nova/tests/test_driver.py b/nova/tests/test_driver.py deleted file mode 100644 index 6fd8b00b..00000000 --- a/nova/tests/test_driver.py +++ /dev/null @@ -1,60 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (c) 2013 Citrix Systems, Inc. -# Copyright 2013 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from nova import test -from nova.virt import driver - - -class FakeDriver(object): - def __init__(self, *args, **kwargs): - self.args = args - self.kwargs = kwargs - - -class FakeDriver2(FakeDriver): - pass - - -class ToDriverRegistryTestCase(test.TestCase): - - def assertDriverInstance(self, inst, class_, *args, **kwargs): - self.assertEquals(class_, inst.__class__) - self.assertEquals(args, inst.args) - self.assertEquals(kwargs, inst.kwargs) - - def test_driver_dict_from_config(self): - drvs = driver.driver_dict_from_config( - [ - 'key1=nova.tests.test_driver.FakeDriver', - 'key2=nova.tests.test_driver.FakeDriver2', - ], 'arg1', 'arg2', param1='value1', param2='value2' - ) - - self.assertEquals( - sorted(['key1', 'key2']), - sorted(drvs.keys()) - ) - - self.assertDriverInstance( - drvs['key1'], - FakeDriver, 'arg1', 'arg2', param1='value1', - param2='value2') - - self.assertDriverInstance( - drvs['key2'], - FakeDriver2, 'arg1', 'arg2', param1='value1', - param2='value2') diff --git a/nova/tests/test_virt.py b/nova/tests/test_virt.py deleted file mode 100644 index 452277c5..00000000 --- a/nova/tests/test_virt.py +++ /dev/null @@ -1,139 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2011 Isaku Yamahata -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import os - -from nova import test -from nova import utils -from nova.virt.disk import api as disk_api -from nova.virt import driver - - -class TestVirtDriver(test.TestCase): - def test_block_device(self): - swap = {'device_name': '/dev/sdb', - 'swap_size': 1} - ephemerals = [{'num': 0, - 'virtual_name': 'ephemeral0', - 'device_name': '/dev/sdc1', - 'size': 1}] - block_device_mapping = [{'mount_device': '/dev/sde', - 'device_path': 'fake_device'}] - block_device_info = { - 'root_device_name': '/dev/sda', - 'swap': swap, - 'ephemerals': ephemerals, - 'block_device_mapping': block_device_mapping} - - empty_block_device_info = {} - - self.assertEqual( - driver.block_device_info_get_root(block_device_info), '/dev/sda') - self.assertEqual( - driver.block_device_info_get_root(empty_block_device_info), None) - self.assertEqual( - driver.block_device_info_get_root(None), None) - - self.assertEqual( - driver.block_device_info_get_swap(block_device_info), swap) - self.assertEqual(driver.block_device_info_get_swap( - empty_block_device_info)['device_name'], None) - self.assertEqual(driver.block_device_info_get_swap( - empty_block_device_info)['swap_size'], 0) - self.assertEqual( - driver.block_device_info_get_swap({'swap': None})['device_name'], - None) - self.assertEqual( - driver.block_device_info_get_swap({'swap': None})['swap_size'], - 0) - self.assertEqual( - driver.block_device_info_get_swap(None)['device_name'], None) - self.assertEqual( - driver.block_device_info_get_swap(None)['swap_size'], 0) - - self.assertEqual( - driver.block_device_info_get_ephemerals(block_device_info), - ephemerals) - self.assertEqual( - driver.block_device_info_get_ephemerals(empty_block_device_info), - []) - self.assertEqual( - driver.block_device_info_get_ephemerals(None), - []) - - def test_swap_is_usable(self): - self.assertFalse(driver.swap_is_usable(None)) - self.assertFalse(driver.swap_is_usable({'device_name': None})) - self.assertFalse(driver.swap_is_usable({'device_name': '/dev/sdb', - 'swap_size': 0})) - self.assertTrue(driver.swap_is_usable({'device_name': '/dev/sdb', - 'swap_size': 1})) - - -class TestVirtDisk(test.TestCase): - def setUp(self): - super(TestVirtDisk, self).setUp() - self.executes = [] - - def fake_execute(*cmd, **kwargs): - self.executes.append(cmd) - return None, None - - self.stubs.Set(utils, 'execute', fake_execute) - - def test_lxc_teardown_container(self): - - def proc_mounts(self, mount_point): - mount_points = { - '/mnt/loop/nopart': '/dev/loop0', - '/mnt/loop/part': '/dev/mapper/loop0p1', - '/mnt/nbd/nopart': '/dev/nbd15', - '/mnt/nbd/part': '/dev/mapper/nbd15p1', - } - return mount_points[mount_point] - - self.stubs.Set(os.path, 'exists', lambda _: True) - self.stubs.Set(disk_api._DiskImage, '_device_for_path', proc_mounts) - expected_commands = [] - - disk_api.teardown_container('/mnt/loop/nopart') - expected_commands += [ - ('umount', '/dev/loop0'), - ('losetup', '--detach', '/dev/loop0'), - ] - - disk_api.teardown_container('/mnt/loop/part') - expected_commands += [ - ('umount', '/dev/mapper/loop0p1'), - ('kpartx', '-d', '/dev/loop0'), - ('losetup', '--detach', '/dev/loop0'), - ] - - disk_api.teardown_container('/mnt/nbd/nopart') - expected_commands += [ - ('umount', '/dev/nbd15'), - ('qemu-nbd', '-d', '/dev/nbd15'), - ] - - disk_api.teardown_container('/mnt/nbd/part') - expected_commands += [ - ('umount', '/dev/mapper/nbd15p1'), - ('kpartx', '-d', '/dev/nbd15'), - ('qemu-nbd', '-d', '/dev/nbd15'), - ] - - self.assertEqual(self.executes, expected_commands) diff --git a/nova/tests/test_virt_disk.py b/nova/tests/test_virt_disk.py deleted file mode 100644 index 0c51e826..00000000 --- a/nova/tests/test_virt_disk.py +++ /dev/null @@ -1,219 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 -# -# Copyright (C) 2012 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import os -import sys - -from nova import exception -from nova import test -from nova.tests import fakeguestfs -from nova.virt.disk import api as diskapi -from nova.virt.disk.vfs import guestfs as vfsguestfs - - -class VirtDiskTest(test.TestCase): - - def setUp(self): - super(VirtDiskTest, self).setUp() - sys.modules['guestfs'] = fakeguestfs - vfsguestfs.guestfs = fakeguestfs - - def test_inject_data(self): - - self.assertTrue(diskapi.inject_data("/some/file", use_cow=True)) - - self.assertTrue(diskapi.inject_data("/some/file", - mandatory=('files',))) - - self.assertTrue(diskapi.inject_data("/some/file", key="mysshkey", - mandatory=('key',))) - - os_name = os.name - os.name = 'nt' # Cause password injection to fail - self.assertRaises(exception.NovaException, - diskapi.inject_data, - "/some/file", admin_password="p", - mandatory=('admin_password',)) - self.assertFalse(diskapi.inject_data("/some/file", admin_password="p")) - os.name = os_name - - def test_inject_data_key(self): - - vfs = vfsguestfs.VFSGuestFS("/some/file", "qcow2") - vfs.setup() - - diskapi._inject_key_into_fs("mysshkey", vfs) - - self.assertTrue("/root/.ssh" in vfs.handle.files) - self.assertEquals(vfs.handle.files["/root/.ssh"], - {'isdir': True, 'gid': 0, 'uid': 0, 'mode': 0700}) - self.assertTrue("/root/.ssh/authorized_keys" in vfs.handle.files) - self.assertEquals(vfs.handle.files["/root/.ssh/authorized_keys"], - {'isdir': False, - 'content': "Hello World\n# The following ssh " + - "key was injected by Nova\nmysshkey\n", - 'gid': 100, - 'uid': 100, - 'mode': 0600}) - - vfs.teardown() - - def test_inject_data_key_with_selinux(self): - - vfs = vfsguestfs.VFSGuestFS("/some/file", "qcow2") - vfs.setup() - - vfs.make_path("etc/selinux") - vfs.make_path("etc/rc.d") - diskapi._inject_key_into_fs("mysshkey", vfs) - - self.assertTrue("/etc/rc.d/rc.local" in vfs.handle.files) - self.assertEquals(vfs.handle.files["/etc/rc.d/rc.local"], - {'isdir': False, - 'content': "Hello World#!/bin/sh\n# Added by " + - "Nova to ensure injected ssh keys " + - "have the right context\nrestorecon " + - "-RF root/.ssh 2>/dev/null || :\n", - 'gid': 100, - 'uid': 100, - 'mode': 0700}) - - self.assertTrue("/root/.ssh" in vfs.handle.files) - self.assertEquals(vfs.handle.files["/root/.ssh"], - {'isdir': True, 'gid': 0, 'uid': 0, 'mode': 0700}) - self.assertTrue("/root/.ssh/authorized_keys" in vfs.handle.files) - self.assertEquals(vfs.handle.files["/root/.ssh/authorized_keys"], - {'isdir': False, - 'content': "Hello World\n# The following ssh " + - "key was injected by Nova\nmysshkey\n", - 'gid': 100, - 'uid': 100, - 'mode': 0600}) - - vfs.teardown() - - def test_inject_data_key_with_selinux_append_with_newline(self): - - vfs = vfsguestfs.VFSGuestFS("/some/file", "qcow2") - vfs.setup() - - vfs.replace_file("/etc/rc.d/rc.local", "#!/bin/sh\necho done") - vfs.make_path("etc/selinux") - vfs.make_path("etc/rc.d") - diskapi._inject_key_into_fs("mysshkey", vfs) - - self.assertTrue("/etc/rc.d/rc.local" in vfs.handle.files) - self.assertEquals(vfs.handle.files["/etc/rc.d/rc.local"], - {'isdir': False, - 'content': "#!/bin/sh\necho done\n# Added " - "by Nova to ensure injected ssh keys have " - "the right context\nrestorecon -RF " - "root/.ssh 2>/dev/null || :\n", - 'gid': 100, - 'uid': 100, - 'mode': 0700}) - vfs.teardown() - - def test_inject_net(self): - - vfs = vfsguestfs.VFSGuestFS("/some/file", "qcow2") - vfs.setup() - - diskapi._inject_net_into_fs("mynetconfig", vfs) - - self.assertTrue("/etc/network/interfaces" in vfs.handle.files) - self.assertEquals(vfs.handle.files["/etc/network/interfaces"], - {'content': 'mynetconfig', - 'gid': 100, - 'isdir': False, - 'mode': 0700, - 'uid': 100}) - vfs.teardown() - - def test_inject_metadata(self): - vfs = vfsguestfs.VFSGuestFS("/some/file", "qcow2") - vfs.setup() - - diskapi._inject_metadata_into_fs([{"key": "foo", - "value": "bar"}, - {"key": "eek", - "value": "wizz"}], vfs) - - self.assertTrue("/meta.js" in vfs.handle.files) - self.assertEquals(vfs.handle.files["/meta.js"], - {'content': '{"foo": "bar", ' + - '"eek": "wizz"}', - 'gid': 100, - 'isdir': False, - 'mode': 0700, - 'uid': 100}) - vfs.teardown() - - def test_inject_admin_password(self): - vfs = vfsguestfs.VFSGuestFS("/some/file", "qcow2") - vfs.setup() - - def fake_salt(): - return "1234567890abcdef" - - self.stubs.Set(diskapi, '_generate_salt', fake_salt) - - vfs.handle.write("/etc/shadow", - "root:$1$12345678$xxxxx:14917:0:99999:7:::\n" + - "bin:*:14495:0:99999:7:::\n" + - "daemon:*:14495:0:99999:7:::\n") - - vfs.handle.write("/etc/passwd", - "root:x:0:0:root:/root:/bin/bash\n" + - "bin:x:1:1:bin:/bin:/sbin/nologin\n" + - "daemon:x:2:2:daemon:/sbin:/sbin/nologin\n") - - diskapi._inject_admin_password_into_fs("123456", vfs) - - self.assertEquals(vfs.handle.files["/etc/passwd"], - {'content': "root:x:0:0:root:/root:/bin/bash\n" + - "bin:x:1:1:bin:/bin:/sbin/nologin\n" + - "daemon:x:2:2:daemon:/sbin:" + - "/sbin/nologin\n", - 'gid': 100, - 'isdir': False, - 'mode': 0700, - 'uid': 100}) - shadow = vfs.handle.files["/etc/shadow"] - - # if the encrypted password is only 13 characters long, then - # nova.virt.disk.api:_set_password fell back to DES. - if len(shadow['content']) == 91: - self.assertEquals(shadow, - {'content': "root:12tir.zIbWQ3c" + - ":14917:0:99999:7:::\n" + - "bin:*:14495:0:99999:7:::\n" + - "daemon:*:14495:0:99999:7:::\n", - 'gid': 100, - 'isdir': False, - 'mode': 0700, - 'uid': 100}) - else: - self.assertEquals(shadow, - {'content': "root:$1$12345678$a4ge4d5iJ5vw" + - "vbFS88TEN0:14917:0:99999:7:::\n" + - "bin:*:14495:0:99999:7:::\n" + - "daemon:*:14495:0:99999:7:::\n", - 'gid': 100, - 'isdir': False, - 'mode': 0700, - 'uid': 100}) - vfs.teardown() diff --git a/nova/tests/test_virt_disk_vfs_guestfs.py b/nova/tests/test_virt_disk_vfs_guestfs.py deleted file mode 100644 index 16c85f81..00000000 --- a/nova/tests/test_virt_disk_vfs_guestfs.py +++ /dev/null @@ -1,203 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 -# -# Copyright (C) 2012 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import sys - -from nova import exception -from nova import test - -from nova.tests import fakeguestfs -from nova.virt.disk.vfs import guestfs as vfsimpl - - -class VirtDiskVFSGuestFSTest(test.TestCase): - - def setUp(self): - super(VirtDiskVFSGuestFSTest, self).setUp() - sys.modules['guestfs'] = fakeguestfs - vfsimpl.guestfs = fakeguestfs - - def test_appliance_setup_inspect(self): - vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", - imgfmt="qcow2", - partition=-1) - vfs.setup() - - self.assertEqual(vfs.handle.running, True) - self.assertEqual(len(vfs.handle.mounts), 2) - self.assertEqual(vfs.handle.mounts[0][1], - "/dev/mapper/guestvgf-lv_root") - self.assertEqual(vfs.handle.mounts[1][1], "/dev/vda1") - self.assertEqual(vfs.handle.mounts[0][2], "/") - self.assertEqual(vfs.handle.mounts[1][2], "/boot") - - handle = vfs.handle - vfs.teardown() - - self.assertEqual(vfs.handle, None) - self.assertEqual(handle.running, False) - self.assertEqual(handle.closed, True) - self.assertEqual(len(handle.mounts), 0) - - def test_appliance_setup_inspect_no_root_raises(self): - vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", - imgfmt="qcow2", - partition=-1) - # call setup to init the handle so we can stub it - vfs.setup() - - def fake_inspect_os(): - return [] - - self.stubs.Set(vfs.handle, 'inspect_os', fake_inspect_os) - self.assertRaises(exception.NovaException, vfs.setup_os_inspect) - - def test_appliance_setup_inspect_multi_boots_raises(self): - vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", - imgfmt="qcow2", - partition=-1) - # call setup to init the handle so we can stub it - vfs.setup() - - def fake_inspect_os(): - return ['fake1', 'fake2'] - - self.stubs.Set(vfs.handle, 'inspect_os', fake_inspect_os) - self.assertRaises(exception.NovaException, vfs.setup_os_inspect) - - def test_appliance_setup_static_nopart(self): - vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", - imgfmt="qcow2", - partition=None) - vfs.setup() - - self.assertEqual(vfs.handle.running, True) - self.assertEqual(len(vfs.handle.mounts), 1) - self.assertEqual(vfs.handle.mounts[0][1], "/dev/sda") - self.assertEqual(vfs.handle.mounts[0][2], "/") - - handle = vfs.handle - vfs.teardown() - - self.assertEqual(vfs.handle, None) - self.assertEqual(handle.running, False) - self.assertEqual(handle.closed, True) - self.assertEqual(len(handle.mounts), 0) - - def test_appliance_setup_static_part(self): - vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", - imgfmt="qcow2", - partition=2) - vfs.setup() - - self.assertEqual(vfs.handle.running, True) - self.assertEqual(len(vfs.handle.mounts), 1) - self.assertEqual(vfs.handle.mounts[0][1], "/dev/sda2") - self.assertEqual(vfs.handle.mounts[0][2], "/") - - handle = vfs.handle - vfs.teardown() - - self.assertEqual(vfs.handle, None) - self.assertEqual(handle.running, False) - self.assertEqual(handle.closed, True) - self.assertEqual(len(handle.mounts), 0) - - def test_makepath(self): - vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", imgfmt="qcow2") - vfs.setup() - vfs.make_path("/some/dir") - vfs.make_path("/other/dir") - - self.assertTrue("/some/dir" in vfs.handle.files) - self.assertTrue("/other/dir" in vfs.handle.files) - self.assertTrue(vfs.handle.files["/some/dir"]["isdir"]) - self.assertTrue(vfs.handle.files["/other/dir"]["isdir"]) - - vfs.teardown() - - def test_append_file(self): - vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", imgfmt="qcow2") - vfs.setup() - vfs.append_file("/some/file", " Goodbye") - - self.assertTrue("/some/file" in vfs.handle.files) - self.assertEqual(vfs.handle.files["/some/file"]["content"], - "Hello World Goodbye") - - vfs.teardown() - - def test_replace_file(self): - vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", imgfmt="qcow2") - vfs.setup() - vfs.replace_file("/some/file", "Goodbye") - - self.assertTrue("/some/file" in vfs.handle.files) - self.assertEqual(vfs.handle.files["/some/file"]["content"], - "Goodbye") - - vfs.teardown() - - def test_read_file(self): - vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", imgfmt="qcow2") - vfs.setup() - self.assertEqual(vfs.read_file("/some/file"), "Hello World") - - vfs.teardown() - - def test_has_file(self): - vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", imgfmt="qcow2") - vfs.setup() - vfs.read_file("/some/file") - - self.assertTrue(vfs.has_file("/some/file")) - self.assertFalse(vfs.has_file("/other/file")) - - vfs.teardown() - - def test_set_permissions(self): - vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", imgfmt="qcow2") - vfs.setup() - vfs.read_file("/some/file") - - self.assertEquals(vfs.handle.files["/some/file"]["mode"], 0700) - - vfs.set_permissions("/some/file", 0777) - self.assertEquals(vfs.handle.files["/some/file"]["mode"], 0777) - - vfs.teardown() - - def test_set_ownership(self): - vfs = vfsimpl.VFSGuestFS(imgfile="/dummy.qcow2", imgfmt="qcow2") - vfs.setup() - vfs.read_file("/some/file") - - self.assertEquals(vfs.handle.files["/some/file"]["uid"], 100) - self.assertEquals(vfs.handle.files["/some/file"]["gid"], 100) - - vfs.set_ownership("/some/file", "fred", None) - self.assertEquals(vfs.handle.files["/some/file"]["uid"], 105) - self.assertEquals(vfs.handle.files["/some/file"]["gid"], 100) - - vfs.set_ownership("/some/file", None, "users") - self.assertEquals(vfs.handle.files["/some/file"]["uid"], 105) - self.assertEquals(vfs.handle.files["/some/file"]["gid"], 500) - - vfs.set_ownership("/some/file", "joe", "admins") - self.assertEquals(vfs.handle.files["/some/file"]["uid"], 110) - self.assertEquals(vfs.handle.files["/some/file"]["gid"], 600) - - vfs.teardown() diff --git a/nova/tests/test_virt_disk_vfs_localfs.py b/nova/tests/test_virt_disk_vfs_localfs.py deleted file mode 100644 index b52817b1..00000000 --- a/nova/tests/test_virt_disk_vfs_localfs.py +++ /dev/null @@ -1,387 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 -# -# Copyright (C) 2012 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo.config import cfg - -from nova import exception -from nova.openstack.common import processutils -from nova import test -from nova.tests import utils as tests_utils - -from nova.virt.disk.vfs import localfs as vfsimpl - -CONF = cfg.CONF - -dirs = [] -files = {} -commands = [] - - -def fake_execute(*args, **kwargs): - commands.append({"args": args, "kwargs": kwargs}) - - if args[0] == "readlink": - if args[1] == "-nm": - if args[2] in ["/scratch/dir/some/file", - "/scratch/dir/some/dir", - "/scratch/dir/other/dir", - "/scratch/dir/other/file"]: - return args[2], "" - elif args[1] == "-e": - if args[2] in files: - return args[2], "" - - return "", "No such file" - elif args[0] == "mkdir": - dirs.append(args[2]) - elif args[0] == "chown": - owner = args[1] - path = args[2] - if path not in files: - raise Exception("No such file: " + path) - - sep = owner.find(':') - if sep != -1: - user = owner[0:sep] - group = owner[sep + 1:] - else: - user = owner - group = None - - if user: - if user == "fred": - uid = 105 - else: - uid = 110 - files[path]["uid"] = uid - if group: - if group == "users": - gid = 500 - else: - gid = 600 - files[path]["gid"] = gid - elif args[0] == "chgrp": - group = args[1] - path = args[2] - if path not in files: - raise Exception("No such file: " + path) - - if group == "users": - gid = 500 - else: - gid = 600 - files[path]["gid"] = gid - elif args[0] == "chmod": - mode = args[1] - path = args[2] - if path not in files: - raise Exception("No such file: " + path) - - files[path]["mode"] = int(mode, 8) - elif args[0] == "cat": - path = args[1] - if path not in files: - files[path] = { - "content": "Hello World", - "gid": 100, - "uid": 100, - "mode": 0700 - } - return files[path]["content"], "" - elif args[0] == "tee": - if args[1] == "-a": - path = args[2] - append = True - else: - path = args[1] - append = False - if path not in files: - files[path] = { - "content": "Hello World", - "gid": 100, - "uid": 100, - "mode": 0700, - } - if append: - files[path]["content"] += kwargs["process_input"] - else: - files[path]["content"] = kwargs["process_input"] - - -class VirtDiskVFSLocalFSTestPaths(test.TestCase): - def setUp(self): - super(VirtDiskVFSLocalFSTestPaths, self).setUp() - - real_execute = processutils.execute - - def nonroot_execute(*cmd_parts, **kwargs): - kwargs.pop('run_as_root', None) - return real_execute(*cmd_parts, **kwargs) - - self.stubs.Set(processutils, 'execute', nonroot_execute) - - def test_check_safe_path(self): - if tests_utils.is_osx(): - self.skipTest("Unable to test on OSX") - vfs = vfsimpl.VFSLocalFS("dummy.img") - vfs.imgdir = "/foo" - ret = vfs._canonical_path('etc/something.conf') - self.assertEquals(ret, '/foo/etc/something.conf') - - def test_check_unsafe_path(self): - if tests_utils.is_osx(): - self.skipTest("Unable to test on OSX") - vfs = vfsimpl.VFSLocalFS("dummy.img") - vfs.imgdir = "/foo" - self.assertRaises(exception.Invalid, - vfs._canonical_path, - 'etc/../../../something.conf') - - -class VirtDiskVFSLocalFSTest(test.TestCase): - def test_makepath(self): - global dirs, commands - dirs = [] - commands = [] - self.stubs.Set(processutils, 'execute', fake_execute) - - vfs = vfsimpl.VFSLocalFS(imgfile="/dummy.qcow2", imgfmt="qcow2") - vfs.imgdir = "/scratch/dir" - vfs.make_path("/some/dir") - vfs.make_path("/other/dir") - - self.assertEqual(dirs, - ["/scratch/dir/some/dir", "/scratch/dir/other/dir"]), - - root_helper = 'sudo nova-rootwrap %s' % CONF.rootwrap_config - self.assertEqual(commands, - [{'args': ('readlink', '-nm', - '/scratch/dir/some/dir'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('mkdir', '-p', - '/scratch/dir/some/dir'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('readlink', '-nm', - '/scratch/dir/other/dir'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('mkdir', '-p', - '/scratch/dir/other/dir'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}]) - - def test_append_file(self): - global files, commands - files = {} - commands = [] - self.stubs.Set(processutils, 'execute', fake_execute) - - vfs = vfsimpl.VFSLocalFS(imgfile="/dummy.qcow2", imgfmt="qcow2") - vfs.imgdir = "/scratch/dir" - vfs.append_file("/some/file", " Goodbye") - - self.assertTrue("/scratch/dir/some/file" in files) - self.assertEquals(files["/scratch/dir/some/file"]["content"], - "Hello World Goodbye") - - root_helper = 'sudo nova-rootwrap %s' % CONF.rootwrap_config - self.assertEqual(commands, - [{'args': ('readlink', '-nm', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('tee', '-a', - '/scratch/dir/some/file'), - 'kwargs': {'process_input': ' Goodbye', - 'run_as_root': True, - 'root_helper': root_helper}}]) - - def test_replace_file(self): - global files, commands - files = {} - commands = [] - self.stubs.Set(processutils, 'execute', fake_execute) - - vfs = vfsimpl.VFSLocalFS(imgfile="/dummy.qcow2", imgfmt="qcow2") - vfs.imgdir = "/scratch/dir" - vfs.replace_file("/some/file", "Goodbye") - - self.assertTrue("/scratch/dir/some/file" in files) - self.assertEquals(files["/scratch/dir/some/file"]["content"], - "Goodbye") - - root_helper = 'sudo nova-rootwrap %s' % CONF.rootwrap_config - self.assertEqual(commands, - [{'args': ('readlink', '-nm', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('tee', '/scratch/dir/some/file'), - 'kwargs': {'process_input': 'Goodbye', - 'run_as_root': True, - 'root_helper': root_helper}}]) - - def test_read_file(self): - global commands, files - files = {} - commands = [] - self.stubs.Set(processutils, 'execute', fake_execute) - - vfs = vfsimpl.VFSLocalFS(imgfile="/dummy.qcow2", imgfmt="qcow2") - vfs.imgdir = "/scratch/dir" - self.assertEqual(vfs.read_file("/some/file"), "Hello World") - - root_helper = 'sudo nova-rootwrap %s' % CONF.rootwrap_config - self.assertEqual(commands, - [{'args': ('readlink', '-nm', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('cat', '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}]) - - def test_has_file(self): - global commands, files - files = {} - commands = [] - self.stubs.Set(processutils, 'execute', fake_execute) - - vfs = vfsimpl.VFSLocalFS(imgfile="/dummy.qcow2", imgfmt="qcow2") - vfs.imgdir = "/scratch/dir" - vfs.read_file("/some/file") - - self.assertTrue(vfs.has_file("/some/file")) - self.assertFalse(vfs.has_file("/other/file")) - - root_helper = 'sudo nova-rootwrap %s' % CONF.rootwrap_config - self.assertEqual(commands, - [{'args': ('readlink', '-nm', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('cat', '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('readlink', '-nm', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('readlink', '-e', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('readlink', '-nm', - '/scratch/dir/other/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('readlink', '-e', - '/scratch/dir/other/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - ]) - - def test_set_permissions(self): - global commands, files - commands = [] - files = {} - self.stubs.Set(processutils, 'execute', fake_execute) - - vfs = vfsimpl.VFSLocalFS(imgfile="/dummy.qcow2", imgfmt="qcow2") - vfs.imgdir = "/scratch/dir" - vfs.read_file("/some/file") - - vfs.set_permissions("/some/file", 0777) - self.assertEquals(files["/scratch/dir/some/file"]["mode"], 0777) - - root_helper = 'sudo nova-rootwrap %s' % CONF.rootwrap_config - self.assertEqual(commands, - [{'args': ('readlink', '-nm', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('cat', '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('readlink', '-nm', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('chmod', '777', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}]) - - def test_set_ownership(self): - global commands, files - commands = [] - files = {} - self.stubs.Set(processutils, 'execute', fake_execute) - - vfs = vfsimpl.VFSLocalFS(imgfile="/dummy.qcow2", imgfmt="qcow2") - vfs.imgdir = "/scratch/dir" - vfs.read_file("/some/file") - - self.assertEquals(files["/scratch/dir/some/file"]["uid"], 100) - self.assertEquals(files["/scratch/dir/some/file"]["gid"], 100) - - vfs.set_ownership("/some/file", "fred", None) - self.assertEquals(files["/scratch/dir/some/file"]["uid"], 105) - self.assertEquals(files["/scratch/dir/some/file"]["gid"], 100) - - vfs.set_ownership("/some/file", None, "users") - self.assertEquals(files["/scratch/dir/some/file"]["uid"], 105) - self.assertEquals(files["/scratch/dir/some/file"]["gid"], 500) - - vfs.set_ownership("/some/file", "joe", "admins") - self.assertEquals(files["/scratch/dir/some/file"]["uid"], 110) - self.assertEquals(files["/scratch/dir/some/file"]["gid"], 600) - - root_helper = 'sudo nova-rootwrap %s' % CONF.rootwrap_config - self.assertEqual(commands, - [{'args': ('readlink', '-nm', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('cat', '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('readlink', '-nm', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('chown', 'fred', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('readlink', '-nm', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('chgrp', 'users', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('readlink', '-nm', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}, - {'args': ('chown', 'joe:admins', - '/scratch/dir/some/file'), - 'kwargs': {'run_as_root': True, - 'root_helper': root_helper}}]) diff --git a/nova/tests/test_virt_drivers.py b/nova/tests/test_virt_drivers.py deleted file mode 100644 index c054b962..00000000 --- a/nova/tests/test_virt_drivers.py +++ /dev/null @@ -1,642 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 -# -# Copyright 2010 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import base64 -import fixtures -import netaddr -import sys -import traceback - -from nova.compute import manager -from nova import exception -from nova.openstack.common import importutils -from nova.openstack.common import log as logging -from nova import test -from nova.tests.image import fake as fake_image -from nova.tests import utils as test_utils -from nova.tests.virt.libvirt import fake_libvirt_utils -from nova.virt import event as virtevent -from nova.virt import fake - -LOG = logging.getLogger(__name__) - - -def catch_notimplementederror(f): - """Decorator to simplify catching drivers raising NotImplementedError - - If a particular call makes a driver raise NotImplementedError, we - log it so that we can extract this information afterwards to - automatically generate a hypervisor/feature support matrix.""" - def wrapped_func(self, *args, **kwargs): - try: - return f(self, *args, **kwargs) - except NotImplementedError: - frame = traceback.extract_tb(sys.exc_info()[2])[-1] - LOG.error('%(driver)s does not implement %(method)s' % { - 'driver': type(self.connection), - 'method': frame[2]}) - - wrapped_func.__name__ = f.__name__ - wrapped_func.__doc__ = f.__doc__ - return wrapped_func - - -class _FakeDriverBackendTestCase(object): - def _setup_fakelibvirt(self): - # So that the _supports_direct_io does the test based - # on the current working directory, instead of the - # default instances_path which doesn't exist - self.flags(instances_path='') - - # Put fakelibvirt in place - if 'libvirt' in sys.modules: - self.saved_libvirt = sys.modules['libvirt'] - else: - self.saved_libvirt = None - - import nova.tests.virt.libvirt.fake_imagebackend as fake_imagebackend - import nova.tests.virt.libvirt.fake_libvirt_utils as fake_libvirt_utils - import nova.tests.virt.libvirt.fakelibvirt as fakelibvirt - - sys.modules['libvirt'] = fakelibvirt - import nova.virt.libvirt.driver - import nova.virt.libvirt.firewall - - self.useFixture(fixtures.MonkeyPatch( - 'nova.virt.libvirt.driver.imagebackend', - fake_imagebackend)) - self.useFixture(fixtures.MonkeyPatch( - 'nova.virt.libvirt.driver.libvirt', - fakelibvirt)) - self.useFixture(fixtures.MonkeyPatch( - 'nova.virt.libvirt.driver.libvirt_utils', - fake_libvirt_utils)) - self.useFixture(fixtures.MonkeyPatch( - 'nova.virt.libvirt.imagebackend.libvirt_utils', - fake_libvirt_utils)) - self.useFixture(fixtures.MonkeyPatch( - 'nova.virt.libvirt.firewall.libvirt', - fakelibvirt)) - - self.flags(rescue_image_id="2", - rescue_kernel_id="3", - rescue_ramdisk_id=None, - libvirt_snapshots_directory='./') - - def fake_extend(image, size): - pass - - def fake_migrateToURI(*a): - pass - - def fake_make_drive(_self, _path): - pass - - def fake_get_instance_disk_info(_self, instance, xml=None): - return '[]' - - self.stubs.Set(nova.virt.libvirt.driver.LibvirtDriver, - 'get_instance_disk_info', - fake_get_instance_disk_info) - - self.stubs.Set(nova.virt.libvirt.driver.disk, - 'extend', fake_extend) - - # Like the existing fakelibvirt.migrateToURI, do nothing, - # but don't fail for these tests. - self.stubs.Set(nova.virt.libvirt.driver.libvirt.Domain, - 'migrateToURI', fake_migrateToURI) - - # We can't actually make a config drive v2 because ensure_tree has - # been faked out - self.stubs.Set(nova.virt.configdrive.ConfigDriveBuilder, - 'make_drive', fake_make_drive) - - def _teardown_fakelibvirt(self): - # Restore libvirt - if self.saved_libvirt: - sys.modules['libvirt'] = self.saved_libvirt - - def setUp(self): - super(_FakeDriverBackendTestCase, self).setUp() - # TODO(sdague): it would be nice to do this in a way that only - # the relevant backends where replaced for tests, though this - # should not harm anything by doing it for all backends - fake_image.stub_out_image_service(self.stubs) - self._setup_fakelibvirt() - - def tearDown(self): - fake_image.FakeImageService_reset() - self._teardown_fakelibvirt() - super(_FakeDriverBackendTestCase, self).tearDown() - - -class VirtDriverLoaderTestCase(_FakeDriverBackendTestCase, test.TestCase): - """Test that ComputeManager can successfully load both - old style and new style drivers and end up with the correct - final class""" - - # if your driver supports being tested in a fake way, it can go here - # - # both long form and short form drivers are supported - new_drivers = { - 'nova.virt.fake.FakeDriver': 'FakeDriver', - 'nova.virt.libvirt.LibvirtDriver': 'LibvirtDriver', - 'fake.FakeDriver': 'FakeDriver', - 'libvirt.LibvirtDriver': 'LibvirtDriver' - } - - def test_load_new_drivers(self): - for cls, driver in self.new_drivers.iteritems(): - self.flags(compute_driver=cls) - # NOTE(sdague) the try block is to make it easier to debug a - # failure by knowing which driver broke - try: - cm = manager.ComputeManager() - except Exception as e: - self.fail("Couldn't load driver %s - %s" % (cls, e)) - - self.assertEqual(cm.driver.__class__.__name__, driver, - "Could't load driver %s" % cls) - - def test_fail_to_load_new_drivers(self): - self.flags(compute_driver='nova.virt.amiga') - - def _fake_exit(error): - raise test.TestingException() - - self.stubs.Set(sys, 'exit', _fake_exit) - self.assertRaises(test.TestingException, manager.ComputeManager) - - -class _VirtDriverTestCase(_FakeDriverBackendTestCase): - def setUp(self): - super(_VirtDriverTestCase, self).setUp() - - self.flags(instances_path=self.useFixture(fixtures.TempDir()).path) - self.connection = importutils.import_object(self.driver_module, - fake.FakeVirtAPI()) - self.ctxt = test_utils.get_test_admin_context() - self.image_service = fake_image.FakeImageService() - - def _get_running_instance(self): - instance_ref = test_utils.get_test_instance() - network_info = test_utils.get_test_network_info() - image_info = test_utils.get_test_image_info(None, instance_ref) - self.connection.spawn(self.ctxt, instance_ref, image_info, - [], 'herp', network_info=network_info) - return instance_ref, network_info - - @catch_notimplementederror - def test_init_host(self): - self.connection.init_host('myhostname') - - @catch_notimplementederror - def test_list_instances(self): - self.connection.list_instances() - - @catch_notimplementederror - def test_spawn(self): - instance_ref, network_info = self._get_running_instance() - domains = self.connection.list_instances() - self.assertIn(instance_ref['name'], domains) - - num_instances = self.connection.get_num_instances() - self.assertEqual(1, num_instances) - - @catch_notimplementederror - def test_snapshot_not_running(self): - instance_ref = test_utils.get_test_instance() - img_ref = self.image_service.create(self.ctxt, {'name': 'snap-1'}) - self.assertRaises(exception.InstanceNotRunning, - self.connection.snapshot, - self.ctxt, instance_ref, img_ref['id'], - lambda *args, **kwargs: None) - - @catch_notimplementederror - def test_snapshot_running(self): - img_ref = self.image_service.create(self.ctxt, {'name': 'snap-1'}) - instance_ref, network_info = self._get_running_instance() - self.connection.snapshot(self.ctxt, instance_ref, img_ref['id'], - lambda *args, **kwargs: None) - - @catch_notimplementederror - def test_reboot(self): - reboot_type = "SOFT" - instance_ref, network_info = self._get_running_instance() - self.connection.reboot(self.ctxt, instance_ref, network_info, - reboot_type) - - @catch_notimplementederror - def test_get_host_ip_addr(self): - host_ip = self.connection.get_host_ip_addr() - - # Will raise an exception if it's not a valid IP at all - ip = netaddr.IPAddress(host_ip) - - # For now, assume IPv4. - self.assertEquals(ip.version, 4) - - @catch_notimplementederror - def test_set_admin_password(self): - instance_ref, network_info = self._get_running_instance() - self.connection.set_admin_password(instance_ref, 'p4ssw0rd') - - @catch_notimplementederror - def test_inject_file(self): - instance_ref, network_info = self._get_running_instance() - self.connection.inject_file(instance_ref, - base64.b64encode('/testfile'), - base64.b64encode('testcontents')) - - @catch_notimplementederror - def test_resume_state_on_host_boot(self): - instance_ref, network_info = self._get_running_instance() - self.connection.resume_state_on_host_boot(self.ctxt, instance_ref, - network_info) - - @catch_notimplementederror - def test_rescue(self): - instance_ref, network_info = self._get_running_instance() - self.connection.rescue(self.ctxt, instance_ref, network_info, None, '') - - @catch_notimplementederror - def test_unrescue_unrescued_instance(self): - instance_ref, network_info = self._get_running_instance() - self.connection.unrescue(instance_ref, network_info) - - @catch_notimplementederror - def test_unrescue_rescued_instance(self): - instance_ref, network_info = self._get_running_instance() - self.connection.rescue(self.ctxt, instance_ref, network_info, None, '') - self.connection.unrescue(instance_ref, network_info) - - @catch_notimplementederror - def test_poll_rebooting_instances(self): - instances = [self._get_running_instance()] - self.connection.poll_rebooting_instances(10, instances) - - @catch_notimplementederror - def test_migrate_disk_and_power_off(self): - instance_ref, network_info = self._get_running_instance() - instance_type_ref = test_utils.get_test_instance_type() - self.connection.migrate_disk_and_power_off( - self.ctxt, instance_ref, 'dest_host', instance_type_ref, - network_info) - - @catch_notimplementederror - def test_power_off(self): - instance_ref, network_info = self._get_running_instance() - self.connection.power_off(instance_ref) - - @catch_notimplementederror - def test_power_on_running(self): - instance_ref, network_info = self._get_running_instance() - self.connection.power_on(instance_ref) - - @catch_notimplementederror - def test_power_on_powered_off(self): - instance_ref, network_info = self._get_running_instance() - self.connection.power_off(instance_ref) - self.connection.power_on(instance_ref) - - @catch_notimplementederror - def test_soft_delete(self): - instance_ref, network_info = self._get_running_instance() - self.connection.soft_delete(instance_ref) - - @catch_notimplementederror - def test_restore_running(self): - instance_ref, network_info = self._get_running_instance() - self.connection.restore(instance_ref) - - @catch_notimplementederror - def test_restore_soft_deleted(self): - instance_ref, network_info = self._get_running_instance() - self.connection.soft_delete(instance_ref) - self.connection.restore(instance_ref) - - @catch_notimplementederror - def test_pause(self): - instance_ref, network_info = self._get_running_instance() - self.connection.pause(instance_ref) - - @catch_notimplementederror - def test_unpause_unpaused_instance(self): - instance_ref, network_info = self._get_running_instance() - self.connection.unpause(instance_ref) - - @catch_notimplementederror - def test_unpause_paused_instance(self): - instance_ref, network_info = self._get_running_instance() - self.connection.pause(instance_ref) - self.connection.unpause(instance_ref) - - @catch_notimplementederror - def test_suspend(self): - instance_ref, network_info = self._get_running_instance() - self.connection.suspend(instance_ref) - - @catch_notimplementederror - def test_resume_unsuspended_instance(self): - instance_ref, network_info = self._get_running_instance() - self.connection.resume(instance_ref, network_info) - - @catch_notimplementederror - def test_resume_suspended_instance(self): - instance_ref, network_info = self._get_running_instance() - self.connection.suspend(instance_ref) - self.connection.resume(instance_ref, network_info) - - @catch_notimplementederror - def test_destroy_instance_nonexistent(self): - fake_instance = {'id': 42, 'name': 'I just made this up!', - 'uuid': 'bda5fb9e-b347-40e8-8256-42397848cb00'} - network_info = test_utils.get_test_network_info() - self.connection.destroy(fake_instance, network_info) - - @catch_notimplementederror - def test_destroy_instance(self): - instance_ref, network_info = self._get_running_instance() - self.assertIn(instance_ref['name'], - self.connection.list_instances()) - self.connection.destroy(instance_ref, network_info) - self.assertNotIn(instance_ref['name'], - self.connection.list_instances()) - - @catch_notimplementederror - def test_get_volume_connector(self): - result = self.connection.get_volume_connector({'id': 'fake'}) - self.assertTrue('ip' in result) - self.assertTrue('initiator' in result) - self.assertTrue('host' in result) - - @catch_notimplementederror - def test_attach_detach_volume(self): - instance_ref, network_info = self._get_running_instance() - self.connection.attach_volume({'driver_volume_type': 'fake'}, - instance_ref, - '/dev/sda') - self.connection.detach_volume({'driver_volume_type': 'fake'}, - instance_ref, - '/dev/sda') - - @catch_notimplementederror - def test_attach_detach_different_power_states(self): - instance_ref, network_info = self._get_running_instance() - self.connection.power_off(instance_ref) - self.connection.attach_volume({'driver_volume_type': 'fake'}, - instance_ref, - '/dev/sda') - self.connection.power_on(instance_ref) - self.connection.detach_volume({'driver_volume_type': 'fake'}, - instance_ref, - '/dev/sda') - - @catch_notimplementederror - def test_get_info(self): - instance_ref, network_info = self._get_running_instance() - info = self.connection.get_info(instance_ref) - self.assertIn('state', info) - self.assertIn('max_mem', info) - self.assertIn('mem', info) - self.assertIn('num_cpu', info) - self.assertIn('cpu_time', info) - - @catch_notimplementederror - def test_get_info_for_unknown_instance(self): - self.assertRaises(exception.NotFound, - self.connection.get_info, - {'name': 'I just made this name up'}) - - @catch_notimplementederror - def test_get_diagnostics(self): - instance_ref, network_info = self._get_running_instance() - self.connection.get_diagnostics(instance_ref) - - @catch_notimplementederror - def test_block_stats(self): - instance_ref, network_info = self._get_running_instance() - stats = self.connection.block_stats(instance_ref['name'], 'someid') - self.assertEquals(len(stats), 5) - - @catch_notimplementederror - def test_interface_stats(self): - instance_ref, network_info = self._get_running_instance() - stats = self.connection.interface_stats(instance_ref['name'], 'someid') - self.assertEquals(len(stats), 8) - - @catch_notimplementederror - def test_get_console_output(self): - fake_libvirt_utils.files['dummy.log'] = '' - instance_ref, network_info = self._get_running_instance() - console_output = self.connection.get_console_output(instance_ref) - self.assertTrue(isinstance(console_output, basestring)) - - @catch_notimplementederror - def test_get_vnc_console(self): - instance_ref, network_info = self._get_running_instance() - vnc_console = self.connection.get_vnc_console(instance_ref) - self.assertIn('internal_access_path', vnc_console) - self.assertIn('host', vnc_console) - self.assertIn('port', vnc_console) - - @catch_notimplementederror - def test_get_spice_console(self): - instance_ref, network_info = self._get_running_instance() - spice_console = self.connection.get_spice_console(instance_ref) - self.assertIn('internal_access_path', spice_console) - self.assertIn('host', spice_console) - self.assertIn('port', spice_console) - self.assertIn('tlsPort', spice_console) - - @catch_notimplementederror - def test_get_console_pool_info(self): - instance_ref, network_info = self._get_running_instance() - console_pool = self.connection.get_console_pool_info(instance_ref) - self.assertIn('address', console_pool) - self.assertIn('username', console_pool) - self.assertIn('password', console_pool) - - @catch_notimplementederror - def test_refresh_security_group_rules(self): - # FIXME: Create security group and add the instance to it - instance_ref, network_info = self._get_running_instance() - self.connection.refresh_security_group_rules(1) - - @catch_notimplementederror - def test_refresh_security_group_members(self): - # FIXME: Create security group and add the instance to it - instance_ref, network_info = self._get_running_instance() - self.connection.refresh_security_group_members(1) - - @catch_notimplementederror - def test_refresh_provider_fw_rules(self): - instance_ref, network_info = self._get_running_instance() - self.connection.refresh_provider_fw_rules() - - @catch_notimplementederror - def test_ensure_filtering_for_instance(self): - instance_ref = test_utils.get_test_instance() - network_info = test_utils.get_test_network_info() - self.connection.ensure_filtering_rules_for_instance(instance_ref, - network_info) - - @catch_notimplementederror - def test_unfilter_instance(self): - instance_ref = test_utils.get_test_instance() - network_info = test_utils.get_test_network_info() - self.connection.unfilter_instance(instance_ref, network_info) - - @catch_notimplementederror - def test_live_migration(self): - instance_ref, network_info = self._get_running_instance() - self.connection.live_migration(self.ctxt, instance_ref, 'otherhost', - lambda *a: None, lambda *a: None) - - @catch_notimplementederror - def _check_host_status_fields(self, host_status): - self.assertIn('disk_total', host_status) - self.assertIn('disk_used', host_status) - self.assertIn('host_memory_total', host_status) - self.assertIn('host_memory_free', host_status) - - @catch_notimplementederror - def test_get_host_stats(self): - host_status = self.connection.get_host_stats() - self._check_host_status_fields(host_status) - - @catch_notimplementederror - def test_set_host_enabled(self): - self.connection.set_host_enabled('a useless argument?', True) - - @catch_notimplementederror - def test_get_host_uptime(self): - self.connection.get_host_uptime('a useless argument?') - - @catch_notimplementederror - def test_host_power_action_reboot(self): - self.connection.host_power_action('a useless argument?', 'reboot') - - @catch_notimplementederror - def test_host_power_action_shutdown(self): - self.connection.host_power_action('a useless argument?', 'shutdown') - - @catch_notimplementederror - def test_host_power_action_startup(self): - self.connection.host_power_action('a useless argument?', 'startup') - - @catch_notimplementederror - def test_add_to_aggregate(self): - self.connection.add_to_aggregate(self.ctxt, 'aggregate', 'host') - - @catch_notimplementederror - def test_remove_from_aggregate(self): - self.connection.remove_from_aggregate(self.ctxt, 'aggregate', 'host') - - def test_events(self): - got_events = [] - - def handler(event): - got_events.append(event) - - self.connection.register_event_listener(handler) - - event1 = virtevent.LifecycleEvent( - "cef19ce0-0ca2-11df-855d-b19fbce37686", - virtevent.EVENT_LIFECYCLE_STARTED) - event2 = virtevent.LifecycleEvent( - "cef19ce0-0ca2-11df-855d-b19fbce37686", - virtevent.EVENT_LIFECYCLE_PAUSED) - - self.connection.emit_event(event1) - self.connection.emit_event(event2) - want_events = [event1, event2] - self.assertEqual(want_events, got_events) - - event3 = virtevent.LifecycleEvent( - "cef19ce0-0ca2-11df-855d-b19fbce37686", - virtevent.EVENT_LIFECYCLE_RESUMED) - event4 = virtevent.LifecycleEvent( - "cef19ce0-0ca2-11df-855d-b19fbce37686", - virtevent.EVENT_LIFECYCLE_STOPPED) - - self.connection.emit_event(event3) - self.connection.emit_event(event4) - - want_events = [event1, event2, event3, event4] - self.assertEqual(want_events, got_events) - - def test_event_bad_object(self): - # Passing in something which does not inherit - # from virtevent.Event - - def handler(event): - pass - - self.connection.register_event_listener(handler) - - badevent = { - "foo": "bar" - } - - self.assertRaises(ValueError, - self.connection.emit_event, - badevent) - - def test_event_bad_callback(self): - # Check that if a callback raises an exception, - # it does not propagate back out of the - # 'emit_event' call - - def handler(event): - raise Exception("Hit Me!") - - self.connection.register_event_listener(handler) - - event1 = virtevent.LifecycleEvent( - "cef19ce0-0ca2-11df-855d-b19fbce37686", - virtevent.EVENT_LIFECYCLE_STARTED) - - self.connection.emit_event(event1) - - -class AbstractDriverTestCase(_VirtDriverTestCase, test.TestCase): - def setUp(self): - self.driver_module = "nova.virt.driver.ComputeDriver" - super(AbstractDriverTestCase, self).setUp() - - -class FakeConnectionTestCase(_VirtDriverTestCase, test.TestCase): - def setUp(self): - self.driver_module = 'nova.virt.fake.FakeDriver' - super(FakeConnectionTestCase, self).setUp() - - -class LibvirtConnTestCase(_VirtDriverTestCase, test.TestCase): - def setUp(self): - # Point _VirtDriverTestCase at the right module - self.driver_module = 'nova.virt.libvirt.LibvirtDriver' - super(LibvirtConnTestCase, self).setUp() - - def test_force_hard_reboot(self): - self.flags(libvirt_wait_soft_reboot_seconds=0) - self.test_reboot() - - def test_migrate_disk_and_power_off(self): - # there is lack of fake stuff to execute this method. so pass. - self.skipTest("Test nothing, but this method" - " needed to override superclass.")