diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 21d4609b3..aa7cffa7d 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -15,7 +15,6 @@ import collections import contextlib import functools -import logging import os import socket import ssl @@ -28,6 +27,7 @@ import kombu.connection import kombu.entity import kombu.messaging from oslo_config import cfg +from oslo_log import log as logging from oslo_utils import netutils import six from six.moves.urllib import parse @@ -175,12 +175,15 @@ class RabbitMessage(dict): def __init__(self, raw_message): super(RabbitMessage, self).__init__( rpc_common.deserialize_msg(raw_message.payload)) + LOG.trace('RabbitMessage.Init: message %s', self) self._raw_message = raw_message def acknowledge(self): + LOG.trace('RabbitMessage.acknowledge: message %s', self) self._raw_message.ack() def requeue(self): + LOG.trace('RabbitMessage.requeue: message %s', self) self._raw_message.requeue() @@ -221,6 +224,8 @@ class Consumer(object): queue_arguments=self.queue_arguments) try: + LOG.trace('ConsumerBase.declare: ' + 'queue %s', self.queue_name) self.queue.declare() except conn.connection.channel_errors as exc: # NOTE(jrosenboom): This exception may be triggered by a race @@ -245,6 +250,7 @@ class Consumer(object): nowait=self.nowait) def cancel(self, tag): + LOG.trace('ConsumerBase.cancel: canceling %s', tag) self.queue.cancel(six.text_type(tag)) def _callback(self, message): @@ -637,6 +643,8 @@ class Connection(object): # should sufficient, because the underlying kombu transport # connection object freed. if self.kombu_reconnect_delay > 0: + LOG.trace('Delaying reconnect for %1.1f seconds ...', + self.kombu_reconnect_delay) time.sleep(self.kombu_reconnect_delay) def on_reconnection(new_channel): @@ -982,6 +990,11 @@ class Connection(object): # disconnect us, so raise timeout earlier ourself transport_timeout = heartbeat_timeout + log_info = {'msg': msg, + 'who': exchange or 'default', + 'key': routing_key} + LOG.trace('Connection._publish: sending message %(msg)s to' + ' %(who)s with routing key %(key)s', log_info) with self._transport_socket_timeout(transport_timeout): producer.publish(msg, expiration=expiration) @@ -1018,6 +1031,10 @@ class Connection(object): name=routing_key, routing_key=routing_key, queue_arguments=_get_queue_arguments(self.rabbit_ha_queues)) + log_info = {'key': routing_key, 'exchange': exchange} + LOG.trace( + 'Connection._publish_and_creates_default_queue: ' + 'declare queue %(key)s on %(exchange)s exchange', log_info) queue.declare() self.PUBLISHER_DECLARED_QUEUES[self.channel].add(queue_indentifier)