diff --git a/nova/rpc/__init__.py b/nova/rpc/__init__.py index 10b69c8b..2a47ba87 100644 --- a/nova/rpc/__init__.py +++ b/nova/rpc/__init__.py @@ -23,7 +23,7 @@ from nova import flags FLAGS = flags.FLAGS flags.DEFINE_string('rpc_backend', - 'carrot', + 'kombu', "The messaging module to use, defaults to carrot.") impl_table = {'kombu': 'nova.rpc.impl_kombu', @@ -42,7 +42,7 @@ def create_connection(new=True): def create_consumer(conn, topic, proxy, fanout=False): - return RPCIMPL.create_consumer(conn, topic, proxy, fanout) + RPCIMPL.create_consumer(conn, topic, proxy, fanout) def call(context, topic, msg): diff --git a/nova/rpc/common.py b/nova/rpc/common.py index 1d3065a8..b8c28063 100644 --- a/nova/rpc/common.py +++ b/nova/rpc/common.py @@ -1,8 +1,14 @@ from nova import exception +from nova import flags from nova import log as logging LOG = logging.getLogger('nova.rpc') +flags.DEFINE_integer('rpc_thread_pool_size', 1024, + 'Size of RPC thread pool') +flags.DEFINE_integer('rpc_conn_pool_size', 30, + 'Size of RPC connection pool') + class RemoteError(exception.Error): """Signifies that a remote class has raised an exception. diff --git a/nova/rpc/impl_carrot.py b/nova/rpc/impl_carrot.py index efff788a..07af0a11 100644 --- a/nova/rpc/impl_carrot.py +++ b/nova/rpc/impl_carrot.py @@ -49,10 +49,6 @@ from nova.rpc.common import RemoteError, LOG eventlet.monkey_patch() FLAGS = flags.FLAGS -flags.DEFINE_integer('rpc_thread_pool_size', 1024, - 'Size of RPC thread pool') -flags.DEFINE_integer('rpc_conn_pool_size', 30, - 'Size of RPC connection pool') class Connection(carrot_connection.BrokerConnection): diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py index bd83bc52..49bca1d8 100644 --- a/nova/rpc/impl_kombu.py +++ b/nova/rpc/impl_kombu.py @@ -40,11 +40,6 @@ eventlet.monkey_patch() FLAGS = flags.FLAGS -flags.DEFINE_integer('rpc_conn_pool_size', 30, - 'Size of RPC connection pool') -flags.DEFINE_integer('rpc_thread_pool_size', 1024, - 'Size of RPC thread pool') - class ConsumerBase(object): """Consumer base class.""" @@ -328,6 +323,9 @@ class Connection(object): pass time.sleep(1) self.connection = kombu.connection.Connection(**self.params) + if FLAGS.fake_rabbit: + # Kludge to speed up tests. + self.connection.transport.polling_interval = 0.0 self.consumer_num = itertools.count(1) try: @@ -422,13 +420,13 @@ class Connection(object): self.consume() except greenlet.GreenletExit: return - if not self.consumer_thread: + if self.consumer_thread is None: self.consumer_thread = eventlet.spawn(_consumer_thread) return self.consumer_thread def cancel_consumer_thread(self): """Cancel a consumer thread""" - if self.consumer_thread: + if self.consumer_thread is not None: self.consumer_thread.kill() try: self.consumer_thread.wait() diff --git a/nova/tests/test_rpc_amqp.py b/nova/tests/test_rpc_amqp.py deleted file mode 100644 index 2215a908..00000000 --- a/nova/tests/test_rpc_amqp.py +++ /dev/null @@ -1,88 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (c) 2010 Openstack, LLC. -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Tests For RPC AMQP. -""" - -from nova import context -from nova import log as logging -from nova import rpc -from nova.rpc import amqp -from nova import test - - -LOG = logging.getLogger('nova.tests.rpc') - - -class RpcAMQPTestCase(test.TestCase): - def setUp(self): - super(RpcAMQPTestCase, self).setUp() - self.conn = rpc.create_connection(True) - self.receiver = TestReceiver() - self.consumer = rpc.create_consumer(self.conn, - 'test', - self.receiver, - False) - self.consumer.attach_to_eventlet() - self.context = context.get_admin_context() - - def test_connectionpool_single(self): - """Test that ConnectionPool recycles a single connection.""" - conn1 = amqp.ConnectionPool.get() - amqp.ConnectionPool.put(conn1) - conn2 = amqp.ConnectionPool.get() - amqp.ConnectionPool.put(conn2) - self.assertEqual(conn1, conn2) - - -class TestReceiver(object): - """Simple Proxy class so the consumer has methods to call. - - Uses static methods because we aren't actually storing any state. - - """ - - @staticmethod - def echo(context, value): - """Simply returns whatever value is sent in.""" - LOG.debug(_("Received %s"), value) - return value - - @staticmethod - def context(context, value): - """Returns dictionary version of context.""" - LOG.debug(_("Received %s"), context) - return context.to_dict() - - @staticmethod - def echo_three_times(context, value): - context.reply(value) - context.reply(value + 1) - context.reply(value + 2) - - @staticmethod - def echo_three_times_yield(context, value): - yield value - yield value + 1 - yield value + 2 - - @staticmethod - def fail(context, value): - """Raises an exception with the value sent in.""" - raise Exception(value) diff --git a/nova/tests/test_rpc_carrot.py b/nova/tests/test_rpc_carrot.py new file mode 100644 index 00000000..cf84980a --- /dev/null +++ b/nova/tests/test_rpc_carrot.py @@ -0,0 +1,202 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +Unit Tests for remote procedure calls using queue +""" + +from nova import context +from nova import log as logging +from nova.rpc import impl_carrot as rpc +from nova import test + + +LOG = logging.getLogger('nova.tests.rpc') + + +class RpcCarrotTestCase(test.TestCase): + def setUp(self): + super(RpcCarrotTestCase, self).setUp() + self.conn = rpc.create_connection(True) + self.receiver = TestReceiver() + rpc.create_consumer(self.conn, + 'test', + self.receiver, + False) + self.conn.consume_in_thread() + self.context = context.get_admin_context() + + def tearDown(self): + self.conn.close() + super(RpcCarrotTestCase, self).tearDown() + + def test_connectionpool_single(self): + """Test that ConnectionPool recycles a single connection.""" + conn1 = rpc.ConnectionPool.get() + rpc.ConnectionPool.put(conn1) + conn2 = rpc.ConnectionPool.get() + rpc.ConnectionPool.put(conn2) + self.assertEqual(conn1, conn2) + + def test_call_succeed(self): + value = 42 + result = rpc.call(self.context, 'test', {"method": "echo", + "args": {"value": value}}) + self.assertEqual(value, result) + + def test_call_succeed_despite_multiple_returns(self): + value = 42 + result = rpc.call(self.context, 'test', {"method": "echo_three_times", + "args": {"value": value}}) + self.assertEqual(value + 2, result) + + def test_call_succeed_despite_multiple_returns_yield(self): + value = 42 + result = rpc.call(self.context, 'test', + {"method": "echo_three_times_yield", + "args": {"value": value}}) + self.assertEqual(value + 2, result) + + def test_multicall_succeed_once(self): + value = 42 + result = rpc.multicall(self.context, + 'test', + {"method": "echo", + "args": {"value": value}}) + for i, x in enumerate(result): + if i > 0: + self.fail('should only receive one response') + self.assertEqual(value + i, x) + + def test_multicall_succeed_three_times(self): + value = 42 + result = rpc.multicall(self.context, + 'test', + {"method": "echo_three_times", + "args": {"value": value}}) + for i, x in enumerate(result): + self.assertEqual(value + i, x) + + def test_multicall_succeed_three_times_yield(self): + value = 42 + result = rpc.multicall(self.context, + 'test', + {"method": "echo_three_times_yield", + "args": {"value": value}}) + for i, x in enumerate(result): + self.assertEqual(value + i, x) + + def test_context_passed(self): + """Makes sure a context is passed through rpc call.""" + value = 42 + result = rpc.call(self.context, + 'test', {"method": "context", + "args": {"value": value}}) + self.assertEqual(self.context.to_dict(), result) + + def test_call_exception(self): + """Test that exception gets passed back properly. + + rpc.call returns a RemoteError object. The value of the + exception is converted to a string, so we convert it back + to an int in the test. + + """ + value = 42 + self.assertRaises(rpc.RemoteError, + rpc.call, + self.context, + 'test', + {"method": "fail", + "args": {"value": value}}) + try: + rpc.call(self.context, + 'test', + {"method": "fail", + "args": {"value": value}}) + self.fail("should have thrown rpc.RemoteError") + except rpc.RemoteError as exc: + self.assertEqual(int(exc.value), value) + + def test_nested_calls(self): + """Test that we can do an rpc.call inside another call.""" + class Nested(object): + @staticmethod + def echo(context, queue, value): + """Calls echo in the passed queue""" + LOG.debug(_("Nested received %(queue)s, %(value)s") + % locals()) + # TODO: so, it will replay the context and use the same REQID? + # that's bizarre. + ret = rpc.call(context, + queue, + {"method": "echo", + "args": {"value": value}}) + LOG.debug(_("Nested return %s"), ret) + return value + + nested = Nested() + conn = rpc.create_connection(True) + rpc.create_consumer(conn, + 'nested', + nested, + False) + conn.consume_in_thread() + value = 42 + result = rpc.call(self.context, + 'nested', {"method": "echo", + "args": {"queue": "test", + "value": value}}) + conn.close() + self.assertEqual(value, result) + + +class TestReceiver(object): + """Simple Proxy class so the consumer has methods to call. + + Uses static methods because we aren't actually storing any state. + + """ + + @staticmethod + def echo(context, value): + """Simply returns whatever value is sent in.""" + LOG.debug(_("Received %s"), value) + return value + + @staticmethod + def context(context, value): + """Returns dictionary version of context.""" + LOG.debug(_("Received %s"), context) + return context.to_dict() + + @staticmethod + def echo_three_times(context, value): + context.reply(value) + context.reply(value + 1) + context.reply(value + 2) + + @staticmethod + def echo_three_times_yield(context, value): + yield value + yield value + 1 + yield value + 2 + + @staticmethod + def fail(context, value): + """Raises an exception with the value sent in.""" + raise Exception(value) diff --git a/nova/tests/test_rpc_kombu.py b/nova/tests/test_rpc_kombu.py new file mode 100644 index 00000000..457dfdec --- /dev/null +++ b/nova/tests/test_rpc_kombu.py @@ -0,0 +1,266 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +Unit Tests for remote procedure calls using queue +""" + +from nova import context +from nova import log as logging +from nova.rpc import impl_kombu as rpc +from nova import test + + +LOG = logging.getLogger('nova.tests.rpc') + + +class RpcKombuTestCase(test.TestCase): + def setUp(self): + super(RpcKombuTestCase, self).setUp() + self.conn = rpc.create_connection() + self.receiver = TestReceiver() + rpc.create_consumer(self.conn, + 'test', + self.receiver, + False) + self.conn.consume_in_thread() + self.context = context.get_admin_context() + + def tearDown(self): + self.conn.close() + super(RpcKombuTestCase, self).tearDown() + + def test_reusing_connection(self): + """Test that reusing a connection returns same one.""" + conn_context = rpc.create_connection(new=False) + conn1 = conn_context.connection + conn_context.close() + conn_context = rpc.create_connection(new=False) + conn2 = conn_context.connection + conn_context.close() + self.assertEqual(conn1, conn2) + + def test_topic_send_receive(self): + """Test sending to a topic exchange/queue""" + + conn = rpc.create_connection() + message = 'topic test message' + + self.received_message = None + + def _callback(message): + self.received_message = message + + conn.declare_topic_consumer('a_topic', _callback) + conn.topic_send('a_topic', message) + conn.consume(limit=1) + conn.close() + + self.assertEqual(self.received_message, message) + + def test_direct_send_receive(self): + """Test sending to a direct exchange/queue""" + conn = rpc.create_connection() + message = 'direct test message' + + self.received_message = None + + def _callback(message): + self.received_message = message + + conn.declare_direct_consumer('a_direct', _callback) + conn.direct_send('a_direct', message) + conn.consume(limit=1) + conn.close() + + self.assertEqual(self.received_message, message) + + @test.skip_test("kombu memory transport seems buggy with fanout queues " + "as this test passes when you use rabbit (fake_rabbit=False)") + def test_fanout_send_receive(self): + """Test sending to a fanout exchange and consuming from 2 queues""" + + conn = rpc.create_connection() + conn2 = rpc.create_connection() + message = 'fanout test message' + + self.received_message = None + + def _callback(message): + self.received_message = message + + conn.declare_fanout_consumer('a_fanout', _callback) + conn2.declare_fanout_consumer('a_fanout', _callback) + conn.fanout_send('a_fanout', message) + + conn.consume(limit=1) + conn.close() + self.assertEqual(self.received_message, message) + + self.received_message = None + conn2.consume(limit=1) + conn2.close() + self.assertEqual(self.received_message, message) + + def test_call_succeed(self): + value = 42 + result = rpc.call(self.context, 'test', {"method": "echo", + "args": {"value": value}}) + self.assertEqual(value, result) + + def test_call_succeed_despite_multiple_returns(self): + value = 42 + result = rpc.call(self.context, 'test', {"method": "echo_three_times", + "args": {"value": value}}) + self.assertEqual(value + 2, result) + + def test_call_succeed_despite_multiple_returns_yield(self): + value = 42 + result = rpc.call(self.context, 'test', + {"method": "echo_three_times_yield", + "args": {"value": value}}) + self.assertEqual(value + 2, result) + + def test_multicall_succeed_once(self): + value = 42 + result = rpc.multicall(self.context, + 'test', + {"method": "echo", + "args": {"value": value}}) + for i, x in enumerate(result): + if i > 0: + self.fail('should only receive one response') + self.assertEqual(value + i, x) + + def test_multicall_succeed_three_times(self): + value = 42 + result = rpc.multicall(self.context, + 'test', + {"method": "echo_three_times", + "args": {"value": value}}) + for i, x in enumerate(result): + self.assertEqual(value + i, x) + + def test_multicall_succeed_three_times_yield(self): + value = 42 + result = rpc.multicall(self.context, + 'test', + {"method": "echo_three_times_yield", + "args": {"value": value}}) + for i, x in enumerate(result): + self.assertEqual(value + i, x) + + def test_context_passed(self): + """Makes sure a context is passed through rpc call.""" + value = 42 + result = rpc.call(self.context, + 'test', {"method": "context", + "args": {"value": value}}) + self.assertEqual(self.context.to_dict(), result) + + def test_call_exception(self): + """Test that exception gets passed back properly. + + rpc.call returns a RemoteError object. The value of the + exception is converted to a string, so we convert it back + to an int in the test. + + """ + value = 42 + self.assertRaises(rpc.RemoteError, + rpc.call, + self.context, + 'test', + {"method": "fail", + "args": {"value": value}}) + try: + rpc.call(self.context, + 'test', + {"method": "fail", + "args": {"value": value}}) + self.fail("should have thrown rpc.RemoteError") + except rpc.RemoteError as exc: + self.assertEqual(int(exc.value), value) + + def test_nested_calls(self): + """Test that we can do an rpc.call inside another call.""" + class Nested(object): + @staticmethod + def echo(context, queue, value): + """Calls echo in the passed queue""" + LOG.debug(_("Nested received %(queue)s, %(value)s") + % locals()) + # TODO: so, it will replay the context and use the same REQID? + # that's bizarre. + ret = rpc.call(context, + queue, + {"method": "echo", + "args": {"value": value}}) + LOG.debug(_("Nested return %s"), ret) + return value + + nested = Nested() + conn = rpc.create_connection(True) + rpc.create_consumer(conn, + 'nested', + nested, + False) + conn.consume_in_thread() + value = 42 + result = rpc.call(self.context, + 'nested', {"method": "echo", + "args": {"queue": "test", + "value": value}}) + conn.close() + self.assertEqual(value, result) + + +class TestReceiver(object): + """Simple Proxy class so the consumer has methods to call. + + Uses static methods because we aren't actually storing any state. + + """ + + @staticmethod + def echo(context, value): + """Simply returns whatever value is sent in.""" + LOG.debug(_("Received %s"), value) + return value + + @staticmethod + def context(context, value): + """Returns dictionary version of context.""" + LOG.debug(_("Received %s"), context) + return context.to_dict() + + @staticmethod + def echo_three_times(context, value): + context.reply(value) + context.reply(value + 1) + context.reply(value + 2) + + @staticmethod + def echo_three_times_yield(context, value): + yield value + yield value + 1 + yield value + 2 + + @staticmethod + def fail(context, value): + """Raises an exception with the value sent in.""" + raise Exception(value)