|
|
|
@ -154,6 +154,11 @@ rabbit_opts = [
|
|
|
|
|
default=2,
|
|
|
|
|
help='How often times during the heartbeat_timeout_threshold '
|
|
|
|
|
'we check the heartbeat.'),
|
|
|
|
|
cfg.BoolOpt('enable_cancel_on_failover',
|
|
|
|
|
default=False,
|
|
|
|
|
help="Enable x-cancel-on-ha-failover flag so that "
|
|
|
|
|
"rabbitmq server will cancel and notify consumers"
|
|
|
|
|
"when queue is down")
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
@ -215,7 +220,8 @@ class Consumer(object):
|
|
|
|
|
|
|
|
|
|
def __init__(self, exchange_name, queue_name, routing_key, type, durable,
|
|
|
|
|
exchange_auto_delete, queue_auto_delete, callback,
|
|
|
|
|
nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0):
|
|
|
|
|
nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0,
|
|
|
|
|
enable_cancel_on_failover=False):
|
|
|
|
|
"""Init the Consumer class with the exchange_name, routing_key,
|
|
|
|
|
type, durable auto_delete
|
|
|
|
|
"""
|
|
|
|
@ -237,10 +243,16 @@ class Consumer(object):
|
|
|
|
|
type=type,
|
|
|
|
|
durable=self.durable,
|
|
|
|
|
auto_delete=self.exchange_auto_delete)
|
|
|
|
|
self.enable_cancel_on_failover = enable_cancel_on_failover
|
|
|
|
|
|
|
|
|
|
def declare(self, conn):
|
|
|
|
|
"""Re-declare the queue after a rabbit (re)connect."""
|
|
|
|
|
|
|
|
|
|
consumer_arguments = None
|
|
|
|
|
if self.enable_cancel_on_failover:
|
|
|
|
|
consumer_arguments = {
|
|
|
|
|
"x-cancel-on-ha-failover": True}
|
|
|
|
|
|
|
|
|
|
self.queue = kombu.entity.Queue(
|
|
|
|
|
name=self.queue_name,
|
|
|
|
|
channel=conn.channel,
|
|
|
|
@ -248,7 +260,9 @@ class Consumer(object):
|
|
|
|
|
durable=self.durable,
|
|
|
|
|
auto_delete=self.queue_auto_delete,
|
|
|
|
|
routing_key=self.routing_key,
|
|
|
|
|
queue_arguments=self.queue_arguments)
|
|
|
|
|
queue_arguments=self.queue_arguments,
|
|
|
|
|
consumer_arguments=consumer_arguments
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
LOG.debug('[%s] Queue.declare: %s',
|
|
|
|
@ -451,6 +465,7 @@ class Connection(object):
|
|
|
|
|
driver_conf.kombu_missing_consumer_retry_timeout
|
|
|
|
|
self.kombu_failover_strategy = driver_conf.kombu_failover_strategy
|
|
|
|
|
self.kombu_compression = driver_conf.kombu_compression
|
|
|
|
|
self.enable_cancel_on_failover = driver_conf.enable_cancel_on_failover
|
|
|
|
|
|
|
|
|
|
if self.ssl:
|
|
|
|
|
self.ssl_version = driver_conf.ssl_version
|
|
|
|
@ -1065,31 +1080,35 @@ class Connection(object):
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
# TODO(obondarev): use default exchange since T release
|
|
|
|
|
consumer = Consumer(exchange_name=topic,
|
|
|
|
|
queue_name=topic,
|
|
|
|
|
routing_key=topic,
|
|
|
|
|
type='direct',
|
|
|
|
|
durable=False,
|
|
|
|
|
exchange_auto_delete=True,
|
|
|
|
|
queue_auto_delete=False,
|
|
|
|
|
callback=callback,
|
|
|
|
|
rabbit_ha_queues=self.rabbit_ha_queues,
|
|
|
|
|
rabbit_queue_ttl=self.rabbit_transient_queues_ttl)
|
|
|
|
|
consumer = Consumer(
|
|
|
|
|
exchange_name=topic,
|
|
|
|
|
queue_name=topic,
|
|
|
|
|
routing_key=topic,
|
|
|
|
|
type='direct',
|
|
|
|
|
durable=False,
|
|
|
|
|
exchange_auto_delete=True,
|
|
|
|
|
queue_auto_delete=False,
|
|
|
|
|
callback=callback,
|
|
|
|
|
rabbit_ha_queues=self.rabbit_ha_queues,
|
|
|
|
|
rabbit_queue_ttl=self.rabbit_transient_queues_ttl,
|
|
|
|
|
enable_cancel_on_failover=self.enable_cancel_on_failover)
|
|
|
|
|
|
|
|
|
|
self.declare_consumer(consumer)
|
|
|
|
|
|
|
|
|
|
def declare_topic_consumer(self, exchange_name, topic, callback=None,
|
|
|
|
|
queue_name=None):
|
|
|
|
|
"""Create a 'topic' consumer."""
|
|
|
|
|
consumer = Consumer(exchange_name=exchange_name,
|
|
|
|
|
queue_name=queue_name or topic,
|
|
|
|
|
routing_key=topic,
|
|
|
|
|
type='topic',
|
|
|
|
|
durable=self.amqp_durable_queues,
|
|
|
|
|
exchange_auto_delete=self.amqp_auto_delete,
|
|
|
|
|
queue_auto_delete=self.amqp_auto_delete,
|
|
|
|
|
callback=callback,
|
|
|
|
|
rabbit_ha_queues=self.rabbit_ha_queues)
|
|
|
|
|
consumer = Consumer(
|
|
|
|
|
exchange_name=exchange_name,
|
|
|
|
|
queue_name=queue_name or topic,
|
|
|
|
|
routing_key=topic,
|
|
|
|
|
type='topic',
|
|
|
|
|
durable=self.amqp_durable_queues,
|
|
|
|
|
exchange_auto_delete=self.amqp_auto_delete,
|
|
|
|
|
queue_auto_delete=self.amqp_auto_delete,
|
|
|
|
|
callback=callback,
|
|
|
|
|
rabbit_ha_queues=self.rabbit_ha_queues,
|
|
|
|
|
enable_cancel_on_failover=self.enable_cancel_on_failover)
|
|
|
|
|
|
|
|
|
|
self.declare_consumer(consumer)
|
|
|
|
|
|
|
|
|
@ -1100,16 +1119,18 @@ class Connection(object):
|
|
|
|
|
exchange_name = '%s_fanout' % topic
|
|
|
|
|
queue_name = '%s_fanout_%s' % (topic, unique)
|
|
|
|
|
|
|
|
|
|
consumer = Consumer(exchange_name=exchange_name,
|
|
|
|
|
queue_name=queue_name,
|
|
|
|
|
routing_key=topic,
|
|
|
|
|
type='fanout',
|
|
|
|
|
durable=False,
|
|
|
|
|
exchange_auto_delete=True,
|
|
|
|
|
queue_auto_delete=False,
|
|
|
|
|
callback=callback,
|
|
|
|
|
rabbit_ha_queues=self.rabbit_ha_queues,
|
|
|
|
|
rabbit_queue_ttl=self.rabbit_transient_queues_ttl)
|
|
|
|
|
consumer = Consumer(
|
|
|
|
|
exchange_name=exchange_name,
|
|
|
|
|
queue_name=queue_name,
|
|
|
|
|
routing_key=topic,
|
|
|
|
|
type='fanout',
|
|
|
|
|
durable=False,
|
|
|
|
|
exchange_auto_delete=True,
|
|
|
|
|
queue_auto_delete=False,
|
|
|
|
|
callback=callback,
|
|
|
|
|
rabbit_ha_queues=self.rabbit_ha_queues,
|
|
|
|
|
rabbit_queue_ttl=self.rabbit_transient_queues_ttl,
|
|
|
|
|
enable_cancel_on_failover=self.enable_cancel_on_failover)
|
|
|
|
|
|
|
|
|
|
self.declare_consumer(consumer)
|
|
|
|
|
|
|
|
|
|