Moving more tests to appropriate locations

Change-Id: I06aa8d8f31593cfea6e26560d6af2caf6a29194d
This commit is contained in:
Rick Harris
2013-05-24 23:31:01 +00:00
parent 9f8f3e1f63
commit d9e2e84d35
9 changed files with 0 additions and 7963 deletions

View File

@@ -1,615 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Unit tests for the API endpoint."""
import random
import StringIO
import boto
import boto.connection
from boto.ec2 import regioninfo
from boto import exception as boto_exc
# newer versions of boto use their own wrapper on top of httplib.HTTPResponse
if hasattr(boto.connection, 'HTTPResponse'):
httplib = boto.connection
else:
import httplib
import fixtures
import webob
from nova.api import auth
from nova.api import ec2
from nova.api.ec2 import apirequest
from nova.api.ec2 import ec2utils
from nova import block_device
from nova import context
from nova import exception
from nova.openstack.common import timeutils
from nova import test
from nova.tests import matchers
class FakeHttplibSocket(object):
"""a fake socket implementation for httplib.HTTPResponse, trivial."""
def __init__(self, response_string):
self.response_string = response_string
self._buffer = StringIO.StringIO(response_string)
def makefile(self, _mode, _other):
"""Returns the socket's internal buffer."""
return self._buffer
class FakeHttplibConnection(object):
"""A fake httplib.HTTPConnection for boto to use
requests made via this connection actually get translated and routed into
our WSGI app, we then wait for the response and turn it back into
the HTTPResponse that boto expects.
"""
def __init__(self, app, host, is_secure=False):
self.app = app
self.host = host
def request(self, method, path, data, headers):
req = webob.Request.blank(path)
req.method = method
req.body = data
req.headers = headers
req.headers['Accept'] = 'text/html'
req.host = self.host
# Call the WSGI app, get the HTTP response
resp = str(req.get_response(self.app))
# For some reason, the response doesn't have "HTTP/1.0 " prepended; I
# guess that's a function the web server usually provides.
resp = "HTTP/1.0 %s" % resp
self.sock = FakeHttplibSocket(resp)
self.http_response = httplib.HTTPResponse(self.sock)
# NOTE(vish): boto is accessing private variables for some reason
self._HTTPConnection__response = self.http_response
self.http_response.begin()
def getresponse(self):
return self.http_response
def getresponsebody(self):
return self.sock.response_string
def close(self):
"""Required for compatibility with boto/tornado."""
pass
class XmlConversionTestCase(test.TestCase):
"""Unit test api xml conversion."""
def test_number_conversion(self):
conv = ec2utils._try_convert
self.assertEqual(conv('None'), None)
self.assertEqual(conv('True'), True)
self.assertEqual(conv('TRUE'), True)
self.assertEqual(conv('true'), True)
self.assertEqual(conv('False'), False)
self.assertEqual(conv('FALSE'), False)
self.assertEqual(conv('false'), False)
self.assertEqual(conv('0'), 0)
self.assertEqual(conv('42'), 42)
self.assertEqual(conv('3.14'), 3.14)
self.assertEqual(conv('-57.12'), -57.12)
self.assertEqual(conv('0x57'), 0x57)
self.assertEqual(conv('-0x57'), -0x57)
self.assertEqual(conv('-'), '-')
self.assertEqual(conv('-0'), 0)
self.assertEqual(conv('0.0'), 0.0)
self.assertEqual(conv('1e-8'), 0.0)
self.assertEqual(conv('-1e-8'), 0.0)
self.assertEqual(conv('0xDD8G'), '0xDD8G')
self.assertEqual(conv('0XDD8G'), '0XDD8G')
self.assertEqual(conv('-stringy'), '-stringy')
self.assertEqual(conv('stringy'), 'stringy')
self.assertEqual(conv('add'), 'add')
self.assertEqual(conv('remove'), 'remove')
self.assertEqual(conv(''), '')
class Ec2utilsTestCase(test.TestCase):
def test_ec2_id_to_id(self):
self.assertEqual(ec2utils.ec2_id_to_id('i-0000001e'), 30)
self.assertEqual(ec2utils.ec2_id_to_id('ami-1d'), 29)
self.assertEqual(ec2utils.ec2_id_to_id('snap-0000001c'), 28)
self.assertEqual(ec2utils.ec2_id_to_id('vol-0000001b'), 27)
def test_bad_ec2_id(self):
self.assertRaises(exception.InvalidEc2Id,
ec2utils.ec2_id_to_id,
'badone')
def test_id_to_ec2_id(self):
self.assertEqual(ec2utils.id_to_ec2_id(30), 'i-0000001e')
self.assertEqual(ec2utils.id_to_ec2_id(29, 'ami-%08x'), 'ami-0000001d')
self.assertEqual(ec2utils.id_to_ec2_snap_id(28), 'snap-0000001c')
self.assertEqual(ec2utils.id_to_ec2_vol_id(27), 'vol-0000001b')
def test_dict_from_dotted_str(self):
in_str = [('BlockDeviceMapping.1.DeviceName', '/dev/sda1'),
('BlockDeviceMapping.1.Ebs.SnapshotId', 'snap-0000001c'),
('BlockDeviceMapping.1.Ebs.VolumeSize', '80'),
('BlockDeviceMapping.1.Ebs.DeleteOnTermination', 'false'),
('BlockDeviceMapping.2.DeviceName', '/dev/sdc'),
('BlockDeviceMapping.2.VirtualName', 'ephemeral0')]
expected_dict = {
'block_device_mapping': {
'1': {'device_name': '/dev/sda1',
'ebs': {'snapshot_id': 'snap-0000001c',
'volume_size': 80,
'delete_on_termination': False}},
'2': {'device_name': '/dev/sdc',
'virtual_name': 'ephemeral0'}}}
out_dict = ec2utils.dict_from_dotted_str(in_str)
self.assertThat(out_dict, matchers.DictMatches(expected_dict))
def test_properties_root_defice_name(self):
mappings = [{"device": "/dev/sda1", "virtual": "root"}]
properties0 = {'mappings': mappings}
properties1 = {'root_device_name': '/dev/sdb', 'mappings': mappings}
root_device_name = block_device.properties_root_device_name(
properties0)
self.assertEqual(root_device_name, '/dev/sda1')
root_device_name = block_device.properties_root_device_name(
properties1)
self.assertEqual(root_device_name, '/dev/sdb')
def test_mapping_prepend_dev(self):
mappings = [
{'virtual': 'ami',
'device': 'sda1'},
{'virtual': 'root',
'device': '/dev/sda1'},
{'virtual': 'swap',
'device': 'sdb1'},
{'virtual': 'swap',
'device': '/dev/sdb2'},
{'virtual': 'ephemeral0',
'device': 'sdc1'},
{'virtual': 'ephemeral1',
'device': '/dev/sdc1'}]
expected_result = [
{'virtual': 'ami',
'device': 'sda1'},
{'virtual': 'root',
'device': '/dev/sda1'},
{'virtual': 'swap',
'device': '/dev/sdb1'},
{'virtual': 'swap',
'device': '/dev/sdb2'},
{'virtual': 'ephemeral0',
'device': '/dev/sdc1'},
{'virtual': 'ephemeral1',
'device': '/dev/sdc1'}]
self.assertThat(block_device.mappings_prepend_dev(mappings),
matchers.DictListMatches(expected_result))
class ApiEc2TestCase(test.TestCase):
"""Unit test for the cloud controller on an EC2 API."""
def setUp(self):
super(ApiEc2TestCase, self).setUp()
self.host = '127.0.0.1'
# NOTE(vish): skipping the Authorizer
roles = ['sysadmin', 'netadmin']
ctxt = context.RequestContext('fake', 'fake', roles=roles)
self.app = auth.InjectContext(ctxt, ec2.FaultWrapper(
ec2.RequestLogging(ec2.Requestify(ec2.Authorizer(ec2.Executor()
), 'nova.api.ec2.cloud.CloudController'))))
self.useFixture(fixtures.FakeLogger('boto'))
def expect_http(self, host=None, is_secure=False, api_version=None):
"""Returns a new EC2 connection."""
self.ec2 = boto.connect_ec2(
aws_access_key_id='fake',
aws_secret_access_key='fake',
is_secure=False,
region=regioninfo.RegionInfo(None, 'test', self.host),
port=8773,
path='/services/Cloud')
if api_version:
self.ec2.APIVersion = api_version
self.mox.StubOutWithMock(self.ec2, 'new_http_connection')
self.http = FakeHttplibConnection(
self.app, '%s:8773' % (self.host), False)
# pylint: disable=E1103
if boto.Version >= '2':
self.ec2.new_http_connection(host or '%s:8773' % (self.host),
is_secure).AndReturn(self.http)
else:
self.ec2.new_http_connection(host, is_secure).AndReturn(self.http)
return self.http
def test_return_valid_isoformat(self):
"""
Ensure that the ec2 api returns datetime in xs:dateTime
(which apparently isn't datetime.isoformat())
NOTE(ken-pepple): https://bugs.launchpad.net/nova/+bug/721297
"""
conv = apirequest._database_to_isoformat
# sqlite database representation with microseconds
time_to_convert = timeutils.parse_strtime("2011-02-21 20:14:10.634276",
"%Y-%m-%d %H:%M:%S.%f")
self.assertEqual(conv(time_to_convert), '2011-02-21T20:14:10.634Z')
# mysqlite database representation
time_to_convert = timeutils.parse_strtime("2011-02-21 19:56:18",
"%Y-%m-%d %H:%M:%S")
self.assertEqual(conv(time_to_convert), '2011-02-21T19:56:18.000Z')
def test_xmlns_version_matches_request_version(self):
self.expect_http(api_version='2010-10-30')
self.mox.ReplayAll()
# Any request should be fine
self.ec2.get_all_instances()
self.assertTrue(self.ec2.APIVersion in self.http.getresponsebody(),
'The version in the xmlns of the response does '
'not match the API version given in the request.')
def test_describe_instances(self):
"""Test that, after creating a user and a project, the describe
instances call to the API works properly"""
self.expect_http()
self.mox.ReplayAll()
self.assertEqual(self.ec2.get_all_instances(), [])
def test_terminate_invalid_instance(self):
# Attempt to terminate an invalid instance.
self.expect_http()
self.mox.ReplayAll()
self.assertRaises(boto_exc.EC2ResponseError,
self.ec2.terminate_instances, "i-00000005")
def test_get_all_key_pairs(self):
"""Test that, after creating a user and project and generating
a key pair, that the API call to list key pairs works properly"""
keyname = "".join(random.choice("sdiuisudfsdcnpaqwertasd")
for x in range(random.randint(4, 8)))
self.expect_http()
self.mox.ReplayAll()
self.ec2.create_key_pair(keyname)
rv = self.ec2.get_all_key_pairs()
results = [k for k in rv if k.name == keyname]
self.assertEquals(len(results), 1)
def test_create_duplicate_key_pair(self):
"""Test that, after successfully generating a keypair,
requesting a second keypair with the same name fails sanely"""
self.expect_http()
self.mox.ReplayAll()
self.ec2.create_key_pair('test')
try:
self.ec2.create_key_pair('test')
except boto_exc.EC2ResponseError as e:
if e.code == 'InvalidKeyPair.Duplicate':
pass
else:
self.assertEqual('InvalidKeyPair.Duplicate', e.code)
else:
self.fail('Exception not raised.')
def test_get_all_security_groups(self):
# Test that we can retrieve security groups.
self.expect_http()
self.mox.ReplayAll()
rv = self.ec2.get_all_security_groups()
self.assertEquals(len(rv), 1)
self.assertEquals(rv[0].name, 'default')
def test_create_delete_security_group(self):
# Test that we can create a security group.
self.expect_http()
self.mox.ReplayAll()
security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd")
for x in range(random.randint(4, 8)))
self.ec2.create_security_group(security_group_name, 'test group')
self.expect_http()
self.mox.ReplayAll()
rv = self.ec2.get_all_security_groups()
self.assertEquals(len(rv), 2)
self.assertTrue(security_group_name in [group.name for group in rv])
self.expect_http()
self.mox.ReplayAll()
self.ec2.delete_security_group(security_group_name)
def test_group_name_valid_chars_security_group(self):
"""Test that we sanely handle invalid security group names.
EC2 API Spec states we should only accept alphanumeric characters,
spaces, dashes, and underscores. Amazon implementation
accepts more characters - so, [:print:] is ok. """
bad_strict_ec2 = "aa \t\x01\x02\x7f"
bad_amazon_ec2 = "aa #^% -=99"
test_raise = [
(True, bad_amazon_ec2, "test desc"),
(True, "test name", bad_amazon_ec2),
(False, bad_strict_ec2, "test desc"),
]
for test in test_raise:
self.expect_http()
self.mox.ReplayAll()
self.flags(ec2_strict_validation=test[0])
self.assertRaises(boto_exc.EC2ResponseError,
self.ec2.create_security_group,
test[1],
test[2])
test_accept = [
(False, bad_amazon_ec2, "test desc"),
(False, "test name", bad_amazon_ec2),
]
for test in test_accept:
self.expect_http()
self.mox.ReplayAll()
self.flags(ec2_strict_validation=test[0])
self.ec2.create_security_group(test[1], test[2])
self.expect_http()
self.mox.ReplayAll()
self.ec2.delete_security_group(test[1])
def test_group_name_valid_length_security_group(self):
"""Test that we sanely handle invalid security group names.
API Spec states that the length should not exceed 255 chars """
self.expect_http()
self.mox.ReplayAll()
# Test block group_name > 255 chars
security_group_name = "".join(random.choice("poiuytrewqasdfghjklmnbvc")
for x in range(random.randint(256, 266)))
self.assertRaises(boto_exc.EC2ResponseError,
self.ec2.create_security_group,
security_group_name,
'test group')
def test_authorize_revoke_security_group_cidr(self):
"""
Test that we can add and remove CIDR based rules
to a security group
"""
self.expect_http()
self.mox.ReplayAll()
security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd")
for x in range(random.randint(4, 8)))
group = self.ec2.create_security_group(security_group_name,
'test group')
self.expect_http()
self.mox.ReplayAll()
group.connection = self.ec2
group.authorize('tcp', 80, 81, '0.0.0.0/0')
group.authorize('icmp', -1, -1, '0.0.0.0/0')
group.authorize('udp', 80, 81, '0.0.0.0/0')
group.authorize('tcp', 1, 65535, '0.0.0.0/0')
group.authorize('udp', 1, 65535, '0.0.0.0/0')
group.authorize('icmp', 1, 0, '0.0.0.0/0')
group.authorize('icmp', 0, 1, '0.0.0.0/0')
group.authorize('icmp', 0, 0, '0.0.0.0/0')
def _assert(message, *args):
try:
group.authorize(*args)
except boto_exc.EC2ResponseError as e:
self.assertEqual(e.status, 400, 'Expected status to be 400')
self.assertIn(message, e.error_message)
else:
raise self.failureException, 'EC2ResponseError not raised'
# Invalid CIDR address
_assert('Invalid CIDR', 'tcp', 80, 81, '0.0.0.0/0444')
# Missing ports
_assert('Not enough parameters', 'tcp', '0.0.0.0/0')
# from port cannot be greater than to port
_assert('Invalid port range', 'tcp', 100, 1, '0.0.0.0/0')
# For tcp, negative values are not allowed
_assert('Invalid port range', 'tcp', -1, 1, '0.0.0.0/0')
# For tcp, valid port range 1-65535
_assert('Invalid port range', 'tcp', 1, 65599, '0.0.0.0/0')
# Invalid Cidr for ICMP type
_assert('Invalid CIDR', 'icmp', -1, -1, '0.0.444.0/4')
# Invalid protocol
_assert('Invalid IP protocol', 'xyz', 1, 14, '0.0.0.0/0')
# Invalid port
_assert('An unknown error has occurred', 'tcp', " ", "81", '0.0.0.0/0')
# Invalid icmp port
_assert('An unknown error has occurred', 'icmp', " ", "81",
'0.0.0.0/0')
# Invalid CIDR Address
_assert('Invalid CIDR', 'icmp', -1, -1, '0.0.0.0')
# Invalid CIDR Address
_assert('Invalid CIDR', 'icmp', -1, -1, '0.0.0.0/')
# Invalid Cidr ports
_assert('Invalid port range', 'icmp', 1, 256, '0.0.0.0/0')
self.expect_http()
self.mox.ReplayAll()
rv = self.ec2.get_all_security_groups()
group = [grp for grp in rv if grp.name == security_group_name][0]
self.assertEquals(len(group.rules), 8)
self.assertEquals(int(group.rules[0].from_port), 80)
self.assertEquals(int(group.rules[0].to_port), 81)
self.assertEquals(len(group.rules[0].grants), 1)
self.assertEquals(str(group.rules[0].grants[0]), '0.0.0.0/0')
self.expect_http()
self.mox.ReplayAll()
group.connection = self.ec2
group.revoke('tcp', 80, 81, '0.0.0.0/0')
group.revoke('icmp', -1, -1, '0.0.0.0/0')
group.revoke('udp', 80, 81, '0.0.0.0/0')
group.revoke('tcp', 1, 65535, '0.0.0.0/0')
group.revoke('udp', 1, 65535, '0.0.0.0/0')
group.revoke('icmp', 1, 0, '0.0.0.0/0')
group.revoke('icmp', 0, 1, '0.0.0.0/0')
group.revoke('icmp', 0, 0, '0.0.0.0/0')
self.expect_http()
self.mox.ReplayAll()
self.ec2.delete_security_group(security_group_name)
self.expect_http()
self.mox.ReplayAll()
group.connection = self.ec2
rv = self.ec2.get_all_security_groups()
self.assertEqual(len(rv), 1)
self.assertEqual(rv[0].name, 'default')
def test_authorize_revoke_security_group_cidr_v6(self):
"""
Test that we can add and remove CIDR based rules
to a security group for IPv6
"""
self.expect_http()
self.mox.ReplayAll()
security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd")
for x in range(random.randint(4, 8)))
group = self.ec2.create_security_group(security_group_name,
'test group')
self.expect_http()
self.mox.ReplayAll()
group.connection = self.ec2
group.authorize('tcp', 80, 81, '::/0')
self.expect_http()
self.mox.ReplayAll()
rv = self.ec2.get_all_security_groups()
group = [grp for grp in rv if grp.name == security_group_name][0]
self.assertEquals(len(group.rules), 1)
self.assertEquals(int(group.rules[0].from_port), 80)
self.assertEquals(int(group.rules[0].to_port), 81)
self.assertEquals(len(group.rules[0].grants), 1)
self.assertEquals(str(group.rules[0].grants[0]), '::/0')
self.expect_http()
self.mox.ReplayAll()
group.connection = self.ec2
group.revoke('tcp', 80, 81, '::/0')
self.expect_http()
self.mox.ReplayAll()
self.ec2.delete_security_group(security_group_name)
self.expect_http()
self.mox.ReplayAll()
group.connection = self.ec2
rv = self.ec2.get_all_security_groups()
self.assertEqual(len(rv), 1)
self.assertEqual(rv[0].name, 'default')
def test_authorize_revoke_security_group_foreign_group(self):
"""
Test that we can grant and revoke another security group access
to a security group
"""
self.expect_http()
self.mox.ReplayAll()
rand_string = 'sdiuisudfsdcnpaqwertasd'
security_group_name = "".join(random.choice(rand_string)
for x in range(random.randint(4, 8)))
other_security_group_name = "".join(random.choice(rand_string)
for x in range(random.randint(4, 8)))
group = self.ec2.create_security_group(security_group_name,
'test group')
self.expect_http()
self.mox.ReplayAll()
other_group = self.ec2.create_security_group(other_security_group_name,
'some other group')
self.expect_http()
self.mox.ReplayAll()
group.connection = self.ec2
group.authorize(src_group=other_group)
self.expect_http()
self.mox.ReplayAll()
rv = self.ec2.get_all_security_groups()
# I don't bother checkng that we actually find it here,
# because the create/delete unit test further up should
# be good enough for that.
for group in rv:
if group.name == security_group_name:
self.assertEquals(len(group.rules), 3)
self.assertEquals(len(group.rules[0].grants), 1)
self.assertEquals(str(group.rules[0].grants[0]), '%s-%s' %
(other_security_group_name, 'fake'))
self.expect_http()
self.mox.ReplayAll()
rv = self.ec2.get_all_security_groups()
for group in rv:
if group.name == security_group_name:
self.expect_http()
self.mox.ReplayAll()
group.connection = self.ec2
group.revoke(src_group=other_group)
self.expect_http()
self.mox.ReplayAll()
self.ec2.delete_security_group(security_group_name)
self.ec2.delete_security_group(other_security_group_name)

File diff suppressed because it is too large Load Diff

View File

@@ -1,238 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from nova import test
from nova import utils
from nova.virt import images
from nova.virt.libvirt import utils as libvirt_utils
class ImageUtilsTestCase(test.TestCase):
def test_disk_type(self):
# Seems like lvm detection
# if its in /dev ??
for p in ['/dev/b', '/dev/blah/blah']:
d_type = libvirt_utils.get_disk_type(p)
self.assertEquals('lvm', d_type)
# Try the other types
template_output = """image: %(path)s
file format: %(format)s
virtual size: 64M (67108864 bytes)
cluster_size: 65536
disk size: 96K
"""
path = '/myhome/disk.config'
for f in ['raw', 'qcow2']:
output = template_output % ({
'format': f,
'path': path,
})
self.mox.StubOutWithMock(os.path, 'exists')
self.mox.StubOutWithMock(utils, 'execute')
os.path.exists(path).AndReturn(True)
utils.execute('env', 'LC_ALL=C', 'LANG=C',
'qemu-img', 'info', path).AndReturn((output, ''))
self.mox.ReplayAll()
d_type = libvirt_utils.get_disk_type(path)
self.assertEquals(f, d_type)
self.mox.UnsetStubs()
def test_disk_backing(self):
path = '/myhome/disk.config'
template_output = """image: %(path)s
file format: raw
virtual size: 2K (2048 bytes)
cluster_size: 65536
disk size: 96K
"""
output = template_output % ({
'path': path,
})
self.mox.StubOutWithMock(os.path, 'exists')
self.mox.StubOutWithMock(utils, 'execute')
os.path.exists(path).AndReturn(True)
utils.execute('env', 'LC_ALL=C', 'LANG=C',
'qemu-img', 'info', path).AndReturn((output, ''))
self.mox.ReplayAll()
d_backing = libvirt_utils.get_disk_backing_file(path)
self.assertEquals(None, d_backing)
def test_disk_size(self):
path = '/myhome/disk.config'
template_output = """image: %(path)s
file format: raw
virtual size: %(v_size)s (%(vsize_b)s bytes)
cluster_size: 65536
disk size: 96K
"""
for i in range(0, 128):
bytes = i * 65336
kbytes = bytes / 1024
mbytes = kbytes / 1024
output = template_output % ({
'v_size': "%sM" % (mbytes),
'vsize_b': i,
'path': path,
})
self.mox.StubOutWithMock(os.path, 'exists')
self.mox.StubOutWithMock(utils, 'execute')
os.path.exists(path).AndReturn(True)
utils.execute('env', 'LC_ALL=C', 'LANG=C',
'qemu-img', 'info', path).AndReturn((output, ''))
self.mox.ReplayAll()
d_size = libvirt_utils.get_disk_size(path)
self.assertEquals(i, d_size)
self.mox.UnsetStubs()
output = template_output % ({
'v_size': "%sK" % (kbytes),
'vsize_b': i,
'path': path,
})
self.mox.StubOutWithMock(os.path, 'exists')
self.mox.StubOutWithMock(utils, 'execute')
os.path.exists(path).AndReturn(True)
utils.execute('env', 'LC_ALL=C', 'LANG=C',
'qemu-img', 'info', path).AndReturn((output, ''))
self.mox.ReplayAll()
d_size = libvirt_utils.get_disk_size(path)
self.assertEquals(i, d_size)
self.mox.UnsetStubs()
def test_qemu_info_canon(self):
path = "disk.config"
example_output = """image: disk.config
file format: raw
virtual size: 64M (67108864 bytes)
cluster_size: 65536
disk size: 96K
blah BLAH: bb
"""
self.mox.StubOutWithMock(os.path, 'exists')
self.mox.StubOutWithMock(utils, 'execute')
os.path.exists(path).AndReturn(True)
utils.execute('env', 'LC_ALL=C', 'LANG=C',
'qemu-img', 'info', path).AndReturn((example_output, ''))
self.mox.ReplayAll()
image_info = images.qemu_img_info(path)
self.assertEquals('disk.config', image_info.image)
self.assertEquals('raw', image_info.file_format)
self.assertEquals(67108864, image_info.virtual_size)
self.assertEquals(98304, image_info.disk_size)
self.assertEquals(65536, image_info.cluster_size)
def test_qemu_info_canon2(self):
path = "disk.config"
example_output = """image: disk.config
file format: QCOW2
virtual size: 67108844
cluster_size: 65536
disk size: 963434
backing file: /var/lib/nova/a328c7998805951a_2
"""
self.mox.StubOutWithMock(os.path, 'exists')
self.mox.StubOutWithMock(utils, 'execute')
os.path.exists(path).AndReturn(True)
utils.execute('env', 'LC_ALL=C', 'LANG=C',
'qemu-img', 'info', path).AndReturn((example_output, ''))
self.mox.ReplayAll()
image_info = images.qemu_img_info(path)
self.assertEquals('disk.config', image_info.image)
self.assertEquals('qcow2', image_info.file_format)
self.assertEquals(67108844, image_info.virtual_size)
self.assertEquals(963434, image_info.disk_size)
self.assertEquals(65536, image_info.cluster_size)
self.assertEquals('/var/lib/nova/a328c7998805951a_2',
image_info.backing_file)
def test_qemu_backing_file_actual(self):
path = "disk.config"
example_output = """image: disk.config
file format: raw
virtual size: 64M (67108864 bytes)
cluster_size: 65536
disk size: 96K
Snapshot list:
ID TAG VM SIZE DATE VM CLOCK
1 d9a9784a500742a7bb95627bb3aace38 0 2012-08-20 10:52:46 00:00:00.000
backing file: /var/lib/nova/a328c7998805951a_2 (actual path: /b/3a988059e51a_2)
"""
self.mox.StubOutWithMock(os.path, 'exists')
self.mox.StubOutWithMock(utils, 'execute')
os.path.exists(path).AndReturn(True)
utils.execute('env', 'LC_ALL=C', 'LANG=C',
'qemu-img', 'info', path).AndReturn((example_output, ''))
self.mox.ReplayAll()
image_info = images.qemu_img_info(path)
self.assertEquals('disk.config', image_info.image)
self.assertEquals('raw', image_info.file_format)
self.assertEquals(67108864, image_info.virtual_size)
self.assertEquals(98304, image_info.disk_size)
self.assertEquals(1, len(image_info.snapshots))
self.assertEquals('/b/3a988059e51a_2',
image_info.backing_file)
def test_qemu_info_convert(self):
path = "disk.config"
example_output = """image: disk.config
file format: raw
virtual size: 64M
disk size: 96K
Snapshot list:
ID TAG VM SIZE DATE VM CLOCK
1 d9a9784a500742a7bb95627bb3aace38 0 2012-08-20 10:52:46 00:00:00.000
3 d9a9784a500742a7bb95627bb3aace38 0 2012-08-20 10:52:46 00:00:00.000
4 d9a9784a500742a7bb95627bb3aace38 0 2012-08-20 10:52:46 00:00:00.000
junk stuff: bbb
"""
self.mox.StubOutWithMock(os.path, 'exists')
self.mox.StubOutWithMock(utils, 'execute')
os.path.exists(path).AndReturn(True)
utils.execute('env', 'LC_ALL=C', 'LANG=C',
'qemu-img', 'info', path).AndReturn((example_output, ''))
self.mox.ReplayAll()
image_info = images.qemu_img_info(path)
self.assertEquals('disk.config', image_info.image)
self.assertEquals('raw', image_info.file_format)
self.assertEquals(67108864, image_info.virtual_size)
self.assertEquals(98304, image_info.disk_size)
def test_qemu_info_snaps(self):
path = "disk.config"
example_output = """image: disk.config
file format: raw
virtual size: 64M (67108864 bytes)
disk size: 96K
Snapshot list:
ID TAG VM SIZE DATE VM CLOCK
1 d9a9784a500742a7bb95627bb3aace38 0 2012-08-20 10:52:46 00:00:00.000
3 d9a9784a500742a7bb95627bb3aace38 0 2012-08-20 10:52:46 00:00:00.000
4 d9a9784a500742a7bb95627bb3aace38 0 2012-08-20 10:52:46 00:00:00.000
"""
self.mox.StubOutWithMock(os.path, 'exists')
self.mox.StubOutWithMock(utils, 'execute')
os.path.exists(path).AndReturn(True)
utils.execute('env', 'LC_ALL=C', 'LANG=C',
'qemu-img', 'info', path).AndReturn((example_output, ''))
self.mox.ReplayAll()
image_info = images.qemu_img_info(path)
self.assertEquals('disk.config', image_info.image)
self.assertEquals('raw', image_info.file_format)
self.assertEquals(67108864, image_info.virtual_size)
self.assertEquals(98304, image_info.disk_size)
self.assertEquals(3, len(image_info.snapshots))

View File

@@ -1,516 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2013 Boris Pavlovic (boris@pavlovic.me).
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from migrate.changeset import UniqueConstraint
from sqlalchemy.dialects import mysql
from sqlalchemy import Boolean, Index, Integer, DateTime, String
from sqlalchemy import MetaData, Table, Column
from sqlalchemy.engine import reflection
from sqlalchemy.exc import NoSuchTableError
from sqlalchemy.exc import SAWarning
from sqlalchemy.sql import select
from sqlalchemy.types import UserDefinedType, NullType
from nova.db.sqlalchemy import api as db
from nova.db.sqlalchemy import utils
from nova import exception
from nova.tests import test_migrations
import warnings
class CustomType(UserDefinedType):
"""Dummy column type for testing unsupported types."""
def get_col_spec(self):
return "CustomType"
class TestMigrationUtils(test_migrations.BaseMigrationTestCase):
"""Class for testing utils that are used in db migrations."""
def test_utils_drop_unique_constraint(self):
table_name = "__test_tmp_table__"
uc_name = 'uniq_foo'
values = [
{'id': 1, 'a': 3, 'foo': 10},
{'id': 2, 'a': 2, 'foo': 20},
{'id': 3, 'a': 1, 'foo': 30}
]
for key, engine in self.engines.items():
meta = MetaData()
meta.bind = engine
test_table = Table(table_name, meta,
Column('id', Integer, primary_key=True,
nullable=False),
Column('a', Integer),
Column('foo', Integer),
UniqueConstraint('a', name='uniq_a'),
UniqueConstraint('foo', name=uc_name))
test_table.create()
engine.execute(test_table.insert(), values)
# NOTE(boris-42): This method is generic UC dropper.
utils.drop_unique_constraint(engine, table_name, uc_name, 'foo')
s = test_table.select().order_by(test_table.c.id)
rows = engine.execute(s).fetchall()
for i in xrange(0, len(values)):
v = values[i]
self.assertEqual((v['id'], v['a'], v['foo']), rows[i])
# NOTE(boris-42): Update data about Table from DB.
meta = MetaData()
meta.bind = engine
test_table = Table(table_name, meta, autoload=True)
constraints = filter(lambda c: c.name == uc_name,
test_table.constraints)
self.assertEqual(len(constraints), 0)
self.assertEqual(len(test_table.constraints), 1)
test_table.drop()
def test_util_drop_unique_constraint_with_not_supported_sqlite_type(self):
table_name = "__test_tmp_table__"
uc_name = 'uniq_foo'
values = [
{'id': 1, 'a': 3, 'foo': 10},
{'id': 2, 'a': 2, 'foo': 20},
{'id': 3, 'a': 1, 'foo': 30}
]
engine = self.engines['sqlite']
meta = MetaData(bind=engine)
test_table = Table(table_name, meta,
Column('id', Integer, primary_key=True,
nullable=False),
Column('a', Integer),
Column('foo', CustomType, default=0),
UniqueConstraint('a', name='uniq_a'),
UniqueConstraint('foo', name=uc_name))
test_table.create()
engine.execute(test_table.insert(), values)
warnings.simplefilter("ignore", SAWarning)
# NOTE(boris-42): Missing info about column `foo` that has
# unsupported type CustomType.
self.assertRaises(exception.NovaException,
utils.drop_unique_constraint,
engine, table_name, uc_name, 'foo')
# NOTE(boris-42): Wrong type of foo instance. it should be
# instance of sqlalchemy.Column.
self.assertRaises(exception.NovaException,
utils.drop_unique_constraint,
engine, table_name, uc_name, 'foo', foo=Integer())
foo = Column('foo', CustomType, default=0)
utils.drop_unique_constraint(engine, table_name, uc_name, 'foo',
foo=foo)
s = test_table.select().order_by(test_table.c.id)
rows = engine.execute(s).fetchall()
for i in xrange(0, len(values)):
v = values[i]
self.assertEqual((v['id'], v['a'], v['foo']), rows[i])
# NOTE(boris-42): Update data about Table from DB.
meta = MetaData(bind=engine)
test_table = Table(table_name, meta, autoload=True)
constraints = filter(lambda c: c.name == uc_name,
test_table.constraints)
self.assertEqual(len(constraints), 0)
self.assertEqual(len(test_table.constraints), 1)
test_table.drop()
def _populate_db_for_drop_duplicate_entries(self, engine, meta,
table_name):
values = [
{'id': 11, 'a': 3, 'b': 10, 'c': 'abcdef'},
{'id': 12, 'a': 5, 'b': 10, 'c': 'abcdef'},
{'id': 13, 'a': 6, 'b': 10, 'c': 'abcdef'},
{'id': 14, 'a': 7, 'b': 10, 'c': 'abcdef'},
{'id': 21, 'a': 1, 'b': 20, 'c': 'aa'},
{'id': 31, 'a': 1, 'b': 20, 'c': 'bb'},
{'id': 41, 'a': 1, 'b': 30, 'c': 'aef'},
{'id': 42, 'a': 2, 'b': 30, 'c': 'aef'},
{'id': 43, 'a': 3, 'b': 30, 'c': 'aef'}
]
test_table = Table(table_name, meta,
Column('id', Integer, primary_key=True,
nullable=False),
Column('a', Integer),
Column('b', Integer),
Column('c', String(255)),
Column('deleted', Integer, default=0),
Column('deleted_at', DateTime),
Column('updated_at', DateTime))
test_table.create()
engine.execute(test_table.insert(), values)
return test_table, values
def test_drop_old_duplicate_entries_from_table(self):
table_name = "__test_tmp_table__"
for key, engine in self.engines.items():
meta = MetaData()
meta.bind = engine
test_table, values = self.\
_populate_db_for_drop_duplicate_entries(engine, meta,
table_name)
utils.drop_old_duplicate_entries_from_table(engine, table_name,
False, 'b', 'c')
uniq_values = set()
expected_ids = []
for value in sorted(values, key=lambda x: x['id'], reverse=True):
uniq_value = (('b', value['b']), ('c', value['c']))
if uniq_value in uniq_values:
continue
uniq_values.add(uniq_value)
expected_ids.append(value['id'])
real_ids = [row[0] for row in
engine.execute(select([test_table.c.id])).fetchall()]
self.assertEqual(len(real_ids), len(expected_ids))
for id_ in expected_ids:
self.assertTrue(id_ in real_ids)
def test_drop_old_duplicate_entries_from_table_soft_delete(self):
table_name = "__test_tmp_table__"
for key, engine in self.engines.items():
meta = MetaData()
meta.bind = engine
table, values = self.\
_populate_db_for_drop_duplicate_entries(engine, meta,
table_name)
utils.drop_old_duplicate_entries_from_table(engine, table_name,
True, 'b', 'c')
uniq_values = set()
expected_values = []
soft_deleted_values = []
for value in sorted(values, key=lambda x: x['id'], reverse=True):
uniq_value = (('b', value['b']), ('c', value['c']))
if uniq_value in uniq_values:
soft_deleted_values.append(value)
continue
uniq_values.add(uniq_value)
expected_values.append(value)
base_select = table.select()
rows_select = base_select.\
where(table.c.deleted != table.c.id)
row_ids = [row['id'] for row in
engine.execute(rows_select).fetchall()]
self.assertEqual(len(row_ids), len(expected_values))
for value in expected_values:
self.assertTrue(value['id'] in row_ids)
deleted_rows_select = base_select.\
where(table.c.deleted == table.c.id)
deleted_rows_ids = [row['id'] for row in
engine.execute(deleted_rows_select).fetchall()]
self.assertEqual(len(deleted_rows_ids),
len(values) - len(row_ids))
for value in soft_deleted_values:
self.assertTrue(value['id'] in deleted_rows_ids)
def test_check_shadow_table(self):
table_name = 'abc'
for key, engine in self.engines.items():
meta = MetaData()
meta.bind = engine
table = Table(table_name, meta,
Column('id', Integer, primary_key=True),
Column('a', Integer),
Column('c', String(256)))
table.create()
#check missing shadow table
self.assertRaises(NoSuchTableError,
utils.check_shadow_table, engine, table_name)
shadow_table = Table(db._SHADOW_TABLE_PREFIX + table_name, meta,
Column('id', Integer),
Column('a', Integer))
shadow_table.create()
# check missing column
self.assertRaises(exception.NovaException,
utils.check_shadow_table, engine, table_name)
# check when all is ok
c = Column('c', String(256))
shadow_table.create_column(c)
self.assertTrue(utils.check_shadow_table(engine, table_name))
# check extra column
d = Column('d', Integer)
shadow_table.create_column(d)
self.assertRaises(exception.NovaException,
utils.check_shadow_table, engine, table_name)
def test_check_shadow_table_different_types(self):
table_name = 'abc'
for key, engine in self.engines.items():
meta = MetaData()
meta.bind = engine
table = Table(table_name, meta,
Column('id', Integer, primary_key=True),
Column('a', Integer))
table.create()
shadow_table = Table(db._SHADOW_TABLE_PREFIX + table_name, meta,
Column('id', Integer, primary_key=True),
Column('a', String(256)))
shadow_table.create()
self.assertRaises(exception.NovaException,
utils.check_shadow_table, engine, table_name)
def test_check_shadow_table_with_unsupported_type(self):
table_name = 'abc'
engine = self.engines['sqlite']
meta = MetaData(bind=engine)
table = Table(table_name, meta,
Column('id', Integer, primary_key=True),
Column('a', Integer),
Column('c', CustomType))
table.create()
shadow_table = Table(db._SHADOW_TABLE_PREFIX + table_name, meta,
Column('id', Integer, primary_key=True),
Column('a', Integer),
Column('c', CustomType))
shadow_table.create()
self.assertTrue(utils.check_shadow_table(engine, table_name))
def test_create_shadow_table_by_table_instance(self):
table_name = 'abc'
for key, engine in self.engines.items():
meta = MetaData()
meta.bind = engine
table = Table(table_name, meta,
Column('id', Integer, primary_key=True),
Column('a', Integer),
Column('b', String(256)))
table.create()
utils.create_shadow_table(engine, table=table)
self.assertTrue(utils.check_shadow_table(engine, table_name))
def test_create_shadow_table_by_name(self):
table_name = 'abc'
for key, engine in self.engines.items():
meta = MetaData()
meta.bind = engine
table = Table(table_name, meta,
Column('id', Integer, primary_key=True),
Column('a', Integer),
Column('b', String(256)))
table.create()
utils.create_shadow_table(engine, table_name=table_name)
self.assertTrue(utils.check_shadow_table(engine, table_name))
def test_create_shadow_table_not_supported_type(self):
table_name = 'abc'
engine = self.engines['sqlite']
meta = MetaData()
meta.bind = engine
table = Table(table_name, meta,
Column('id', Integer, primary_key=True),
Column('a', CustomType))
table.create()
self.assertRaises(exception.NovaException,
utils.create_shadow_table,
engine, table_name=table_name)
utils.create_shadow_table(engine, table_name=table_name,
a=Column('a', CustomType()))
self.assertTrue(utils.check_shadow_table(engine, table_name))
def test_create_shadow_both_table_and_table_name_are_none(self):
for key, engine in self.engines.items():
meta = MetaData()
meta.bind = engine
self.assertRaises(exception.NovaException,
utils.create_shadow_table, engine)
def test_create_shadow_both_table_and_table_name_are_specified(self):
table_name = 'abc'
for key, engine in self.engines.items():
meta = MetaData()
meta.bind = engine
table = Table(table_name, meta,
Column('id', Integer, primary_key=True),
Column('a', Integer))
table.create()
self.assertRaises(exception.NovaException,
utils.create_shadow_table,
engine, table=table, table_name=table_name)
def test_create_duplicate_shadow_table(self):
table_name = 'abc'
for key, engine in self.engines.items():
meta = MetaData()
meta.bind = engine
table = Table(table_name, meta,
Column('id', Integer, primary_key=True),
Column('a', Integer))
table.create()
utils.create_shadow_table(engine, table_name=table_name)
self.assertRaises(exception.ShadowTableExists,
utils.create_shadow_table,
engine, table_name=table_name)
def test_change_deleted_column_type_doesnt_drop_index(self):
table_name = 'abc'
for key, engine in self.engines.items():
meta = MetaData(bind=engine)
indexes = {
'idx_a_deleted': ['a', 'deleted'],
'idx_b_deleted': ['b', 'deleted'],
'idx_a': ['a']
}
index_instances = [Index(name, *columns)
for name, columns in indexes.iteritems()]
table = Table(table_name, meta,
Column('id', Integer, primary_key=True),
Column('a', String(255)),
Column('b', String(255)),
Column('deleted', Boolean),
*index_instances)
table.create()
utils.change_deleted_column_type_to_id_type(engine, table_name)
utils.change_deleted_column_type_to_boolean(engine, table_name)
insp = reflection.Inspector.from_engine(engine)
real_indexes = insp.get_indexes(table_name)
self.assertEqual(len(real_indexes), 3)
for index in real_indexes:
name = index['name']
self.assertIn(name, indexes)
self.assertEqual(set(index['column_names']),
set(indexes[name]))
def test_change_deleted_column_type_to_id_type_integer(self):
table_name = 'abc'
for key, engine in self.engines.items():
meta = MetaData()
meta.bind = engine
table = Table(table_name, meta,
Column('id', Integer, primary_key=True),
Column('deleted', Boolean))
table.create()
utils.change_deleted_column_type_to_id_type(engine, table_name)
table = utils.get_table(engine, table_name)
self.assertTrue(isinstance(table.c.deleted.type, Integer))
def test_change_deleted_column_type_to_id_type_string(self):
table_name = 'abc'
for key, engine in self.engines.items():
meta = MetaData()
meta.bind = engine
table = Table(table_name, meta,
Column('id', String(255), primary_key=True),
Column('deleted', Boolean))
table.create()
utils.change_deleted_column_type_to_id_type(engine, table_name)
table = utils.get_table(engine, table_name)
self.assertTrue(isinstance(table.c.deleted.type, String))
def test_change_deleted_column_type_to_id_type_custom(self):
table_name = 'abc'
engine = self.engines['sqlite']
meta = MetaData()
meta.bind = engine
table = Table(table_name, meta,
Column('id', Integer, primary_key=True),
Column('foo', CustomType),
Column('deleted', Boolean))
table.create()
self.assertRaises(exception.NovaException,
utils.change_deleted_column_type_to_id_type,
engine, table_name)
fooColumn = Column('foo', CustomType())
utils.change_deleted_column_type_to_id_type(engine, table_name,
foo=fooColumn)
table = utils.get_table(engine, table_name)
# NOTE(boris-42): There is no way to check has foo type CustomType.
# but sqlalchemy will set it to NullType.
self.assertTrue(isinstance(table.c.foo.type, NullType))
self.assertTrue(isinstance(table.c.deleted.type, Integer))
def test_change_deleted_column_type_to_boolean(self):
table_name = 'abc'
for key, engine in self.engines.items():
meta = MetaData()
meta.bind = engine
table = Table(table_name, meta,
Column('id', Integer, primary_key=True),
Column('deleted', Integer))
table.create()
utils.change_deleted_column_type_to_boolean(engine, table_name)
table = utils.get_table(engine, table_name)
expected_type = Boolean if key != "mysql" else mysql.TINYINT
self.assertTrue(isinstance(table.c.deleted.type, expected_type))
def test_change_deleted_column_type_to_boolean_type_custom(self):
table_name = 'abc'
engine = self.engines['sqlite']
meta = MetaData()
meta.bind = engine
table = Table(table_name, meta,
Column('id', Integer, primary_key=True),
Column('foo', CustomType),
Column('deleted', Integer))
table.create()
self.assertRaises(exception.NovaException,
utils.change_deleted_column_type_to_boolean,
engine, table_name)
fooColumn = Column('foo', CustomType())
utils.change_deleted_column_type_to_boolean(engine, table_name,
foo=fooColumn)
table = utils.get_table(engine, table_name)
# NOTE(boris-42): There is no way to check has foo type CustomType.
# but sqlalchemy will set it to NullType.
self.assertTrue(isinstance(table.c.foo.type, NullType))
self.assertTrue(isinstance(table.c.deleted.type, Boolean))

View File

@@ -1,9 +0,0 @@
[DEFAULT]
# Set up any number of migration data stores you want, one
# The "name" used in the test is the config variable key.
#sqlite=sqlite:///test_migrations.db
sqlite=sqlite://
#mysql=mysql://root:@localhost/test_migrations
#postgresql=postgresql://user:pass@localhost/test_migrations
[walk_style]
snake_walk=yes

File diff suppressed because it is too large Load Diff

View File

@@ -1,87 +0,0 @@
# Copyright 2011 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import pkg_resources
from nova.api.openstack.compute import extensions as computeextensions
from nova.api.openstack import extensions
from nova.openstack.common.plugin import plugin
from nova import test
class StubController(object):
def i_am_the_stub(self):
pass
class StubControllerExtension(extensions.ExtensionDescriptor):
"""This is a docstring. We need it."""
name = 'stubextension'
alias = 'stubby'
def get_resources(self):
resources = []
res = extensions.ResourceExtension('testme',
StubController())
resources.append(res)
return resources
service_list = []
class TestPluginClass(plugin.Plugin):
def __init__(self, service_name):
super(TestPluginClass, self).__init__(service_name)
self._add_api_extension_descriptor(StubControllerExtension)
service_list.append(service_name)
class MockEntrypoint(pkg_resources.EntryPoint):
def load(self):
return TestPluginClass
class APITestCase(test.TestCase):
"""Test case for the plugin api extension interface."""
def test_add_extension(self):
def mock_load(_s):
return TestPluginClass()
def mock_iter_entry_points(_t):
return [MockEntrypoint("fake", "fake", ["fake"])]
self.stubs.Set(pkg_resources, 'iter_entry_points',
mock_iter_entry_points)
global service_list
service_list = []
# Marking out the default extension paths makes this test MUCH faster.
self.flags(osapi_compute_extension=[])
found = False
mgr = computeextensions.ExtensionManager()
for res in mgr.get_resources():
# We have to use this weird 'dir' check because
# the plugin framework muddies up the classname
# such that 'isinstance' doesn't work right.
if 'i_am_the_stub' in dir(res.controller):
found = True
self.assertTrue(found)
self.assertEqual(len(service_list), 1)
self.assertEqual(service_list[0], 'compute-extensions')

View File

@@ -1,49 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright (C) 2012 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova import exception
from nova import test
from nova.compute import vm_mode
class ComputeVMModeTest(test.TestCase):
def test_case(self):
inst = dict(vm_mode="HVM")
mode = vm_mode.get_from_instance(inst)
self.assertEqual(mode, "hvm")
def test_legacy_pv(self):
inst = dict(vm_mode="pv")
mode = vm_mode.get_from_instance(inst)
self.assertEqual(mode, "xen")
def test_legacy_hv(self):
inst = dict(vm_mode="hv")
mode = vm_mode.get_from_instance(inst)
self.assertEqual(mode, "hvm")
def test_bogus(self):
inst = dict(vm_mode="wibble")
self.assertRaises(exception.Invalid,
vm_mode.get_from_instance,
inst)
def test_good(self):
inst = dict(vm_mode="hvm")
mode = vm_mode.get_from_instance(inst)
self.assertEqual(mode, "hvm")