add queue name argument to TopicConsumer

ceilometer is going to want to subscribe several worker processes
to the notifications.info topic queue. The pool of workers needs
to be assured of receiving all messages, without interference from
other clients listening for notifications. The TopicConsumer
class always assumes the topic and queue name should be the same,
but in the ceilometer case we want to use a separate named queue
with a different name. The only parameter that cannot be
controlled by the user of TopicConsumer is the queue name,
so this change adds the ability to set the queue name.

Change-Id: I41a525de2fd855ca30c24fafcfbfefd6ab615dd7
This commit is contained in:
Doug Hellmann
2012-05-18 15:13:20 -04:00
parent ce369c2825
commit 3ff328d177
6 changed files with 124 additions and 16 deletions

View File

@@ -132,6 +132,9 @@ class ConnectionContext(rpc_common.Connection):
def create_consumer(self, topic, proxy, fanout=False):
self.connection.create_consumer(topic, proxy, fanout)
def create_worker(self, topic, proxy, pool_name):
self.connection.create_worker(topic, proxy, pool_name)
def consume_in_thread(self):
self.connection.consume_in_thread()

View File

@@ -131,6 +131,25 @@ class Connection(object):
"""
raise NotImplementedError()
def create_worker(self, conf, topic, proxy, pool_name):
"""Create a worker on this connection.
A worker is like a regular consumer of messages directed to a
topic, except that it is part of a set of such consumers (the
"pool") which may run in parallel. Every pool of workers will
receive a given message, but only one worker in the pool will
be asked to process it. Load is distributed across the members
of the pool in round-robin fashion.
:param conf: An openstack.common.cfg configuration object.
:param topic: This is a name associated with what to consume from.
Multiple instances of a service may consume from the same
topic.
:param proxy: The object that will handle all incoming messages.
:param pool_name: String containing the name of the pool of workers
"""
raise NotImplementedError()
def consume_in_thread(self):
"""Spawn a thread to handle incoming messages.

View File

@@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import functools
import itertools
import socket
import ssl
@@ -156,15 +157,19 @@ class DirectConsumer(ConsumerBase):
class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'"""
def __init__(self, conf, channel, topic, callback, tag, **kwargs):
def __init__(self, conf, channel, topic, callback, tag, name=None,
**kwargs):
"""Init a 'topic' queue.
'channel' is the amqp channel to use
'topic' is the topic to listen on
'callback' is the callback to call when messages are received
'tag' is a unique ID for the consumer on the channel
:param channel: the amqp channel to use
:param topic: the topic to listen on
:paramtype topic: str
:param callback: the callback to call when messages are received
:param tag: a unique ID for the consumer on the channel
:param name: optional queue name, defaults to topic
:paramtype name: str
Other kombu options may be passed
Other kombu options may be passed as keyword arguments
"""
# Default options
options = {'durable': conf.rabbit_durable_queues,
@@ -180,7 +185,7 @@ class TopicConsumer(ConsumerBase):
channel,
callback,
tag,
name=topic,
name=name or topic,
exchange=exchange,
routing_key=topic,
**options)
@@ -602,9 +607,12 @@ class Connection(object):
"""
self.declare_consumer(DirectConsumer, topic, callback)
def declare_topic_consumer(self, topic, callback=None):
def declare_topic_consumer(self, topic, callback=None, queue_name=None):
"""Create a 'topic' consumer."""
self.declare_consumer(TopicConsumer, topic, callback)
self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name,
),
topic, callback)
def declare_fanout_consumer(self, topic, callback):
"""Create a 'fanout' consumer"""
@@ -656,6 +664,12 @@ class Connection(object):
else:
self.declare_topic_consumer(topic, proxy_cb)
def create_worker(self, topic, proxy, pool_name):
"""Create a worker that calls a method in a proxy object"""
proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
rpc_amqp.get_connection_pool(self, Connection))
self.declare_topic_consumer(topic, proxy_cb, pool_name)
def create_connection(conf, new=True):
"""Create a connection"""

View File

@@ -15,6 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import functools
import itertools
import json
import time
@@ -161,17 +162,19 @@ class DirectConsumer(ConsumerBase):
class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'"""
def __init__(self, conf, session, topic, callback):
def __init__(self, conf, session, topic, callback, name=None):
"""Init a 'topic' queue.
'session' is the amqp session to use
'topic' is the topic to listen on
'callback' is the callback to call when messages are received
:param session: the amqp session to use
:param topic: is the topic to listen on
:paramtype topic: str
:param callback: the callback to call when messages are received
:param name: optional queue name, defaults to topic
"""
super(TopicConsumer, self).__init__(session, callback,
"%s/%s" % (conf.control_exchange, topic), {},
topic, {})
name or topic, {})
class FanoutConsumer(ConsumerBase):
@@ -448,9 +451,12 @@ class Connection(object):
"""
self.declare_consumer(DirectConsumer, topic, callback)
def declare_topic_consumer(self, topic, callback=None):
def declare_topic_consumer(self, topic, callback=None, queue_name=None):
"""Create a 'topic' consumer."""
self.declare_consumer(TopicConsumer, topic, callback)
self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name,
),
topic, callback)
def declare_fanout_consumer(self, topic, callback):
"""Create a 'fanout' consumer"""
@@ -506,6 +512,18 @@ class Connection(object):
return consumer
def create_worker(self, topic, proxy, pool_name):
"""Create a worker that calls a method in a proxy object"""
proxy_cb = rpc_amqp.ProxyCallback(self.conf, proxy,
rpc_amqp.get_connection_pool(self, Connection))
consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
name=pool_name)
self._register_consumer(consumer)
return consumer
def create_connection(conf, new=True):
"""Create a connection"""

View File

@@ -101,6 +101,31 @@ class RpcKombuTestCase(common.BaseRpcAMQPTestCase):
self.assertEqual(self.received_message, message)
@test.skip_if(kombu is None, "Test requires kombu")
def test_topic_multiple_queues(self):
"""Test sending to a topic exchange with multiple queues"""
conn = self.rpc.create_connection(FLAGS)
message = 'topic test message'
self.received_message_1 = None
self.received_message_2 = None
def _callback1(message):
self.received_message_1 = message
def _callback2(message):
self.received_message_2 = message
conn.declare_topic_consumer('a_topic', _callback1, queue_name='queue1')
conn.declare_topic_consumer('a_topic', _callback2, queue_name='queue2')
conn.topic_send('a_topic', message)
conn.consume(limit=2)
conn.close()
self.assertEqual(self.received_message_1, message)
self.assertEqual(self.received_message_2, message)
@test.skip_if(kombu is None, "Test requires kombu")
def test_direct_send_receive(self):
"""Test sending to a direct exchange/queue"""

View File

@@ -147,6 +147,35 @@ class RpcQpidTestCase(test.TestCase):
def test_create_consumer_fanout(self):
self._test_create_consumer(fanout=True)
@test.skip_if(qpid is None, "Test requires qpid")
def test_create_worker(self):
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)
self.mock_receiver = self.mox.CreateMock(self.orig_receiver)
self.mock_connection.opened().AndReturn(False)
self.mock_connection.open()
self.mock_connection.session().AndReturn(self.mock_session)
expected_address = (
'nova/impl_qpid_test ; {"node": {"x-declare": '
'{"auto-delete": true, "durable": true}, "type": "topic"}, '
'"create": "always", "link": {"x-declare": {"auto-delete": '
'true, "exclusive": false, "durable": false}, "durable": '
'true, "name": "impl.qpid.test.workers"}}')
self.mock_session.receiver(expected_address).AndReturn(
self.mock_receiver)
self.mock_receiver.capacity = 1
self.mock_connection.close()
self.mox.ReplayAll()
connection = impl_qpid.create_connection(FLAGS)
connection.create_worker("impl_qpid_test",
lambda *_x, **_y: None,
'impl.qpid.test.workers',
)
connection.close()
def _test_cast(self, fanout, server_params=None):
self.mock_connection = self.mox.CreateMock(self.orig_connection)
self.mock_session = self.mox.CreateMock(self.orig_session)