This change creates a minimalist API abstraction for the nova/rpc.py code so that it's possible to use other queue mechanisms besides Rabbit and/or AMQP, and even use other drivers for AMQP rather than Rabbit. The change is intended to give the least amount of interference with the rest of the code, fixes several bugs in the tests, and works with the current branch. I also have a small demo driver+server for using 0MQ which I'll submit after this patch is merged.
This commit is contained in:
1
Authors
1
Authors
@@ -105,3 +105,4 @@ Yoshiaki Tamura <yoshi@midokura.jp>
|
||||
Youcef Laribi <Youcef.Laribi@eu.citrix.com>
|
||||
Yuriy Taraday <yorik.sar@gmail.com>
|
||||
Zhixue Wu <Zhixue.Wu@citrix.com>
|
||||
Zed Shaw <zedshaw@zedshaw.com>
|
||||
|
||||
66
nova/rpc/__init__.py
Normal file
66
nova/rpc/__init__.py
Normal file
@@ -0,0 +1,66 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
from nova.utils import import_object
|
||||
from nova.rpc.common import RemoteError, LOG
|
||||
from nova import flags
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
flags.DEFINE_string('rpc_backend',
|
||||
'nova.rpc.amqp',
|
||||
"The messaging module to use, defaults to AMQP.")
|
||||
|
||||
RPCIMPL = import_object(FLAGS.rpc_backend)
|
||||
|
||||
|
||||
def create_connection(new=True):
|
||||
return RPCIMPL.Connection.instance(new=True)
|
||||
|
||||
|
||||
def create_consumer(conn, topic, proxy, fanout=False):
|
||||
if fanout:
|
||||
return RPCIMPL.FanoutAdapterConsumer(
|
||||
connection=conn,
|
||||
topic=topic,
|
||||
proxy=proxy)
|
||||
else:
|
||||
return RPCIMPL.TopicAdapterConsumer(
|
||||
connection=conn,
|
||||
topic=topic,
|
||||
proxy=proxy)
|
||||
|
||||
|
||||
def create_consumer_set(conn, consumers):
|
||||
return RPCIMPL.ConsumerSet(connection=conn, consumer_list=consumers)
|
||||
|
||||
|
||||
def call(context, topic, msg):
|
||||
return RPCIMPL.call(context, topic, msg)
|
||||
|
||||
|
||||
def cast(context, topic, msg):
|
||||
return RPCIMPL.cast(context, topic, msg)
|
||||
|
||||
|
||||
def fanout_cast(context, topic, msg):
|
||||
return RPCIMPL.fanout_cast(context, topic, msg)
|
||||
|
||||
|
||||
def multicall(context, topic, msg):
|
||||
return RPCIMPL.multicall(context, topic, msg)
|
||||
@@ -44,9 +44,7 @@ from nova import fakerabbit
|
||||
from nova import flags
|
||||
from nova import log as logging
|
||||
from nova import utils
|
||||
|
||||
|
||||
LOG = logging.getLogger('nova.rpc')
|
||||
from nova.rpc.common import RemoteError, LOG
|
||||
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
@@ -418,25 +416,6 @@ def msg_reply(msg_id, reply=None, failure=None):
|
||||
publisher.close()
|
||||
|
||||
|
||||
class RemoteError(exception.Error):
|
||||
"""Signifies that a remote class has raised an exception.
|
||||
|
||||
Containes a string representation of the type of the original exception,
|
||||
the value of the original exception, and the traceback. These are
|
||||
sent to the parent as a joined string so printing the exception
|
||||
contains all of the relevent info.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, exc_type, value, traceback):
|
||||
self.exc_type = exc_type
|
||||
self.value = value
|
||||
self.traceback = traceback
|
||||
super(RemoteError, self).__init__('%s %s\n%s' % (exc_type,
|
||||
value,
|
||||
traceback))
|
||||
|
||||
|
||||
def _unpack_context(msg):
|
||||
"""Unpack context from msg."""
|
||||
context_dict = {}
|
||||
23
nova/rpc/common.py
Normal file
23
nova/rpc/common.py
Normal file
@@ -0,0 +1,23 @@
|
||||
from nova import exception
|
||||
from nova import log as logging
|
||||
|
||||
LOG = logging.getLogger('nova.rpc')
|
||||
|
||||
|
||||
class RemoteError(exception.Error):
|
||||
"""Signifies that a remote class has raised an exception.
|
||||
|
||||
Containes a string representation of the type of the original exception,
|
||||
the value of the original exception, and the traceback. These are
|
||||
sent to the parent as a joined string so printing the exception
|
||||
contains all of the relevent info.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, exc_type, value, traceback):
|
||||
self.exc_type = exc_type
|
||||
self.value = value
|
||||
self.traceback = traceback
|
||||
super(RemoteError, self).__init__('%s %s\n%s' % (exc_type,
|
||||
value,
|
||||
traceback))
|
||||
@@ -39,7 +39,7 @@ class AdminApiTestCase(test.TestCase):
|
||||
super(AdminApiTestCase, self).setUp()
|
||||
self.flags(connection_type='fake')
|
||||
|
||||
self.conn = rpc.Connection.instance()
|
||||
self.conn = rpc.create_connection()
|
||||
|
||||
# set up our cloud
|
||||
self.api = admin.AdminController()
|
||||
|
||||
@@ -50,7 +50,7 @@ class CloudTestCase(test.TestCase):
|
||||
self.flags(connection_type='fake',
|
||||
stub_network=True)
|
||||
|
||||
self.conn = rpc.Connection.instance()
|
||||
self.conn = rpc.create_connection()
|
||||
|
||||
# set up our cloud
|
||||
self.cloud = cloud.CloudController()
|
||||
@@ -326,22 +326,15 @@ class CloudTestCase(test.TestCase):
|
||||
revoke = self.cloud.revoke_security_group_ingress
|
||||
self.assertTrue(revoke(self.context, group_name=sec['name'], **kwargs))
|
||||
|
||||
def test_revoke_security_group_ingress_by_id(self):
|
||||
kwargs = {'project_id': self.context.project_id, 'name': 'test'}
|
||||
sec = db.security_group_create(self.context, kwargs)
|
||||
authz = self.cloud.authorize_security_group_ingress
|
||||
kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
|
||||
authz(self.context, group_id=sec['id'], **kwargs)
|
||||
revoke = self.cloud.revoke_security_group_ingress
|
||||
self.assertTrue(revoke(self.context, group_id=sec['id'], **kwargs))
|
||||
|
||||
def test_authorize_security_group_ingress_by_id(self):
|
||||
def test_authorize_revoke_security_group_ingress_by_id(self):
|
||||
sec = db.security_group_create(self.context,
|
||||
{'project_id': self.context.project_id,
|
||||
'name': 'test'})
|
||||
authz = self.cloud.authorize_security_group_ingress
|
||||
kwargs = {'to_port': '999', 'from_port': '999', 'ip_protocol': 'tcp'}
|
||||
self.assertTrue(authz(self.context, group_id=sec['id'], **kwargs))
|
||||
authz(self.context, group_id=sec['id'], **kwargs)
|
||||
revoke = self.cloud.revoke_security_group_ingress
|
||||
self.assertTrue(revoke(self.context, group_id=sec['id'], **kwargs))
|
||||
|
||||
def test_authorize_security_group_ingress_missing_protocol_params(self):
|
||||
sec = db.security_group_create(self.context,
|
||||
@@ -961,21 +954,6 @@ class CloudTestCase(test.TestCase):
|
||||
self._wait_for_running(ec2_instance_id)
|
||||
return ec2_instance_id
|
||||
|
||||
def test_rescue_unrescue_instance(self):
|
||||
instance_id = self._run_instance(
|
||||
image_id='ami-1',
|
||||
instance_type=FLAGS.default_instance_type,
|
||||
max_count=1)
|
||||
self.cloud.rescue_instance(context=self.context,
|
||||
instance_id=instance_id)
|
||||
# NOTE(vish): This currently does no validation, it simply makes sure
|
||||
# that the code path doesn't throw an exception.
|
||||
self.cloud.unrescue_instance(context=self.context,
|
||||
instance_id=instance_id)
|
||||
# TODO(soren): We need this until we can stop polling in the rpc code
|
||||
# for unit tests.
|
||||
self.cloud.terminate_instances(self.context, [instance_id])
|
||||
|
||||
def test_console_output(self):
|
||||
instance_id = self._run_instance(
|
||||
image_id='ami-1',
|
||||
|
||||
@@ -33,11 +33,12 @@ LOG = logging.getLogger('nova.tests.rpc')
|
||||
class RpcTestCase(test.TestCase):
|
||||
def setUp(self):
|
||||
super(RpcTestCase, self).setUp()
|
||||
self.conn = rpc.Connection.instance(True)
|
||||
self.conn = rpc.create_connection(True)
|
||||
self.receiver = TestReceiver()
|
||||
self.consumer = rpc.TopicAdapterConsumer(connection=self.conn,
|
||||
topic='test',
|
||||
proxy=self.receiver)
|
||||
self.consumer = rpc.create_consumer(self.conn,
|
||||
'test',
|
||||
self.receiver,
|
||||
False)
|
||||
self.consumer.attach_to_eventlet()
|
||||
self.context = context.get_admin_context()
|
||||
|
||||
@@ -129,6 +130,8 @@ class RpcTestCase(test.TestCase):
|
||||
"""Calls echo in the passed queue"""
|
||||
LOG.debug(_("Nested received %(queue)s, %(value)s")
|
||||
% locals())
|
||||
# TODO: so, it will replay the context and use the same REQID?
|
||||
# that's bizarre.
|
||||
ret = rpc.call(context,
|
||||
queue,
|
||||
{"method": "echo",
|
||||
@@ -137,10 +140,11 @@ class RpcTestCase(test.TestCase):
|
||||
return value
|
||||
|
||||
nested = Nested()
|
||||
conn = rpc.Connection.instance(True)
|
||||
consumer = rpc.TopicAdapterConsumer(connection=conn,
|
||||
topic='nested',
|
||||
proxy=nested)
|
||||
conn = rpc.create_connection(True)
|
||||
consumer = rpc.create_consumer(conn,
|
||||
'nested',
|
||||
nested,
|
||||
False)
|
||||
consumer.attach_to_eventlet()
|
||||
value = 42
|
||||
result = rpc.call(self.context,
|
||||
@@ -149,47 +153,6 @@ class RpcTestCase(test.TestCase):
|
||||
"value": value}})
|
||||
self.assertEqual(value, result)
|
||||
|
||||
def test_connectionpool_single(self):
|
||||
"""Test that ConnectionPool recycles a single connection."""
|
||||
conn1 = rpc.ConnectionPool.get()
|
||||
rpc.ConnectionPool.put(conn1)
|
||||
conn2 = rpc.ConnectionPool.get()
|
||||
rpc.ConnectionPool.put(conn2)
|
||||
self.assertEqual(conn1, conn2)
|
||||
|
||||
def test_connectionpool_double(self):
|
||||
"""Test that ConnectionPool returns and reuses separate connections.
|
||||
|
||||
When called consecutively we should get separate connections and upon
|
||||
returning them those connections should be reused for future calls
|
||||
before generating a new connection.
|
||||
|
||||
"""
|
||||
conn1 = rpc.ConnectionPool.get()
|
||||
conn2 = rpc.ConnectionPool.get()
|
||||
|
||||
self.assertNotEqual(conn1, conn2)
|
||||
rpc.ConnectionPool.put(conn1)
|
||||
rpc.ConnectionPool.put(conn2)
|
||||
|
||||
conn3 = rpc.ConnectionPool.get()
|
||||
conn4 = rpc.ConnectionPool.get()
|
||||
self.assertEqual(conn1, conn3)
|
||||
self.assertEqual(conn2, conn4)
|
||||
|
||||
def test_connectionpool_limit(self):
|
||||
"""Test connection pool limit and connection uniqueness."""
|
||||
max_size = FLAGS.rpc_conn_pool_size
|
||||
conns = []
|
||||
|
||||
for i in xrange(max_size):
|
||||
conns.append(rpc.ConnectionPool.get())
|
||||
|
||||
self.assertFalse(rpc.ConnectionPool.free_items)
|
||||
self.assertEqual(rpc.ConnectionPool.current_size,
|
||||
rpc.ConnectionPool.max_size)
|
||||
self.assertEqual(len(set(conns)), max_size)
|
||||
|
||||
|
||||
class TestReceiver(object):
|
||||
"""Simple Proxy class so the consumer has methods to call.
|
||||
|
||||
68
nova/tests/test_rpc_amqp.py
Normal file
68
nova/tests/test_rpc_amqp.py
Normal file
@@ -0,0 +1,68 @@
|
||||
from nova import context
|
||||
from nova import flags
|
||||
from nova import log as logging
|
||||
from nova import rpc
|
||||
from nova.rpc import amqp
|
||||
from nova import test
|
||||
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
LOG = logging.getLogger('nova.tests.rpc')
|
||||
|
||||
|
||||
class RpcAMQPTestCase(test.TestCase):
|
||||
def setUp(self):
|
||||
super(RpcAMQPTestCase, self).setUp()
|
||||
self.conn = rpc.create_connection(True)
|
||||
self.receiver = TestReceiver()
|
||||
self.consumer = rpc.create_consumer(self.conn,
|
||||
'test',
|
||||
self.receiver,
|
||||
False)
|
||||
self.consumer.attach_to_eventlet()
|
||||
self.context = context.get_admin_context()
|
||||
|
||||
def test_connectionpool_single(self):
|
||||
"""Test that ConnectionPool recycles a single connection."""
|
||||
conn1 = amqp.ConnectionPool.get()
|
||||
amqp.ConnectionPool.put(conn1)
|
||||
conn2 = amqp.ConnectionPool.get()
|
||||
amqp.ConnectionPool.put(conn2)
|
||||
self.assertEqual(conn1, conn2)
|
||||
|
||||
|
||||
class TestReceiver(object):
|
||||
"""Simple Proxy class so the consumer has methods to call.
|
||||
|
||||
Uses static methods because we aren't actually storing any state.
|
||||
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def echo(context, value):
|
||||
"""Simply returns whatever value is sent in."""
|
||||
LOG.debug(_("Received %s"), value)
|
||||
return value
|
||||
|
||||
@staticmethod
|
||||
def context(context, value):
|
||||
"""Returns dictionary version of context."""
|
||||
LOG.debug(_("Received %s"), context)
|
||||
return context.to_dict()
|
||||
|
||||
@staticmethod
|
||||
def echo_three_times(context, value):
|
||||
context.reply(value)
|
||||
context.reply(value + 1)
|
||||
context.reply(value + 2)
|
||||
|
||||
@staticmethod
|
||||
def echo_three_times_yield(context, value):
|
||||
yield value
|
||||
yield value + 1
|
||||
yield value + 2
|
||||
|
||||
@staticmethod
|
||||
def fail(context, value):
|
||||
"""Raises an exception with the value sent in."""
|
||||
raise Exception(value)
|
||||
Reference in New Issue
Block a user