From a966a416ff0b867b6cc9a863e464b8917ef477c5 Mon Sep 17 00:00:00 2001 From: Chris Behrens Date: Sun, 28 Aug 2011 17:33:11 -0700 Subject: [PATCH] start to rework some consumer stuff --- nova/rpc/impl_kombu.py | 127 ++++++++++++++++++++++++----------------- 1 file changed, 76 insertions(+), 51 deletions(-) diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py index db839dd2..01871606 100644 --- a/nova/rpc/impl_kombu.py +++ b/nova/rpc/impl_kombu.py @@ -35,11 +35,11 @@ flags.DEFINE_integer('rpc_thread_pool_size', 1024, 'Size of RPC thread pool') -class QueueBase(object): - """Queue base class.""" +class ConsumerBase(object): + """Consumer base class.""" def __init__(self, channel, callback, tag, **kwargs): - """Init the queue. + """Declare a queue on an amqp channel. 'channel' is the amqp channel to use 'callback' is the callback to call when messages are received @@ -55,20 +55,21 @@ class QueueBase(object): self.reconnect(channel) def reconnect(self, channel): - """Re-create the queue after a rabbit reconnect""" + """Re-declare the queue after a rabbit reconnect""" self.channel = channel self.kwargs['channel'] = channel self.queue = kombu.entity.Queue(**self.kwargs) self.queue.declare() def consume(self, *args, **kwargs): - """Consume from this queue. + """Actually declare the consumer on the amqp channel. This will + start the flow of messages from the queue. Using the + Connection.iterconsume() iterator will process the messages, + calling the appropriate callback. + If a callback is specified in kwargs, use that. Otherwise, use the callback passed during __init__() - The callback will be called if a message was read off of the - queue. - If kwargs['nowait'] is True, then this call will block until a message is read. @@ -100,7 +101,7 @@ class QueueBase(object): self.queue = None -class DirectQueue(QueueBase): +class DirectConsumer(ConsumerBase): """Queue/consumer class for 'direct'""" def __init__(self, channel, msg_id, callback, tag, **kwargs): @@ -123,7 +124,7 @@ class DirectQueue(QueueBase): type='direct', durable=options['durable'], auto_delete=options['auto_delete']) - super(DirectQueue, self).__init__( + super(DirectConsumer, self).__init__( channel, callback, tag, @@ -133,8 +134,8 @@ class DirectQueue(QueueBase): **options) -class TopicQueue(QueueBase): - """Queue/consumer class for 'topic'""" +class TopicConsumer(ConsumerBase): + """Consumer class for 'topic'""" def __init__(self, channel, topic, callback, tag, **kwargs): """Init a 'topic' queue. @@ -156,7 +157,7 @@ class TopicQueue(QueueBase): type='topic', durable=options['durable'], auto_delete=options['auto_delete']) - super(TopicQueue, self).__init__( + super(TopicConsumer, self).__init__( channel, callback, tag, @@ -166,8 +167,8 @@ class TopicQueue(QueueBase): **options) -class FanoutQueue(QueueBase): - """Queue/consumer class for 'fanout'""" +class FanoutConsumer(ConsumerBase): + """Consumer class for 'fanout'""" def __init__(self, channel, topic, callback, tag, **kwargs): """Init a 'fanout' queue. @@ -193,7 +194,7 @@ class FanoutQueue(QueueBase): type='fanout', durable=options['durable'], auto_delete=options['auto_delete']) - super(FanoutQueue, self).__init__( + super(FanoutConsumer, self).__init__( channel, callback, tag, @@ -286,7 +287,8 @@ class Connection(object): """Connection instance object.""" def __init__(self): - self.queues = [] + self.consumers = [] + self.consumer_thread = None self.max_retries = FLAGS.rabbit_max_retries # Try forever? if self.max_retries <= 0: @@ -334,9 +336,9 @@ class Connection(object): LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d' % self.params)) self.channel = self.connection.channel() - for consumer in self.queues: + for consumer in self.consumers: consumer.reconnect(self.channel) - if self.queues: + if self.consumers: LOG.debug(_("Re-established AMQP queues")) def get_channel(self): @@ -354,30 +356,32 @@ class Connection(object): def close(self): """Close/release this connection""" + self.cancel_consumer_thread() self.connection.release() self.connection = None def reset(self): """Reset a connection so it can be used again""" + self.cancel_consumer_thread() self.channel.close() self.channel = self.connection.channel() - self.queues = [] + self.consumers = [] - def create_queue(self, queue_cls, topic, callback): - """Create a queue using the class that was passed in and - add it to our list of queues used for consuming + def declare_consumer(self, consumer_cls, topic, callback): + """Create a Consumer using the class that was passed in and + add it to our list of consumers """ - queue = queue_cls(self.channel, topic, callback, - self.queue_num.next()) - self.queues.append(queue) - return queue + consumer = consumer_cls(self.channel, topic, callback, + self.consumer_num.next()) + self.consumers.append(consumer) + return consumer - def consume(self, limit=None): - """Consume from all queues""" + def iterconsume(self, limit=None): + """Return an iterator that will consume from all queues/consumers""" while True: try: - queues_head = self.queues[:-1] - queues_tail = self.queues[-1] + queues_head = self.consumers[:-1] + queues_tail = self.consumers[-1] for queue in queues_head: queue.consume(nowait=True) queues_tail.consume(nowait=False) @@ -391,6 +395,36 @@ class Connection(object): '%s' % str(e))) self.reconnect() + def consume(self, limit=None): + """Consume from all queues/consumers""" + it = self.iterconsume(limit=limit) + while True: + try: + it.next() + except StopIteration: + return + + def consume_in_thread(self): + """Consumer from all queues/consumers in a greenthread""" + def _consumer_thread(): + try: + self.consume() + except greenlet.GreenletExit: + return + if not self.consumer_thread: + self.consumer_thread = eventlet.spawn(_consumer_thread) + return self.consumer_thread + + def cancel_consumer_thread(self): + """Cancel a consumer thread""" + if self.consumer_thread: + self.consumer_thread.kill() + try: + self.consumer_thread.wait() + except greenlet.GreenletExit: + pass + self.consumer_thread = None + def publisher_send(self, cls, topic, msg): """Send to a publisher based on the publisher class""" while True: @@ -408,20 +442,20 @@ class Connection(object): except self.connection.connection_errors, e: pass - def direct_consumer(self, topic, callback): + def declare_direct_consumer(self, topic, callback): """Create a 'direct' queue. In nova's use, this is generally a msg_id queue used for responses for call/multicall """ - return self.create_queue(DirectQueue, topic, callback) + self.declare_consumer(DirectConsumer, topic, callback) - def topic_consumer(self, topic, callback=None): - """Create a 'topic' queue.""" - return self.create_queue(TopicQueue, topic, callback) + def declare_topic_consumer(self, topic, callback=None): + """Create a 'topic' consumer.""" + self.declare_consumer(TopicConsumer, topic, callback) - def fanout_consumer(self, topic, callback): - """Create a 'fanout' queue""" - return self.create_queue(FanoutQueue, topic, callback) + def declare_fanout_consumer(self, topic, callback): + """Create a 'fanout' consumer""" + self.declare_consumer(FanoutConsumer, topic, callback) def direct_send(self, msg_id, msg): """Send a 'direct' message""" @@ -638,18 +672,9 @@ def create_connection(new=True): def create_consumer(conn, topic, proxy, fanout=False): """Create a consumer that calls a method in a proxy object""" if fanout: - return conn.fanout_consumer(topic, ProxyCallback(proxy)) + conn.declare_fanout_consumer(topic, ProxyCallback(proxy)) else: - return conn.topic_consumer(topic, ProxyCallback(proxy)) - - -def create_consumer_set(conn, consumers): - # FIXME(comstud): Replace this however necessary - # Returns an object that you can call .wait() on to consume - # all queues? - # Needs to have a .close() which will stop consuming? - # Needs to also have an method for tests? - raise NotImplemented + conn.declare_topic_consumer(topic, ProxyCallback(proxy)) def multicall(context, topic, msg): @@ -666,7 +691,7 @@ def multicall(context, topic, msg): conn = ConnectionContext() wait_msg = MulticallWaiter(conn) - conn.direct_consumer(msg_id, wait_msg) + conn.declare_direct_consumer(msg_id, wait_msg) conn.topic_send(topic, msg) return wait_msg