first cut of unittest framework for xenapi

This commit is contained in:
Armando Migliaccio 2010-12-14 14:27:56 +00:00
commit e7562217ed
11 changed files with 569 additions and 209 deletions

View File

@ -30,9 +30,6 @@ from nova.virt.xenapi import volume_utils
FLAGS = flags.FLAGS
flags.DECLARE('instances_path', 'nova.compute.manager')
# Those are XenAPI related
flags.DECLARE('target_host', 'nova.virt.xenapi_conn')
FLAGS.target_host = '127.0.0.1'
class LibvirtConnTestCase(test.TrialTestCase):
@ -262,74 +259,3 @@ class NWFilterTestCase(test.TrialTestCase):
d.addCallback(lambda _: self.teardown_security_group())
return d
class XenAPIVolumeTestCase(test.TrialTestCase):
def setUp(self):
super(XenAPIVolumeTestCase, self).setUp()
self.flags(xenapi_use_fake_session=True)
self.session = fake.FakeXenAPISession()
self.helper = volume_utils.VolumeHelper
self.helper.late_import()
def _create_volume(self, size='0'):
"""Create a volume object."""
vol = {}
vol['size'] = size
vol['user_id'] = 'fake'
vol['project_id'] = 'fake'
vol['host'] = 'localhost'
vol['availability_zone'] = FLAGS.storage_availability_zone
vol['status'] = "creating"
vol['attach_status'] = "detached"
return db.volume_create(context.get_admin_context(), vol)
def test_create_iscsi_storage_raise_no_exception(self):
vol = self._create_volume()
info = yield self.helper.parse_volume_info(vol['ec2_id'], '/dev/sdc')
label = None # For testing new SRs
description = 'Test-SR'
self.session.fail_next_call = False
sr_ref = self.helper.create_iscsi_storage_blocking(self.session,
info,
label,
description)
self.assertEqual(sr_ref, self.session.SR.FAKE_REF)
db.volume_destroy(context.get_admin_context(), vol['id'])
def test_create_iscsi_storage_raise_unable_to_create_sr_exception(self):
vol = self._create_volume()
info = yield self.helper.parse_volume_info(vol['ec2_id'], '/dev/sdc')
label = None # For testing new SRs
description = None
self.session.fail_next_call = True
self.assertRaises(volume_utils.StorageError,
self.helper.create_iscsi_storage_blocking,
self.session,
info,
label,
description)
def test_find_sr_from_vbd_raise_no_exception(self):
sr_ref = yield self.helper.find_sr_from_vbd(self.session,
self.session.VBD.FAKE_REF)
self.assertEqual(sr_ref, self.session.SR.FAKE_REF)
def test_destroy_iscsi_storage(self):
sr_ref = self.session.SR.FAKE_REF
self.helper.destroy_iscsi_storage_blocking(self.session, sr_ref)
def test_introduce_vdi_raise_no_exception(self):
sr_ref = self.session.SR.FAKE_REF
self.helper.introduce_vdi_blocking(self.session, sr_ref)
def test_introduce_vdi_raise_unable_get_vdi_record_exception(self):
sr_ref = self.session.SR.FAKE_REF
self.session.fail_next_call = True
self.assertRaises(volume_utils.StorageError,
self.helper.introduce_vdi_blocking,
self.session, sr_ref)
def tearDown(self):
super(XenAPIVolumeTestCase, self).tearDown()

View File

@ -0,0 +1,188 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2010 Citrix Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (c) 2010 Citrix Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from twisted.internet import defer
from twisted.internet import threads
from nova import db
from nova import context
from nova import flags
from nova import test
from nova import utils
from nova.auth import manager
from nova.compute import instance_types
from nova.compute import power_state
from nova.virt import xenapi_conn
from nova.virt.xenapi import fake
from nova.virt.xenapi import volume_utils
FLAGS = flags.FLAGS
class XenAPIVolumeTestCase(test.TrialTestCase):
def setUp(self):
super(XenAPIVolumeTestCase, self).setUp()
FLAGS.xenapi_use_fake_session = True
FLAGS.target_host = '127.0.0.1'
FLAGS.xenapi_connection_url = 'test_url'
FLAGS.xenapi_connection_password = 'test_pass'
fake.reset()
def _create_volume(self, size='0'):
"""Create a volume object."""
vol = {}
vol['size'] = size
vol['user_id'] = 'fake'
vol['project_id'] = 'fake'
vol['host'] = 'localhost'
vol['availability_zone'] = FLAGS.storage_availability_zone
vol['status'] = "creating"
vol['attach_status'] = "detached"
return db.volume_create(context.get_admin_context(), vol)
def test_create_iscsi_storage_raise_no_exception(self):
session = xenapi_conn.XenAPISession('test_url', 'root', 'test_pass')
helper = volume_utils.VolumeHelper
helper.late_import(FLAGS)
vol = self._create_volume()
info = yield helper.parse_volume_info(vol['ec2_id'], '/dev/sdc')
label = 'SR-%s' % vol['ec2_id']
description = 'Test-SR'
sr_ref = helper.create_iscsi_storage_blocking(session,
info,
label,
description)
db.volume_destroy(context.get_admin_context(), vol['id'])
def test_attach_volume(self):
conn = xenapi_conn.get_connection(False)
volume = self._create_volume()
instance = FakeInstance(1, 'fake', 'fake', 1, 2, 3,
'm1.large', 'aa:bb:cc:dd:ee:ff')
fake.create_vm(instance.name, 'Running')
result = conn.attach_volume(instance.name, volume['ec2_id'],
'/dev/sdc')
def check(_):
# check that
# 1. the SR has been created
# 2. the instance has a VBD attached to it
pass
result.addCallback(check)
return result
def tearDown(self):
super(XenAPIVolumeTestCase, self).tearDown()
class XenAPIVMTestCase(test.TrialTestCase):
def setUp(self):
super(XenAPIVMTestCase, self).setUp()
self.manager = manager.AuthManager()
self.user = self.manager.create_user('fake', 'fake', 'fake',
admin=True)
self.project = self.manager.create_project('fake', 'fake', 'fake')
self.network = utils.import_object(FLAGS.network_manager)
FLAGS.xenapi_use_fake_session = True
FLAGS.xenapi_connection_url = 'test_url'
FLAGS.xenapi_connection_password = 'test_pass'
fake.reset()
fake.create_network('fake', FLAGS.flat_network_bridge)
def test_list_instances_0(self):
conn = xenapi_conn.get_connection(False)
instances = conn.list_instances()
self.assertEquals(instances, [])
test_list_instances_0.skip = "E"
def test_spawn(self):
conn = xenapi_conn.get_connection(False)
instance = FakeInstance(1, self.project.id, self.user.id, 1, 2, 3,
'm1.large', 'aa:bb:cc:dd:ee:ff')
result = conn.spawn(instance)
def check(_):
instances = conn.list_instances()
self.assertEquals(instances, [1])
# Get Nova record for VM
vm_info = conn.get_info(1)
# Get XenAPI record for VM
vms = fake.get_all('VM')
vm = fake.get_record('VM', vms[0])
# Check that m1.large above turned into the right thing.
instance_type = instance_types.INSTANCE_TYPES['m1.large']
mem_kib = long(instance_type['memory_mb']) << 10
mem_bytes = str(mem_kib << 10)
vcpus = instance_type['vcpus']
self.assertEquals(vm_info['max_mem'], mem_kib)
self.assertEquals(vm_info['mem'], mem_kib)
self.assertEquals(vm['memory_static_max'], mem_bytes)
self.assertEquals(vm['memory_dynamic_max'], mem_bytes)
self.assertEquals(vm['memory_dynamic_min'], mem_bytes)
self.assertEquals(vm['VCPUs_max'], str(vcpus))
self.assertEquals(vm['VCPUs_at_startup'], str(vcpus))
# Check that the VM is running according to Nova
self.assertEquals(vm_info['state'], power_state.RUNNING)
# Check that the VM is running according to XenAPI.
self.assertEquals(vm['power_state'], 'Running')
result.addCallback(check)
return result
def tearDown(self):
super(XenAPIVMTestCase, self).tearDown()
self.manager.delete_project(self.project)
self.manager.delete_user(self.user)
class FakeInstance():
def __init__(self, name, project_id, user_id, image_id, kernel_id,
ramdisk_id, instance_type, mac_address):
self.name = name
self.project_id = project_id
self.user_id = user_id
self.image_id = image_id
self.kernel_id = kernel_id
self.ramdisk_id = ramdisk_id
self.instance_type = instance_type
self.mac_address = mac_address

View File

@ -18,3 +18,33 @@
:mod:`xenapi` -- Nova support for XenServer and XCP through XenAPI
==================================================================
"""
def load_sdk(flags):
"""
This method is used for loading the XenAPI SDK (fake or real)
"""
xenapi_module = \
flags.xenapi_use_fake_session and 'nova.virt.xenapi.fake' or 'XenAPI'
from_list = \
flags.xenapi_use_fake_session and ['fake'] or []
return __import__(xenapi_module, globals(), locals(), from_list, -1)
class HelperBase():
"""
The class that wraps the helper methods together.
"""
XenAPI = None
def __init__(self):
return
@classmethod
def late_import(cls, FLAGS):
"""
Load XenAPI module in for helper class
"""
if cls.XenAPI is None:
cls.XenAPI = load_sdk(FLAGS)

View File

@ -1,6 +1,6 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2010 Citrix Systems, Inc.
#
# Copyright (c) 2010 Citrix Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -13,116 +13,354 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
#============================================================================
#
# Parts of this file are based upon xmlrpclib.py, the XML-RPC client
# interface included in the Python distribution.
#
# Copyright (c) 1999-2002 by Secret Labs AB
# Copyright (c) 1999-2002 by Fredrik Lundh
#
# By obtaining, using, and/or copying this software and/or its
# associated documentation, you agree that you have read, understood,
# and will comply with the following terms and conditions:
#
# Permission to use, copy, modify, and distribute this software and
# its associated documentation for any purpose and without fee is
# hereby granted, provided that the above copyright notice appears in
# all copies, and that both that copyright notice and this permission
# notice appear in supporting documentation, and that the name of
# Secret Labs AB or the author not be used in advertising or publicity
# pertaining to distribution of the software without specific, written
# prior permission.
#
# SECRET LABS AB AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD
# TO THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANT-
# ABILITY AND FITNESS. IN NO EVENT SHALL SECRET LABS AB OR THE AUTHOR
# BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY
# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
# OF THIS SOFTWARE.
# --------------------------------------------------------------------
"""
A fake XenAPI SDK.
Allows for xenapi helper classes testing.
"""
import datetime
import logging
import uuid
from nova import exception
_CLASSES = ['host', 'network', 'session', 'SR', 'VBD',\
'VDI', 'VIF', 'VM', 'task']
_db_content = {}
def reset():
for c in _CLASSES:
_db_content[c] = {}
create_host('fake')
def create_host(name_label):
return _create_object('host', {
'name_label': name_label,
})
def create_network(name_label, bridge):
return _create_object('network', {
'name_label': name_label,
'bridge': bridge,
})
def create_vm(name_label, status):
return _create_object('VM', {
'name_label': name_label,
'power-state': status,
})
def create_task(name_label):
return _create_object('task', {
'name_label': name_label,
'status': 'pending',
})
def _create_object(table, obj):
ref = str(uuid.uuid4())
obj['uuid'] = str(uuid.uuid4())
_db_content[table][ref] = obj
return ref
def get_all(table):
return _db_content[table].keys()
def get_all_records(table):
return _db_content[table]
def get_record(table, ref):
if ref in _db_content[table]:
return _db_content[table].get(ref)
else:
raise Failure(['HANDLE_INVALID', table, ref])
def check_for_session_leaks():
if len(_db_content['session']) > 0:
raise exception.Error('Sessions have leaked: %s' %
_db_content['session'])
class Failure(Exception):
def __init__(self, message=None):
super(Failure, self).__init__(message)
self.details = []
def __init__(self, details):
self.details = details
def __str__(self):
return 'Fake XenAPI Exception'
try:
return str(self.details)
except Exception, exn:
return "XenAPI Fake Failure: %s" % str(self.details)
def _details_map(self):
return dict([(str(i), self.details[i])
for i in range(len(self.details))])
class FakeXenAPISession(object):
""" The session to invoke XenAPI SDK calls """
def __init__(self):
self.fail_next_call = False
class SessionBase(object):
"""
Base class for Fake Sessions
"""
def get_xenapi(self):
""" Return the xenapi object """
return self
def __init__(self, uri):
self._session = None
def get_xenapi_host(self):
""" Return the xenapi host """
return 'FAKE_XENAPI_HOST'
def call_xenapi(self, method, *args):
"""Call the specified XenAPI method on a background thread. Returns
a Deferred for the result."""
raise NotImplementedError()
def async_call_plugin(self, plugin, fn, args):
"""Call Async.host.call_plugin on a background thread. Returns a
Deferred with the task reference."""
raise NotImplementedError()
def wait_for_task(self, task):
"""Return a Deferred that will give the result of the given task.
The task is polled until it completes."""
raise NotImplementedError()
def __getattr__(self, name):
return FakeXenAPIObject(name, self)
class FakeXenAPIObject(object):
def __init__(self, name, session):
self.name = name
self.session = session
self.FAKE_REF = 'FAKE_REFERENCE_%s' % name
def get_by_name_label(self, label):
if label is None:
return '' # 'No object found'
def xenapi_request(self, methodname, params):
if methodname.startswith('login'):
self._login(methodname, params)
return None
elif methodname == 'logout' or methodname == 'session.logout':
self._logout()
return None
else:
return 'FAKE_OBJECT_%s_%s' % (self.name, label)
full_params = (self._session,) + params
meth = getattr(self, methodname, None)
if meth is None:
logging.warn('Raising NotImplemented')
raise NotImplementedError(
'xenapi.fake does not have an implementation for %s' %
methodname)
return meth(*full_params)
def getter(self, *args):
self._check_fail()
return self.FAKE_REF
def _login(self, method, params):
self._session = str(uuid.uuid4())
_db_content['session'][self._session] = {
'uuid': str(uuid.uuid4()),
'this_host': _db_content['host'].keys()[0],
}
def ref_list(self, *args):
self._check_fail()
return [FakeXenAPIRecord()]
def _logout(self):
s = self._session
self._session = None
if s not in _db_content['session']:
raise exception.Error(
"Logging out a session that is invalid or already logged "
"out: %s" % s)
del _db_content['session'][s]
def __getattr__(self, name):
if name == 'create':
return self._create
elif name == 'get_record':
return self._record
elif name == 'introduce' or\
name == 'forget' or\
name == 'unplug':
return self._fake_action
elif name.startswith('get_'):
getter = 'get_%s' % self.name
if name == getter:
return self.getter
else:
child = name[name.find('_') + 1:]
if child.endswith('s'):
return FakeXenAPIObject(child[:-1], self.session).ref_list
else:
return FakeXenAPIObject(child, self.session).getter
if name == 'handle':
return self._session
elif name == 'xenapi':
return _Dispatcher(self.xenapi_request, None)
elif name.startswith('login') or name.startswith('slave_local'):
return lambda *params: self._login(name, params)
elif name.startswith('Async'):
return lambda *params: self._async(name, params)
elif '.' in name:
impl = getattr(self, name.replace('.', '_'))
if impl is not None:
def callit(*params):
logging.warn('Calling %s %s', name, impl)
self._check_session(params)
return impl(*params)
return callit
if self._is_gettersetter(name, True):
logging.warn('Calling getter %s', name)
return lambda *params: self._getter(name, params)
elif self._is_create(name):
return lambda *params: self._create(name, params)
else:
return None
def _create(self, *args):
self._check_fail()
return self.FAKE_REF
def _is_gettersetter(self, name, getter):
bits = name.split('.')
return (len(bits) == 2 and
bits[0] in _CLASSES and
bits[1].startswith(getter and 'get_' or 'set_'))
def _record(self, *args):
self._check_fail()
return FakeXenAPIRecord()
def _is_create(self, name):
bits = name.split('.')
return (len(bits) == 2 and
bits[0] in _CLASSES and
bits[1] == 'create')
def _fake_action(self, *args):
self._check_fail()
pass
def _getter(self, name, params):
self._check_session(params)
(cls, func) = name.split('.')
def _check_fail(self):
if self.session.fail_next_call:
self.session.fail_next_call = False # Reset!
raise Failure('Unable to create %s' % self.name)
if func == 'get_all':
self._check_arg_count(params, 1)
return get_all(cls)
if func == 'get_all_records':
self._check_arg_count(params, 1)
return get_all_records(cls)
if func == 'get_record':
self._check_arg_count(params, 2)
return get_record(cls, params[1])
if (func == 'get_by_name_label' or
func == 'get_by_uuid'):
self._check_arg_count(params, 2)
return self._get_by_field(
_db_content[cls], func[len('get_by_'):], params[1])
if len(params) == 2:
field = func[len('get_'):]
ref = params[1]
if (ref in _db_content[cls] and
field in _db_content[cls][ref]):
return _db_content[cls][ref][field]
logging.error('Raising NotImplemented')
raise NotImplementedError(
'xenapi.fake does not have an implementation for %s or it has '
'been called with the wrong number of arguments' % name)
def _setter(self, name, params):
self._check_session(params)
(cls, func) = name.split('.')
if len(params) == 3:
field = func[len('set_'):]
ref = params[1]
val = params[2]
if (ref in _db_content[cls] and
field in _db_content[cls][ref]):
_db_content[cls][ref][field] = val
logging.warn('Raising NotImplemented')
raise NotImplementedError(
'xenapi.fake does not have an implementation for %s or it has '
'been called with the wrong number of arguments or the database '
'is missing that field' % name)
def _create(self, name, params):
self._check_session(params)
expected = 2
if name == 'SR.create':
expected = 10
self._check_arg_count(params, expected)
(cls, _) = name.split('.')
if name == 'SR.create':
ref = _create_object(cls, params[2])
else:
ref = _create_object(cls, params[1])
obj = get_record(cls, ref)
# Add RO fields
if cls == 'VM':
obj['power_state'] = 'Halted'
return ref
def _async(self, name, params):
task_ref = create_task(name)
task = _db_content['task'][task_ref]
func = name[len('Async.'):]
try:
task['result'] = self.xenapi_request(func, params[1:])
task['status'] = 'success'
except Failure, exn:
task['error_info'] = exn.details
task['status'] = 'failed'
task['finished'] = datetime.datetime.now()
return task_ref
def _check_session(self, params):
if (self._session is None or
self._session not in _db_content['session']):
raise Failure(['HANDLE_INVALID', 'session', self._session])
if len(params) == 0 or params[0] != self._session:
logging.warn('Raising NotImplemented')
raise NotImplementedError('Call to XenAPI without using .xenapi')
def _check_arg_count(self, params, expected):
actual = len(params)
if actual != expected:
raise Failure(['MESSAGE_PARAMETER_COUNT_MISMATCH',
expected, actual])
def _get_by_field(self, recs, k, v):
result = []
for ref, rec in recs.iteritems():
if rec.get(k) == v:
result.append(ref)
return result
class FakeXenAPIRecord(dict):
def __init__(self):
pass
# Based upon _Method from xmlrpclib.
class _Dispatcher:
def __init__(self, send, name):
self.__send = send
self.__name = name
def __getitem__(self, attr):
def __repr__(self):
if self.__name:
return '<xenapi.fake._Dispatcher for %s>' % self.__name
else:
return '<xenapi.fake._Dispatcher>'
def __getattr__(self, name):
if self.__name is None:
return _Dispatcher(self.__send, name)
else:
return _Dispatcher(self.__send, "%s.%s" % (self.__name, name))
def __call__(self, *args):
return self.__send(self.__name, args)
class FakeSession(SessionBase):
def __init__(self, uri):
super(FakeSession, self).__init__(uri)
def network_get_all_records_where(self, _1, _2):
return self.xenapi.network.get_all_records()
def host_call_plugin(self, _1, _2, _3, _4, _5):
return ''
def VM_start(self, _1, ref, _2, _3):
vm = get_record('VM', ref)
if vm['power_state'] != 'Halted':
raise Failure(['VM_BAD_POWER_STATE', ref, 'Halted',
vm['power_state']])
vm['power_state'] = 'Running'

View File

@ -21,9 +21,10 @@ their lookup functions.
"""
from twisted.internet import defer
from nova.virt.xenapi import HelperBase
class NetworkHelper():
class NetworkHelper(HelperBase):
"""
The class that wraps the helper methods together.
"""

View File

@ -32,6 +32,7 @@ from nova.auth.manager import AuthManager
from nova.compute import instance_types
from nova.compute import power_state
from nova.virt import images
from nova.virt.xenapi import HelperBase
from nova.virt.xenapi.volume_utils import StorageError
FLAGS = flags.FLAGS
@ -44,29 +45,13 @@ XENAPI_POWER_STATE = {
'Crashed': power_state.CRASHED}
class VMHelper():
class VMHelper(HelperBase):
"""
The class that wraps the helper methods together.
"""
XenAPI = None
def __init__(self):
return
@classmethod
def late_import(cls):
"""
Load XenAPI module in for helper class
"""
xenapi_module = \
FLAGS.xenapi_use_fake_session and 'nova.virt.xenapi.fake' or 'XenAPI'
from_list = \
FLAGS.xenapi_use_fake_session and ['fake'] or []
if cls.XenAPI is None:
cls.XenAPI = __import__(xenapi_module,
globals(), locals(), from_list, -1)
@classmethod
@defer.inlineCallbacks
def create_vm(cls, session, instance, kernel, ramdisk):

View File

@ -24,9 +24,11 @@ from twisted.internet import defer
from nova import db
from nova import context
from nova import flags
from nova import exception
from nova.auth.manager import AuthManager
from nova.virt.xenapi import load_sdk
from nova.virt.xenapi.network_utils import NetworkHelper
from nova.virt.xenapi.vm_utils import VMHelper
@ -36,10 +38,10 @@ class VMOps(object):
Management class for VM-related tasks
"""
def __init__(self, session):
self.XenAPI = __import__('XenAPI')
self.XenAPI = load_sdk(flags.FLAGS)
self._session = session
# Load XenAPI module in the helper class
VMHelper.late_import()
VMHelper.late_import(flags.FLAGS)
def list_instances(self):
""" List VM instances """

View File

@ -30,7 +30,7 @@ from nova import context
from nova import flags
from nova import process
from nova import utils
from nova.virt.xenapi import HelperBase
FLAGS = flags.FLAGS
@ -41,29 +41,13 @@ class StorageError(Exception):
super(StorageError, self).__init__(message)
class VolumeHelper():
class VolumeHelper(HelperBase):
"""
The class that wraps the helper methods together.
"""
XenAPI = None
def __init__(self):
return
@classmethod
def late_import(cls):
"""
Load XenAPI module in for helper class
"""
xenapi_module = \
FLAGS.xenapi_use_fake_session and 'nova.virt.xenapi.fake' or 'XenAPI'
from_list = \
FLAGS.xenapi_use_fake_session and ['fake'] or []
if cls.XenAPI is None:
cls.XenAPI = __import__(xenapi_module,
globals(), locals(), from_list, -1)
@classmethod
@utils.deferredToThread
def create_iscsi_storage(cls, session, info, label, description):

View File

@ -21,6 +21,8 @@ import logging
from twisted.internet import defer
from nova import flags
from nova.virt.xenapi import load_sdk
from nova.virt.xenapi.vm_utils import VMHelper
from nova.virt.xenapi.volume_utils import VolumeHelper
from nova.virt.xenapi.volume_utils import StorageError
@ -31,11 +33,11 @@ class VolumeOps(object):
Management class for Volume-related tasks
"""
def __init__(self, session):
self.XenAPI = __import__('XenAPI')
self.XenAPI = load_sdk(flags.FLAGS)
self._session = session
# Load XenAPI module in the helper classes respectively
VolumeHelper.late_import()
VMHelper.late_import()
VolumeHelper.late_import(flags.FLAGS)
VMHelper.late_import(flags.FLAGS)
@defer.inlineCallbacks
def attach_volume(self, instance_name, device_path, mountpoint):

View File

@ -59,6 +59,7 @@ from twisted.internet import reactor
from nova import utils
from nova import flags
from nova.virt.xenapi import load_sdk
from nova.virt.xenapi.vmops import VMOps
from nova.virt.xenapi.volumeops import VolumeOps
@ -91,7 +92,7 @@ flags.DEFINE_string('target_port',
'3260',
'iSCSI Target Port, 3260 Default')
flags.DEFINE_string('iqn_prefix',
'iqn.2010-12.org.openstack',
'iqn.2010-10.org.openstack',
'IQN Prefix')
@ -122,7 +123,7 @@ class XenAPIConnection(object):
def spawn(self, instance):
""" Create VM instance """
self._vmops.spawn(instance)
return self._vmops.spawn(instance)
def reboot(self, instance):
""" Reboot VM instance """
@ -156,8 +157,11 @@ class XenAPISession(object):
def __init__(self, url, user, pw):
# This is loaded late so that there's no need to install this
# library when not using XenAPI.
self.XenAPI = __import__('XenAPI')
self._session = self.XenAPI.Session(url)
self.XenAPI = load_sdk(FLAGS)
if FLAGS.xenapi_use_fake_session:
self._session = self.XenAPI.FakeSession(url)
else:
self._session = self.XenAPI.Session(url)
self._session.login_with_password(user, pw)
def get_xenapi(self):

View File

@ -65,8 +65,8 @@ from nova.tests.service_unittest import *
from nova.tests.twistd_unittest import *
from nova.tests.validator_unittest import *
from nova.tests.virt_unittest import *
from nova.tests.virt_unittest import *
from nova.tests.volume_unittest import *
from nova.tests.xenapi_unittest import *
FLAGS = flags.FLAGS