moving xenapi unittests changes into another branch

This commit is contained in:
Armando Migliaccio
2010-12-13 20:31:33 +00:00
parent 478fb2844e
commit 9ec2c03b22

View File

@@ -25,14 +25,9 @@ from nova import utils
from nova.api.ec2 import cloud
from nova.auth import manager
from nova.virt import libvirt_conn
from nova.virt.xenapi import fake
from nova.virt.xenapi import volume_utils
FLAGS = flags.FLAGS
flags.DECLARE('instances_path', 'nova.compute.manager')
# Those are XenAPI related
flags.DECLARE('target_host', 'nova.virt.xenapi_conn')
FLAGS.target_host = '127.0.0.1'
class LibvirtConnTestCase(test.TrialTestCase):
@@ -262,74 +257,3 @@ class NWFilterTestCase(test.TrialTestCase):
d.addCallback(lambda _: self.teardown_security_group())
return d
class XenAPIVolumeTestCase(test.TrialTestCase):
def setUp(self):
super(XenAPIVolumeTestCase, self).setUp()
self.flags(xenapi_use_fake_session=True)
self.session = fake.FakeXenAPISession()
self.helper = volume_utils.VolumeHelper
self.helper.late_import()
def _create_volume(self, size='0'):
"""Create a volume object."""
vol = {}
vol['size'] = size
vol['user_id'] = 'fake'
vol['project_id'] = 'fake'
vol['host'] = 'localhost'
vol['availability_zone'] = FLAGS.storage_availability_zone
vol['status'] = "creating"
vol['attach_status'] = "detached"
return db.volume_create(context.get_admin_context(), vol)
def test_create_iscsi_storage_raise_no_exception(self):
vol = self._create_volume()
info = yield self.helper.parse_volume_info(vol['ec2_id'], '/dev/sdc')
label = None # For testing new SRs
description = 'Test-SR'
self.session.fail_next_call = False
sr_ref = self.helper.create_iscsi_storage_blocking(self.session,
info,
label,
description)
self.assertEqual(sr_ref, self.session.SR.FAKE_REF)
db.volume_destroy(context.get_admin_context(), vol['id'])
def test_create_iscsi_storage_raise_unable_to_create_sr_exception(self):
vol = self._create_volume()
info = yield self.helper.parse_volume_info(vol['ec2_id'], '/dev/sdc')
label = None # For testing new SRs
description = None
self.session.fail_next_call = True
self.assertRaises(volume_utils.StorageError,
self.helper.create_iscsi_storage_blocking,
self.session,
info,
label,
description)
def test_find_sr_from_vbd_raise_no_exception(self):
sr_ref = yield self.helper.find_sr_from_vbd(self.session,
self.session.VBD.FAKE_REF)
self.assertEqual(sr_ref, self.session.SR.FAKE_REF)
def test_destroy_iscsi_storage(self):
sr_ref = self.session.SR.FAKE_REF
self.helper.destroy_iscsi_storage_blocking(self.session, sr_ref)
def test_introduce_vdi_raise_no_exception(self):
sr_ref = self.session.SR.FAKE_REF
self.helper.introduce_vdi_blocking(self.session, sr_ref)
def test_introduce_vdi_raise_unable_get_vdi_record_exception(self):
sr_ref = self.session.SR.FAKE_REF
self.session.fail_next_call = True
self.assertRaises(volume_utils.StorageError,
self.helper.introduce_vdi_blocking,
self.session, sr_ref)
def tearDown(self):
super(XenAPIVolumeTestCase, self).tearDown()