merged trunk

This commit is contained in:
Vishvananda Ishaya
2011-07-29 19:36:37 +00:00
11 changed files with 253 additions and 106 deletions

View File

@@ -105,3 +105,4 @@ Yoshiaki Tamura <yoshi@midokura.jp>
Youcef Laribi <Youcef.Laribi@eu.citrix.com> Youcef Laribi <Youcef.Laribi@eu.citrix.com>
Yuriy Taraday <yorik.sar@gmail.com> Yuriy Taraday <yorik.sar@gmail.com>
Zhixue Wu <Zhixue.Wu@citrix.com> Zhixue Wu <Zhixue.Wu@citrix.com>
Zed Shaw <zedshaw@zedshaw.com>

66
nova/rpc/__init__.py Normal file
View File

@@ -0,0 +1,66 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova.utils import import_object
from nova.rpc.common import RemoteError, LOG
from nova import flags
FLAGS = flags.FLAGS
flags.DEFINE_string('rpc_backend',
'nova.rpc.amqp',
"The messaging module to use, defaults to AMQP.")
RPCIMPL = import_object(FLAGS.rpc_backend)
def create_connection(new=True):
return RPCIMPL.Connection.instance(new=True)
def create_consumer(conn, topic, proxy, fanout=False):
if fanout:
return RPCIMPL.FanoutAdapterConsumer(
connection=conn,
topic=topic,
proxy=proxy)
else:
return RPCIMPL.TopicAdapterConsumer(
connection=conn,
topic=topic,
proxy=proxy)
def create_consumer_set(conn, consumers):
return RPCIMPL.ConsumerSet(connection=conn, consumer_list=consumers)
def call(context, topic, msg):
return RPCIMPL.call(context, topic, msg)
def cast(context, topic, msg):
return RPCIMPL.cast(context, topic, msg)
def fanout_cast(context, topic, msg):
return RPCIMPL.fanout_cast(context, topic, msg)
def multicall(context, topic, msg):
return RPCIMPL.multicall(context, topic, msg)

View File

@@ -44,9 +44,7 @@ from nova import fakerabbit
from nova import flags from nova import flags
from nova import log as logging from nova import log as logging
from nova import utils from nova import utils
from nova.rpc.common import RemoteError, LOG
LOG = logging.getLogger('nova.rpc')
FLAGS = flags.FLAGS FLAGS = flags.FLAGS
@@ -418,25 +416,6 @@ def msg_reply(msg_id, reply=None, failure=None):
publisher.close() publisher.close()
class RemoteError(exception.Error):
"""Signifies that a remote class has raised an exception.
Containes a string representation of the type of the original exception,
the value of the original exception, and the traceback. These are
sent to the parent as a joined string so printing the exception
contains all of the relevent info.
"""
def __init__(self, exc_type, value, traceback):
self.exc_type = exc_type
self.value = value
self.traceback = traceback
super(RemoteError, self).__init__('%s %s\n%s' % (exc_type,
value,
traceback))
def _unpack_context(msg): def _unpack_context(msg):
"""Unpack context from msg.""" """Unpack context from msg."""
context_dict = {} context_dict = {}

23
nova/rpc/common.py Normal file
View File

@@ -0,0 +1,23 @@
from nova import exception
from nova import log as logging
LOG = logging.getLogger('nova.rpc')
class RemoteError(exception.Error):
"""Signifies that a remote class has raised an exception.
Containes a string representation of the type of the original exception,
the value of the original exception, and the traceback. These are
sent to the parent as a joined string so printing the exception
contains all of the relevent info.
"""
def __init__(self, exc_type, value, traceback):
self.exc_type = exc_type
self.value = value
self.traceback = traceback
super(RemoteError, self).__init__('%s %s\n%s' % (exc_type,
value,
traceback))

View File

@@ -38,7 +38,7 @@ class AdminApiTestCase(test.TestCase):
super(AdminApiTestCase, self).setUp() super(AdminApiTestCase, self).setUp()
self.flags(connection_type='fake') self.flags(connection_type='fake')
self.conn = rpc.Connection.instance() self.conn = rpc.create_connection()
# set up our cloud # set up our cloud
self.api = admin.AdminController() self.api = admin.AdminController()

View File

@@ -49,7 +49,7 @@ class CloudTestCase(test.TestCase):
self.flags(connection_type='fake', self.flags(connection_type='fake',
stub_network=True) stub_network=True)
self.conn = rpc.Connection.instance() self.conn = rpc.create_connection()
# set up our cloud # set up our cloud
self.cloud = cloud.CloudController() self.cloud = cloud.CloudController()
@@ -322,22 +322,15 @@ class CloudTestCase(test.TestCase):
revoke = self.cloud.revoke_security_group_ingress revoke = self.cloud.revoke_security_group_ingress
self.assertTrue(revoke(self.context, group_name=sec['name'], **kwargs)) self.assertTrue(revoke(self.context, group_name=sec['name'], **kwargs))
def test_revoke_security_group_ingress_by_id(self): def test_authorize_revoke_security_group_ingress_by_id(self):
kwargs = {'project_id': self.context.project_id, 'name': 'test'}
sec = db.security_group_create(self.context, kwargs)
authz = self.cloud.authorize_security_group_ingress
kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
authz(self.context, group_id=sec['id'], **kwargs)
revoke = self.cloud.revoke_security_group_ingress
self.assertTrue(revoke(self.context, group_id=sec['id'], **kwargs))
def test_authorize_security_group_ingress_by_id(self):
sec = db.security_group_create(self.context, sec = db.security_group_create(self.context,
{'project_id': self.context.project_id, {'project_id': self.context.project_id,
'name': 'test'}) 'name': 'test'})
authz = self.cloud.authorize_security_group_ingress authz = self.cloud.authorize_security_group_ingress
kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'} kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
self.assertTrue(authz(self.context, group_id=sec['id'], **kwargs)) authz(self.context, group_id=sec['id'], **kwargs)
revoke = self.cloud.revoke_security_group_ingress
self.assertTrue(revoke(self.context, group_id=sec['id'], **kwargs))
def test_authorize_security_group_ingress_missing_protocol_params(self): def test_authorize_security_group_ingress_missing_protocol_params(self):
sec = db.security_group_create(self.context, sec = db.security_group_create(self.context,
@@ -957,21 +950,6 @@ class CloudTestCase(test.TestCase):
self._wait_for_running(ec2_instance_id) self._wait_for_running(ec2_instance_id)
return ec2_instance_id return ec2_instance_id
def test_rescue_unrescue_instance(self):
instance_id = self._run_instance(
image_id='ami-1',
instance_type=FLAGS.default_instance_type,
max_count=1)
self.cloud.rescue_instance(context=self.context,
instance_id=instance_id)
# NOTE(vish): This currently does no validation, it simply makes sure
# that the code path doesn't throw an exception.
self.cloud.unrescue_instance(context=self.context,
instance_id=instance_id)
# TODO(soren): We need this until we can stop polling in the rpc code
# for unit tests.
self.cloud.terminate_instances(self.context, [instance_id])
def test_console_output(self): def test_console_output(self):
instance_id = self._run_instance( instance_id = self._run_instance(
image_id='ami-1', image_id='ami-1',

View File

@@ -410,7 +410,7 @@ class ComputeTestCase(test.TestCase):
def fake(*args, **kwargs): def fake(*args, **kwargs):
pass pass
self.stubs.Set(self.compute.driver, 'finish_resize', fake) self.stubs.Set(self.compute.driver, 'finish_migration', fake)
self.stubs.Set(self.compute.network_api, 'get_instance_nw_info', fake) self.stubs.Set(self.compute.network_api, 'get_instance_nw_info', fake)
context = self.context.elevated() context = self.context.elevated()
instance_id = self._create_instance() instance_id = self._create_instance()
@@ -521,8 +521,8 @@ class ComputeTestCase(test.TestCase):
def fake(*args, **kwargs): def fake(*args, **kwargs):
pass pass
self.stubs.Set(self.compute.driver, 'finish_resize', fake) self.stubs.Set(self.compute.driver, 'finish_migration', fake)
self.stubs.Set(self.compute.driver, 'revert_resize', fake) self.stubs.Set(self.compute.driver, 'revert_migration', fake)
self.stubs.Set(self.compute.network_api, 'get_instance_nw_info', fake) self.stubs.Set(self.compute.network_api, 'get_instance_nw_info', fake)
self.compute.run_instance(self.context, instance_id) self.compute.run_instance(self.context, instance_id)

View File

@@ -217,7 +217,7 @@ class LibvirtConnTestCase(test.TestCase):
'mac_address': 'fake', 'mac_address': 'fake',
'ip_address': 'fake', 'ip_address': 'fake',
'dhcp_server': 'fake', 'dhcp_server': 'fake',
'extra_params': 'fake' 'extra_params': 'fake',
} }
# Creating mocks # Creating mocks

View File

@@ -33,11 +33,12 @@ LOG = logging.getLogger('nova.tests.rpc')
class RpcTestCase(test.TestCase): class RpcTestCase(test.TestCase):
def setUp(self): def setUp(self):
super(RpcTestCase, self).setUp() super(RpcTestCase, self).setUp()
self.conn = rpc.Connection.instance(True) self.conn = rpc.create_connection(True)
self.receiver = TestReceiver() self.receiver = TestReceiver()
self.consumer = rpc.TopicAdapterConsumer(connection=self.conn, self.consumer = rpc.create_consumer(self.conn,
topic='test', 'test',
proxy=self.receiver) self.receiver,
False)
self.consumer.attach_to_eventlet() self.consumer.attach_to_eventlet()
self.context = context.get_admin_context() self.context = context.get_admin_context()
@@ -129,6 +130,8 @@ class RpcTestCase(test.TestCase):
"""Calls echo in the passed queue""" """Calls echo in the passed queue"""
LOG.debug(_("Nested received %(queue)s, %(value)s") LOG.debug(_("Nested received %(queue)s, %(value)s")
% locals()) % locals())
# TODO: so, it will replay the context and use the same REQID?
# that's bizarre.
ret = rpc.call(context, ret = rpc.call(context,
queue, queue,
{"method": "echo", {"method": "echo",
@@ -137,10 +140,11 @@ class RpcTestCase(test.TestCase):
return value return value
nested = Nested() nested = Nested()
conn = rpc.Connection.instance(True) conn = rpc.create_connection(True)
consumer = rpc.TopicAdapterConsumer(connection=conn, consumer = rpc.create_consumer(conn,
topic='nested', 'nested',
proxy=nested) nested,
False)
consumer.attach_to_eventlet() consumer.attach_to_eventlet()
value = 42 value = 42
result = rpc.call(self.context, result = rpc.call(self.context,
@@ -149,47 +153,6 @@ class RpcTestCase(test.TestCase):
"value": value}}) "value": value}})
self.assertEqual(value, result) self.assertEqual(value, result)
def test_connectionpool_single(self):
"""Test that ConnectionPool recycles a single connection."""
conn1 = rpc.ConnectionPool.get()
rpc.ConnectionPool.put(conn1)
conn2 = rpc.ConnectionPool.get()
rpc.ConnectionPool.put(conn2)
self.assertEqual(conn1, conn2)
def test_connectionpool_double(self):
"""Test that ConnectionPool returns and reuses separate connections.
When called consecutively we should get separate connections and upon
returning them those connections should be reused for future calls
before generating a new connection.
"""
conn1 = rpc.ConnectionPool.get()
conn2 = rpc.ConnectionPool.get()
self.assertNotEqual(conn1, conn2)
rpc.ConnectionPool.put(conn1)
rpc.ConnectionPool.put(conn2)
conn3 = rpc.ConnectionPool.get()
conn4 = rpc.ConnectionPool.get()
self.assertEqual(conn1, conn3)
self.assertEqual(conn2, conn4)
def test_connectionpool_limit(self):
"""Test connection pool limit and connection uniqueness."""
max_size = FLAGS.rpc_conn_pool_size
conns = []
for i in xrange(max_size):
conns.append(rpc.ConnectionPool.get())
self.assertFalse(rpc.ConnectionPool.free_items)
self.assertEqual(rpc.ConnectionPool.current_size,
rpc.ConnectionPool.max_size)
self.assertEqual(len(set(conns)), max_size)
class TestReceiver(object): class TestReceiver(object):
"""Simple Proxy class so the consumer has methods to call. """Simple Proxy class so the consumer has methods to call.

View File

@@ -0,0 +1,68 @@
from nova import context
from nova import flags
from nova import log as logging
from nova import rpc
from nova.rpc import amqp
from nova import test
FLAGS = flags.FLAGS
LOG = logging.getLogger('nova.tests.rpc')
class RpcAMQPTestCase(test.TestCase):
def setUp(self):
super(RpcAMQPTestCase, self).setUp()
self.conn = rpc.create_connection(True)
self.receiver = TestReceiver()
self.consumer = rpc.create_consumer(self.conn,
'test',
self.receiver,
False)
self.consumer.attach_to_eventlet()
self.context = context.get_admin_context()
def test_connectionpool_single(self):
"""Test that ConnectionPool recycles a single connection."""
conn1 = amqp.ConnectionPool.get()
amqp.ConnectionPool.put(conn1)
conn2 = amqp.ConnectionPool.get()
amqp.ConnectionPool.put(conn2)
self.assertEqual(conn1, conn2)
class TestReceiver(object):
"""Simple Proxy class so the consumer has methods to call.
Uses static methods because we aren't actually storing any state.
"""
@staticmethod
def echo(context, value):
"""Simply returns whatever value is sent in."""
LOG.debug(_("Received %s"), value)
return value
@staticmethod
def context(context, value):
"""Returns dictionary version of context."""
LOG.debug(_("Received %s"), context)
return context.to_dict()
@staticmethod
def echo_three_times(context, value):
context.reply(value)
context.reply(value + 1)
context.reply(value + 2)
@staticmethod
def echo_three_times_yield(context, value):
yield value
yield value + 1
yield value + 2
@staticmethod
def fail(context, value):
"""Raises an exception with the value sent in."""
raise Exception(value)

View File

@@ -786,8 +786,15 @@ class XenAPIMigrateInstance(test.TestCase):
conn = xenapi_conn.get_connection(False) conn = xenapi_conn.get_connection(False)
conn.migrate_disk_and_power_off(instance, '127.0.0.1') conn.migrate_disk_and_power_off(instance, '127.0.0.1')
def test_finish_resize(self): def test_finish_migrate(self):
instance = db.instance_create(self.context, self.values) instance = db.instance_create(self.context, self.values)
self.called = False
def fake_vdi_resize(*args, **kwargs):
self.called = True
self.stubs.Set(stubs.FakeSessionForMigrationTests,
"VDI_resize_online", fake_vdi_resize)
stubs.stubout_session(self.stubs, stubs.FakeSessionForMigrationTests) stubs.stubout_session(self.stubs, stubs.FakeSessionForMigrationTests)
stubs.stubout_loopingcall_start(self.stubs) stubs.stubout_loopingcall_start(self.stubs)
conn = xenapi_conn.get_connection(False) conn = xenapi_conn.get_connection(False)
@@ -805,8 +812,70 @@ class XenAPIMigrateInstance(test.TestCase):
'label': 'fake', 'label': 'fake',
'mac': 'DE:AD:BE:EF:00:00', 'mac': 'DE:AD:BE:EF:00:00',
'rxtx_cap': 3})] 'rxtx_cap': 3})]
conn.finish_resize(instance, dict(base_copy='hurr', cow='durr'), conn.finish_migration(instance, dict(base_copy='hurr', cow='durr'),
network_info) network_info, resize_instance=True)
self.assertEqual(self.called, True)
def test_finish_migrate_no_local_storage(self):
tiny_type_id = \
instance_types.get_instance_type_by_name('m1.tiny')['id']
self.values.update({'instance_type_id': tiny_type_id, 'local_gb': 0})
instance = db.instance_create(self.context, self.values)
def fake_vdi_resize(*args, **kwargs):
raise Exception("This shouldn't be called")
self.stubs.Set(stubs.FakeSessionForMigrationTests,
"VDI_resize_online", fake_vdi_resize)
stubs.stubout_session(self.stubs, stubs.FakeSessionForMigrationTests)
stubs.stubout_loopingcall_start(self.stubs)
conn = xenapi_conn.get_connection(False)
network_info = [({'bridge': 'fa0', 'id': 0, 'injected': False},
{'broadcast': '192.168.0.255',
'dns': ['192.168.0.1'],
'gateway': '192.168.0.1',
'gateway6': 'dead:beef::1',
'ip6s': [{'enabled': '1',
'ip': 'dead:beef::dcad:beff:feef:0',
'netmask': '64'}],
'ips': [{'enabled': '1',
'ip': '192.168.0.100',
'netmask': '255.255.255.0'}],
'label': 'fake',
'mac': 'DE:AD:BE:EF:00:00',
'rxtx_cap': 3})]
conn.finish_migration(instance, dict(base_copy='hurr', cow='durr'),
network_info, resize_instance=True)
def test_finish_migrate_no_resize_vdi(self):
instance = db.instance_create(self.context, self.values)
def fake_vdi_resize(*args, **kwargs):
raise Exception("This shouldn't be called")
self.stubs.Set(stubs.FakeSessionForMigrationTests,
"VDI_resize_online", fake_vdi_resize)
stubs.stubout_session(self.stubs, stubs.FakeSessionForMigrationTests)
stubs.stubout_loopingcall_start(self.stubs)
conn = xenapi_conn.get_connection(False)
network_info = [({'bridge': 'fa0', 'id': 0, 'injected': False},
{'broadcast': '192.168.0.255',
'dns': ['192.168.0.1'],
'gateway': '192.168.0.1',
'gateway6': 'dead:beef::1',
'ip6s': [{'enabled': '1',
'ip': 'dead:beef::dcad:beff:feef:0',
'netmask': '64'}],
'ips': [{'enabled': '1',
'ip': '192.168.0.100',
'netmask': '255.255.255.0'}],
'label': 'fake',
'mac': 'DE:AD:BE:EF:00:00',
'rxtx_cap': 3})]
# Resize instance would be determined by the compute call
conn.finish_migration(instance, dict(base_copy='hurr', cow='durr'),
network_info, resize_instance=False)
class XenAPIDetermineDiskImageTestCase(test.TestCase): class XenAPIDetermineDiskImageTestCase(test.TestCase):