From dc1309a003cb1059101294c4c74dd75e9c326f7d Mon Sep 17 00:00:00 2001 From: Kirill Bespalov Date: Tue, 24 May 2016 20:45:52 +0300 Subject: [PATCH] Improve the impl_rabbit logging It would be better if we could log a client port and some additional information about a connection, to understand where is problems occurs. The client port especially helpful for associating events in a rabbitmq.log with oslo.messaging logs. Change-Id: I0dae619f5a1688fb2f64fd90623fde2444609f04 --- oslo_messaging/_drivers/impl_rabbit.py | 66 +++++++++++++------ .../tests/drivers/test_impl_rabbit.py | 8 +-- 2 files changed, 49 insertions(+), 25 deletions(-) diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 9c44465d0..d35693827 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -294,8 +294,8 @@ class Consumer(object): queue_arguments=self.queue_arguments) try: - LOG.trace('ConsumerBase.declare: ' - 'queue %s', self.queue_name) + LOG.debug('[%s] Queue.declare: %s', + conn.connection_id, self.queue_name) self.queue.declare() except conn.connection.channel_errors as exc: # NOTE(jrosenboom): This exception may be triggered by a race @@ -537,6 +537,10 @@ class Connection(object): else: self._connection_lock = DummyConnectionLock() + self.connection_id = str(uuid.uuid4()) + self.name = "%s:%d:%s" % (os.path.basename(sys.argv[0]), + os.getpid(), + self.connection_id) self.connection = kombu.connection.Connection( self._url, ssl=self._fetch_ssl_params(), login_method=self.login_method, @@ -544,17 +548,21 @@ class Connection(object): failover_strategy=self.kombu_failover_strategy, transport_options={ 'confirm_publish': True, - 'client_properties': {'capabilities': { - 'authentication_failure_close': True, - 'connection.blocked': True, - 'consumer_cancel_notify': True}}, + 'client_properties': { + 'capabilities': { + 'authentication_failure_close': True, + 'connection.blocked': True, + 'consumer_cancel_notify': True + }, + 'connection_name': self.name}, 'on_blocked': self._on_connection_blocked, 'on_unblocked': self._on_connection_unblocked, }, ) - LOG.debug('Connecting to AMQP server on %(hostname)s:%(port)s', - self.connection.info()) + LOG.debug('[%(connection_id)s] Connecting to AMQP server on' + ' %(hostname)s:%(port)s', + self._get_connection_info()) # NOTE(sileht): kombu recommend to run heartbeat_check every # seconds, but we use a lock around the kombu connection @@ -579,9 +587,10 @@ class Connection(object): if purpose == rpc_common.PURPOSE_SEND: self._heartbeat_start() - LOG.debug('Connected to AMQP server on %(hostname)s:%(port)s ' - 'via [%(transport)s] client', - self.connection.info()) + LOG.debug('[%(connection_id)s] Connected to AMQP server on ' + '%(hostname)s:%(port)s via [%(transport)s] client with' + ' port %(client_port)s.', + self._get_connection_info()) # NOTE(sileht): value chosen according the best practice from kombu # http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop @@ -697,7 +706,8 @@ class Connection(object): retry = None def on_error(exc, interval): - LOG.debug("Received recoverable error from kombu:", + LOG.debug("[%s] Received recoverable error from kombu:" + % self.connection_id, exc_info=True) recoverable_error_callback and recoverable_error_callback(exc) @@ -707,16 +717,19 @@ class Connection(object): else interval) info = {'err_str': exc, 'sleep_time': interval} - info.update(self.connection.info()) + info.update(self._get_connection_info()) if 'Socket closed' in six.text_type(exc): - LOG.error(_LE('AMQP server %(hostname)s:%(port)s closed' + LOG.error(_LE('[%(connection_id)s] AMQP server' + ' %(hostname)s:%(port)s closed' ' the connection. Check login credentials:' ' %(err_str)s'), info) else: - LOG.error(_LE('AMQP server on %(hostname)s:%(port)s is ' - 'unreachable: %(err_str)s. Trying again in ' - '%(sleep_time)d seconds.'), info) + LOG.error(_LE('[%(connection_id)s] AMQP server on ' + '%(hostname)s:%(port)s is unreachable: ' + '%(err_str)s. Trying again in ' + '%(sleep_time)d seconds. Client port: ' + '%(client_port)s'), info) # XXX(nic): when reconnecting to a RabbitMQ cluster # with mirrored queues in use, the attempt to release the @@ -743,9 +756,10 @@ class Connection(object): for consumer in self._consumers: consumer.declare(self) - LOG.info(_LI('Reconnected to AMQP server on ' - '%(hostname)s:%(port)s via [%(transport)s] client'), - self.connection.info()) + LOG.info(_LI('[%(connection_id)s] Reconnected to AMQP server on ' + '%(hostname)s:%(port)s via [%(transport)s] client' + 'with port %(client_port)s.'), + self._get_connection_info()) def execute_method(channel): self._set_current_channel(channel) @@ -885,7 +899,8 @@ class Connection(object): sock = self.channel.connection.sock except AttributeError as e: # Level is set to debug because otherwise we would spam the logs - LOG.debug('Failed to get socket attribute: %s' % str(e)) + LOG.debug('[%s] Failed to get socket attribute: %s' + % (self.connection_id, str(e))) else: sock.settimeout(timeout) # TCP_USER_TIMEOUT is not defined on Windows and Mac OS X @@ -1141,6 +1156,15 @@ class Connection(object): with self._connection_lock: self.ensure(method, retry=retry, error_callback=_error_callback) + def _get_connection_info(self): + info = self.connection.info() + client_port = None + if self.channel and hasattr(self.channel.connection, 'sock'): + client_port = self.channel.connection.sock.getsockname()[1] + info.update({'client_port': client_port, + 'connection_id': self.connection_id}) + return info + def _publish(self, exchange, msg, routing_key=None, timeout=None): """Publish a message.""" diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index ac8e9cbb1..4717ad7fd 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -189,16 +189,16 @@ class TestRabbitDriverLoadSSL(test_utils.BaseTestCase): 'kombu+memory:////') self.addCleanup(transport.cleanup) - transport._driver._get_connection() + connection = transport._driver._get_connection() connection_klass.assert_called_once_with( 'memory:///', transport_options={ 'client_properties': { 'capabilities': { 'connection.blocked': True, 'consumer_cancel_notify': True, - 'authentication_failure_close': True - } - }, + 'authentication_failure_close': True, + }, + 'connection_name': connection.name}, 'confirm_publish': True, 'on_blocked': mock.ANY, 'on_unblocked': mock.ANY},