Extend test_virt_driver to also test libvirt driver.

To support this, I've added a fake libvirt implementation. It's supposed
to expose an API and behaviour identical to that of libvirt itself
except without actually running any VM's or setting up any firewall or
anything, but still responding correctly when asked for a domain's XML,
a list of defined domains, running domains, etc.

I've also split out everything from libvirt.connection that is
potentially destructive or otherwise undesirable to run during testing,
and moved it to a new nova.virt.libvirt.utils. I added tests for those
things separately as well as stub version of it for testing. I hope
eventually to make it similar to fakelibvirt in style (e.g. keep track
of files created and deleted and attempts to open a file that it doesn't
know about, you'll get proper exceptions with proper errnos set and
whatnot).

Change-Id: Id90b260933e3443b4ffb3b29e4bc0cbc82c19ba6
This commit is contained in:
Soren Hansen
2011-09-26 16:15:18 +02:00
parent dc26109044
commit 37d9591f7e
5 changed files with 1580 additions and 188 deletions

View File

@@ -0,0 +1,104 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import StringIO
files = {}
disk_sizes = {}
disk_backing_files = {}
def create_image(disk_format, path, size):
pass
def create_cow_image(backing_file, path):
pass
def get_disk_size(path):
return disk_sizes.get(path, 1024 * 1024 * 20)
def get_backing_file(path):
return disk_backing_files.get(path, None)
def copy_image(src, dest):
pass
def mkfs(fs, path):
pass
def ensure_tree(path):
pass
def write_to_file(path, contents, umask=None):
pass
def chown(path, owner):
pass
def extract_snapshot(disk_path, source_fmt, snapshot_name, out_path, dest_fmt):
files[out_path] = ''
class File(object):
def __init__(self, path, mode=None):
self.fp = StringIO.StringIO(files[path])
def __enter__(self):
return self.fp
def __exit__(self, *args):
return
def file_open(path, mode=None):
return File(path, mode)
def load_file(path):
return ''
def file_delete(path):
return True
def get_open_port(start_port, end_port):
# Return the port in the middle
return int((start_port + end_port) / 2)
def run_ajaxterm(cmd, token, port):
pass
def get_fs_info(path):
return {'total': 128 * (1024 ** 3),
'used': 44 * (1024 ** 3),
'free': 84 * (1024 ** 3)}
def fetch_image(context, target, image_id, user_id, project_id,
size=None):
pass

779
nova/tests/fakelibvirt.py Normal file
View File

@@ -0,0 +1,779 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2010 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from xml.etree.ElementTree import fromstring as xml_to_tree
from xml.etree.ElementTree import ParseError
import uuid
# Allow passing None to the various connect methods
# (i.e. allow the client to rely on default URLs)
allow_default_uri_connection = True
# string indicating the CPU arch
node_arch = 'x86_64' # or 'i686' (or whatever else uname -m might return)
# memory size in kilobytes
node_kB_mem = 4096
# the number of active CPUs
node_cpus = 2
# expected CPU frequency
node_mhz = 800
# the number of NUMA cell, 1 for unusual NUMA topologies or uniform
# memory access; check capabilities XML for the actual NUMA topology
node_nodes = 1 # NUMA nodes
# number of CPU sockets per node if nodes > 1, total number of CPU
# sockets otherwise
node_sockets = 1
# number of cores per socket
node_cores = 2
# number of threads per core
node_threads = 1
# CPU model
node_cpu_model = "Penryn"
# CPU vendor
node_cpu_vendor = "Intel"
def _reset():
global allow_default_uri_connection
allow_default_uri_connection = True
# virDomainState
VIR_DOMAIN_NOSTATE = 0
VIR_DOMAIN_RUNNING = 1
VIR_DOMAIN_BLOCKED = 2
VIR_DOMAIN_PAUSED = 3
VIR_DOMAIN_SHUTDOWN = 4
VIR_DOMAIN_SHUTOFF = 5
VIR_DOMAIN_CRASHED = 6
VIR_CPU_COMPARE_ERROR = -1
VIR_CPU_COMPARE_INCOMPATIBLE = 0
VIR_CPU_COMPARE_IDENTICAL = 1
VIR_CPU_COMPARE_SUPERSET = 2
VIR_CRED_AUTHNAME = 2
VIR_CRED_NOECHOPROMPT = 7
# libvirtError enums
# (Intentionally different from what's in libvirt. We do this to check,
# that consumers of the library are using the symbolic names rather than
# hardcoding the numerical values)
VIR_FROM_QEMU = 100
VIR_FROM_DOMAIN = 200
VIR_FROM_NWFILTER = 330
VIR_ERR_XML_DETAIL = 350
VIR_ERR_NO_DOMAIN = 420
VIR_ERR_NO_NWFILTER = 620
def _parse_disk_info(element):
disk_info = {}
disk_info['type'] = element.get('type', 'file')
disk_info['device'] = element.get('device', 'disk')
driver = element.find('./driver')
if driver is not None:
disk_info['driver_name'] = driver.get('name')
disk_info['driver_type'] = driver.get('type')
source = element.find('./source')
if source is not None:
disk_info['source'] = source.get('file')
if not disk_info['source']:
disk_info['source'] = source.get('dev')
if not disk_info['source']:
disk_info['source'] = source.get('path')
target = element.find('./target')
if target is not None:
disk_info['target_dev'] = target.get('dev')
disk_info['target_bus'] = target.get('bus')
return disk_info
class libvirtError(Exception):
def __init__(self, error_code, error_domain, msg):
self.error_code = error_code
self.error_domain = error_domain
Exception(self, msg)
def get_error_code(self):
return self.error_code
def get_error_domain(self):
return self.error_domain
class NWFilter(object):
def __init__(self, connection, xml):
self._connection = connection
self._xml = xml
self._parse_xml(xml)
def _parse_xml(self, xml):
tree = xml_to_tree(xml)
root = tree.find('.')
self._name = root.get('name')
def undefine(self):
self._connection._remove_filter(self)
class Domain(object):
def __init__(self, connection, xml, running=False, transient=False):
self._connection = connection
if running:
connection._mark_running(self)
self._state = running and VIR_DOMAIN_RUNNING or VIR_DOMAIN_SHUTOFF
self._transient = transient
self._def = self._parse_definition(xml)
self._has_saved_state = False
self._snapshots = {}
def _parse_definition(self, xml):
try:
tree = xml_to_tree(xml)
except ParseError:
raise libvirtError(VIR_ERR_XML_DETAIL, VIR_FROM_DOMAIN,
"Invalid XML.")
definition = {}
name = tree.find('./name')
if name is not None:
definition['name'] = name.text
uuid_elem = tree.find('./uuid')
if uuid_elem is not None:
definition['uuid'] = uuid_elem.text
else:
definition['uuid'] = str(uuid.uuid4())
vcpu = tree.find('./vcpu')
if vcpu is not None:
definition['vcpu'] = int(vcpu.text)
memory = tree.find('./memory')
if memory is not None:
definition['memory'] = int(memory.text)
os = {}
os_type = tree.find('./os/type')
if os_type is not None:
os['type'] = os_type.text
os['arch'] = os_type.get('arch', node_arch)
os_kernel = tree.find('./os/kernel')
if os_kernel is not None:
os['kernel'] = os_kernel.text
os_initrd = tree.find('./os/initrd')
if os_initrd is not None:
os['initrd'] = os_initrd.text
os_cmdline = tree.find('./os/cmdline')
if os_cmdline is not None:
os['cmdline'] = os_cmdline.text
os_boot = tree.find('./os/boot')
if os_boot is not None:
os['boot_dev'] = os_boot.get('dev')
definition['os'] = os
features = {}
acpi = tree.find('./features/acpi')
if acpi is not None:
features['acpi'] = True
definition['features'] = features
devices = {}
device_nodes = tree.find('./devices')
if device_nodes is not None:
disks_info = []
disks = device_nodes.findall('./disk')
for disk in disks:
disks_info += [_parse_disk_info(disk)]
devices['disks'] = disks_info
nics_info = []
nics = device_nodes.findall('./interface')
for nic in nics:
nic_info = {}
nic_info['type'] = nic.get('type')
mac = nic.find('./mac')
if mac is not None:
nic_info['mac'] = mac.get('address')
source = nic.find('./source')
if source is not None:
if nic_info['type'] == 'network':
nic_info['source'] = source.get('network')
elif nic_info['type'] == 'bridge':
nic_info['source'] = source.get('bridge')
nics_info += [nic_info]
devices['nics'] = nics_info
definition['devices'] = devices
return definition
def create(self):
self.createWithFlags(0)
def createWithFlags(self, flags):
# FIXME: Not handling flags at the moment
self._state = VIR_DOMAIN_RUNNING
self._connection._mark_running(self)
self._has_saved_state = False
def isActive(self):
return int(self._state == VIR_DOMAIN_RUNNING)
def undefine(self):
self._connection._undefine(self)
def destroy(self):
self._state = VIR_DOMAIN_SHUTOFF
self._connection._mark_not_running(self)
def name(self):
return self._def['name']
def UUIDString(self):
return self._def['uuid']
def interfaceStats(self, device):
return [10000242400, 1234, 0, 2, 213412343233, 34214234, 23, 3]
def blockStats(self, device):
return [2, 10000242400, 234, 2343424234, 34]
def suspend(self):
self._state = VIR_DOMAIN_PAUSED
def info(self):
return [VIR_DOMAIN_RUNNING,
long(self._def['memory']),
long(self._def['memory']),
self._def['vcpu'],
123456789L]
def attachDevice(self, xml):
disk_info = _parse_disk_info(xml_to_tree(xml))
disk_info['_attached'] = True
self._def['devices']['disks'] += [disk_info]
return True
def detachDevice(self, xml):
disk_info = _parse_disk_info(xml_to_tree(xml))
disk_info['_attached'] = True
return disk_info in self._def['devices']['disks']
def XMLDesc(self, flags):
disks = ''
for disk in self._def['devices']['disks']:
disks += '''<disk type='%(type)s' device='%(device)s'>
<driver name='%(driver_name)s' type='%(driver_type)s'/>
<source file='%(source)s'/>
<target dev='%(target_dev)s' bus='%(target_bus)s'/>
<address type='drive' controller='0' bus='0' unit='0'/>
</disk>''' % disk
nics = ''
for nic in self._def['devices']['nics']:
nics += '''<interface type='%(type)s'>
<mac address='%(mac)s'/>
<source %(type)s='%(source)s'/>
<address type='pci' domain='0x0000' bus='0x00' slot='0x03'
function='0x0'/>
</interface>''' % nic
return '''<domain type='kvm'>
<name>%(name)s</name>
<uuid>%(uuid)s</uuid>
<memory>%(memory)s</memory>
<currentMemory>%(memory)s</currentMemory>
<vcpu>%(vcpu)s</vcpu>
<os>
<type arch='%(arch)s' machine='pc-0.12'>hvm</type>
<boot dev='hd'/>
</os>
<features>
<acpi/>
<apic/>
<pae/>
</features>
<clock offset='localtime'/>
<on_poweroff>destroy</on_poweroff>
<on_reboot>restart</on_reboot>
<on_crash>restart</on_crash>
<devices>
<emulator>/usr/bin/kvm</emulator>
%(disks)s
<controller type='ide' index='0'>
<address type='pci' domain='0x0000' bus='0x00' slot='0x01'
function='0x1'/>
</controller>
%(nics)s
<serial type='pty'>
<source pty='/dev/pts/27'/>
<target port='0'/>
</serial>
<console type='pty'>
<target type='serial' port='0'/>
</console>
<input type='tablet' bus='usb'/>
<input type='mouse' bus='ps2'/>
<graphics type='vnc' port='-1' autoport='yes'/>
<video>
<model type='cirrus' vram='9216' heads='1'/>
<address type='pci' domain='0x0000' bus='0x00' slot='0x02'
function='0x0'/>
</video>
<memballoon model='virtio'>
<address type='pci' domain='0x0000' bus='0x00' slot='0x04'
function='0x0'/>
</memballoon>
</devices>
</domain>''' % {'name': self._def['name'],
'uuid': self._def['uuid'],
'memory': self._def['memory'],
'vcpu': self._def['vcpu'],
'arch': self._def['os']['arch'],
'disks': disks,
'nics': nics}
def managedSave(self, flags):
self._connection._mark_not_running(self)
self._has_saved_state = True
def managedSaveRemove(self, flags):
self._has_saved_state = False
def hasManagedSaveImage(self, flags):
return int(self._has_saved_state)
def resume(self):
self._state = VIR_DOMAIN_RUNNING
def snapshotCreateXML(self, xml, flags):
tree = xml_to_tree(xml)
name = tree.find('./name').text
snapshot = DomainSnapshot(name, self)
self._snapshots[name] = snapshot
return snapshot
class DomainSnapshot(object):
def __init__(self, name, domain):
self._name = name
self._domain = domain
def delete(self, flags):
del self._domain._snapshots[self._name]
class Connection(object):
def __init__(self, uri, readonly):
if not uri:
if allow_default_uri_connection:
uri = 'qemu:///session'
else:
raise Exception("URI was None, but fake libvirt is configured"
" to not accept this.")
uri_whitelist = ['qemu:///system',
'qemu:///session',
'xen:///system',
'uml:///system']
if uri not in uri_whitelist:
raise libvirtError(5, 0,
"libvir: error : no connection driver "
"available for No connection for URI %s" % uri)
self.readonly = readonly
self._uri = uri
self._vms = {}
self._running_vms = {}
self._id_counter = 0
self._nwfilters = {}
def _add_filter(self, nwfilter):
self._nwfilters[nwfilter._name] = nwfilter
def _remove_filter(self, nwfilter):
del self._nwfilters[nwfilter._name]
def _mark_running(self, dom):
self._running_vms[self._id_counter] = dom
self._id_counter += 1
def _mark_not_running(self, dom):
if dom._transient:
self._undefine(dom)
for (k, v) in self._running_vms.iteritems():
if v == dom:
del self._running_vms[k]
return
def _undefine(self, dom):
del self._vms[dom.name()]
def getInfo(self):
return [node_arch,
node_kB_mem,
node_cpus,
node_mhz,
node_nodes,
node_sockets,
node_cores,
node_threads]
def listDomainsID(self):
return self._running_vms.keys()
def lookupByID(self, id):
if id in self._running_vms:
return self._running_vms[id]
raise libvirtError(VIR_ERR_NO_DOMAIN, VIR_FROM_QEMU,
'Domain not found: no domain with matching '
'id %d' % id)
def lookupByName(self, name):
if name in self._vms:
return self._vms[name]
raise libvirtError(VIR_ERR_NO_DOMAIN, VIR_FROM_QEMU,
'Domain not found: no domain with matching '
'name "%s"' % name)
def defineXML(self, xml):
dom = Domain(connection=self, running=False, transient=False, xml=xml)
self._vms[dom.name()] = dom
return dom
def createXML(self, xml, flags):
dom = Domain(connection=self, running=True, transient=True, xml=xml)
self._vms[dom.name()] = dom
return dom
def getType(self):
if self._uri == 'qemu:///system':
return 'QEMU'
def getVersion(self):
return 14000
def getCapabilities(self):
return '''<capabilities>
<host>
<uuid>cef19ce0-0ca2-11df-855d-b19fbce37686</uuid>
<cpu>
<arch>x86_64</arch>
<model>Penryn</model>
<vendor>Intel</vendor>
<topology sockets='1' cores='2' threads='1'/>
<feature name='xtpr'/>
<feature name='tm2'/>
<feature name='est'/>
<feature name='vmx'/>
<feature name='ds_cpl'/>
<feature name='monitor'/>
<feature name='pbe'/>
<feature name='tm'/>
<feature name='ht'/>
<feature name='ss'/>
<feature name='acpi'/>
<feature name='ds'/>
<feature name='vme'/>
</cpu>
<migration_features>
<live/>
<uri_transports>
<uri_transport>tcp</uri_transport>
</uri_transports>
</migration_features>
<secmodel>
<model>apparmor</model>
<doi>0</doi>
</secmodel>
</host>
<guest>
<os_type>hvm</os_type>
<arch name='i686'>
<wordsize>32</wordsize>
<emulator>/usr/bin/qemu</emulator>
<machine>pc-0.14</machine>
<machine canonical='pc-0.14'>pc</machine>
<machine>pc-0.13</machine>
<machine>pc-0.12</machine>
<machine>pc-0.11</machine>
<machine>pc-0.10</machine>
<machine>isapc</machine>
<domain type='qemu'>
</domain>
<domain type='kvm'>
<emulator>/usr/bin/kvm</emulator>
<machine>pc-0.14</machine>
<machine canonical='pc-0.14'>pc</machine>
<machine>pc-0.13</machine>
<machine>pc-0.12</machine>
<machine>pc-0.11</machine>
<machine>pc-0.10</machine>
<machine>isapc</machine>
</domain>
</arch>
<features>
<cpuselection/>
<deviceboot/>
<pae/>
<nonpae/>
<acpi default='on' toggle='yes'/>
<apic default='on' toggle='no'/>
</features>
</guest>
<guest>
<os_type>hvm</os_type>
<arch name='x86_64'>
<wordsize>64</wordsize>
<emulator>/usr/bin/qemu-system-x86_64</emulator>
<machine>pc-0.14</machine>
<machine canonical='pc-0.14'>pc</machine>
<machine>pc-0.13</machine>
<machine>pc-0.12</machine>
<machine>pc-0.11</machine>
<machine>pc-0.10</machine>
<machine>isapc</machine>
<domain type='qemu'>
</domain>
<domain type='kvm'>
<emulator>/usr/bin/kvm</emulator>
<machine>pc-0.14</machine>
<machine canonical='pc-0.14'>pc</machine>
<machine>pc-0.13</machine>
<machine>pc-0.12</machine>
<machine>pc-0.11</machine>
<machine>pc-0.10</machine>
<machine>isapc</machine>
</domain>
</arch>
<features>
<cpuselection/>
<deviceboot/>
<acpi default='on' toggle='yes'/>
<apic default='on' toggle='no'/>
</features>
</guest>
<guest>
<os_type>hvm</os_type>
<arch name='arm'>
<wordsize>32</wordsize>
<emulator>/usr/bin/qemu-system-arm</emulator>
<machine>integratorcp</machine>
<machine>vexpress-a9</machine>
<machine>syborg</machine>
<machine>musicpal</machine>
<machine>mainstone</machine>
<machine>n800</machine>
<machine>n810</machine>
<machine>n900</machine>
<machine>cheetah</machine>
<machine>sx1</machine>
<machine>sx1-v1</machine>
<machine>beagle</machine>
<machine>beaglexm</machine>
<machine>tosa</machine>
<machine>akita</machine>
<machine>spitz</machine>
<machine>borzoi</machine>
<machine>terrier</machine>
<machine>connex</machine>
<machine>verdex</machine>
<machine>lm3s811evb</machine>
<machine>lm3s6965evb</machine>
<machine>realview-eb</machine>
<machine>realview-eb-mpcore</machine>
<machine>realview-pb-a8</machine>
<machine>realview-pbx-a9</machine>
<machine>versatilepb</machine>
<machine>versatileab</machine>
<domain type='qemu'>
</domain>
</arch>
<features>
<deviceboot/>
</features>
</guest>
<guest>
<os_type>hvm</os_type>
<arch name='mips'>
<wordsize>32</wordsize>
<emulator>/usr/bin/qemu-system-mips</emulator>
<machine>malta</machine>
<machine>mipssim</machine>
<machine>magnum</machine>
<machine>pica61</machine>
<machine>mips</machine>
<domain type='qemu'>
</domain>
</arch>
<features>
<deviceboot/>
</features>
</guest>
<guest>
<os_type>hvm</os_type>
<arch name='mipsel'>
<wordsize>32</wordsize>
<emulator>/usr/bin/qemu-system-mipsel</emulator>
<machine>malta</machine>
<machine>mipssim</machine>
<machine>magnum</machine>
<machine>pica61</machine>
<machine>mips</machine>
<domain type='qemu'>
</domain>
</arch>
<features>
<deviceboot/>
</features>
</guest>
<guest>
<os_type>hvm</os_type>
<arch name='sparc'>
<wordsize>32</wordsize>
<emulator>/usr/bin/qemu-system-sparc</emulator>
<machine>SS-5</machine>
<machine>leon3_generic</machine>
<machine>SS-10</machine>
<machine>SS-600MP</machine>
<machine>SS-20</machine>
<machine>Voyager</machine>
<machine>LX</machine>
<machine>SS-4</machine>
<machine>SPARCClassic</machine>
<machine>SPARCbook</machine>
<machine>SS-1000</machine>
<machine>SS-2000</machine>
<machine>SS-2</machine>
<domain type='qemu'>
</domain>
</arch>
</guest>
<guest>
<os_type>hvm</os_type>
<arch name='ppc'>
<wordsize>32</wordsize>
<emulator>/usr/bin/qemu-system-ppc</emulator>
<machine>g3beige</machine>
<machine>virtex-ml507</machine>
<machine>mpc8544ds</machine>
<machine canonical='bamboo-0.13'>bamboo</machine>
<machine>bamboo-0.13</machine>
<machine>bamboo-0.12</machine>
<machine>ref405ep</machine>
<machine>taihu</machine>
<machine>mac99</machine>
<machine>prep</machine>
<domain type='qemu'>
</domain>
</arch>
<features>
<deviceboot/>
</features>
</guest>
</capabilities>'''
def compareCPU(self, xml, flags):
tree = xml_to_tree(xml)
arch_node = tree.find('./arch')
if arch_node is not None:
if arch_node.text not in ['x86_64', 'i686']:
return VIR_CPU_COMPARE_INCOMPATIBLE
model_node = tree.find('./model')
if model_node is not None:
if model_node.text != node_cpu_model:
return VIR_CPU_COMPARE_INCOMPATIBLE
vendor_node = tree.find('./vendor')
if vendor_node is not None:
if vendor_node.text != node_cpu_vendor:
return VIR_CPU_COMPARE_INCOMPATIBLE
# The rest of the stuff libvirt implements is rather complicated
# and I don't think it adds much value to replicate it here.
return VIR_CPU_COMPARE_IDENTICAL
def nwfilterLookupByName(self, name):
try:
return self._nwfilters[name]
except KeyError:
raise libvirtError(VIR_ERR_NO_NWFILTER, VIR_FROM_NWFILTER,
"no nwfilter with matching name %s" % name)
def nwfilterDefineXML(self, xml):
nwfilter = NWFilter(self, xml)
self._add_filter(nwfilter)
def openReadOnly(uri):
return Connection(uri, readonly=True)
def openAuth(uri, auth, flags):
if flags != 0:
raise Exception(_("Please extend mock libvirt module to support "
"flags"))
if auth != [[VIR_CRED_AUTHNAME, VIR_CRED_NOECHOPROMPT],
'root',
None]:
raise Exception(_("Please extend fake libvirt module to support "
"this auth method"))
return Connection(uri, readonly=False)

View File

@@ -0,0 +1,403 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2010 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova import test
from xml.etree.ElementTree import fromstring as xml_to_tree
import fakelibvirt as libvirt
def get_vm_xml(name="testname", uuid=None, source_type='file',
interface_type='bridge'):
uuid_tag = ''
if uuid:
uuid_tag = '<uuid>%s</uuid>' % (uuid,)
return '''<domain type='kvm'>
<name>%(name)s</name>
%(uuid_tag)s
<memory>128000</memory>
<vcpu>1</vcpu>
<os>
<type>hvm</type>
<kernel>/somekernel</kernel>
<cmdline>root=/dev/sda</cmdline>
<boot dev='hd'/>
</os>
<features>
<acpi/>
</features>
<devices>
<disk type='file' device='disk'>
<driver name='qemu' type='qcow2'/>
<source %(source_type)s='/somefile'/>
<target dev='vda' bus='virtio'/>
</disk>
<interface type='%(interface_type)s'>
<mac address='05:26:3e:31:28:1f'/>
<source %(interface_type)s='br100'/>
</interface>
<input type='mouse' bus='ps2'/>
<graphics type='vnc' port='5901' autoport='yes' keymap='en-us'/>
</devices>
</domain>''' % {'name': name,
'uuid_tag': uuid_tag,
'source_type': source_type,
'interface_type': interface_type}
class FakeLibvirtTests(test.TestCase):
def setUp(self):
super(FakeLibvirtTests, self).setUp()
libvirt._reset()
def get_openReadOnly_curry_func(self):
return lambda uri: libvirt.openReadOnly(uri)
def get_openAuth_curry_func(self):
return lambda uri: libvirt.openAuth(uri,
[[libvirt.VIR_CRED_AUTHNAME,
libvirt.VIR_CRED_NOECHOPROMPT],
'root',
None], 0)
def _test_connect_method_accepts_None_uri_by_default(self, conn_method):
conn = conn_method(None)
self.assertNotEqual(conn, None, "Connecting to fake libvirt failed")
def test_openReadOnly_accepts_None_uri_by_default(self):
conn_method = self.get_openReadOnly_curry_func()
self._test_connect_method_accepts_None_uri_by_default(conn_method)
def test_openAuth_accepts_None_uri_by_default(self):
conn_method = self.get_openAuth_curry_func()
self._test_connect_method_accepts_None_uri_by_default(conn_method)
def _test_connect_method_can_refuse_None_uri(self, conn_method):
libvirt.allow_default_uri_connection = False
self.assertRaises(Exception, conn_method, None)
def test_openReadOnly_can_refuse_None_uri(self):
conn_method = self.get_openReadOnly_curry_func()
self._test_connect_method_can_refuse_None_uri(conn_method)
def test_openAuth_can_refuse_None_uri(self):
conn_method = self.get_openAuth_curry_func()
self._test_connect_method_can_refuse_None_uri(conn_method)
def _test_connect_method_refuses_invalid_URI(self, conn_method):
self.assertRaises(libvirt.libvirtError, conn_method, 'blah')
def test_openReadOnly_refuses_invalid_URI(self):
conn_method = self.get_openReadOnly_curry_func()
self._test_connect_method_refuses_invalid_URI(conn_method)
def test_openAuth_refuses_invalid_URI(self):
conn_method = self.get_openAuth_curry_func()
self._test_connect_method_refuses_invalid_URI(conn_method)
def test_getInfo(self):
conn = libvirt.openReadOnly(None)
res = conn.getInfo()
self.assertIn(res[0], ('i686', 'x86_64'))
self.assertTrue(1024 <= res[1] <= 16384,
"Memory unusually high or low.")
self.assertTrue(1 <= res[2] <= 32,
"Active CPU count unusually high or low.")
self.assertTrue(800 <= res[3] <= 4500,
"CPU speed unusually high or low.")
self.assertTrue(res[2] <= (res[5] * res[6]),
"More active CPUs than num_sockets*cores_per_socket")
def test_createXML_detects_invalid_xml(self):
self._test_XML_func_detects_invalid_xml('createXML', [0])
def test_defineXML_detects_invalid_xml(self):
self._test_XML_func_detects_invalid_xml('defineXML', [])
def _test_XML_func_detects_invalid_xml(self, xmlfunc_name, args):
conn = self.get_openAuth_curry_func()('qemu:///system')
try:
getattr(conn, xmlfunc_name)("this is not valid </xml>", *args)
except libvirt.libvirtError, e:
self.assertEqual(e.get_error_code(), libvirt.VIR_ERR_XML_DETAIL)
self.assertEqual(e.get_error_domain(), libvirt.VIR_FROM_DOMAIN)
return
raise self.failureException("Invalid XML didn't raise libvirtError")
def test_defineXML_defines_domain(self):
conn = self.get_openAuth_curry_func()('qemu:///system')
conn.defineXML(get_vm_xml())
dom = conn.lookupByName('testname')
self.assertEqual('testname', dom.name())
self.assertEqual(0, dom.isActive())
dom.undefine()
self.assertRaises(libvirt.libvirtError,
conn.lookupByName,
'testname')
def test_blockStats(self):
conn = self.get_openAuth_curry_func()('qemu:///system')
conn.createXML(get_vm_xml(), 0)
dom = conn.lookupByName('testname')
blockstats = dom.blockStats('vda')
self.assertEqual(len(blockstats), 5)
for x in blockstats:
self.assertTrue(type(x) in [int, long])
def test_attach_detach(self):
conn = self.get_openAuth_curry_func()('qemu:///system')
conn.createXML(get_vm_xml(), 0)
dom = conn.lookupByName('testname')
xml = '''<disk type='block'>
<driver name='qemu' type='raw'/>
<source dev='/dev/nbd0'/>
<target dev='/dev/vdc' bus='virtio'/>
</disk>'''
self.assertTrue(dom.attachDevice(xml))
self.assertTrue(dom.detachDevice(xml))
def test_info(self):
conn = self.get_openAuth_curry_func()('qemu:///system')
conn.createXML(get_vm_xml(), 0)
dom = conn.lookupByName('testname')
info = dom.info()
self.assertEqual(info[0], libvirt.VIR_DOMAIN_RUNNING)
self.assertEqual(info[1], 128000)
self.assertTrue(info[2] <= 128000)
self.assertEqual(info[3], 1)
self.assertTrue(type(info[4]) in [int, long])
def test_createXML_runs_domain(self):
conn = self.get_openAuth_curry_func()('qemu:///system')
conn.createXML(get_vm_xml(), 0)
dom = conn.lookupByName('testname')
self.assertEqual('testname', dom.name())
self.assertEqual(1, dom.isActive())
dom.destroy()
try:
dom = conn.lookupByName('testname')
except libvirt.libvirtError as e:
self.assertEqual(e.get_error_code(), libvirt.VIR_ERR_NO_DOMAIN)
self.assertEqual(e.get_error_domain(), libvirt.VIR_FROM_QEMU)
return
self.fail("lookupByName succeeded for destroyed non-defined VM")
def test_defineXML_remembers_uuid(self):
conn = self.get_openAuth_curry_func()('qemu:///system')
uuid = 'b21f957d-a72f-4b93-b5a5-45b1161abb02'
conn.defineXML(get_vm_xml(uuid=uuid))
dom = conn.lookupByName('testname')
self.assertEquals(dom.UUIDString(), uuid)
def test_createWithFlags(self):
conn = self.get_openAuth_curry_func()('qemu:///system')
conn.defineXML(get_vm_xml())
dom = conn.lookupByName('testname')
self.assertFalse(dom.isActive(), 'Defined domain was running.')
dom.createWithFlags(0)
self.assertTrue(dom.isActive(),
'Domain wasn\'t running after createWithFlags')
def test_managedSave(self):
conn = self.get_openAuth_curry_func()('qemu:///system')
conn.defineXML(get_vm_xml())
dom = conn.lookupByName('testname')
self.assertFalse(dom.isActive(), 'Defined domain was running.')
dom.createWithFlags(0)
self.assertEquals(dom.hasManagedSaveImage(0), 0)
dom.managedSave(0)
self.assertEquals(dom.hasManagedSaveImage(0), 1)
dom.managedSaveRemove(0)
self.assertEquals(dom.hasManagedSaveImage(0), 0)
def test_listDomainsId_and_lookupById(self):
conn = self.get_openAuth_curry_func()('qemu:///system')
self.assertEquals(conn.listDomainsID(), [])
conn.defineXML(get_vm_xml())
dom = conn.lookupByName('testname')
dom.createWithFlags(0)
self.assertEquals(len(conn.listDomainsID()), 1)
dom_id = conn.listDomainsID()[0]
self.assertEquals(conn.lookupByID(dom_id), dom)
dom_id = conn.listDomainsID()[0]
try:
conn.lookupByID(dom_id + 1)
except libvirt.libvirtError, e:
self.assertEqual(e.get_error_code(), libvirt.VIR_ERR_NO_DOMAIN)
self.assertEqual(e.get_error_domain(), libvirt.VIR_FROM_QEMU)
return
raise self.failureException("Looking up an invalid domain ID didn't "
"raise libvirtError")
def test_define_and_retrieve(self):
conn = self.get_openAuth_curry_func()('qemu:///system')
self.assertEquals(conn.listDomainsID(), [])
conn.defineXML(get_vm_xml())
dom = conn.lookupByName('testname')
xml = dom.XMLDesc(0)
xml_to_tree(xml)
def _test_accepts_source_type(self, source_type):
conn = self.get_openAuth_curry_func()('qemu:///system')
self.assertEquals(conn.listDomainsID(), [])
conn.defineXML(get_vm_xml(source_type=source_type))
dom = conn.lookupByName('testname')
xml = dom.XMLDesc(0)
tree = xml_to_tree(xml)
elem = tree.find('./devices/disk/source')
self.assertEquals(elem.get('file'), '/somefile')
def test_accepts_source_dev(self):
self._test_accepts_source_type('dev')
def test_accepts_source_path(self):
self._test_accepts_source_type('path')
def test_network_type_bridge_sticks(self):
self._test_network_type_sticks('bridge')
def test_network_type_network_sticks(self):
self._test_network_type_sticks('network')
def _test_network_type_sticks(self, network_type):
conn = self.get_openAuth_curry_func()('qemu:///system')
self.assertEquals(conn.listDomainsID(), [])
conn.defineXML(get_vm_xml(interface_type=network_type))
dom = conn.lookupByName('testname')
xml = dom.XMLDesc(0)
tree = xml_to_tree(xml)
elem = tree.find('./devices/interface')
self.assertEquals(elem.get('type'), network_type)
elem = elem.find('./source')
self.assertEquals(elem.get(network_type), 'br100')
def test_getType(self):
conn = self.get_openAuth_curry_func()('qemu:///system')
self.assertEquals(conn.getType(), 'QEMU')
def test_getVersion(self):
conn = self.get_openAuth_curry_func()('qemu:///system')
self.assertTrue(type(conn.getVersion()) is int)
def test_getCapabilities(self):
conn = self.get_openAuth_curry_func()('qemu:///system')
xml_to_tree(conn.getCapabilities())
def test_nwfilter_define_undefine(self):
conn = self.get_openAuth_curry_func()('qemu:///system')
# Will raise an exception if it's not valid XML
xml = '''<filter name='nova-instance-instance-789' chain='root'>
<uuid>946878c6-3ad3-82b2-87f3-c709f3807f58</uuid>
</filter>'''
conn.nwfilterDefineXML(xml)
nwfilter = conn.nwfilterLookupByName('nova-instance-instance-789')
nwfilter.undefine()
try:
conn.nwfilterLookupByName('nova-instance-instance-789320334')
except libvirt.libvirtError, e:
self.assertEqual(e.get_error_code(), libvirt.VIR_ERR_NO_NWFILTER)
self.assertEqual(e.get_error_domain(), libvirt.VIR_FROM_NWFILTER)
return
raise self.failureException("Invalid NWFilter name didn't"
" raise libvirtError")
def test_compareCPU_compatible(self):
conn = self.get_openAuth_curry_func()('qemu:///system')
xml = '''<cpu>
<arch>%s</arch>
<model>%s</model>
<vendor>%s</vendor>
<topology sockets="%d" cores="%d" threads="%d"/>
</cpu>''' % (libvirt.node_arch,
libvirt.node_cpu_model,
libvirt.node_cpu_vendor,
libvirt.node_sockets,
libvirt.node_cores,
libvirt.node_threads)
self.assertEqual(conn.compareCPU(xml, 0),
libvirt.VIR_CPU_COMPARE_IDENTICAL)
def test_compareCPU_incompatible_vendor(self):
conn = self.get_openAuth_curry_func()('qemu:///system')
xml = '''<cpu>
<arch>%s</arch>
<model>%s</model>
<vendor>%s</vendor>
<topology sockets="%d" cores="%d" threads="%d"/>
</cpu>''' % (libvirt.node_arch,
libvirt.node_cpu_model,
"AnotherVendor",
libvirt.node_sockets,
libvirt.node_cores,
libvirt.node_threads)
self.assertEqual(conn.compareCPU(xml, 0),
libvirt.VIR_CPU_COMPARE_INCOMPATIBLE)
def test_compareCPU_incompatible_arch(self):
conn = self.get_openAuth_curry_func()('qemu:///system')
xml = '''<cpu>
<arch>%s</arch>
<model>%s</model>
<vendor>%s</vendor>
<topology sockets="%d" cores="%d" threads="%d"/>
</cpu>''' % ('not-a-valid-arch',
libvirt.node_cpu_model,
libvirt.node_cpu_vendor,
libvirt.node_sockets,
libvirt.node_cores,
libvirt.node_threads)
self.assertEqual(conn.compareCPU(xml, 0),
libvirt.VIR_CPU_COMPARE_INCOMPATIBLE)
def test_compareCPU_incompatible_model(self):
conn = self.get_openAuth_curry_func()('qemu:///system')
xml = '''<cpu>
<arch>%s</arch>
<model>%s</model>
<vendor>%s</vendor>
<topology sockets="%d" cores="%d" threads="%d"/>
</cpu>''' % (libvirt.node_arch,
"AnotherModel",
libvirt.node_cpu_vendor,
libvirt.node_sockets,
libvirt.node_cores,
libvirt.node_threads)
self.assertEqual(conn.compareCPU(xml, 0),
libvirt.VIR_CPU_COMPARE_INCOMPATIBLE)
def test_compareCPU_compatible_unspecified_model(self):
conn = self.get_openAuth_curry_func()('qemu:///system')
xml = '''<cpu>
<arch>%s</arch>
<vendor>%s</vendor>
<topology sockets="%d" cores="%d" threads="%d"/>
</cpu>''' % (libvirt.node_arch,
libvirt.node_cpu_vendor,
libvirt.node_sockets,
libvirt.node_cores,
libvirt.node_threads)
self.assertEqual(conn.compareCPU(xml, 0),
libvirt.VIR_CPU_COMPARE_IDENTICAL)

View File

@@ -36,12 +36,16 @@ from nova import utils
from nova.api.ec2 import cloud
from nova.compute import power_state
from nova.compute import vm_states
from nova.virt import disk
from nova.virt import images
from nova.virt import driver
from nova.virt.libvirt import connection
from nova.virt.libvirt import firewall
from nova.virt.libvirt import volume
from nova.volume import driver as volume_driver
from nova.virt.libvirt import utils as libvirt_utils
from nova.tests import fake_network
from nova.tests import fake_libvirt_utils
try:
@@ -187,6 +191,11 @@ class CacheConcurrencyTestCase(test.TestCase):
self.stubs.Set(os.path, 'exists', fake_exists)
self.stubs.Set(utils, 'execute', fake_execute)
connection.libvirt_utils = fake_libvirt_utils
def tearDown(self):
connection.libvirt_utils = libvirt_utils
super(CacheConcurrencyTestCase, self).tearDown()
def test_same_fname_concurrency(self):
"""Ensures that the same fname cache runs at a sequentially"""
@@ -260,6 +269,11 @@ class LibvirtConnTestCase(test.TestCase):
self.context = context.get_admin_context()
self.flags(instances_path='')
self.call_libvirt_dependant_setup = False
connection.libvirt_utils = fake_libvirt_utils
def tearDown(self):
connection.libvirt_utils = libvirt_utils
super(LibvirtConnTestCase, self).tearDown()
test_instance = {'memory_kb': '1024000',
'basepath': '/some/path',
@@ -905,10 +919,6 @@ class LibvirtConnTestCase(test.TestCase):
# Preparing mocks
# qemu-img should be mockd since test environment might not have
# large disk space.
self.mox.StubOutWithMock(utils, "execute")
utils.execute('qemu-img', 'create', '-f', 'raw',
'%s/%s/disk' % (tmpdir, instance_ref.name), '10G')
self.mox.ReplayAll()
conn = connection.LibvirtConnection(False)
conn.pre_block_migration(self.context, instance_ref,
@@ -937,12 +947,6 @@ class LibvirtConnTestCase(test.TestCase):
"<target dev='vdb' bus='virtio'/></disk>"
"</devices></domain>")
ret = ("image: /test/disk\nfile format: raw\n"
"virtual size: 20G (21474836480 bytes)\ndisk size: 3.1G\n"
"disk size: 102M\n"
"cluster_size: 2097152\n"
"backing file: /test/dummy (actual path: /backing/file)\n")
# Preparing mocks
vdmock = self.mox.CreateMock(libvirt.virDomain)
self.mox.StubOutWithMock(vdmock, "XMLDesc")
@@ -953,27 +957,23 @@ class LibvirtConnTestCase(test.TestCase):
return vdmock
self.create_fake_libvirt_mock(lookupByName=fake_lookup)
self.mox.StubOutWithMock(os.path, "getsize")
# based on above testdata, one is raw image, so getsize is mocked.
os.path.getsize("/test/disk").AndReturn(10 * 1024 * 1024 * 1024)
# another is qcow image, so qemu-img should be mocked.
self.mox.StubOutWithMock(utils, "execute")
utils.execute('qemu-img', 'info', '/test/disk.local').\
AndReturn((ret, ''))
GB = 1024 * 1024 * 1024
fake_libvirt_utils.disk_sizes['/test/disk'] = 10 * GB
fake_libvirt_utils.disk_sizes['/test/disk.local'] = 20 * GB
fake_libvirt_utils.disk_backing_files['/test/disk.local'] = 'file'
self.mox.ReplayAll()
conn = connection.LibvirtConnection(False)
info = conn.get_instance_disk_info(self.context, instance_ref)
info = utils.loads(info)
self.assertTrue(info[0]['type'] == 'raw' and
info[1]['type'] == 'qcow2' and
info[0]['path'] == '/test/disk' and
info[1]['path'] == '/test/disk.local' and
info[0]['local_gb'] == '10G' and
info[1]['local_gb'] == '20G' and
info[0]['backing_file'] == "" and
info[1]['backing_file'] == "file")
self.assertEquals(info[0]['type'], 'raw')
self.assertEquals(info[1]['type'], 'qcow2')
self.assertEquals(info[0]['path'], '/test/disk')
self.assertEquals(info[1]['path'], '/test/disk.local')
self.assertEquals(info[0]['local_gb'], '10G')
self.assertEquals(info[1]['local_gb'], '20G')
self.assertEquals(info[0]['backing_file'], "")
self.assertEquals(info[1]['backing_file'], "file")
db.instance_destroy(self.context, instance_ref['id'])
@@ -1011,8 +1011,13 @@ class LibvirtConnTestCase(test.TestCase):
', not int'))
count = (0 <= str(e.message).find('Unexpected method call'))
shutil.rmtree(os.path.join(FLAGS.instances_path, instance.name))
shutil.rmtree(os.path.join(FLAGS.instances_path, '_base'))
path = os.path.join(FLAGS.instances_path, instance.name)
if os.path.isdir(path):
shutil.rmtree(path)
path = os.path.join(FLAGS.instances_path, '_base')
if os.path.isdir(path):
shutil.rmtree(os.path.join(FLAGS.instances_path, '_base'))
def test_get_host_ip_addr(self):
conn = connection.LibvirtConnection(False)
@@ -1055,50 +1060,6 @@ class LibvirtConnTestCase(test.TestCase):
_assert_volume_in_mapping('sdg', False)
_assert_volume_in_mapping('sdh1', False)
def test_reboot_signature(self):
"""Test that libvirt driver method sig matches interface"""
def fake_reboot_with_correct_sig(ignore, instance,
network_info, reboot_type):
pass
def fake_destroy(instance, network_info, cleanup=False):
pass
def fake_plug_vifs(instance, network_info):
pass
def fake_create_new_domain(xml):
return
def fake_none(self, instance):
return
instance = db.instance_create(self.context, self.test_instance)
network_info = _fake_network_info(self.stubs, 1)
self.mox.StubOutWithMock(connection.LibvirtConnection, '_conn')
connection.LibvirtConnection._conn.lookupByName = self.fake_lookup
conn = connection.LibvirtConnection(False)
self.stubs.Set(conn, 'destroy', fake_destroy)
self.stubs.Set(conn, 'plug_vifs', fake_plug_vifs)
self.stubs.Set(conn.firewall_driver,
'setup_basic_filtering',
fake_none)
self.stubs.Set(conn.firewall_driver,
'prepare_instance_filter',
fake_none)
self.stubs.Set(conn, '_create_new_domain', fake_create_new_domain)
self.stubs.Set(conn.firewall_driver,
'apply_instance_filter',
fake_none)
args = [instance, network_info, 'SOFT']
conn.reboot(*args)
compute_driver = driver.ComputeDriver()
self.assertRaises(NotImplementedError, compute_driver.reboot, *args)
@test.skip_if(missing_libvirt(), "Test requires libvirt")
def test_immediate_delete(self):
conn = connection.LibvirtConnection(False)
@@ -1732,3 +1693,181 @@ class NWFilterTestCase(test.TestCase):
self.assertEqual(original_filter_count - len(fakefilter.filters), 2)
db.instance_destroy(admin_ctxt, instance_ref['id'])
class LibvirtUtilsTestCase(test.TestCase):
def test_create_image(self):
self.mox.StubOutWithMock(utils, 'execute')
utils.execute('qemu-img', 'create', '-f', 'raw',
'/some/path', '10G')
utils.execute('qemu-img', 'create', '-f', 'qcow2',
'/some/stuff', '1234567891234')
# Start test
self.mox.ReplayAll()
libvirt_utils.create_image('raw', '/some/path', '10G')
libvirt_utils.create_image('qcow2', '/some/stuff', '1234567891234')
def test_create_cow_image(self):
self.mox.StubOutWithMock(utils, 'execute')
utils.execute('qemu-img', 'create', '-f', 'qcow2',
'-o', 'cluster_size=2M,backing_file=/some/path',
'/the/new/cow')
# Start test
self.mox.ReplayAll()
libvirt_utils.create_cow_image('/some/path', '/the/new/cow')
def test_get_disk_size(self):
self.mox.StubOutWithMock(utils, 'execute')
utils.execute('qemu-img',
'info',
'/some/path').AndReturn(('''image: 00000001
file format: raw
virtual size: 4.4M (4592640 bytes)
disk size: 4.4M''', ''))
# Start test
self.mox.ReplayAll()
self.assertEquals(libvirt_utils.get_disk_size('/some/path'), 4592640)
def test_copy_image(self):
dst_fd, dst_path = tempfile.mkstemp()
try:
os.close(dst_fd)
src_fd, src_path = tempfile.mkstemp()
try:
with os.fdopen(src_fd, 'w') as fp:
fp.write('canary')
libvirt_utils.copy_image(src_path, dst_path)
with open(dst_path, 'r') as fp:
self.assertEquals(fp.read(), 'canary')
finally:
os.unlink(src_path)
finally:
os.unlink(dst_path)
def test_mkfs(self):
self.mox.StubOutWithMock(utils, 'execute')
utils.execute('mkfs', '-t', 'ext4', '/my/block/dev')
utils.execute('mkswap', '/my/swap/block/dev')
self.mox.ReplayAll()
libvirt_utils.mkfs('ext4', '/my/block/dev')
libvirt_utils.mkfs('swap', '/my/swap/block/dev')
def test_ensure_tree(self):
tmpdir = tempfile.mkdtemp()
try:
testdir = '%s/foo/bar/baz' % (tmpdir,)
libvirt_utils.ensure_tree(testdir)
self.assertTrue(os.path.isdir(testdir))
finally:
shutil.rmtree(tmpdir)
def test_write_to_file(self):
dst_fd, dst_path = tempfile.mkstemp()
try:
os.close(dst_fd)
libvirt_utils.write_to_file(dst_path, 'hello')
with open(dst_path, 'r') as fp:
self.assertEquals(fp.read(), 'hello')
finally:
os.unlink(dst_path)
def test_write_to_file_with_umask(self):
dst_fd, dst_path = tempfile.mkstemp()
try:
os.close(dst_fd)
os.unlink(dst_path)
libvirt_utils.write_to_file(dst_path, 'hello', umask=0277)
with open(dst_path, 'r') as fp:
self.assertEquals(fp.read(), 'hello')
mode = os.stat(dst_path).st_mode
self.assertEquals(mode & 0277, 0)
finally:
os.unlink(dst_path)
def test_chown(self):
self.mox.StubOutWithMock(utils, 'execute')
utils.execute('chown', 'soren', '/some/path', run_as_root=True)
self.mox.ReplayAll()
libvirt_utils.chown('/some/path', 'soren')
def test_extract_snapshot(self):
self.mox.StubOutWithMock(utils, 'execute')
utils.execute('qemu-img', 'convert', '-f', 'qcow2', '-O', 'raw',
'-s', 'snap1', '/path/to/disk/image', '/extracted/snap')
# Start test
self.mox.ReplayAll()
libvirt_utils.extract_snapshot('/path/to/disk/image', 'qcow2',
'snap1', '/extracted/snap', 'raw')
def test_load_file(self):
dst_fd, dst_path = tempfile.mkstemp()
try:
os.close(dst_fd)
# We have a test for write_to_file. If that is sound, this suffices
libvirt_utils.write_to_file(dst_path, 'hello')
self.assertEquals(libvirt_utils.load_file(dst_path), 'hello')
finally:
os.unlink(dst_path)
def test_file_open(self):
dst_fd, dst_path = tempfile.mkstemp()
try:
os.close(dst_fd)
# We have a test for write_to_file. If that is sound, this suffices
libvirt_utils.write_to_file(dst_path, 'hello')
with libvirt_utils.file_open(dst_path, 'r') as fp:
self.assertEquals(fp.read(), 'hello')
finally:
os.unlink(dst_path)
def test_run_ajaxterm(self):
self.mox.StubOutWithMock(utils, 'execute')
token = 's3cr3tt0ken'
shell_cmd = 'shell-cmd.py'
port = 2048
utils.execute(mox.IgnoreArg(),
'--command', shell_cmd,
'-t', token,
'-p', port)
# Start test
self.mox.ReplayAll()
libvirt_utils.run_ajaxterm(shell_cmd, token, port)
def test_get_fs_info(self):
stdout, stderr = utils.execute('df', '-B1', '/tmp')
info_line = ' '.join(stdout.split('\n')[1:])
_dev, total, used, free, _percentage, _mntpnt = info_line.split()
fs_info = libvirt_utils.get_fs_info('/tmp')
self.assertEquals(int(total), fs_info['total'])
self.assertEquals(int(free), fs_info['free'])
self.assertEquals(int(used), fs_info['used'])
def test_fetch_image(self):
self.mox.StubOutWithMock(images, 'fetch')
self.mox.StubOutWithMock(disk, 'extend')
context = 'opaque context'
target = '/tmp/targetfile'
image_id = '4'
user_id = 'fake'
project_id = 'fake'
images.fetch(context, image_id, target, user_id, project_id)
images.fetch(context, image_id, target, user_id, project_id)
disk.extend(target, '10G')
self.mox.ReplayAll()
libvirt_utils.fetch_image(context, target, image_id,
user_id, project_id)
libvirt_utils.fetch_image(context, target, image_id,
user_id, project_id, size='10G')

View File

@@ -59,6 +59,15 @@ class _VirtDriverTestCase(test.TestCase):
self.ctxt = test_utils.get_test_admin_context()
self.image_service = image.get_default_image_service()
def _get_running_instance(self):
instance_ref = test_utils.get_test_instance()
network_info = test_utils.get_test_network_info()
image_info = test_utils.get_test_image_info(None, instance_ref)
self.connection.spawn(self.ctxt, instance=instance_ref,
image_meta=image_info,
network_info=network_info)
return instance_ref, network_info
@catch_notimplementederror
def test_init_host(self):
self.connection.init_host('myhostname')
@@ -73,10 +82,7 @@ class _VirtDriverTestCase(test.TestCase):
@catch_notimplementederror
def test_spawn(self):
instance_ref = test_utils.get_test_instance()
network_info = test_utils.get_test_network_info()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
domains = self.connection.list_instances()
self.assertIn(instance_ref['name'], domains)
@@ -93,18 +99,14 @@ class _VirtDriverTestCase(test.TestCase):
@catch_notimplementederror
def test_snapshot_running(self):
instance_ref = test_utils.get_test_instance()
network_info = test_utils.get_test_network_info()
img_ref = self.image_service.create(self.ctxt, {'name': 'snap-1'})
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
self.connection.snapshot(self.ctxt, instance_ref, img_ref['id'])
@catch_notimplementederror
def test_reboot(self):
instance_ref = test_utils.get_test_instance()
network_info = test_utils.get_test_network_info()
reboot_type = "SOFT"
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
self.connection.reboot(instance_ref, network_info, reboot_type)
@catch_notimplementederror
@@ -119,54 +121,40 @@ class _VirtDriverTestCase(test.TestCase):
@catch_notimplementederror
def test_resize_running(self):
instance_ref = test_utils.get_test_instance()
network_info = test_utils.get_test_network_info()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
self.connection.resize(instance_ref, 7)
@catch_notimplementederror
def test_set_admin_password(self):
instance_ref = test_utils.get_test_instance()
network_info = test_utils.get_test_network_info()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
self.connection.set_admin_password(instance_ref, 'p4ssw0rd')
@catch_notimplementederror
def test_inject_file(self):
instance_ref = test_utils.get_test_instance()
network_info = test_utils.get_test_network_info()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
self.connection.inject_file(instance_ref,
base64.b64encode('/testfile'),
base64.b64encode('testcontents'))
@catch_notimplementederror
def test_agent_update(self):
instance_ref = test_utils.get_test_instance()
network_info = test_utils.get_test_network_info()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
self.connection.agent_update(instance_ref, 'http://www.openstack.org/',
'd41d8cd98f00b204e9800998ecf8427e')
@catch_notimplementederror
def test_rescue(self):
instance_ref = test_utils.get_test_instance()
network_info = test_utils.get_test_network_info()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
self.connection.rescue(self.ctxt, instance_ref, network_info)
@catch_notimplementederror
def test_unrescue_unrescued_instance(self):
instance_ref = test_utils.get_test_instance()
network_info = test_utils.get_test_network_info()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
self.connection.unrescue(instance_ref, network_info)
@catch_notimplementederror
def test_unrescue_rescued_instance(self):
instance_ref = test_utils.get_test_instance()
network_info = test_utils.get_test_network_info()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
self.connection.rescue(self.ctxt, instance_ref, network_info)
self.connection.unrescue(instance_ref, network_info)
@@ -184,53 +172,39 @@ class _VirtDriverTestCase(test.TestCase):
@catch_notimplementederror
def test_migrate_disk_and_power_off(self):
instance_ref = test_utils.get_test_instance()
network_info = test_utils.get_test_network_info()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
self.connection.migrate_disk_and_power_off(
self.ctxt, instance_ref, 'dest_host')
@catch_notimplementederror
def test_pause(self):
instance_ref = test_utils.get_test_instance()
network_info = test_utils.get_test_network_info()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
self.connection.pause(instance_ref)
@catch_notimplementederror
def test_unpause_unpaused_instance(self):
instance_ref = test_utils.get_test_instance()
network_info = test_utils.get_test_network_info()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
self.connection.unpause(instance_ref)
@catch_notimplementederror
def test_unpause_paused_instance(self):
instance_ref = test_utils.get_test_instance()
network_info = test_utils.get_test_network_info()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
self.connection.pause(instance_ref)
self.connection.unpause(instance_ref)
@catch_notimplementederror
def test_suspend(self):
instance_ref = test_utils.get_test_instance()
network_info = test_utils.get_test_network_info()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
self.connection.suspend(instance_ref)
@catch_notimplementederror
def test_resume_unsuspended_instance(self):
instance_ref = test_utils.get_test_instance()
network_info = test_utils.get_test_network_info()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
self.connection.resume(instance_ref)
@catch_notimplementederror
def test_resume_suspended_instance(self):
instance_ref = test_utils.get_test_instance()
network_info = test_utils.get_test_network_info()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
self.connection.suspend(instance_ref)
self.connection.resume(instance_ref)
@@ -242,9 +216,7 @@ class _VirtDriverTestCase(test.TestCase):
@catch_notimplementederror
def test_destroy_instance(self):
instance_ref = test_utils.get_test_instance()
network_info = test_utils.get_test_network_info()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
self.assertIn(instance_ref['name'],
self.connection.list_instances())
self.connection.destroy(instance_ref, network_info)
@@ -253,9 +225,7 @@ class _VirtDriverTestCase(test.TestCase):
@catch_notimplementederror
def test_attach_detach_volume(self):
network_info = test_utils.get_test_network_info()
instance_ref = test_utils.get_test_instance()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
self.connection.attach_volume({'driver_volume_type': 'fake'},
instance_ref['name'],
'/mnt/nova/something')
@@ -265,9 +235,7 @@ class _VirtDriverTestCase(test.TestCase):
@catch_notimplementederror
def test_get_info(self):
network_info = test_utils.get_test_network_info()
instance_ref = test_utils.get_test_instance()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
info = self.connection.get_info(instance_ref['name'])
self.assertIn('state', info)
self.assertIn('max_mem', info)
@@ -282,54 +250,40 @@ class _VirtDriverTestCase(test.TestCase):
@catch_notimplementederror
def test_get_diagnostics(self):
network_info = test_utils.get_test_network_info()
instance_ref = test_utils.get_test_instance()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
self.connection.get_diagnostics(instance_ref['name'])
@catch_notimplementederror
def test_list_disks(self):
network_info = test_utils.get_test_network_info()
instance_ref = test_utils.get_test_instance()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
self.connection.list_disks(instance_ref['name'])
@catch_notimplementederror
def test_list_interfaces(self):
network_info = test_utils.get_test_network_info()
instance_ref = test_utils.get_test_instance()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
self.connection.list_interfaces(instance_ref['name'])
@catch_notimplementederror
def test_block_stats(self):
network_info = test_utils.get_test_network_info()
instance_ref = test_utils.get_test_instance()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
stats = self.connection.block_stats(instance_ref['name'], 'someid')
self.assertEquals(len(stats), 5)
@catch_notimplementederror
def test_interface_stats(self):
network_info = test_utils.get_test_network_info()
instance_ref = test_utils.get_test_instance()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
stats = self.connection.interface_stats(instance_ref['name'], 'someid')
self.assertEquals(len(stats), 8)
@catch_notimplementederror
def test_get_console_output(self):
network_info = test_utils.get_test_network_info()
instance_ref = test_utils.get_test_instance()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
console_output = self.connection.get_console_output(instance_ref)
self.assertTrue(isinstance(console_output, basestring))
@catch_notimplementederror
def test_get_ajax_console(self):
network_info = test_utils.get_test_network_info()
instance_ref = test_utils.get_test_instance()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
ajax_console = self.connection.get_ajax_console(instance_ref)
self.assertIn('token', ajax_console)
self.assertIn('host', ajax_console)
@@ -337,9 +291,7 @@ class _VirtDriverTestCase(test.TestCase):
@catch_notimplementederror
def test_get_vnc_console(self):
network_info = test_utils.get_test_network_info()
instance_ref = test_utils.get_test_instance()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
vnc_console = self.connection.get_vnc_console(instance_ref)
self.assertIn('token', vnc_console)
self.assertIn('host', vnc_console)
@@ -347,9 +299,7 @@ class _VirtDriverTestCase(test.TestCase):
@catch_notimplementederror
def test_get_console_pool_info(self):
network_info = test_utils.get_test_network_info()
instance_ref = test_utils.get_test_instance()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
console_pool = self.connection.get_console_pool_info(instance_ref)
self.assertIn('address', console_pool)
self.assertIn('username', console_pool)
@@ -357,25 +307,19 @@ class _VirtDriverTestCase(test.TestCase):
@catch_notimplementederror
def test_refresh_security_group_rules(self):
network_info = test_utils.get_test_network_info()
instance_ref = test_utils.get_test_instance()
# FIXME: Create security group and add the instance to it
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
self.connection.refresh_security_group_rules(1)
@catch_notimplementederror
def test_refresh_security_group_members(self):
network_info = test_utils.get_test_network_info()
instance_ref = test_utils.get_test_instance()
# FIXME: Create security group and add the instance to it
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
self.connection.refresh_security_group_members(1)
@catch_notimplementederror
def test_refresh_provider_fw_rules(self):
network_info = test_utils.get_test_network_info()
instance_ref = test_utils.get_test_instance()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
self.connection.refresh_provider_fw_rules()
@catch_notimplementederror
@@ -424,28 +368,16 @@ class _VirtDriverTestCase(test.TestCase):
@catch_notimplementederror
def test_live_migration(self):
network_info = test_utils.get_test_network_info()
instance_ref = test_utils.get_test_instance()
self.connection.spawn(self.ctxt, instance_ref, network_info)
instance_ref, network_info = self._get_running_instance()
self.connection.live_migration(self.ctxt, instance_ref, 'otherhost',
None, None)
@catch_notimplementederror
def _check_host_status_fields(self, host_status):
self.assertIn('host_name-description', host_status)
self.assertIn('host_hostname', host_status)
self.assertIn('host_memory_total', host_status)
self.assertIn('host_memory_overhead', host_status)
self.assertIn('host_memory_free', host_status)
self.assertIn('host_memory_free_computed', host_status)
self.assertIn('host_other_config', host_status)
self.assertIn('host_ip_address', host_status)
self.assertIn('host_cpu_info', host_status)
self.assertIn('disk_available', host_status)
self.assertIn('disk_total', host_status)
self.assertIn('disk_used', host_status)
self.assertIn('host_uuid', host_status)
self.assertIn('host_name_label', host_status)
self.assertIn('host_memory_total', host_status)
self.assertIn('host_memory_free', host_status)
@catch_notimplementederror
def test_update_host_status(self):
@@ -493,7 +425,42 @@ class FakeConnectionTestCase(_VirtDriverTestCase):
self.driver_module = nova.virt.fake
super(FakeConnectionTestCase, self).setUp()
# Before long, we'll add the real hypervisor drivers here as well
# with whatever instrumentation they need to work independently of
# their hypervisor. This way, we can verify that they all act the
# same.
class LibvirtConnTestCase(_VirtDriverTestCase):
def setUp(self):
# Put fakelibvirt in place
if 'libvirt' in sys.modules:
self.saved_libvirt = sys.modules['libvirt']
else:
self.saved_libvirt = None
import fakelibvirt
import fake_libvirt_utils
sys.modules['libvirt'] = fakelibvirt
import nova.virt.libvirt.connection
import nova.virt.libvirt.firewall
nova.virt.libvirt.connection.libvirt = fakelibvirt
nova.virt.libvirt.connection.libvirt_utils = fake_libvirt_utils
nova.virt.libvirt.firewall.libvirt = fakelibvirt
# Point _VirtDriverTestCase at the right module
self.driver_module = nova.virt.libvirt.connection
super(LibvirtConnTestCase, self).setUp()
FLAGS.rescue_image_id = "2"
FLAGS.rescue_kernel_id = "3"
FLAGS.rescue_ramdisk_id = None
def tearDown(self):
super(LibvirtConnTestCase, self).setUp()
# Restore libvirt
import nova.virt.libvirt.connection
import nova.virt.libvirt.firewall
if self.saved_libvirt:
sys.modules['libvirt'] = self.saved_libvirt
nova.virt.libvirt.connection.libvirt = self.saved_libvirt
nova.virt.libvirt.connection.libvirt_utils = self.saved_libvirt
nova.virt.libvirt.firewall.libvirt = self.saved_libvirt