diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 9e3f8eb96..4b977eb00 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -365,111 +365,108 @@ class FanoutConsumer(ConsumerBase): class Publisher(object): - """Base Publisher class.""" + """Publisher that silently creates exchange but no queues.""" - def __init__(self, channel, exchange_name, routing_key, **kwargs): + passive = False + + def __init__(self, conf, exchange_name, routing_key, type, durable, + auto_delete): """Init the Publisher class with the exchange_name, routing_key, - and other options + type, durable auto_delete """ + self.queue_arguments = _get_queue_arguments(conf) self.exchange_name = exchange_name self.routing_key = routing_key - self.kwargs = kwargs - self.reconnect(channel) - - def reconnect(self, channel): - """Re-establish the Producer after a rabbit reconnection.""" + self.auto_delete = auto_delete + self.durable = durable self.exchange = kombu.entity.Exchange(name=self.exchange_name, - **self.kwargs) - self.producer = kombu.messaging.Producer(exchange=self.exchange, - channel=channel, - routing_key=self.routing_key) + type=type, + exclusive=False, + durable=durable, + auto_delete=auto_delete, + passive=self.passive) - def send(self, msg, timeout=None): - """Send a message.""" + def send(self, conn, msg, timeout=None): + """Send a message on an channel.""" + producer = kombu.messaging.Producer(exchange=self.exchange, + channel=conn.channel, + routing_key=self.routing_key) + + headers = {} if timeout: - # - # AMQP TTL is in milliseconds when set in the header. - # - self.producer.publish(msg, headers={'ttl': (timeout * 1000)}) - else: - self.producer.publish(msg) + # AMQP TTL is in milliseconds when set in the property. + # Details: http://www.rabbitmq.com/ttl.html#per-message-ttl + # NOTE(sileht): this amqp header doesn't exists ... LP#1444854 + headers['ttl'] = timeout * 1000 + + producer.publish(msg, headers=headers) -class DirectPublisher(Publisher): - """Publisher class for 'direct'.""" - def __init__(self, conf, channel, topic, **kwargs): - """Init a 'direct' publisher. +class DeclareQueuePublisher(Publisher): + """Publisher that declares a default queue - Kombu options may be passed as keyword args to override defaults - """ + When the exchange is missing instead of silently creating an exchange + not binded to a queue, this publisher creates a default queue + named with the routing_key. - options = {'durable': False, - 'auto_delete': True, - 'exclusive': False, - 'passive': True} - options.update(kwargs) - super(DirectPublisher, self).__init__(channel, topic, topic, - type='direct', **options) + This is mainly used to not miss notifications in case of nobody consumes + them yet. If the future consumer binds the default queue it can retrieve + missing messages. + """ + # FIXME(sileht): The side effect of this is that we declare again and + # again the same queue, and generate a lot of useless rabbit traffic. + # https://bugs.launchpad.net/oslo.messaging/+bug/1437902 - -class TopicPublisher(Publisher): - """Publisher class for 'topic'.""" - def __init__(self, conf, channel, exchange_name, topic, **kwargs): - """Init a 'topic' publisher. - - Kombu options may be passed as keyword args to override defaults - """ - options = {'durable': conf.amqp_durable_queues, - 'auto_delete': conf.amqp_auto_delete, - 'exclusive': False} - - options.update(kwargs) - super(TopicPublisher, self).__init__(channel, - exchange_name, - topic, - type='topic', - **options) - - -class FanoutPublisher(Publisher): - """Publisher class for 'fanout'.""" - def __init__(self, conf, channel, topic, **kwargs): - """Init a 'fanout' publisher. - - Kombu options may be passed as keyword args to override defaults - """ - options = {'durable': False, - 'auto_delete': True, - 'exclusive': False} - options.update(kwargs) - super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic, - None, type='fanout', **options) - - -class NotifyPublisher(TopicPublisher): - """Publisher class for 'notify'.""" - - def __init__(self, conf, channel, exchange_name, topic, **kwargs): - self.durable = kwargs.pop('durable', conf.amqp_durable_queues) - self.auto_delete = kwargs.pop('auto_delete', conf.amqp_auto_delete) - self.queue_arguments = _get_queue_arguments(conf) - super(NotifyPublisher, self).__init__(conf, channel, exchange_name, - topic, **kwargs) - - def reconnect(self, channel): - super(NotifyPublisher, self).reconnect(channel) - - # NOTE(jerdfelt): Normally the consumer would create the queue, but - # we do this to ensure that messages don't get dropped if the - # consumer is started after we do - queue = kombu.entity.Queue(channel=channel, - exchange=self.exchange, - durable=self.durable, - auto_delete=self.auto_delete, - name=self.routing_key, - routing_key=self.routing_key, - queue_arguments=self.queue_arguments) + def send(self, conn, msg, timeout=None): + queue = kombu.entity.Queue( + channel=conn.channel, + exchange=self.exchange, + durable=self.durable, + auto_delete=self.auto_delete, + name=self.routing_key, + routing_key=self.routing_key, + queue_arguments=self.queue_arguments) queue.declare() + super(DeclareQueuePublisher, self).send( + conn, msg, timeout) + + +class RetryOnMissingExchangePublisher(Publisher): + """Publisher that retry during 60 seconds if the exchange is missing.""" + + passive = True + + def send(self, conn, msg, timeout=None): + # TODO(sileht): + # * use timeout parameter when available + # * use rpc_timeout if not instead of hardcoded 60 + # * use @retrying + timer = rpc_common.DecayingTimer(duration=60) + timer.start() + + while True: + try: + super(RetryOnMissingExchangePublisher, self).send(conn, msg, + timeout) + return + except conn.connection.channel_errors as exc: + # NOTE(noelbk/sileht): + # If rabbit dies, the consumer can be disconnected before the + # publisher sends, and if the consumer hasn't declared the + # queue, the publisher's will send a message to an exchange + # that's not bound to a queue, and the message wll be lost. + # So we set passive=True to the publisher exchange and catch + # the 404 kombu ChannelError and retry until the exchange + # appears + if exc.code == 404 and timer.check_return() > 0: + LOG.info(_LI("The exchange %(exchange)s to send to " + "%(routing_key)s doesn't exist yet, " + "retrying...") % { + 'exchange': self.exchange, + 'routing_key': self.routing_key}) + time.sleep(1) + continue + raise class DummyConnectionLock(object): @@ -1038,32 +1035,20 @@ class Connection(object): recoverable_error_callback=_recoverable_error_callback, error_callback=_error_callback) - @staticmethod - def _log_publisher_send_error(topic, exc): - log_info = {'topic': topic, 'err_str': exc} - LOG.error(_("Failed to publish message to topic " - "'%(topic)s': %(err_str)s"), log_info) - LOG.debug('Exception', exc_info=exc) - - default_marker = object() - - def publisher_send(self, cls, topic, msg, timeout=None, retry=None, - error_callback=default_marker, **kwargs): + def publisher_send(self, publisher, msg, timeout=None, retry=None): """Send to a publisher based on the publisher class.""" - def _default_error_callback(exc): - self._log_publisher_send_error(topic, exc) - - if error_callback is self.default_marker: - error_callback = _default_error_callback + def _error_callback(exc): + log_info = {'topic': publisher.exchange_name, 'err_str': exc} + LOG.error(_("Failed to publish message to topic " + "'%(topic)s': %(err_str)s"), log_info) + LOG.debug('Exception', exc_info=exc) def _publish(): - publisher = cls(self.driver_conf, self.channel, topic=topic, - **kwargs) - publisher.send(msg, timeout) + publisher.send(self, msg, timeout) with self._connection_lock: - self.ensure(_publish, retry=retry, error_callback=error_callback) + self.ensure(_publish, retry=retry, error_callback=_error_callback) def declare_direct_consumer(self, topic, callback): """Create a 'direct' queue. @@ -1088,49 +1073,48 @@ class Connection(object): def direct_send(self, msg_id, msg): """Send a 'direct' message.""" - timer = rpc_common.DecayingTimer(duration=60) - timer.start() - # NOTE(sileht): retry at least 60sec, after we have a good change - # that the caller is really dead too... + p = RetryOnMissingExchangePublisher(self.driver_conf, + exchange_name=msg_id, + routing_key=msg_id, + type='direct', + durable=False, + auto_delete=True) - while True: - try: - self.publisher_send(DirectPublisher, msg_id, msg, - error_callback=None) - return - except self.connection.channel_errors as exc: - # NOTE(noelbk/sileht): - # If rabbit dies, the consumer can be disconnected before the - # publisher sends, and if the consumer hasn't declared the - # queue, the publisher's will send a message to an exchange - # that's not bound to a queue, and the message wll be lost. - # So we set passive=True to the publisher exchange and catch - # the 404 kombu ChannelError and retry until the exchange - # appears - if exc.code == 404 and timer.check_return() > 0: - LOG.info(_LI("The exchange to reply to %s doesn't " - "exist yet, retrying...") % msg_id) - time.sleep(1) - continue - self._log_publisher_send_error(msg_id, exc) - raise - except Exception as exc: - self._log_publisher_send_error(msg_id, exc) - raise + self.publisher_send(p, msg) def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None): """Send a 'topic' message.""" - self.publisher_send(TopicPublisher, topic, msg, timeout, - exchange_name=exchange_name, retry=retry) + p = Publisher(self.driver_conf, + exchange_name=exchange_name, + routing_key=topic, + type='topic', + durable=self.driver_conf.amqp_durable_queues, + auto_delete=self.driver_conf.amqp_auto_delete) + self.publisher_send(p, msg, timeout, retry=retry) def fanout_send(self, topic, msg, retry=None): """Send a 'fanout' message.""" - self.publisher_send(FanoutPublisher, topic, msg, retry=retry) + + p = Publisher(self.driver_conf, + exchange_name='%s_fanout' % topic, + routing_key=None, + type='fanout', + durable=False, + auto_delete=True) + + self.publisher_send(p, msg, retry=retry) def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs): """Send a notify message on a topic.""" - self.publisher_send(NotifyPublisher, topic, msg, timeout=None, - exchange_name=exchange_name, retry=retry, **kwargs) + p = DeclareQueuePublisher( + self.driver_conf, + exchange_name=exchange_name, + routing_key=topic, + type='topic', + durable=self.driver_conf.amqp_durable_queues, + auto_delete=self.driver_conf.amqp_auto_delete) + + self.publisher_send(p, msg, timeout=None, retry=retry) def stop_consuming(self): self._consume_loop_stopped = True