Improve the impl_rabbit logging

It would be better if we could log a client port and some additional
information about a connection, to understand where is problems occurs.
The client port especially helpful for associating events in a rabbitmq.log
with oslo.messaging logs.

Change-Id: I0dae619f5a1688fb2f64fd90623fde2444609f04
This commit is contained in:
Kirill Bespalov 2016-05-24 20:45:52 +03:00
parent fc446c3854
commit dc1309a003
2 changed files with 49 additions and 25 deletions

View File

@ -294,8 +294,8 @@ class Consumer(object):
queue_arguments=self.queue_arguments) queue_arguments=self.queue_arguments)
try: try:
LOG.trace('ConsumerBase.declare: ' LOG.debug('[%s] Queue.declare: %s',
'queue %s', self.queue_name) conn.connection_id, self.queue_name)
self.queue.declare() self.queue.declare()
except conn.connection.channel_errors as exc: except conn.connection.channel_errors as exc:
# NOTE(jrosenboom): This exception may be triggered by a race # NOTE(jrosenboom): This exception may be triggered by a race
@ -537,6 +537,10 @@ class Connection(object):
else: else:
self._connection_lock = DummyConnectionLock() self._connection_lock = DummyConnectionLock()
self.connection_id = str(uuid.uuid4())
self.name = "%s:%d:%s" % (os.path.basename(sys.argv[0]),
os.getpid(),
self.connection_id)
self.connection = kombu.connection.Connection( self.connection = kombu.connection.Connection(
self._url, ssl=self._fetch_ssl_params(), self._url, ssl=self._fetch_ssl_params(),
login_method=self.login_method, login_method=self.login_method,
@ -544,17 +548,21 @@ class Connection(object):
failover_strategy=self.kombu_failover_strategy, failover_strategy=self.kombu_failover_strategy,
transport_options={ transport_options={
'confirm_publish': True, 'confirm_publish': True,
'client_properties': {'capabilities': { 'client_properties': {
'authentication_failure_close': True, 'capabilities': {
'connection.blocked': True, 'authentication_failure_close': True,
'consumer_cancel_notify': True}}, 'connection.blocked': True,
'consumer_cancel_notify': True
},
'connection_name': self.name},
'on_blocked': self._on_connection_blocked, 'on_blocked': self._on_connection_blocked,
'on_unblocked': self._on_connection_unblocked, 'on_unblocked': self._on_connection_unblocked,
}, },
) )
LOG.debug('Connecting to AMQP server on %(hostname)s:%(port)s', LOG.debug('[%(connection_id)s] Connecting to AMQP server on'
self.connection.info()) ' %(hostname)s:%(port)s',
self._get_connection_info())
# NOTE(sileht): kombu recommend to run heartbeat_check every # NOTE(sileht): kombu recommend to run heartbeat_check every
# seconds, but we use a lock around the kombu connection # seconds, but we use a lock around the kombu connection
@ -579,9 +587,10 @@ class Connection(object):
if purpose == rpc_common.PURPOSE_SEND: if purpose == rpc_common.PURPOSE_SEND:
self._heartbeat_start() self._heartbeat_start()
LOG.debug('Connected to AMQP server on %(hostname)s:%(port)s ' LOG.debug('[%(connection_id)s] Connected to AMQP server on '
'via [%(transport)s] client', '%(hostname)s:%(port)s via [%(transport)s] client with'
self.connection.info()) ' port %(client_port)s.',
self._get_connection_info())
# NOTE(sileht): value chosen according the best practice from kombu # NOTE(sileht): value chosen according the best practice from kombu
# http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop # http://kombu.readthedocs.org/en/latest/reference/kombu.common.html#kombu.common.eventloop
@ -697,7 +706,8 @@ class Connection(object):
retry = None retry = None
def on_error(exc, interval): def on_error(exc, interval):
LOG.debug("Received recoverable error from kombu:", LOG.debug("[%s] Received recoverable error from kombu:"
% self.connection_id,
exc_info=True) exc_info=True)
recoverable_error_callback and recoverable_error_callback(exc) recoverable_error_callback and recoverable_error_callback(exc)
@ -707,16 +717,19 @@ class Connection(object):
else interval) else interval)
info = {'err_str': exc, 'sleep_time': interval} info = {'err_str': exc, 'sleep_time': interval}
info.update(self.connection.info()) info.update(self._get_connection_info())
if 'Socket closed' in six.text_type(exc): if 'Socket closed' in six.text_type(exc):
LOG.error(_LE('AMQP server %(hostname)s:%(port)s closed' LOG.error(_LE('[%(connection_id)s] AMQP server'
' %(hostname)s:%(port)s closed'
' the connection. Check login credentials:' ' the connection. Check login credentials:'
' %(err_str)s'), info) ' %(err_str)s'), info)
else: else:
LOG.error(_LE('AMQP server on %(hostname)s:%(port)s is ' LOG.error(_LE('[%(connection_id)s] AMQP server on '
'unreachable: %(err_str)s. Trying again in ' '%(hostname)s:%(port)s is unreachable: '
'%(sleep_time)d seconds.'), info) '%(err_str)s. Trying again in '
'%(sleep_time)d seconds. Client port: '
'%(client_port)s'), info)
# XXX(nic): when reconnecting to a RabbitMQ cluster # XXX(nic): when reconnecting to a RabbitMQ cluster
# with mirrored queues in use, the attempt to release the # with mirrored queues in use, the attempt to release the
@ -743,9 +756,10 @@ class Connection(object):
for consumer in self._consumers: for consumer in self._consumers:
consumer.declare(self) consumer.declare(self)
LOG.info(_LI('Reconnected to AMQP server on ' LOG.info(_LI('[%(connection_id)s] Reconnected to AMQP server on '
'%(hostname)s:%(port)s via [%(transport)s] client'), '%(hostname)s:%(port)s via [%(transport)s] client'
self.connection.info()) 'with port %(client_port)s.'),
self._get_connection_info())
def execute_method(channel): def execute_method(channel):
self._set_current_channel(channel) self._set_current_channel(channel)
@ -885,7 +899,8 @@ class Connection(object):
sock = self.channel.connection.sock sock = self.channel.connection.sock
except AttributeError as e: except AttributeError as e:
# Level is set to debug because otherwise we would spam the logs # Level is set to debug because otherwise we would spam the logs
LOG.debug('Failed to get socket attribute: %s' % str(e)) LOG.debug('[%s] Failed to get socket attribute: %s'
% (self.connection_id, str(e)))
else: else:
sock.settimeout(timeout) sock.settimeout(timeout)
# TCP_USER_TIMEOUT is not defined on Windows and Mac OS X # TCP_USER_TIMEOUT is not defined on Windows and Mac OS X
@ -1141,6 +1156,15 @@ class Connection(object):
with self._connection_lock: with self._connection_lock:
self.ensure(method, retry=retry, error_callback=_error_callback) self.ensure(method, retry=retry, error_callback=_error_callback)
def _get_connection_info(self):
info = self.connection.info()
client_port = None
if self.channel and hasattr(self.channel.connection, 'sock'):
client_port = self.channel.connection.sock.getsockname()[1]
info.update({'client_port': client_port,
'connection_id': self.connection_id})
return info
def _publish(self, exchange, msg, routing_key=None, timeout=None): def _publish(self, exchange, msg, routing_key=None, timeout=None):
"""Publish a message.""" """Publish a message."""

View File

@ -189,16 +189,16 @@ class TestRabbitDriverLoadSSL(test_utils.BaseTestCase):
'kombu+memory:////') 'kombu+memory:////')
self.addCleanup(transport.cleanup) self.addCleanup(transport.cleanup)
transport._driver._get_connection() connection = transport._driver._get_connection()
connection_klass.assert_called_once_with( connection_klass.assert_called_once_with(
'memory:///', transport_options={ 'memory:///', transport_options={
'client_properties': { 'client_properties': {
'capabilities': { 'capabilities': {
'connection.blocked': True, 'connection.blocked': True,
'consumer_cancel_notify': True, 'consumer_cancel_notify': True,
'authentication_failure_close': True 'authentication_failure_close': True,
} },
}, 'connection_name': connection.name},
'confirm_publish': True, 'confirm_publish': True,
'on_blocked': mock.ANY, 'on_blocked': mock.ANY,
'on_unblocked': mock.ANY}, 'on_unblocked': mock.ANY},