PEP8 cleanup in nova/tests, except for tests. There should be no functional changes here, just style changes to get violations down.

This commit is contained in:
Eric Day
2010-10-22 00:48:27 -07:00
parent 56a2eb07b2
commit 8a3d262250
16 changed files with 159 additions and 116 deletions

View File

@@ -29,9 +29,12 @@ from nova.auth import manager
FLAGS = flags.FLAGS
class Context(object):
pass
class AccessTestCase(test.TrialTestCase):
def setUp(self):
super(AccessTestCase, self).setUp()
@@ -56,9 +59,11 @@ class AccessTestCase(test.TrialTestCase):
self.project.add_role(self.testnet, 'netadmin')
self.project.add_role(self.testsys, 'sysadmin')
#user is set in each test
def noopWSGIApp(environ, start_response):
start_response('200 OK', [])
return ['']
self.mw = ec2.Authorizer(noopWSGIApp)
self.mw.action_roles = {'str': {
'_allow_all': ['all'],
@@ -80,7 +85,7 @@ class AccessTestCase(test.TrialTestCase):
def response_status(self, user, methodName):
ctxt = context.RequestContext(user, self.project)
environ = {'ec2.context' : ctxt,
environ = {'ec2.context': ctxt,
'ec2.controller': 'some string',
'ec2.action': methodName}
req = webob.Request.blank('/', environ)

View File

@@ -28,16 +28,17 @@ CLC_IP = '127.0.0.1'
CLC_PORT = 8773
REGION = 'test'
def get_connection():
return boto.connect_ec2 (
return boto.connect_ec2(
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
is_secure=False,
region=RegionInfo(None, REGION, CLC_IP),
port=CLC_PORT,
path='/services/Cloud',
debug=99
)
debug=99)
class APIIntegrationTests(unittest.TestCase):
def test_001_get_all_images(self):
@@ -51,4 +52,3 @@ if __name__ == '__main__':
#print conn.get_all_key_pairs()
#print conn.create_key_pair
#print conn.create_security_group('name', 'description')

View File

@@ -99,6 +99,7 @@ class XmlConversionTestCase(test.BaseTestCase):
self.assertEqual(conv('-'), '-')
self.assertEqual(conv('-0'), 0)
class ApiEc2TestCase(test.BaseTestCase):
"""Unit test for the cloud controller on an EC2 API"""
def setUp(self):
@@ -138,7 +139,6 @@ class ApiEc2TestCase(test.BaseTestCase):
self.manager.delete_project(project)
self.manager.delete_user(user)
def test_get_all_key_pairs(self):
"""Test that, after creating a user and project and generating
a key pair, that the API call to list key pairs works properly"""
@@ -183,7 +183,7 @@ class ApiEc2TestCase(test.BaseTestCase):
self.manager.add_role('fake', 'netadmin')
project.add_role('fake', 'netadmin')
security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd") \
security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd")
for x in range(random.randint(4, 8)))
self.ec2.create_security_group(security_group_name, 'test group')
@@ -217,10 +217,11 @@ class ApiEc2TestCase(test.BaseTestCase):
self.manager.add_role('fake', 'netadmin')
project.add_role('fake', 'netadmin')
security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd") \
security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd")
for x in range(random.randint(4, 8)))
group = self.ec2.create_security_group(security_group_name, 'test group')
group = self.ec2.create_security_group(security_group_name,
'test group')
self.expect_http()
self.mox.ReplayAll()
@@ -282,12 +283,14 @@ class ApiEc2TestCase(test.BaseTestCase):
self.manager.add_role('fake', 'netadmin')
project.add_role('fake', 'netadmin')
security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd") \
rand_string = 'sdiuisudfsdcnpaqwertasd'
security_group_name = "".join(random.choice(rand_string)
for x in range(random.randint(4, 8)))
other_security_group_name = "".join(random.choice(rand_string)
for x in range(random.randint(4, 8)))
other_security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd") \
for x in range(random.randint(4, 8)))
group = self.ec2.create_security_group(security_group_name, 'test group')
group = self.ec2.create_security_group(security_group_name,
'test group')
self.expect_http()
self.mox.ReplayAll()
@@ -313,9 +316,8 @@ class ApiEc2TestCase(test.BaseTestCase):
if group.name == security_group_name:
self.assertEquals(len(group.rules), 1)
self.assertEquals(len(group.rules[0].grants), 1)
self.assertEquals(str(group.rules[0].grants[0]),
'%s-%s' % (other_security_group_name, 'fake'))
self.assertEquals(str(group.rules[0].grants[0]), '%s-%s' %
(other_security_group_name, 'fake'))
self.expect_http()
self.mox.ReplayAll()

View File

@@ -28,6 +28,7 @@ from nova.api.ec2 import cloud
FLAGS = flags.FLAGS
class user_generator(object):
def __init__(self, manager, **user_state):
if 'name' not in user_state:
@@ -41,6 +42,7 @@ class user_generator(object):
def __exit__(self, value, type, trace):
self.manager.delete_user(self.user)
class project_generator(object):
def __init__(self, manager, **project_state):
if 'name' not in project_state:
@@ -56,6 +58,7 @@ class project_generator(object):
def __exit__(self, value, type, trace):
self.manager.delete_project(self.project)
class user_and_project_generator(object):
def __init__(self, manager, user_state={}, project_state={}):
self.manager = manager
@@ -75,6 +78,7 @@ class user_and_project_generator(object):
self.manager.delete_user(self.user)
self.manager.delete_project(self.project)
class AuthManagerTestCase(object):
def setUp(self):
FLAGS.auth_driver = self.auth_driver
@@ -96,7 +100,7 @@ class AuthManagerTestCase(object):
self.assertEqual('private-party', u.access)
def test_004_signature_is_valid(self):
#self.assertTrue(self.manager.authenticate( **boto.generate_url ... ? ? ? ))
#self.assertTrue(self.manager.authenticate(**boto.generate_url ...? ))
pass
#raise NotImplementedError
@@ -127,7 +131,7 @@ class AuthManagerTestCase(object):
self.assertFalse(self.manager.has_role('test1', 'itsec'))
def test_can_create_and_get_project(self):
with user_and_project_generator(self.manager) as (u,p):
with user_and_project_generator(self.manager) as (u, p):
self.assert_(self.manager.get_user('test1'))
self.assert_(self.manager.get_user('test1'))
self.assert_(self.manager.get_project('testproj'))
@@ -321,6 +325,7 @@ class AuthManagerTestCase(object):
self.assertEqual('secret', user.secret)
self.assertTrue(user.is_admin())
class AuthManagerLdapTestCase(AuthManagerTestCase, test.TrialTestCase):
auth_driver = 'nova.auth.ldapdriver.FakeLdapDriver'
@@ -337,6 +342,7 @@ class AuthManagerLdapTestCase(AuthManagerTestCase, test.TrialTestCase):
except:
self.skip = True
class AuthManagerDbTestCase(AuthManagerTestCase, test.TrialTestCase):
auth_driver = 'nova.auth.dbdriver.DbDriver'

View File

@@ -46,13 +46,13 @@ from nova.objectstore import image
FLAGS = flags.FLAGS
# Temp dirs for working with image attributes through the cloud controller
# (stole this from objectstore_unittest.py)
OSS_TEMPDIR = tempfile.mkdtemp(prefix='test_oss-')
IMAGES_PATH = os.path.join(OSS_TEMPDIR, 'images')
os.makedirs(IMAGES_PATH)
class CloudTestCase(test.TrialTestCase):
def setUp(self):
super(CloudTestCase, self).setUp()
@@ -97,17 +97,17 @@ class CloudTestCase(test.TrialTestCase):
max_count = 1
kwargs = {'image_id': image_id,
'instance_type': instance_type,
'max_count': max_count }
'max_count': max_count}
rv = yield self.cloud.run_instances(self.context, **kwargs)
instance_id = rv['instancesSet'][0]['instanceId']
output = yield self.cloud.get_console_output(context=self.context, instance_id=[instance_id])
output = yield self.cloud.get_console_output(context=self.context,
instance_id=[instance_id])
self.assertEquals(b64decode(output['output']), 'FAKE CONSOLE OUTPUT')
# TODO(soren): We need this until we can stop polling in the rpc code
# for unit tests.
greenthread.sleep(0.3)
rv = yield self.cloud.terminate_instances(self.context, [instance_id])
def test_key_generation(self):
result = self._create_key('test')
private_key = result['private_key']
@@ -146,8 +146,10 @@ class CloudTestCase(test.TrialTestCase):
'max_count': max_count}
rv = yield self.cloud.run_instances(self.context, **kwargs)
# TODO: check for proper response
instance = rv['reservationSet'][0][rv['reservationSet'][0].keys()[0]][0]
logging.debug("Need to watch instance %s until it's running..." % instance['instance_id'])
instance_id = rv['reservationSet'][0].keys()[0]
instance = rv['reservationSet'][0][instance_id][0]
logging.debug("Need to watch instance %s until it's running..." %
instance['instance_id'])
while True:
rv = yield defer.succeed(time.sleep(1))
info = self.cloud._get_instance(instance['instance_id'])
@@ -157,14 +159,15 @@ class CloudTestCase(test.TrialTestCase):
self.assert_(rv)
if connection_type != 'fake':
time.sleep(45) # Should use boto for polling here
time.sleep(45) # Should use boto for polling here
for reservations in rv['reservationSet']:
# for res_id in reservations.keys():
# logging.debug(reservations[res_id])
# for instance in reservations[res_id]:
for instance in reservations[reservations.keys()[0]]:
logging.debug("Terminating instance %s" % instance['instance_id'])
rv = yield self.compute.terminate_instance(instance['instance_id'])
# logging.debug(reservations[res_id])
# for instance in reservations[res_id]:
for instance in reservations[reservations.keys()[0]]:
instance_id = instance['instance_id']
logging.debug("Terminating instance %s" % instance_id)
rv = yield self.compute.terminate_instance(instance_id)
def test_instance_update_state(self):
def instance(num):
@@ -183,8 +186,7 @@ class CloudTestCase(test.TrialTestCase):
'groups': ['default'],
'product_codes': None,
'state': 0x01,
'user_data': ''
}
'user_data': ''}
rv = self.cloud._format_describe_instances(self.context)
self.assert_(len(rv['reservationSet']) == 0)
@@ -199,7 +201,9 @@ class CloudTestCase(test.TrialTestCase):
#self.assert_(len(rv['reservationSet'][0]['instances_set']) == 5)
# report 4 nodes each having 1 of the instances
#for i in xrange(4):
# self.cloud.update_state('instances', {('node-%s' % i): {('i-%s' % i): instance(i)}})
# self.cloud.update_state('instances',
# {('node-%s' % i): {('i-%s' % i):
# instance(i)}})
# one instance should be pending still
#self.assert_(len(self.cloud.instances['pending'].keys()) == 1)
@@ -217,8 +221,10 @@ class CloudTestCase(test.TrialTestCase):
@staticmethod
def _fake_set_image_description(ctxt, image_id, description):
from nova.objectstore import handler
class req:
pass
request = req()
request.context = ctxt
request.args = {'image_id': [image_id],

View File

@@ -23,7 +23,9 @@ from nova import test
FLAGS = flags.FLAGS
flags.DEFINE_string('flags_unittest', 'foo', 'for testing purposes only')
class FlagsTestCase(test.TrialTestCase):
def setUp(self):
super(FlagsTestCase, self).setUp()
self.FLAGS = flags.FlagValues()
@@ -35,7 +37,8 @@ class FlagsTestCase(test.TrialTestCase):
self.assert_('false' not in self.FLAGS)
self.assert_('true' not in self.FLAGS)
flags.DEFINE_string('string', 'default', 'desc', flag_values=self.FLAGS)
flags.DEFINE_string('string', 'default', 'desc',
flag_values=self.FLAGS)
flags.DEFINE_integer('int', 1, 'desc', flag_values=self.FLAGS)
flags.DEFINE_bool('false', False, 'desc', flag_values=self.FLAGS)
flags.DEFINE_bool('true', True, 'desc', flag_values=self.FLAGS)

View File

@@ -98,7 +98,6 @@ class NetworkTestCase(test.TrialTestCase):
self.context.project_id = self.projects[project_num].id
self.network.deallocate_fixed_ip(self.context, address)
def test_public_network_association(self):
"""Makes sure that we can allocaate a public ip"""
# TODO(vish): better way of adding floating ips
@@ -118,10 +117,12 @@ class NetworkTestCase(test.TrialTestCase):
lease_ip(fix_addr)
self.assertEqual(float_addr, str(pubnet[0]))
self.network.associate_floating_ip(self.context, float_addr, fix_addr)
address = db.instance_get_floating_address(context.get_admin_context(), self.instance_id)
address = db.instance_get_floating_address(context.get_admin_context(),
self.instance_id)
self.assertEqual(address, float_addr)
self.network.disassociate_floating_ip(self.context, float_addr)
address = db.instance_get_floating_address(context.get_admin_context(), self.instance_id)
address = db.instance_get_floating_address(context.get_admin_context(),
self.instance_id)
self.assertEqual(address, None)
self.network.deallocate_floating_ip(self.context, float_addr)
self.network.deallocate_fixed_ip(self.context, fix_addr)
@@ -254,18 +255,24 @@ class NetworkTestCase(test.TrialTestCase):
There are ips reserved at the bottom and top of the range.
services (network, gateway, CloudPipe, broadcast)
"""
network = db.project_get_network(context.get_admin_context(), self.projects[0].id)
network = db.project_get_network(context.get_admin_context(),
self.projects[0].id)
net_size = flags.FLAGS.network_size
total_ips = (db.network_count_available_ips(context.get_admin_context(), network['id']) +
db.network_count_reserved_ips(context.get_admin_context(), network['id']) +
db.network_count_allocated_ips(context.get_admin_context(), network['id']))
admin_context = context.get_admin_context()
total_ips = (db.network_count_available_ips(admin_context,
network['id']) +
db.network_count_reserved_ips(admin_context,
network['id']) +
db.network_count_allocated_ips(admin_context,
network['id']))
self.assertEqual(total_ips, net_size)
def test_too_many_addresses(self):
"""Test for a NoMoreAddresses exception when all fixed ips are used.
"""
network = db.project_get_network(context.get_admin_context(), self.projects[0].id)
num_available_ips = db.network_count_available_ips(context.get_admin_context(),
admin_context = context.get_admin_context()
network = db.project_get_network(admin_context, self.projects[0].id)
num_available_ips = db.network_count_available_ips(admin_context,
network['id'])
addresses = []
instance_ids = []
@@ -276,8 +283,9 @@ class NetworkTestCase(test.TrialTestCase):
addresses.append(address)
lease_ip(address)
self.assertEqual(db.network_count_available_ips(context.get_admin_context(),
network['id']), 0)
ip_count = db.network_count_available_ips(context.get_admin_context(),
network['id'])
self.assertEqual(ip_count, 0)
self.assertRaises(db.NoMoreAddresses,
self.network.allocate_fixed_ip,
self.context,
@@ -287,14 +295,15 @@ class NetworkTestCase(test.TrialTestCase):
self.network.deallocate_fixed_ip(self.context, addresses[i])
release_ip(addresses[i])
db.instance_destroy(context.get_admin_context(), instance_ids[i])
self.assertEqual(db.network_count_available_ips(context.get_admin_context(),
network['id']),
num_available_ips)
ip_count = db.network_count_available_ips(context.get_admin_context(),
network['id'])
self.assertEqual(ip_count, num_available_ips)
def is_allocated_in_project(address, project_id):
"""Returns true if address is in specified project"""
project_net = db.project_get_network(context.get_admin_context(), project_id)
project_net = db.project_get_network(context.get_admin_context(),
project_id)
network = db.fixed_ip_get_network(context.get_admin_context(), address)
instance = db.fixed_ip_get_instance(context.get_admin_context(), address)
# instance exists until release
@@ -308,8 +317,10 @@ def binpath(script):
def lease_ip(private_ip):
"""Run add command on dhcpbridge"""
network_ref = db.fixed_ip_get_network(context.get_admin_context(), private_ip)
instance_ref = db.fixed_ip_get_instance(context.get_admin_context(), private_ip)
network_ref = db.fixed_ip_get_network(context.get_admin_context(),
private_ip)
instance_ref = db.fixed_ip_get_instance(context.get_admin_context(),
private_ip)
cmd = "%s add %s %s fake" % (binpath('nova-dhcpbridge'),
instance_ref['mac_address'],
private_ip)
@@ -322,8 +333,10 @@ def lease_ip(private_ip):
def release_ip(private_ip):
"""Run del command on dhcpbridge"""
network_ref = db.fixed_ip_get_network(context.get_admin_context(), private_ip)
instance_ref = db.fixed_ip_get_instance(context.get_admin_context(), private_ip)
network_ref = db.fixed_ip_get_network(context.get_admin_context(),
private_ip)
instance_ref = db.fixed_ip_get_instance(context.get_admin_context(),
private_ip)
cmd = "%s del %s %s fake" % (binpath('nova-dhcpbridge'),
instance_ref['mac_address'],
private_ip)

View File

@@ -181,7 +181,7 @@ class ObjectStoreTestCase(test.TrialTestCase):
class TestHTTPChannel(http.HTTPChannel):
"""Dummy site required for twisted.web"""
def checkPersistence(self, _, __): # pylint: disable-msg=C0103
def checkPersistence(self, _, __): # pylint: disable-msg=C0103
"""Otherwise we end up with an unclean reactor."""
return False
@@ -217,7 +217,6 @@ class S3APITestCase(test.TrialTestCase):
# pylint: enable-msg=E1101
self.tcp_port = self.listening_port.getHost().port
if not boto.config.has_section('Boto'):
boto.config.add_section('Boto')
boto.config.set('Boto', 'num_retries', '0')
@@ -234,11 +233,11 @@ class S3APITestCase(test.TrialTestCase):
self.conn.get_http_connection = get_http_connection
def _ensure_no_buckets(self, buckets): # pylint: disable-msg=C0111
def _ensure_no_buckets(self, buckets): # pylint: disable-msg=C0111
self.assertEquals(len(buckets), 0, "Bucket list was not empty")
return True
def _ensure_one_bucket(self, buckets, name): # pylint: disable-msg=C0111
def _ensure_one_bucket(self, buckets, name): # pylint: disable-msg=C0111
self.assertEquals(len(buckets), 1,
"Bucket list didn't have exactly one element in it")
self.assertEquals(buckets[0].name, name, "Wrong name")

View File

@@ -38,6 +38,7 @@ class ProcessTestCase(test.TrialTestCase):
def test_execute_stdout(self):
pool = process.ProcessPool(2)
d = pool.simple_execute('echo test')
def _check(rv):
self.assertEqual(rv[0], 'test\n')
self.assertEqual(rv[1], '')
@@ -49,6 +50,7 @@ class ProcessTestCase(test.TrialTestCase):
def test_execute_stderr(self):
pool = process.ProcessPool(2)
d = pool.simple_execute('cat BAD_FILE', check_exit_code=False)
def _check(rv):
self.assertEqual(rv[0], '')
self.assert_('No such file' in rv[1])
@@ -72,6 +74,7 @@ class ProcessTestCase(test.TrialTestCase):
d4 = pool.simple_execute('sleep 0.005')
called = []
def _called(rv, name):
called.append(name)

View File

@@ -141,12 +141,13 @@ class QuotaTestCase(test.TrialTestCase):
try:
db.floating_ip_get_by_address(context.get_admin_context(), address)
except exception.NotFound:
db.floating_ip_create(context.get_admin_context(), {'address': address,
'host': FLAGS.host})
db.floating_ip_create(context.get_admin_context(),
{'address': address, 'host': FLAGS.host})
float_addr = self.network.allocate_floating_ip(self.context,
self.project.id)
# NOTE(vish): This assert never fails. When cloud attempts to
# make an rpc.call, the test just finishes with OK. It
# appears to be something in the magic inline callbacks
# that is breaking.
self.assertRaises(cloud.QuotaError, self.cloud.allocate_address, self.context)
self.assertRaises(cloud.QuotaError, self.cloud.allocate_address,
self.context)

View File

@@ -41,7 +41,7 @@ class RpcTestCase(test.TrialTestCase):
topic='test',
proxy=self.receiver)
self.consumer.attach_to_twisted()
self.context= context.get_admin_context()
self.context = context.get_admin_context()
def test_call_succeed(self):
"""Get a value through rpc call"""
@@ -67,9 +67,9 @@ class RpcTestCase(test.TrialTestCase):
to an int in the test.
"""
value = 42
self.assertFailure(rpc.call_twisted(self.context,
'test', {"method": "fail",
"args": {"value": value}}),
self.assertFailure(rpc.call_twisted(self.context, 'test',
{"method": "fail",
"args": {"value": value}}),
rpc.RemoteError)
try:
yield rpc.call_twisted(self.context,
@@ -101,4 +101,3 @@ class TestReceiver(object):
def fail(context, value):
"""Raises an exception with the value sent in"""
raise Exception(value)

View File

@@ -34,6 +34,7 @@ from nova.scheduler import driver
FLAGS = flags.FLAGS
flags.DECLARE('max_cores', 'nova.scheduler.simple')
class TestDriver(driver.Scheduler):
"""Scheduler Driver for Tests"""
def schedule(context, topic, *args, **kwargs):
@@ -42,6 +43,7 @@ class TestDriver(driver.Scheduler):
def schedule_named_method(context, topic, num):
return 'named_host'
class SchedulerTestCase(test.TrialTestCase):
"""Test case for scheduler"""
def setUp(self):

View File

@@ -179,7 +179,8 @@ class ServiceTestCase(test.BaseTestCase):
binary).AndRaise(exception.NotFound())
service.db.service_create(self.context,
service_create).AndReturn(service_ref)
service.db.service_get(self.context, service_ref['id']).AndReturn(service_ref)
service.db.service_get(self.context,
service_ref['id']).AndReturn(service_ref)
service.db.service_update(self.context, service_ref['id'],
mox.ContainsKeyValue('report_count', 1))
@@ -227,4 +228,3 @@ class ServiceTestCase(test.BaseTestCase):
rv = yield s.report_state(host, binary)
self.assert_(not s.model_disconnected)

View File

@@ -35,7 +35,8 @@ class ValidationTestCase(test.TrialTestCase):
self.assertTrue(type_case("foo", 5, 1))
self.assertRaises(TypeError, type_case, "bar", "5", 1)
self.assertRaises(TypeError, type_case, None, 5, 1)
@validate.typetest(instanceid=str, size=int, number_of_instances=int)
def type_case(instanceid, size, number_of_instances):
return True

View File

@@ -29,11 +29,13 @@ from nova.virt import libvirt_conn
FLAGS = flags.FLAGS
flags.DECLARE('instances_path', 'nova.compute.manager')
class LibvirtConnTestCase(test.TrialTestCase):
def setUp(self):
super(LibvirtConnTestCase, self).setUp()
self.manager = manager.AuthManager()
self.user = self.manager.create_user('fake', 'fake', 'fake', admin=True)
self.user = self.manager.create_user('fake', 'fake', 'fake',
admin=True)
self.project = self.manager.create_project('fake', 'fake', 'fake')
self.network = utils.import_object(FLAGS.network_manager)
FLAGS.instances_path = ''
@@ -41,15 +43,15 @@ class LibvirtConnTestCase(test.TrialTestCase):
def test_get_uri_and_template(self):
ip = '10.11.12.13'
instance = { 'internal_id' : 1,
'memory_kb' : '1024000',
'basepath' : '/some/path',
'bridge_name' : 'br100',
'mac_address' : '02:12:34:46:56:67',
'vcpus' : 2,
'project_id' : 'fake',
'bridge' : 'br101',
'instance_type' : 'm1.small'}
instance = {'internal_id': 1,
'memory_kb': '1024000',
'basepath': '/some/path',
'bridge_name': 'br100',
'mac_address': '02:12:34:46:56:67',
'vcpus': 2,
'project_id': 'fake',
'bridge': 'br101',
'instance_type': 'm1.small'}
user_context = context.RequestContext(project=self.project,
user=self.user)
@@ -58,36 +60,34 @@ class LibvirtConnTestCase(test.TrialTestCase):
self.network.set_network_host(context.get_admin_context(),
network_ref['id'])
fixed_ip = { 'address' : ip,
'network_id' : network_ref['id'] }
fixed_ip = {'address': ip,
'network_id': network_ref['id']}
ctxt = context.get_admin_context()
fixed_ip_ref = db.fixed_ip_create(ctxt, fixed_ip)
db.fixed_ip_update(ctxt, ip, {'allocated': True,
'instance_id': instance_ref['id'] })
'instance_id': instance_ref['id']})
type_uri_map = { 'qemu' : ('qemu:///system',
[(lambda t: t.find('.').get('type'), 'qemu'),
(lambda t: t.find('./os/type').text, 'hvm'),
(lambda t: t.find('./devices/emulator'), None)]),
'kvm' : ('qemu:///system',
[(lambda t: t.find('.').get('type'), 'kvm'),
(lambda t: t.find('./os/type').text, 'hvm'),
(lambda t: t.find('./devices/emulator'), None)]),
'uml' : ('uml:///system',
[(lambda t: t.find('.').get('type'), 'uml'),
(lambda t: t.find('./os/type').text, 'uml')]),
}
type_uri_map = {'qemu': ('qemu:///system',
[(lambda t: t.find('.').get('type'), 'qemu'),
(lambda t: t.find('./os/type').text, 'hvm'),
(lambda t: t.find('./devices/emulator'), None)]),
'kvm': ('qemu:///system',
[(lambda t: t.find('.').get('type'), 'kvm'),
(lambda t: t.find('./os/type').text, 'hvm'),
(lambda t: t.find('./devices/emulator'), None)]),
'uml': ('uml:///system',
[(lambda t: t.find('.').get('type'), 'uml'),
(lambda t: t.find('./os/type').text, 'uml')])}
common_checks = [(lambda t: t.find('.').tag, 'domain'),
(lambda t: \
t.find('./devices/interface/filterref/parameter') \
.get('name'), 'IP'),
(lambda t: \
t.find('./devices/interface/filterref/parameter') \
.get('value'), '10.11.12.13')]
common_checks = [
(lambda t: t.find('.').tag, 'domain'),
(lambda t: t.find('./devices/interface/filterref/parameter').\
get('name'), 'IP'),
(lambda t: t.find('./devices/interface/filterref/parameter').\
get('value'), '10.11.12.13')]
for (libvirt_type,(expected_uri, checks)) in type_uri_map.iteritems():
for (libvirt_type, (expected_uri, checks)) in type_uri_map.iteritems():
FLAGS.libvirt_type = libvirt_type
conn = libvirt_conn.LibvirtConnection(True)
@@ -111,19 +111,20 @@ class LibvirtConnTestCase(test.TrialTestCase):
# implementation doesn't fiddle around with the FLAGS.
testuri = 'something completely different'
FLAGS.libvirt_uri = testuri
for (libvirt_type,(expected_uri, checks)) in type_uri_map.iteritems():
for (libvirt_type, (expected_uri, checks)) in type_uri_map.iteritems():
FLAGS.libvirt_type = libvirt_type
conn = libvirt_conn.LibvirtConnection(True)
uri, template = conn.get_uri_and_template()
self.assertEquals(uri, testuri)
def tearDown(self):
super(LibvirtConnTestCase, self).tearDown()
self.manager.delete_project(self.project)
self.manager.delete_user(self.user)
class NWFilterTestCase(test.TrialTestCase):
def setUp(self):
super(NWFilterTestCase, self).setUp()
@@ -131,7 +132,8 @@ class NWFilterTestCase(test.TrialTestCase):
pass
self.manager = manager.AuthManager()
self.user = self.manager.create_user('fake', 'fake', 'fake', admin=True)
self.user = self.manager.create_user('fake', 'fake', 'fake',
admin=True)
self.project = self.manager.create_project('fake', 'fake', 'fake')
self.context = context.RequestContext(self.user, self.project)
@@ -143,7 +145,6 @@ class NWFilterTestCase(test.TrialTestCase):
self.manager.delete_project(self.project)
self.manager.delete_user(self.user)
def test_cidr_rule_nwfilter_xml(self):
cloud_controller = cloud.CloudController()
cloud_controller.create_security_group(self.context,
@@ -156,7 +157,6 @@ class NWFilterTestCase(test.TrialTestCase):
ip_protocol='tcp',
cidr_ip='0.0.0.0/0')
security_group = db.security_group_get_by_name(self.context,
'fake',
'testgroup')
@@ -182,15 +182,12 @@ class NWFilterTestCase(test.TrialTestCase):
self.assertEqual(ip_conditions[0].getAttribute('srcipmask'), '0.0.0.0')
self.assertEqual(ip_conditions[0].getAttribute('dstportstart'), '80')
self.assertEqual(ip_conditions[0].getAttribute('dstportend'), '81')
self.teardown_security_group()
def teardown_security_group(self):
cloud_controller = cloud.CloudController()
cloud_controller.delete_security_group(self.context, 'testgroup')
def setup_and_return_security_group(self):
cloud_controller = cloud.CloudController()
cloud_controller.create_security_group(self.context,
@@ -244,16 +241,19 @@ class NWFilterTestCase(test.TrialTestCase):
for required in [secgroup_filter, 'allow-dhcp-server',
'no-arp-spoofing', 'no-ip-spoofing',
'no-mac-spoofing']:
self.assertTrue(required in self.recursive_depends[instance_filter],
"Instance's filter does not include %s" % required)
self.assertTrue(required in
self.recursive_depends[instance_filter],
"Instance's filter does not include %s" %
required)
self.security_group = self.setup_and_return_security_group()
db.instance_add_security_group(self.context, inst_id, self.security_group.id)
db.instance_add_security_group(self.context, inst_id,
self.security_group.id)
instance = db.instance_get(self.context, inst_id)
d = self.fw.setup_nwfilters_for_instance(instance)
d.addCallback(_ensure_all_called)
d.addCallback(lambda _:self.teardown_security_group())
d.addCallback(lambda _: self.teardown_security_group())
return d

View File

@@ -59,7 +59,8 @@ class VolumeTestCase(test.TrialTestCase):
"""Test volume can be created and deleted"""
volume_id = self._create_volume()
yield self.volume.create_volume(self.context, volume_id)
self.assertEqual(volume_id, db.volume_get(context.get_admin_context(), volume_id).id)
self.assertEqual(volume_id, db.volume_get(context.get_admin_context(),
volume_id).id)
yield self.volume.delete_volume(self.context, volume_id)
self.assertRaises(exception.NotFound,
@@ -114,7 +115,8 @@ class VolumeTestCase(test.TrialTestCase):
volume_id = self._create_volume()
yield self.volume.create_volume(self.context, volume_id)
if FLAGS.fake_tests:
db.volume_attached(self.context, volume_id, instance_id, mountpoint)
db.volume_attached(self.context, volume_id, instance_id,
mountpoint)
else:
yield self.compute.attach_volume(self.context,
instance_id,
@@ -154,7 +156,8 @@ class VolumeTestCase(test.TrialTestCase):
def _check(volume_id):
"""Make sure blades aren't duplicated"""
volume_ids.append(volume_id)
(shelf_id, blade_id) = db.volume_get_shelf_and_blade(context.get_admin_context(),
admin_context = context.get_admin_context()
(shelf_id, blade_id) = db.volume_get_shelf_and_blade(admin_context,
volume_id)
shelf_blade = '%s.%s' % (shelf_id, blade_id)
self.assert_(shelf_blade not in shelf_blades)