add carrot/kombu tests... small thread fix for kombu

This commit is contained in:
Chris Behrens
2011-08-28 19:22:53 -07:00
parent 068138618f
commit d6cce6e0e4
7 changed files with 481 additions and 101 deletions

View File

@@ -23,7 +23,7 @@ from nova import flags
FLAGS = flags.FLAGS
flags.DEFINE_string('rpc_backend',
'carrot',
'kombu',
"The messaging module to use, defaults to carrot.")
impl_table = {'kombu': 'nova.rpc.impl_kombu',
@@ -42,7 +42,7 @@ def create_connection(new=True):
def create_consumer(conn, topic, proxy, fanout=False):
return RPCIMPL.create_consumer(conn, topic, proxy, fanout)
RPCIMPL.create_consumer(conn, topic, proxy, fanout)
def call(context, topic, msg):

View File

@@ -1,8 +1,14 @@
from nova import exception
from nova import flags
from nova import log as logging
LOG = logging.getLogger('nova.rpc')
flags.DEFINE_integer('rpc_thread_pool_size', 1024,
'Size of RPC thread pool')
flags.DEFINE_integer('rpc_conn_pool_size', 30,
'Size of RPC connection pool')
class RemoteError(exception.Error):
"""Signifies that a remote class has raised an exception.

View File

@@ -49,10 +49,6 @@ from nova.rpc.common import RemoteError, LOG
eventlet.monkey_patch()
FLAGS = flags.FLAGS
flags.DEFINE_integer('rpc_thread_pool_size', 1024,
'Size of RPC thread pool')
flags.DEFINE_integer('rpc_conn_pool_size', 30,
'Size of RPC connection pool')
class Connection(carrot_connection.BrokerConnection):

View File

@@ -40,11 +40,6 @@ eventlet.monkey_patch()
FLAGS = flags.FLAGS
flags.DEFINE_integer('rpc_conn_pool_size', 30,
'Size of RPC connection pool')
flags.DEFINE_integer('rpc_thread_pool_size', 1024,
'Size of RPC thread pool')
class ConsumerBase(object):
"""Consumer base class."""
@@ -328,6 +323,9 @@ class Connection(object):
pass
time.sleep(1)
self.connection = kombu.connection.Connection(**self.params)
if FLAGS.fake_rabbit:
# Kludge to speed up tests.
self.connection.transport.polling_interval = 0.0
self.consumer_num = itertools.count(1)
try:
@@ -422,13 +420,13 @@ class Connection(object):
self.consume()
except greenlet.GreenletExit:
return
if not self.consumer_thread:
if self.consumer_thread is None:
self.consumer_thread = eventlet.spawn(_consumer_thread)
return self.consumer_thread
def cancel_consumer_thread(self):
"""Cancel a consumer thread"""
if self.consumer_thread:
if self.consumer_thread is not None:
self.consumer_thread.kill()
try:
self.consumer_thread.wait()

View File

@@ -1,88 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2010 Openstack, LLC.
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For RPC AMQP.
"""
from nova import context
from nova import log as logging
from nova import rpc
from nova.rpc import amqp
from nova import test
LOG = logging.getLogger('nova.tests.rpc')
class RpcAMQPTestCase(test.TestCase):
def setUp(self):
super(RpcAMQPTestCase, self).setUp()
self.conn = rpc.create_connection(True)
self.receiver = TestReceiver()
self.consumer = rpc.create_consumer(self.conn,
'test',
self.receiver,
False)
self.consumer.attach_to_eventlet()
self.context = context.get_admin_context()
def test_connectionpool_single(self):
"""Test that ConnectionPool recycles a single connection."""
conn1 = amqp.ConnectionPool.get()
amqp.ConnectionPool.put(conn1)
conn2 = amqp.ConnectionPool.get()
amqp.ConnectionPool.put(conn2)
self.assertEqual(conn1, conn2)
class TestReceiver(object):
"""Simple Proxy class so the consumer has methods to call.
Uses static methods because we aren't actually storing any state.
"""
@staticmethod
def echo(context, value):
"""Simply returns whatever value is sent in."""
LOG.debug(_("Received %s"), value)
return value
@staticmethod
def context(context, value):
"""Returns dictionary version of context."""
LOG.debug(_("Received %s"), context)
return context.to_dict()
@staticmethod
def echo_three_times(context, value):
context.reply(value)
context.reply(value + 1)
context.reply(value + 2)
@staticmethod
def echo_three_times_yield(context, value):
yield value
yield value + 1
yield value + 2
@staticmethod
def fail(context, value):
"""Raises an exception with the value sent in."""
raise Exception(value)

View File

@@ -0,0 +1,202 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for remote procedure calls using queue
"""
from nova import context
from nova import log as logging
from nova.rpc import impl_carrot as rpc
from nova import test
LOG = logging.getLogger('nova.tests.rpc')
class RpcCarrotTestCase(test.TestCase):
def setUp(self):
super(RpcCarrotTestCase, self).setUp()
self.conn = rpc.create_connection(True)
self.receiver = TestReceiver()
rpc.create_consumer(self.conn,
'test',
self.receiver,
False)
self.conn.consume_in_thread()
self.context = context.get_admin_context()
def tearDown(self):
self.conn.close()
super(RpcCarrotTestCase, self).tearDown()
def test_connectionpool_single(self):
"""Test that ConnectionPool recycles a single connection."""
conn1 = rpc.ConnectionPool.get()
rpc.ConnectionPool.put(conn1)
conn2 = rpc.ConnectionPool.get()
rpc.ConnectionPool.put(conn2)
self.assertEqual(conn1, conn2)
def test_call_succeed(self):
value = 42
result = rpc.call(self.context, 'test', {"method": "echo",
"args": {"value": value}})
self.assertEqual(value, result)
def test_call_succeed_despite_multiple_returns(self):
value = 42
result = rpc.call(self.context, 'test', {"method": "echo_three_times",
"args": {"value": value}})
self.assertEqual(value + 2, result)
def test_call_succeed_despite_multiple_returns_yield(self):
value = 42
result = rpc.call(self.context, 'test',
{"method": "echo_three_times_yield",
"args": {"value": value}})
self.assertEqual(value + 2, result)
def test_multicall_succeed_once(self):
value = 42
result = rpc.multicall(self.context,
'test',
{"method": "echo",
"args": {"value": value}})
for i, x in enumerate(result):
if i > 0:
self.fail('should only receive one response')
self.assertEqual(value + i, x)
def test_multicall_succeed_three_times(self):
value = 42
result = rpc.multicall(self.context,
'test',
{"method": "echo_three_times",
"args": {"value": value}})
for i, x in enumerate(result):
self.assertEqual(value + i, x)
def test_multicall_succeed_three_times_yield(self):
value = 42
result = rpc.multicall(self.context,
'test',
{"method": "echo_three_times_yield",
"args": {"value": value}})
for i, x in enumerate(result):
self.assertEqual(value + i, x)
def test_context_passed(self):
"""Makes sure a context is passed through rpc call."""
value = 42
result = rpc.call(self.context,
'test', {"method": "context",
"args": {"value": value}})
self.assertEqual(self.context.to_dict(), result)
def test_call_exception(self):
"""Test that exception gets passed back properly.
rpc.call returns a RemoteError object. The value of the
exception is converted to a string, so we convert it back
to an int in the test.
"""
value = 42
self.assertRaises(rpc.RemoteError,
rpc.call,
self.context,
'test',
{"method": "fail",
"args": {"value": value}})
try:
rpc.call(self.context,
'test',
{"method": "fail",
"args": {"value": value}})
self.fail("should have thrown rpc.RemoteError")
except rpc.RemoteError as exc:
self.assertEqual(int(exc.value), value)
def test_nested_calls(self):
"""Test that we can do an rpc.call inside another call."""
class Nested(object):
@staticmethod
def echo(context, queue, value):
"""Calls echo in the passed queue"""
LOG.debug(_("Nested received %(queue)s, %(value)s")
% locals())
# TODO: so, it will replay the context and use the same REQID?
# that's bizarre.
ret = rpc.call(context,
queue,
{"method": "echo",
"args": {"value": value}})
LOG.debug(_("Nested return %s"), ret)
return value
nested = Nested()
conn = rpc.create_connection(True)
rpc.create_consumer(conn,
'nested',
nested,
False)
conn.consume_in_thread()
value = 42
result = rpc.call(self.context,
'nested', {"method": "echo",
"args": {"queue": "test",
"value": value}})
conn.close()
self.assertEqual(value, result)
class TestReceiver(object):
"""Simple Proxy class so the consumer has methods to call.
Uses static methods because we aren't actually storing any state.
"""
@staticmethod
def echo(context, value):
"""Simply returns whatever value is sent in."""
LOG.debug(_("Received %s"), value)
return value
@staticmethod
def context(context, value):
"""Returns dictionary version of context."""
LOG.debug(_("Received %s"), context)
return context.to_dict()
@staticmethod
def echo_three_times(context, value):
context.reply(value)
context.reply(value + 1)
context.reply(value + 2)
@staticmethod
def echo_three_times_yield(context, value):
yield value
yield value + 1
yield value + 2
@staticmethod
def fail(context, value):
"""Raises an exception with the value sent in."""
raise Exception(value)

View File

@@ -0,0 +1,266 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit Tests for remote procedure calls using queue
"""
from nova import context
from nova import log as logging
from nova.rpc import impl_kombu as rpc
from nova import test
LOG = logging.getLogger('nova.tests.rpc')
class RpcKombuTestCase(test.TestCase):
def setUp(self):
super(RpcKombuTestCase, self).setUp()
self.conn = rpc.create_connection()
self.receiver = TestReceiver()
rpc.create_consumer(self.conn,
'test',
self.receiver,
False)
self.conn.consume_in_thread()
self.context = context.get_admin_context()
def tearDown(self):
self.conn.close()
super(RpcKombuTestCase, self).tearDown()
def test_reusing_connection(self):
"""Test that reusing a connection returns same one."""
conn_context = rpc.create_connection(new=False)
conn1 = conn_context.connection
conn_context.close()
conn_context = rpc.create_connection(new=False)
conn2 = conn_context.connection
conn_context.close()
self.assertEqual(conn1, conn2)
def test_topic_send_receive(self):
"""Test sending to a topic exchange/queue"""
conn = rpc.create_connection()
message = 'topic test message'
self.received_message = None
def _callback(message):
self.received_message = message
conn.declare_topic_consumer('a_topic', _callback)
conn.topic_send('a_topic', message)
conn.consume(limit=1)
conn.close()
self.assertEqual(self.received_message, message)
def test_direct_send_receive(self):
"""Test sending to a direct exchange/queue"""
conn = rpc.create_connection()
message = 'direct test message'
self.received_message = None
def _callback(message):
self.received_message = message
conn.declare_direct_consumer('a_direct', _callback)
conn.direct_send('a_direct', message)
conn.consume(limit=1)
conn.close()
self.assertEqual(self.received_message, message)
@test.skip_test("kombu memory transport seems buggy with fanout queues "
"as this test passes when you use rabbit (fake_rabbit=False)")
def test_fanout_send_receive(self):
"""Test sending to a fanout exchange and consuming from 2 queues"""
conn = rpc.create_connection()
conn2 = rpc.create_connection()
message = 'fanout test message'
self.received_message = None
def _callback(message):
self.received_message = message
conn.declare_fanout_consumer('a_fanout', _callback)
conn2.declare_fanout_consumer('a_fanout', _callback)
conn.fanout_send('a_fanout', message)
conn.consume(limit=1)
conn.close()
self.assertEqual(self.received_message, message)
self.received_message = None
conn2.consume(limit=1)
conn2.close()
self.assertEqual(self.received_message, message)
def test_call_succeed(self):
value = 42
result = rpc.call(self.context, 'test', {"method": "echo",
"args": {"value": value}})
self.assertEqual(value, result)
def test_call_succeed_despite_multiple_returns(self):
value = 42
result = rpc.call(self.context, 'test', {"method": "echo_three_times",
"args": {"value": value}})
self.assertEqual(value + 2, result)
def test_call_succeed_despite_multiple_returns_yield(self):
value = 42
result = rpc.call(self.context, 'test',
{"method": "echo_three_times_yield",
"args": {"value": value}})
self.assertEqual(value + 2, result)
def test_multicall_succeed_once(self):
value = 42
result = rpc.multicall(self.context,
'test',
{"method": "echo",
"args": {"value": value}})
for i, x in enumerate(result):
if i > 0:
self.fail('should only receive one response')
self.assertEqual(value + i, x)
def test_multicall_succeed_three_times(self):
value = 42
result = rpc.multicall(self.context,
'test',
{"method": "echo_three_times",
"args": {"value": value}})
for i, x in enumerate(result):
self.assertEqual(value + i, x)
def test_multicall_succeed_three_times_yield(self):
value = 42
result = rpc.multicall(self.context,
'test',
{"method": "echo_three_times_yield",
"args": {"value": value}})
for i, x in enumerate(result):
self.assertEqual(value + i, x)
def test_context_passed(self):
"""Makes sure a context is passed through rpc call."""
value = 42
result = rpc.call(self.context,
'test', {"method": "context",
"args": {"value": value}})
self.assertEqual(self.context.to_dict(), result)
def test_call_exception(self):
"""Test that exception gets passed back properly.
rpc.call returns a RemoteError object. The value of the
exception is converted to a string, so we convert it back
to an int in the test.
"""
value = 42
self.assertRaises(rpc.RemoteError,
rpc.call,
self.context,
'test',
{"method": "fail",
"args": {"value": value}})
try:
rpc.call(self.context,
'test',
{"method": "fail",
"args": {"value": value}})
self.fail("should have thrown rpc.RemoteError")
except rpc.RemoteError as exc:
self.assertEqual(int(exc.value), value)
def test_nested_calls(self):
"""Test that we can do an rpc.call inside another call."""
class Nested(object):
@staticmethod
def echo(context, queue, value):
"""Calls echo in the passed queue"""
LOG.debug(_("Nested received %(queue)s, %(value)s")
% locals())
# TODO: so, it will replay the context and use the same REQID?
# that's bizarre.
ret = rpc.call(context,
queue,
{"method": "echo",
"args": {"value": value}})
LOG.debug(_("Nested return %s"), ret)
return value
nested = Nested()
conn = rpc.create_connection(True)
rpc.create_consumer(conn,
'nested',
nested,
False)
conn.consume_in_thread()
value = 42
result = rpc.call(self.context,
'nested', {"method": "echo",
"args": {"queue": "test",
"value": value}})
conn.close()
self.assertEqual(value, result)
class TestReceiver(object):
"""Simple Proxy class so the consumer has methods to call.
Uses static methods because we aren't actually storing any state.
"""
@staticmethod
def echo(context, value):
"""Simply returns whatever value is sent in."""
LOG.debug(_("Received %s"), value)
return value
@staticmethod
def context(context, value):
"""Returns dictionary version of context."""
LOG.debug(_("Received %s"), context)
return context.to_dict()
@staticmethod
def echo_three_times(context, value):
context.reply(value)
context.reply(value + 1)
context.reply(value + 2)
@staticmethod
def echo_three_times_yield(context, value):
yield value
yield value + 1
yield value + 2
@staticmethod
def fail(context, value):
"""Raises an exception with the value sent in."""
raise Exception(value)