Merge "rabbit: remove unused consumer interfaces"
This commit is contained in:
commit
e1e2abaed8
@ -13,8 +13,6 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import contextlib
|
import contextlib
|
||||||
import functools
|
|
||||||
import itertools
|
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import socket
|
import socket
|
||||||
@ -171,30 +169,42 @@ class RabbitMessage(dict):
|
|||||||
self._raw_message.requeue()
|
self._raw_message.requeue()
|
||||||
|
|
||||||
|
|
||||||
class ConsumerBase(object):
|
class Consumer(object):
|
||||||
"""Consumer base class."""
|
"""Consumer class."""
|
||||||
|
|
||||||
def __init__(self, channel, callback, tag, **kwargs):
|
def __init__(self, conf, exchange_name, queue_name, routing_key, type,
|
||||||
"""Declare a queue on an amqp channel.
|
durable, auto_delete, callback, nowait=True):
|
||||||
|
"""Init the Publisher class with the exchange_name, routing_key,
|
||||||
'channel' is the amqp channel to use
|
type, durable auto_delete
|
||||||
'callback' is the callback to call when messages are received
|
|
||||||
'tag' is a unique ID for the consumer on the channel
|
|
||||||
|
|
||||||
queue name, exchange name, and other kombu options are
|
|
||||||
passed in here as a dictionary.
|
|
||||||
"""
|
"""
|
||||||
|
self.queue_name = queue_name
|
||||||
|
self.exchange_name = exchange_name
|
||||||
|
self.routing_key = routing_key
|
||||||
|
self.auto_delete = auto_delete
|
||||||
|
self.durable = durable
|
||||||
self.callback = callback
|
self.callback = callback
|
||||||
self.tag = six.text_type(tag)
|
self.type = type
|
||||||
self.kwargs = kwargs
|
self.nowait = nowait
|
||||||
self.queue = None
|
self.queue_arguments = _get_queue_arguments(conf)
|
||||||
self.reconnect(channel)
|
|
||||||
|
self.queue = None
|
||||||
|
self.exchange = kombu.entity.Exchange(
|
||||||
|
name=exchange_name,
|
||||||
|
type=type,
|
||||||
|
durable=self.durable,
|
||||||
|
auto_delete=self.auto_delete)
|
||||||
|
|
||||||
|
def declare(self, channel):
|
||||||
|
"""Re-declare the queue after a rabbit (re)connect."""
|
||||||
|
self.queue = kombu.entity.Queue(
|
||||||
|
name=self.queue_name,
|
||||||
|
channel=channel,
|
||||||
|
exchange=self.exchange,
|
||||||
|
durable=self.durable,
|
||||||
|
auto_delete=self.auto_delete,
|
||||||
|
routing_key=self.routing_key,
|
||||||
|
queue_arguments=self.queue_arguments)
|
||||||
|
|
||||||
def reconnect(self, channel):
|
|
||||||
"""Re-declare the queue after a rabbit reconnect."""
|
|
||||||
self.channel = channel
|
|
||||||
self.kwargs['channel'] = channel
|
|
||||||
self.queue = kombu.entity.Queue(**self.kwargs)
|
|
||||||
try:
|
try:
|
||||||
self.queue.declare()
|
self.queue.declare()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@ -210,149 +220,34 @@ class ConsumerBase(object):
|
|||||||
LOG.error(_("Declaring queue failed with (%s), retrying"), e)
|
LOG.error(_("Declaring queue failed with (%s), retrying"), e)
|
||||||
self.queue.declare()
|
self.queue.declare()
|
||||||
|
|
||||||
def _callback_handler(self, message, callback):
|
def consume(self, tag):
|
||||||
|
"""Actually declare the consumer on the amqp channel. This will
|
||||||
|
start the flow of messages from the queue. Using the
|
||||||
|
Connection.consume() will process the messages,
|
||||||
|
calling the appropriate callback.
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.queue.consume(callback=self._callback,
|
||||||
|
consumer_tag=six.text_type(tag),
|
||||||
|
nowait=self.nowait)
|
||||||
|
|
||||||
|
def _callback(self, message):
|
||||||
"""Call callback with deserialized message.
|
"""Call callback with deserialized message.
|
||||||
|
|
||||||
Messages that are processed and ack'ed.
|
Messages that are processed and ack'ed.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
m2p = getattr(self.queue.channel, 'message_to_python', None)
|
||||||
|
if m2p:
|
||||||
|
message = m2p(message)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
callback(RabbitMessage(message))
|
self.callback(RabbitMessage(message))
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception(_("Failed to process message"
|
LOG.exception(_("Failed to process message"
|
||||||
" ... skipping it."))
|
" ... skipping it."))
|
||||||
message.ack()
|
message.ack()
|
||||||
|
|
||||||
def consume(self, *args, **kwargs):
|
|
||||||
"""Actually declare the consumer on the amqp channel. This will
|
|
||||||
start the flow of messages from the queue. Using the
|
|
||||||
Connection.consume() will process the messages,
|
|
||||||
calling the appropriate callback.
|
|
||||||
|
|
||||||
If a callback is specified in kwargs, use that. Otherwise,
|
|
||||||
use the callback passed during __init__()
|
|
||||||
|
|
||||||
If kwargs['nowait'] is True, then this call will block until
|
|
||||||
a message is read.
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
options = {'consumer_tag': self.tag}
|
|
||||||
options['nowait'] = kwargs.get('nowait', False)
|
|
||||||
callback = kwargs.get('callback', self.callback)
|
|
||||||
if not callback:
|
|
||||||
raise ValueError("No callback defined")
|
|
||||||
|
|
||||||
def _callback(message):
|
|
||||||
m2p = getattr(self.channel, 'message_to_python', None)
|
|
||||||
if m2p:
|
|
||||||
message = m2p(message)
|
|
||||||
self._callback_handler(message, callback)
|
|
||||||
|
|
||||||
self.queue.consume(*args, callback=_callback, **options)
|
|
||||||
|
|
||||||
|
|
||||||
class DirectConsumer(ConsumerBase):
|
|
||||||
"""Queue/consumer class for 'direct'."""
|
|
||||||
|
|
||||||
def __init__(self, conf, channel, msg_id, callback, tag, **kwargs):
|
|
||||||
"""Init a 'direct' queue.
|
|
||||||
|
|
||||||
'channel' is the amqp channel to use
|
|
||||||
'msg_id' is the msg_id to listen on
|
|
||||||
'callback' is the callback to call when messages are received
|
|
||||||
'tag' is a unique ID for the consumer on the channel
|
|
||||||
|
|
||||||
Other kombu options may be passed
|
|
||||||
"""
|
|
||||||
# Default options
|
|
||||||
options = {'durable': False,
|
|
||||||
'queue_arguments': _get_queue_arguments(conf),
|
|
||||||
'auto_delete': True,
|
|
||||||
'exclusive': False}
|
|
||||||
options.update(kwargs)
|
|
||||||
exchange = kombu.entity.Exchange(name=msg_id,
|
|
||||||
type='direct',
|
|
||||||
durable=options['durable'],
|
|
||||||
auto_delete=options['auto_delete'])
|
|
||||||
super(DirectConsumer, self).__init__(channel,
|
|
||||||
callback,
|
|
||||||
tag,
|
|
||||||
name=msg_id,
|
|
||||||
exchange=exchange,
|
|
||||||
routing_key=msg_id,
|
|
||||||
**options)
|
|
||||||
|
|
||||||
|
|
||||||
class TopicConsumer(ConsumerBase):
|
|
||||||
"""Consumer class for 'topic'."""
|
|
||||||
|
|
||||||
def __init__(self, conf, channel, topic, callback, tag, exchange_name,
|
|
||||||
name=None, **kwargs):
|
|
||||||
"""Init a 'topic' queue.
|
|
||||||
|
|
||||||
:param channel: the amqp channel to use
|
|
||||||
:param topic: the topic to listen on
|
|
||||||
:paramtype topic: str
|
|
||||||
:param callback: the callback to call when messages are received
|
|
||||||
:param tag: a unique ID for the consumer on the channel
|
|
||||||
:param exchange_name: the exchange name to use
|
|
||||||
:param name: optional queue name, defaults to topic
|
|
||||||
:paramtype name: str
|
|
||||||
|
|
||||||
Other kombu options may be passed as keyword arguments
|
|
||||||
"""
|
|
||||||
# Default options
|
|
||||||
options = {'durable': conf.amqp_durable_queues,
|
|
||||||
'queue_arguments': _get_queue_arguments(conf),
|
|
||||||
'auto_delete': conf.amqp_auto_delete,
|
|
||||||
'exclusive': False}
|
|
||||||
options.update(kwargs)
|
|
||||||
exchange = kombu.entity.Exchange(name=exchange_name,
|
|
||||||
type='topic',
|
|
||||||
durable=options['durable'],
|
|
||||||
auto_delete=options['auto_delete'])
|
|
||||||
super(TopicConsumer, self).__init__(channel,
|
|
||||||
callback,
|
|
||||||
tag,
|
|
||||||
name=name or topic,
|
|
||||||
exchange=exchange,
|
|
||||||
routing_key=topic,
|
|
||||||
**options)
|
|
||||||
|
|
||||||
|
|
||||||
class FanoutConsumer(ConsumerBase):
|
|
||||||
"""Consumer class for 'fanout'."""
|
|
||||||
|
|
||||||
def __init__(self, conf, channel, topic, callback, tag, **kwargs):
|
|
||||||
"""Init a 'fanout' queue.
|
|
||||||
|
|
||||||
'channel' is the amqp channel to use
|
|
||||||
'topic' is the topic to listen on
|
|
||||||
'callback' is the callback to call when messages are received
|
|
||||||
'tag' is a unique ID for the consumer on the channel
|
|
||||||
|
|
||||||
Other kombu options may be passed
|
|
||||||
"""
|
|
||||||
unique = uuid.uuid4().hex
|
|
||||||
exchange_name = '%s_fanout' % topic
|
|
||||||
queue_name = '%s_fanout_%s' % (topic, unique)
|
|
||||||
|
|
||||||
# Default options
|
|
||||||
options = {'durable': False,
|
|
||||||
'queue_arguments': _get_queue_arguments(conf),
|
|
||||||
'auto_delete': True,
|
|
||||||
'exclusive': False}
|
|
||||||
options.update(kwargs)
|
|
||||||
exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
|
|
||||||
durable=options['durable'],
|
|
||||||
auto_delete=options['auto_delete'])
|
|
||||||
super(FanoutConsumer, self).__init__(channel, callback, tag,
|
|
||||||
name=queue_name,
|
|
||||||
exchange=exchange,
|
|
||||||
routing_key=topic,
|
|
||||||
**options)
|
|
||||||
|
|
||||||
|
|
||||||
class Publisher(object):
|
class Publisher(object):
|
||||||
"""Publisher that silently creates exchange but no queues."""
|
"""Publisher that silently creates exchange but no queues."""
|
||||||
@ -371,7 +266,6 @@ class Publisher(object):
|
|||||||
self.durable = durable
|
self.durable = durable
|
||||||
self.exchange = kombu.entity.Exchange(name=self.exchange_name,
|
self.exchange = kombu.entity.Exchange(name=self.exchange_name,
|
||||||
type=type,
|
type=type,
|
||||||
exclusive=False,
|
|
||||||
durable=durable,
|
durable=durable,
|
||||||
auto_delete=auto_delete,
|
auto_delete=auto_delete,
|
||||||
passive=self.passive)
|
passive=self.passive)
|
||||||
@ -573,7 +467,6 @@ class Connection(object):
|
|||||||
|
|
||||||
def __init__(self, conf, url, purpose):
|
def __init__(self, conf, url, purpose):
|
||||||
self.consumers = []
|
self.consumers = []
|
||||||
self.consumer_num = itertools.count(1)
|
|
||||||
self.conf = conf
|
self.conf = conf
|
||||||
self.driver_conf = self.conf.oslo_messaging_rabbit
|
self.driver_conf = self.conf.oslo_messaging_rabbit
|
||||||
self.max_retries = self.driver_conf.rabbit_max_retries
|
self.max_retries = self.driver_conf.rabbit_max_retries
|
||||||
@ -812,9 +705,8 @@ class Connection(object):
|
|||||||
a new channel, we use it the reconfigure our consumers.
|
a new channel, we use it the reconfigure our consumers.
|
||||||
"""
|
"""
|
||||||
self._set_current_channel(new_channel)
|
self._set_current_channel(new_channel)
|
||||||
self.consumer_num = itertools.count(1)
|
|
||||||
for consumer in self.consumers:
|
for consumer in self.consumers:
|
||||||
consumer.reconnect(new_channel)
|
consumer.declare(new_channel)
|
||||||
|
|
||||||
LOG.info(_LI('Reconnected to AMQP server on '
|
LOG.info(_LI('Reconnected to AMQP server on '
|
||||||
'%(hostname)s:%(port)d'),
|
'%(hostname)s:%(port)d'),
|
||||||
@ -889,7 +781,6 @@ class Connection(object):
|
|||||||
self._set_current_channel(None)
|
self._set_current_channel(None)
|
||||||
self.ensure_connection()
|
self.ensure_connection()
|
||||||
self.consumers = []
|
self.consumers = []
|
||||||
self.consumer_num = itertools.count(1)
|
|
||||||
|
|
||||||
def _heartbeat_supported_and_enabled(self):
|
def _heartbeat_supported_and_enabled(self):
|
||||||
if self.driver_conf.heartbeat_timeout_threshold <= 0:
|
if self.driver_conf.heartbeat_timeout_threshold <= 0:
|
||||||
@ -956,19 +847,18 @@ class Connection(object):
|
|||||||
timeout=self._heartbeat_wait_timeout)
|
timeout=self._heartbeat_wait_timeout)
|
||||||
self._heartbeat_exit_event.clear()
|
self._heartbeat_exit_event.clear()
|
||||||
|
|
||||||
def declare_consumer(self, consumer_cls, topic, callback):
|
def declare_consumer(self, consumer):
|
||||||
"""Create a Consumer using the class that was passed in and
|
"""Create a Consumer using the class that was passed in and
|
||||||
add it to our list of consumers
|
add it to our list of consumers
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def _connect_error(exc):
|
def _connect_error(exc):
|
||||||
log_info = {'topic': topic, 'err_str': exc}
|
log_info = {'topic': consumer.routing_key, 'err_str': exc}
|
||||||
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
|
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
|
||||||
"%(err_str)s"), log_info)
|
"%(err_str)s"), log_info)
|
||||||
|
|
||||||
def _declare_consumer():
|
def _declare_consumer():
|
||||||
consumer = consumer_cls(self.driver_conf, self.channel, topic,
|
consumer.declare(self.channel)
|
||||||
callback, six.next(self.consumer_num))
|
|
||||||
self.consumers.append(consumer)
|
self.consumers.append(consumer)
|
||||||
return consumer
|
return consumer
|
||||||
|
|
||||||
@ -997,11 +887,8 @@ class Connection(object):
|
|||||||
|
|
||||||
def _consume():
|
def _consume():
|
||||||
if self.do_consume:
|
if self.do_consume:
|
||||||
queues_head = self.consumers[:-1] # not fanout.
|
for tag, consumer in enumerate(self.consumers):
|
||||||
queues_tail = self.consumers[-1] # fanout
|
consumer.consume(tag=tag)
|
||||||
for queue in queues_head:
|
|
||||||
queue.consume(nowait=True)
|
|
||||||
queues_tail.consume(nowait=False)
|
|
||||||
self.do_consume = False
|
self.do_consume = False
|
||||||
|
|
||||||
poll_timeout = (self._poll_timeout if timeout is None
|
poll_timeout = (self._poll_timeout if timeout is None
|
||||||
@ -1045,20 +932,50 @@ class Connection(object):
|
|||||||
In nova's use, this is generally a msg_id queue used for
|
In nova's use, this is generally a msg_id queue used for
|
||||||
responses for call/multicall
|
responses for call/multicall
|
||||||
"""
|
"""
|
||||||
self.declare_consumer(DirectConsumer, topic, callback)
|
|
||||||
|
consumer = Consumer(self.driver_conf,
|
||||||
|
exchange_name=topic,
|
||||||
|
queue_name=topic,
|
||||||
|
routing_key=topic,
|
||||||
|
type='direct',
|
||||||
|
durable=False,
|
||||||
|
auto_delete=True,
|
||||||
|
callback=callback)
|
||||||
|
|
||||||
|
self.declare_consumer(consumer)
|
||||||
|
|
||||||
def declare_topic_consumer(self, exchange_name, topic, callback=None,
|
def declare_topic_consumer(self, exchange_name, topic, callback=None,
|
||||||
queue_name=None):
|
queue_name=None):
|
||||||
"""Create a 'topic' consumer."""
|
"""Create a 'topic' consumer."""
|
||||||
self.declare_consumer(functools.partial(TopicConsumer,
|
consumer = Consumer(self.driver_conf,
|
||||||
name=queue_name,
|
exchange_name=exchange_name,
|
||||||
exchange_name=exchange_name,
|
queue_name=queue_name or topic,
|
||||||
),
|
routing_key=topic,
|
||||||
topic, callback)
|
type='topic',
|
||||||
|
durable=self.driver_conf.amqp_durable_queues,
|
||||||
|
auto_delete=self.driver_conf.amqp_auto_delete,
|
||||||
|
callback=callback)
|
||||||
|
|
||||||
|
self.declare_consumer(consumer)
|
||||||
|
|
||||||
def declare_fanout_consumer(self, topic, callback):
|
def declare_fanout_consumer(self, topic, callback):
|
||||||
"""Create a 'fanout' consumer."""
|
"""Create a 'fanout' consumer."""
|
||||||
self.declare_consumer(FanoutConsumer, topic, callback)
|
|
||||||
|
unique = uuid.uuid4().hex
|
||||||
|
exchange_name = '%s_fanout' % topic
|
||||||
|
queue_name = '%s_fanout_%s' % (topic, unique)
|
||||||
|
|
||||||
|
consumer = Consumer(self.driver_conf,
|
||||||
|
exchange_name=exchange_name,
|
||||||
|
queue_name=queue_name,
|
||||||
|
routing_key=topic,
|
||||||
|
type='fanout',
|
||||||
|
durable=False,
|
||||||
|
auto_delete=True,
|
||||||
|
callback=callback,
|
||||||
|
nowait=False)
|
||||||
|
|
||||||
|
self.declare_consumer(consumer)
|
||||||
|
|
||||||
def direct_send(self, msg_id, msg):
|
def direct_send(self, msg_id, msg):
|
||||||
"""Send a 'direct' message."""
|
"""Send a 'direct' message."""
|
||||||
|
Loading…
x
Reference in New Issue
Block a user