Improved unit tests

Fixed docstring formatting

Merged with trunk
This commit is contained in:
Salvatore Orlando
2011-04-06 14:28:05 +01:00
10 changed files with 97 additions and 516 deletions

View File

@@ -32,6 +32,7 @@ Jesse Andrews <anotherjesse@gmail.com>
Joe Heck <heckj@mac.com> Joe Heck <heckj@mac.com>
Joel Moore <joelbm24@gmail.com> Joel Moore <joelbm24@gmail.com>
John Dewey <john@dewey.ws> John Dewey <john@dewey.ws>
John Tran <jtran@attinteractive.com>
Jonathan Bryce <jbryce@jbryce.com> Jonathan Bryce <jbryce@jbryce.com>
Jordan Rinke <jordan@openstack.org> Jordan Rinke <jordan@openstack.org>
Josh Durgin <joshd@hq.newdream.net> Josh Durgin <joshd@hq.newdream.net>

View File

@@ -108,17 +108,17 @@ class AjaxConsoleProxy(object):
return "Server Error" return "Server Error"
def register_listeners(self): def register_listeners(self):
class Callback: class TopicProxy():
def __call__(self, data, message): @staticmethod
if data['method'] == 'authorize_ajax_console': def authorize_ajax_console(context, **kwargs):
AjaxConsoleProxy.tokens[data['args']['token']] = \ AjaxConsoleProxy.tokens[kwargs['token']] = \
{'args': data['args'], 'last_activity': time.time()} {'args': kwargs, 'last_activity': time.time()}
conn = rpc.Connection.instance(new=True) conn = rpc.Connection.instance(new=True)
consumer = rpc.TopicAdapterConsumer( consumer = rpc.TopicAdapterConsumer(
connection=conn, connection=conn,
topic=FLAGS.ajax_console_proxy_topic) proxy=TopicProxy,
consumer.register_callback(Callback()) topic=FLAGS.ajax_console_proxy_topic)
def delete_expired_tokens(): def delete_expired_tokens():
now = time.time() now = time.time()
@@ -130,8 +130,7 @@ class AjaxConsoleProxy(object):
for k in to_delete: for k in to_delete:
del AjaxConsoleProxy.tokens[k] del AjaxConsoleProxy.tokens[k]
utils.LoopingCall(consumer.fetch, auto_ack=True, utils.LoopingCall(consumer.fetch, enable_callbacks=True).start(0.1)
enable_callbacks=True).start(0.1)
utils.LoopingCall(delete_expired_tokens).start(1) utils.LoopingCall(delete_expired_tokens).start(1)
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -902,7 +902,7 @@ class ImageCommands(object):
'disk_format': disk_format, 'disk_format': disk_format,
'container_format': container_format, 'container_format': container_format,
'properties': {'image_state': 'available', 'properties': {'image_state': 'available',
'owner': owner, 'owner_id': owner,
'type': image_type, 'type': image_type,
'architecture': architecture, 'architecture': architecture,
'image_location': 'local', 'image_location': 'local',
@@ -980,7 +980,7 @@ class ImageCommands(object):
'is_public': True, 'is_public': True,
'name': old['imageId'], 'name': old['imageId'],
'properties': {'image_state': old['imageState'], 'properties': {'image_state': old['imageState'],
'owner': old['imageOwnerId'], 'owner_id': old['imageOwnerId'],
'architecture': old['architecture'], 'architecture': old['architecture'],
'type': old['type'], 'type': old['type'],
'image_location': old['imageLocation'], 'image_location': old['imageLocation'],

View File

@@ -1,473 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Nova User API client library.
"""
import base64
import boto
import boto.exception
import httplib
import re
import string
from boto.ec2.regioninfo import RegionInfo
DEFAULT_CLC_URL = 'http://127.0.0.1:8773'
DEFAULT_REGION = 'nova'
class UserInfo(object):
"""
Information about a Nova user, as parsed through SAX.
**Fields Include**
* username
* accesskey
* secretkey
* file (optional) containing zip of X509 cert & rc file
"""
def __init__(self, connection=None, username=None, endpoint=None):
self.connection = connection
self.username = username
self.endpoint = endpoint
def __repr__(self):
return 'UserInfo:%s' % self.username
def startElement(self, name, attrs, connection):
return None
def endElement(self, name, value, connection):
if name == 'username':
self.username = str(value)
elif name == 'file':
self.file = base64.b64decode(str(value))
elif name == 'accesskey':
self.accesskey = str(value)
elif name == 'secretkey':
self.secretkey = str(value)
class UserRole(object):
"""
Information about a Nova user's role, as parsed through SAX.
**Fields include**
* role
"""
def __init__(self, connection=None):
self.connection = connection
self.role = None
def __repr__(self):
return 'UserRole:%s' % self.role
def startElement(self, name, attrs, connection):
return None
def endElement(self, name, value, connection):
if name == 'role':
self.role = value
else:
setattr(self, name, str(value))
class ProjectInfo(object):
"""
Information about a Nova project, as parsed through SAX.
**Fields include**
* projectname
* description
* projectManagerId
* memberIds
"""
def __init__(self, connection=None):
self.connection = connection
self.projectname = None
self.description = None
self.projectManagerId = None
self.memberIds = []
def __repr__(self):
return 'ProjectInfo:%s' % self.projectname
def startElement(self, name, attrs, connection):
return None
def endElement(self, name, value, connection):
if name == 'projectname':
self.projectname = value
elif name == 'description':
self.description = value
elif name == 'projectManagerId':
self.projectManagerId = value
elif name == 'memberId':
self.memberIds.append(value)
else:
setattr(self, name, str(value))
class ProjectMember(object):
"""
Information about a Nova project member, as parsed through SAX.
**Fields include**
* memberId
"""
def __init__(self, connection=None):
self.connection = connection
self.memberId = None
def __repr__(self):
return 'ProjectMember:%s' % self.memberId
def startElement(self, name, attrs, connection):
return None
def endElement(self, name, value, connection):
if name == 'member':
self.memberId = value
else:
setattr(self, name, str(value))
class HostInfo(object):
"""
Information about a Nova Host, as parsed through SAX.
**Fields Include**
* Hostname
* Compute service status
* Volume service status
* Instance count
* Volume count
"""
def __init__(self, connection=None):
self.connection = connection
self.hostname = None
self.compute = None
self.volume = None
self.instance_count = 0
self.volume_count = 0
def __repr__(self):
return 'Host:%s' % self.hostname
# this is needed by the sax parser, so ignore the ugly name
def startElement(self, name, attrs, connection):
return None
# this is needed by the sax parser, so ignore the ugly name
def endElement(self, name, value, connection):
fixed_name = string.lower(re.sub(r'([A-Z])', r'_\1', name))
setattr(self, fixed_name, value)
class Vpn(object):
"""
Information about a Vpn, as parsed through SAX
**Fields Include**
* instance_id
* project_id
* public_ip
* public_port
* created_at
* internal_ip
* state
"""
def __init__(self, connection=None):
self.connection = connection
self.instance_id = None
self.project_id = None
def __repr__(self):
return 'Vpn:%s:%s' % (self.project_id, self.instance_id)
def startElement(self, name, attrs, connection):
return None
def endElement(self, name, value, connection):
fixed_name = string.lower(re.sub(r'([A-Z])', r'_\1', name))
setattr(self, fixed_name, value)
class InstanceType(object):
"""
Information about a Nova instance type, as parsed through SAX.
**Fields include**
* name
* vcpus
* disk_gb
* memory_mb
* flavor_id
"""
def __init__(self, connection=None):
self.connection = connection
self.name = None
self.vcpus = None
self.disk_gb = None
self.memory_mb = None
self.flavor_id = None
def __repr__(self):
return 'InstanceType:%s' % self.name
def startElement(self, name, attrs, connection):
return None
def endElement(self, name, value, connection):
if name == "memoryMb":
self.memory_mb = str(value)
elif name == "flavorId":
self.flavor_id = str(value)
elif name == "diskGb":
self.disk_gb = str(value)
else:
setattr(self, name, str(value))
class NovaAdminClient(object):
def __init__(
self,
clc_url=DEFAULT_CLC_URL,
region=DEFAULT_REGION,
access_key=None,
secret_key=None,
**kwargs):
parts = self.split_clc_url(clc_url)
self.clc_url = clc_url
self.region = region
self.access = access_key
self.secret = secret_key
self.apiconn = boto.connect_ec2(aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
is_secure=parts['is_secure'],
region=RegionInfo(None,
region,
parts['ip']),
port=parts['port'],
path='/services/Admin',
**kwargs)
self.apiconn.APIVersion = 'nova'
def connection_for(self, username, project, clc_url=None, region=None,
**kwargs):
"""Returns a boto ec2 connection for the given username."""
if not clc_url:
clc_url = self.clc_url
if not region:
region = self.region
parts = self.split_clc_url(clc_url)
user = self.get_user(username)
access_key = '%s:%s' % (user.accesskey, project)
return boto.connect_ec2(aws_access_key_id=access_key,
aws_secret_access_key=user.secretkey,
is_secure=parts['is_secure'],
region=RegionInfo(None,
self.region,
parts['ip']),
port=parts['port'],
path='/services/Cloud',
**kwargs)
def split_clc_url(self, clc_url):
"""Splits a cloud controller endpoint url."""
parts = httplib.urlsplit(clc_url)
is_secure = parts.scheme == 'https'
ip, port = parts.netloc.split(':')
return {'ip': ip, 'port': int(port), 'is_secure': is_secure}
def get_users(self):
"""Grabs the list of all users."""
return self.apiconn.get_list('DescribeUsers', {}, [('item', UserInfo)])
def get_user(self, name):
"""Grab a single user by name."""
user = self.apiconn.get_object('DescribeUser',
{'Name': name},
UserInfo)
if user.username != None:
return user
def has_user(self, username):
"""Determine if user exists."""
return self.get_user(username) != None
def create_user(self, username):
"""Creates a new user, returning the userinfo object with
access/secret."""
return self.apiconn.get_object('RegisterUser', {'Name': username},
UserInfo)
def delete_user(self, username):
"""Deletes a user."""
return self.apiconn.get_object('DeregisterUser', {'Name': username},
UserInfo)
def get_roles(self, project_roles=True):
"""Returns a list of available roles."""
return self.apiconn.get_list('DescribeRoles',
{'ProjectRoles': project_roles},
[('item', UserRole)])
def get_user_roles(self, user, project=None):
"""Returns a list of roles for the given user.
Omitting project will return any global roles that the user has.
Specifying project will return only project specific roles.
"""
params = {'User': user}
if project:
params['Project'] = project
return self.apiconn.get_list('DescribeUserRoles',
params,
[('item', UserRole)])
def add_user_role(self, user, role, project=None):
"""Add a role to a user either globally or for a specific project."""
return self.modify_user_role(user, role, project=project,
operation='add')
def remove_user_role(self, user, role, project=None):
"""Remove a role from a user either globally or for a specific
project."""
return self.modify_user_role(user, role, project=project,
operation='remove')
def modify_user_role(self, user, role, project=None, operation='add',
**kwargs):
"""Add or remove a role for a user and project."""
params = {'User': user,
'Role': role,
'Project': project,
'Operation': operation}
return self.apiconn.get_status('ModifyUserRole', params)
def get_projects(self, user=None):
"""Returns a list of all projects."""
if user:
params = {'User': user}
else:
params = {}
return self.apiconn.get_list('DescribeProjects',
params,
[('item', ProjectInfo)])
def get_project(self, name):
"""Returns a single project with the specified name."""
project = self.apiconn.get_object('DescribeProject',
{'Name': name},
ProjectInfo)
if project.projectname != None:
return project
def create_project(self, projectname, manager_user, description=None,
member_users=None):
"""Creates a new project."""
params = {'Name': projectname,
'ManagerUser': manager_user,
'Description': description,
'MemberUsers': member_users}
return self.apiconn.get_object('RegisterProject', params, ProjectInfo)
def modify_project(self, projectname, manager_user=None, description=None):
"""Modifies an existing project."""
params = {'Name': projectname,
'ManagerUser': manager_user,
'Description': description}
return self.apiconn.get_status('ModifyProject', params)
def delete_project(self, projectname):
"""Permanently deletes the specified project."""
return self.apiconn.get_object('DeregisterProject',
{'Name': projectname},
ProjectInfo)
def get_project_members(self, name):
"""Returns a list of members of a project."""
return self.apiconn.get_list('DescribeProjectMembers',
{'Name': name},
[('item', ProjectMember)])
def add_project_member(self, user, project):
"""Adds a user to a project."""
return self.modify_project_member(user, project, operation='add')
def remove_project_member(self, user, project):
"""Removes a user from a project."""
return self.modify_project_member(user, project, operation='remove')
def modify_project_member(self, user, project, operation='add'):
"""Adds or removes a user from a project."""
params = {'User': user,
'Project': project,
'Operation': operation}
return self.apiconn.get_status('ModifyProjectMember', params)
def get_zip(self, user, project):
"""Returns the content of a zip file containing novarc and access
credentials."""
params = {'Name': user, 'Project': project}
zip = self.apiconn.get_object('GenerateX509ForUser', params, UserInfo)
return zip.file
def start_vpn(self, project):
"""
Starts the vpn for a user
"""
return self.apiconn.get_object('StartVpn', {'Project': project}, Vpn)
def get_vpns(self):
"""Return a list of vpn with project name"""
return self.apiconn.get_list('DescribeVpns', {}, [('item', Vpn)])
def get_hosts(self):
return self.apiconn.get_list('DescribeHosts', {}, [('item', HostInfo)])
def get_instance_types(self):
"""Grabs the list of all users."""
return self.apiconn.get_list('DescribeInstanceTypes', {},
[('item', InstanceType)])

View File

@@ -74,7 +74,12 @@ class Connection(carrot_connection.BrokerConnection):
"""Recreates the connection instance """Recreates the connection instance
This is necessary to recover from some network errors/disconnects""" This is necessary to recover from some network errors/disconnects"""
del cls._instance try:
del cls._instance
except AttributeError, e:
# The _instance stuff is for testing purposes. Usually we don't use
# it. So don't freak out if it doesn't exist.
pass
return cls.instance() return cls.instance()
@@ -125,9 +130,9 @@ class Consumer(messaging.Consumer):
# NOTE(vish): This is catching all errors because we really don't # NOTE(vish): This is catching all errors because we really don't
# want exceptions to be logged 10 times a second if some # want exceptions to be logged 10 times a second if some
# persistent failure occurs. # persistent failure occurs.
except Exception: # pylint: disable=W0703 except Exception, e: # pylint: disable=W0703
if not self.failed_connection: if not self.failed_connection:
LOG.exception(_("Failed to fetch message from queue")) LOG.exception(_("Failed to fetch message from queue: %s" % e))
self.failed_connection = True self.failed_connection = True
def attach_to_eventlet(self): def attach_to_eventlet(self):

View File

@@ -72,7 +72,9 @@ class SimpleScheduler(chance.ChanceScheduler):
{'host': service['host'], {'host': service['host'],
'scheduled_at': now}) 'scheduled_at': now})
return service['host'] return service['host']
raise driver.NoValidHost(_("No hosts found")) raise driver.NoValidHost(_("Scheduler was unable to locate a host"
" for this request. Is the appropriate"
" service running?"))
def schedule_create_volume(self, context, volume_id, *_args, **_kwargs): def schedule_create_volume(self, context, volume_id, *_args, **_kwargs):
"""Picks a host that is up and has the fewest volumes.""" """Picks a host that is up and has the fewest volumes."""
@@ -107,7 +109,9 @@ class SimpleScheduler(chance.ChanceScheduler):
{'host': service['host'], {'host': service['host'],
'scheduled_at': now}) 'scheduled_at': now})
return service['host'] return service['host']
raise driver.NoValidHost(_("No hosts found")) raise driver.NoValidHost(_("Scheduler was unable to locate a host"
" for this request. Is the appropriate"
" service running?"))
def schedule_set_network_host(self, context, *_args, **_kwargs): def schedule_set_network_host(self, context, *_args, **_kwargs):
"""Picks a host that is up and has the fewest networks.""" """Picks a host that is up and has the fewest networks."""
@@ -119,4 +123,6 @@ class SimpleScheduler(chance.ChanceScheduler):
raise driver.NoValidHost(_("All hosts have too many networks")) raise driver.NoValidHost(_("All hosts have too many networks"))
if self.service_is_up(service): if self.service_is_up(service):
return service['host'] return service['host']
raise driver.NoValidHost(_("No hosts found")) raise driver.NoValidHost(_("Scheduler was unable to locate a host"
" for this request. Is the appropriate"
" service running?"))

View File

@@ -52,5 +52,8 @@ class ZoneScheduler(driver.Scheduler):
zone = _kwargs.get('availability_zone') zone = _kwargs.get('availability_zone')
hosts = self.hosts_up_with_zone(context, topic, zone) hosts = self.hosts_up_with_zone(context, topic, zone)
if not hosts: if not hosts:
raise driver.NoValidHost(_("No hosts found")) raise driver.NoValidHost(_("Scheduler was unable to locate a host"
" for this request. Is the appropriate"
" service running?"))
return hosts[int(random.random() * len(hosts))] return hosts[int(random.random() * len(hosts))]

View File

@@ -41,6 +41,7 @@ from nova.compute import power_state
from nova.api.ec2 import cloud from nova.api.ec2 import cloud
from nova.api.ec2 import ec2utils from nova.api.ec2 import ec2utils
from nova.image import local from nova.image import local
from nova.exception import NotFound
FLAGS = flags.FLAGS FLAGS = flags.FLAGS
@@ -71,7 +72,8 @@ class CloudTestCase(test.TestCase):
host = self.network.get_network_host(self.context.elevated()) host = self.network.get_network_host(self.context.elevated())
def fake_show(meh, context, id): def fake_show(meh, context, id):
return {'id': 1, 'properties': {'kernel_id': 1, 'ramdisk_id': 1}} return {'id': 1, 'properties': {'kernel_id': 1, 'ramdisk_id': 1,
'type': 'machine'}}
self.stubs.Set(local.LocalImageService, 'show', fake_show) self.stubs.Set(local.LocalImageService, 'show', fake_show)
self.stubs.Set(local.LocalImageService, 'show_by_name', fake_show) self.stubs.Set(local.LocalImageService, 'show_by_name', fake_show)
@@ -216,6 +218,35 @@ class CloudTestCase(test.TestCase):
db.service_destroy(self.context, comp1['id']) db.service_destroy(self.context, comp1['id'])
db.service_destroy(self.context, comp2['id']) db.service_destroy(self.context, comp2['id'])
def test_describe_images(self):
describe_images = self.cloud.describe_images
def fake_detail(meh, context):
return [{'id': 1, 'properties': {'kernel_id': 1, 'ramdisk_id': 1,
'type': 'machine'}}]
def fake_show_none(meh, context, id):
raise NotFound
self.stubs.Set(local.LocalImageService, 'detail', fake_detail)
# list all
result1 = describe_images(self.context)
result1 = result1['imagesSet'][0]
self.assertEqual(result1['imageId'], 'ami-00000001')
# provided a valid image_id
result2 = describe_images(self.context, ['ami-00000001'])
self.assertEqual(1, len(result2['imagesSet']))
# provide more than 1 valid image_id
result3 = describe_images(self.context, ['ami-00000001',
'ami-00000002'])
self.assertEqual(2, len(result3['imagesSet']))
# provide an non-existing image_id
self.stubs.UnsetAll()
self.stubs.Set(local.LocalImageService, 'show', fake_show_none)
self.stubs.Set(local.LocalImageService, 'show_by_name', fake_show_none)
self.assertRaises(NotFound, describe_images,
self.context, ['ami-fake'])
def test_console_output(self): def test_console_output(self):
instance_type = FLAGS.default_instance_type instance_type = FLAGS.default_instance_type
max_count = 1 max_count = 1

View File

@@ -188,7 +188,7 @@ class XenAPIVMTestCase(test.TestCase):
stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests) stubs.stubout_session(self.stubs, stubs.FakeSessionForVMTests)
stubs.stubout_get_this_vm_uuid(self.stubs) stubs.stubout_get_this_vm_uuid(self.stubs)
stubs.stubout_stream_disk(self.stubs) stubs.stubout_stream_disk(self.stubs)
stubs.stubout_is_vdi_pv(self.stubs) stubs.stubout_determine_is_pv_objectstore(self.stubs)
self.stubs.Set(VMOps, 'reset_network', reset_network) self.stubs.Set(VMOps, 'reset_network', reset_network)
stubs.stub_out_vm_methods(self.stubs) stubs.stub_out_vm_methods(self.stubs)
glance_stubs.stubout_glance_client(self.stubs, glance_stubs.stubout_glance_client(self.stubs,
@@ -327,18 +327,17 @@ class XenAPIVMTestCase(test.TestCase):
self.assertEquals(self.vm['HVM_boot_params'], {}) self.assertEquals(self.vm['HVM_boot_params'], {})
self.assertEquals(self.vm['HVM_boot_policy'], '') self.assertEquals(self.vm['HVM_boot_policy'], '')
def _check_no_unbound_vdi(self): def _list_vdis(self):
url = FLAGS.xenapi_connection_url url = FLAGS.xenapi_connection_url
username = FLAGS.xenapi_connection_username username = FLAGS.xenapi_connection_username
password = FLAGS.xenapi_connection_password password = FLAGS.xenapi_connection_password
session = xenapi_conn.XenAPISession(url, username, password) session = xenapi_conn.XenAPISession(url, username, password)
vdi_refs = session.call_xenapi('VDI.get_all') return session.call_xenapi('VDI.get_all')
for vdi_ref in vdi_refs:
vdi_rec = session.call_xenapi('VDI.get_record', vdi_ref) def _check_vdis(self, start_list, end_list):
if 'VBDs' in vdi_rec: for vdi_ref in end_list:
self.assertEquals(vdi_rec['VBDs'], {}) if not vdi_ref in start_list:
else: self.fail('Found unexpected VDI:%s' % vdi_ref)
self.fail('Found unexpected unbound VDI:%s' % vdi_rec['uuid'])
def _test_spawn(self, image_id, kernel_id, ramdisk_id, def _test_spawn(self, image_id, kernel_id, ramdisk_id,
instance_type="m1.large", os_type="linux", instance_type="m1.large", os_type="linux",
@@ -365,28 +364,32 @@ class XenAPIVMTestCase(test.TestCase):
1, 2, 3, "m1.xlarge") 1, 2, 3, "m1.xlarge")
def test_spawn_fail_cleanup_1(self): def test_spawn_fail_cleanup_1(self):
"""Simulates an error while downloading an image.
Verifies that VDIs created are properly cleaned up.
""" """
Simulates an error while downloading image vdi_recs_start = self._list_vdis()
Verifies VDI create are properly cleaned up.
"""
FLAGS.xenapi_image_service = 'glance' FLAGS.xenapi_image_service = 'glance'
stubs.stubout_fetch_image_glance_disk(self.stubs) stubs.stubout_fetch_image_glance_disk(self.stubs)
self.assertRaises(xenapi_fake.Failure, self.assertRaises(xenapi_fake.Failure,
self._test_spawn, 1, 2, 3) self._test_spawn, 1, 2, 3)
# ensure there is no VDI without a VBD # no new VDI should be found
self._check_no_unbound_vdi() vdi_recs_end = self._list_vdis()
self._check_vdis(vdi_recs_start, vdi_recs_end)
def test_spawn_fail_cleanup_2(self): def test_spawn_fail_cleanup_2(self):
"""Simulates an error while creating VM record. It
verifies that VDIs created are properly cleaned up.
""" """
Simulates an error while creating VM record. It vdi_recs_start = self._list_vdis()
verifies that VDI created are properly cleaned up.
"""
FLAGS.xenapi_image_service = 'glance' FLAGS.xenapi_image_service = 'glance'
stubs.stubout_create_vm(self.stubs) stubs.stubout_create_vm(self.stubs)
self.assertRaises(xenapi_fake.Failure, self.assertRaises(xenapi_fake.Failure,
self._test_spawn, 1, 2, 3) self._test_spawn, 1, 2, 3)
# ensure there is no VDI without a VBD # no new VDI should be found
self._check_no_unbound_vdi() vdi_recs_end = self._list_vdis()
self._check_vdis(vdi_recs_start, vdi_recs_end)
def test_spawn_raw_objectstore(self): def test_spawn_raw_objectstore(self):
FLAGS.xenapi_image_service = 'objectstore' FLAGS.xenapi_image_service = 'objectstore'

View File

@@ -132,10 +132,13 @@ def stubout_stream_disk(stubs):
stubs.Set(vm_utils, '_stream_disk', f) stubs.Set(vm_utils, '_stream_disk', f)
def stubout_is_vdi_pv(stubs): def stubout_determine_is_pv_objectstore(stubs):
def f(_1): """Assumes VMs never have PV kernels"""
@classmethod
def f(cls, *args):
return False return False
stubs.Set(vm_utils, '_is_vdi_pv', f) stubs.Set(vm_utils.VMHelper, '_determine_is_pv_objectstore', f)
def stubout_lookup_image(stubs): def stubout_lookup_image(stubs):
@@ -176,7 +179,10 @@ class FakeSessionForVMTests(fake.SessionBase):
def __init__(self, uri): def __init__(self, uri):
super(FakeSessionForVMTests, self).__init__(uri) super(FakeSessionForVMTests, self).__init__(uri)
def host_call_plugin(self, _1, _2, _3, _4, _5): def host_call_plugin(self, _1, _2, plugin, fn, args):
# copy_kernel_vdi returns nothing
if fn == 'copy_kernel_vdi':
return
sr_ref = fake.get_all('SR')[0] sr_ref = fake.get_all('SR')[0]
vdi_ref = fake.create_vdi('', False, sr_ref, False) vdi_ref = fake.create_vdi('', False, sr_ref, False)
vdi_rec = fake.get_record('VDI', vdi_ref) vdi_rec = fake.get_record('VDI', vdi_ref)