start to rework some consumer stuff
This commit is contained in:
@@ -35,11 +35,11 @@ flags.DEFINE_integer('rpc_thread_pool_size', 1024,
|
||||
'Size of RPC thread pool')
|
||||
|
||||
|
||||
class QueueBase(object):
|
||||
"""Queue base class."""
|
||||
class ConsumerBase(object):
|
||||
"""Consumer base class."""
|
||||
|
||||
def __init__(self, channel, callback, tag, **kwargs):
|
||||
"""Init the queue.
|
||||
"""Declare a queue on an amqp channel.
|
||||
|
||||
'channel' is the amqp channel to use
|
||||
'callback' is the callback to call when messages are received
|
||||
@@ -55,20 +55,21 @@ class QueueBase(object):
|
||||
self.reconnect(channel)
|
||||
|
||||
def reconnect(self, channel):
|
||||
"""Re-create the queue after a rabbit reconnect"""
|
||||
"""Re-declare the queue after a rabbit reconnect"""
|
||||
self.channel = channel
|
||||
self.kwargs['channel'] = channel
|
||||
self.queue = kombu.entity.Queue(**self.kwargs)
|
||||
self.queue.declare()
|
||||
|
||||
def consume(self, *args, **kwargs):
|
||||
"""Consume from this queue.
|
||||
"""Actually declare the consumer on the amqp channel. This will
|
||||
start the flow of messages from the queue. Using the
|
||||
Connection.iterconsume() iterator will process the messages,
|
||||
calling the appropriate callback.
|
||||
|
||||
If a callback is specified in kwargs, use that. Otherwise,
|
||||
use the callback passed during __init__()
|
||||
|
||||
The callback will be called if a message was read off of the
|
||||
queue.
|
||||
|
||||
If kwargs['nowait'] is True, then this call will block until
|
||||
a message is read.
|
||||
|
||||
@@ -100,7 +101,7 @@ class QueueBase(object):
|
||||
self.queue = None
|
||||
|
||||
|
||||
class DirectQueue(QueueBase):
|
||||
class DirectConsumer(ConsumerBase):
|
||||
"""Queue/consumer class for 'direct'"""
|
||||
|
||||
def __init__(self, channel, msg_id, callback, tag, **kwargs):
|
||||
@@ -123,7 +124,7 @@ class DirectQueue(QueueBase):
|
||||
type='direct',
|
||||
durable=options['durable'],
|
||||
auto_delete=options['auto_delete'])
|
||||
super(DirectQueue, self).__init__(
|
||||
super(DirectConsumer, self).__init__(
|
||||
channel,
|
||||
callback,
|
||||
tag,
|
||||
@@ -133,8 +134,8 @@ class DirectQueue(QueueBase):
|
||||
**options)
|
||||
|
||||
|
||||
class TopicQueue(QueueBase):
|
||||
"""Queue/consumer class for 'topic'"""
|
||||
class TopicConsumer(ConsumerBase):
|
||||
"""Consumer class for 'topic'"""
|
||||
|
||||
def __init__(self, channel, topic, callback, tag, **kwargs):
|
||||
"""Init a 'topic' queue.
|
||||
@@ -156,7 +157,7 @@ class TopicQueue(QueueBase):
|
||||
type='topic',
|
||||
durable=options['durable'],
|
||||
auto_delete=options['auto_delete'])
|
||||
super(TopicQueue, self).__init__(
|
||||
super(TopicConsumer, self).__init__(
|
||||
channel,
|
||||
callback,
|
||||
tag,
|
||||
@@ -166,8 +167,8 @@ class TopicQueue(QueueBase):
|
||||
**options)
|
||||
|
||||
|
||||
class FanoutQueue(QueueBase):
|
||||
"""Queue/consumer class for 'fanout'"""
|
||||
class FanoutConsumer(ConsumerBase):
|
||||
"""Consumer class for 'fanout'"""
|
||||
|
||||
def __init__(self, channel, topic, callback, tag, **kwargs):
|
||||
"""Init a 'fanout' queue.
|
||||
@@ -193,7 +194,7 @@ class FanoutQueue(QueueBase):
|
||||
type='fanout',
|
||||
durable=options['durable'],
|
||||
auto_delete=options['auto_delete'])
|
||||
super(FanoutQueue, self).__init__(
|
||||
super(FanoutConsumer, self).__init__(
|
||||
channel,
|
||||
callback,
|
||||
tag,
|
||||
@@ -286,7 +287,8 @@ class Connection(object):
|
||||
"""Connection instance object."""
|
||||
|
||||
def __init__(self):
|
||||
self.queues = []
|
||||
self.consumers = []
|
||||
self.consumer_thread = None
|
||||
self.max_retries = FLAGS.rabbit_max_retries
|
||||
# Try forever?
|
||||
if self.max_retries <= 0:
|
||||
@@ -334,9 +336,9 @@ class Connection(object):
|
||||
LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d' %
|
||||
self.params))
|
||||
self.channel = self.connection.channel()
|
||||
for consumer in self.queues:
|
||||
for consumer in self.consumers:
|
||||
consumer.reconnect(self.channel)
|
||||
if self.queues:
|
||||
if self.consumers:
|
||||
LOG.debug(_("Re-established AMQP queues"))
|
||||
|
||||
def get_channel(self):
|
||||
@@ -354,30 +356,32 @@ class Connection(object):
|
||||
|
||||
def close(self):
|
||||
"""Close/release this connection"""
|
||||
self.cancel_consumer_thread()
|
||||
self.connection.release()
|
||||
self.connection = None
|
||||
|
||||
def reset(self):
|
||||
"""Reset a connection so it can be used again"""
|
||||
self.cancel_consumer_thread()
|
||||
self.channel.close()
|
||||
self.channel = self.connection.channel()
|
||||
self.queues = []
|
||||
self.consumers = []
|
||||
|
||||
def create_queue(self, queue_cls, topic, callback):
|
||||
"""Create a queue using the class that was passed in and
|
||||
add it to our list of queues used for consuming
|
||||
def declare_consumer(self, consumer_cls, topic, callback):
|
||||
"""Create a Consumer using the class that was passed in and
|
||||
add it to our list of consumers
|
||||
"""
|
||||
queue = queue_cls(self.channel, topic, callback,
|
||||
self.queue_num.next())
|
||||
self.queues.append(queue)
|
||||
return queue
|
||||
consumer = consumer_cls(self.channel, topic, callback,
|
||||
self.consumer_num.next())
|
||||
self.consumers.append(consumer)
|
||||
return consumer
|
||||
|
||||
def consume(self, limit=None):
|
||||
"""Consume from all queues"""
|
||||
def iterconsume(self, limit=None):
|
||||
"""Return an iterator that will consume from all queues/consumers"""
|
||||
while True:
|
||||
try:
|
||||
queues_head = self.queues[:-1]
|
||||
queues_tail = self.queues[-1]
|
||||
queues_head = self.consumers[:-1]
|
||||
queues_tail = self.consumers[-1]
|
||||
for queue in queues_head:
|
||||
queue.consume(nowait=True)
|
||||
queues_tail.consume(nowait=False)
|
||||
@@ -391,6 +395,36 @@ class Connection(object):
|
||||
'%s' % str(e)))
|
||||
self.reconnect()
|
||||
|
||||
def consume(self, limit=None):
|
||||
"""Consume from all queues/consumers"""
|
||||
it = self.iterconsume(limit=limit)
|
||||
while True:
|
||||
try:
|
||||
it.next()
|
||||
except StopIteration:
|
||||
return
|
||||
|
||||
def consume_in_thread(self):
|
||||
"""Consumer from all queues/consumers in a greenthread"""
|
||||
def _consumer_thread():
|
||||
try:
|
||||
self.consume()
|
||||
except greenlet.GreenletExit:
|
||||
return
|
||||
if not self.consumer_thread:
|
||||
self.consumer_thread = eventlet.spawn(_consumer_thread)
|
||||
return self.consumer_thread
|
||||
|
||||
def cancel_consumer_thread(self):
|
||||
"""Cancel a consumer thread"""
|
||||
if self.consumer_thread:
|
||||
self.consumer_thread.kill()
|
||||
try:
|
||||
self.consumer_thread.wait()
|
||||
except greenlet.GreenletExit:
|
||||
pass
|
||||
self.consumer_thread = None
|
||||
|
||||
def publisher_send(self, cls, topic, msg):
|
||||
"""Send to a publisher based on the publisher class"""
|
||||
while True:
|
||||
@@ -408,20 +442,20 @@ class Connection(object):
|
||||
except self.connection.connection_errors, e:
|
||||
pass
|
||||
|
||||
def direct_consumer(self, topic, callback):
|
||||
def declare_direct_consumer(self, topic, callback):
|
||||
"""Create a 'direct' queue.
|
||||
In nova's use, this is generally a msg_id queue used for
|
||||
responses for call/multicall
|
||||
"""
|
||||
return self.create_queue(DirectQueue, topic, callback)
|
||||
self.declare_consumer(DirectConsumer, topic, callback)
|
||||
|
||||
def topic_consumer(self, topic, callback=None):
|
||||
"""Create a 'topic' queue."""
|
||||
return self.create_queue(TopicQueue, topic, callback)
|
||||
def declare_topic_consumer(self, topic, callback=None):
|
||||
"""Create a 'topic' consumer."""
|
||||
self.declare_consumer(TopicConsumer, topic, callback)
|
||||
|
||||
def fanout_consumer(self, topic, callback):
|
||||
"""Create a 'fanout' queue"""
|
||||
return self.create_queue(FanoutQueue, topic, callback)
|
||||
def declare_fanout_consumer(self, topic, callback):
|
||||
"""Create a 'fanout' consumer"""
|
||||
self.declare_consumer(FanoutConsumer, topic, callback)
|
||||
|
||||
def direct_send(self, msg_id, msg):
|
||||
"""Send a 'direct' message"""
|
||||
@@ -638,18 +672,9 @@ def create_connection(new=True):
|
||||
def create_consumer(conn, topic, proxy, fanout=False):
|
||||
"""Create a consumer that calls a method in a proxy object"""
|
||||
if fanout:
|
||||
return conn.fanout_consumer(topic, ProxyCallback(proxy))
|
||||
conn.declare_fanout_consumer(topic, ProxyCallback(proxy))
|
||||
else:
|
||||
return conn.topic_consumer(topic, ProxyCallback(proxy))
|
||||
|
||||
|
||||
def create_consumer_set(conn, consumers):
|
||||
# FIXME(comstud): Replace this however necessary
|
||||
# Returns an object that you can call .wait() on to consume
|
||||
# all queues?
|
||||
# Needs to have a .close() which will stop consuming?
|
||||
# Needs to also have an method for tests?
|
||||
raise NotImplemented
|
||||
conn.declare_topic_consumer(topic, ProxyCallback(proxy))
|
||||
|
||||
|
||||
def multicall(context, topic, msg):
|
||||
@@ -666,7 +691,7 @@ def multicall(context, topic, msg):
|
||||
|
||||
conn = ConnectionContext()
|
||||
wait_msg = MulticallWaiter(conn)
|
||||
conn.direct_consumer(msg_id, wait_msg)
|
||||
conn.declare_direct_consumer(msg_id, wait_msg)
|
||||
conn.topic_send(topic, msg)
|
||||
|
||||
return wait_msg
|
||||
|
||||
Reference in New Issue
Block a user