Ensure kombu channels are closed

Kombu doesn't always close channel we created for us.
So we must ensure they are closed correctly.

Closes-bug: #1406629

Change-Id: I7c1b31c37ac75dd4fded4d86f046c18f9c2dd7b8
This commit is contained in:
Mehdi Abaakouk 2015-01-06 15:11:55 +01:00
parent be9fca7f6f
commit e7e5506b92
1 changed files with 21 additions and 14 deletions

View File

@ -510,7 +510,7 @@ class Connection(object):
'port': self.connection.port}) 'port': self.connection.port})
# NOTE(sileht): just ensure the connection is setuped at startup # NOTE(sileht): just ensure the connection is setuped at startup
self.ensure(error_callback=None, self.ensure(error_callback=None,
method=lambda channel: True) method=lambda: True)
LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d'), LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d'),
{'hostname': self.connection.hostname, {'hostname': self.connection.hostname,
'port': self.connection.port}) 'port': self.connection.port})
@ -592,8 +592,6 @@ class Connection(object):
retry = None retry = None
def on_error(exc, interval): def on_error(exc, interval):
self.channel = None
error_callback and error_callback(exc) error_callback and error_callback(exc)
interval = (self.conf.kombu_reconnect_delay + interval interval = (self.conf.kombu_reconnect_delay + interval
@ -629,6 +627,7 @@ class Connection(object):
"""Callback invoked when the kombu reconnects and creates """Callback invoked when the kombu reconnects and creates
a new channel, we use it the reconfigure our consumers. a new channel, we use it the reconfigure our consumers.
""" """
self._set_current_channel(new_channel)
self.consumer_num = itertools.count(1) self.consumer_num = itertools.count(1)
for consumer in self.consumers: for consumer in self.consumers:
consumer.reconnect(new_channel) consumer.reconnect(new_channel)
@ -638,11 +637,15 @@ class Connection(object):
{'hostname': self.connection.hostname, {'hostname': self.connection.hostname,
'port': self.connection.port}) 'port': self.connection.port})
def execute_method(channel):
self._set_current_channel(channel)
method()
recoverable_errors = (self.connection.recoverable_channel_errors + recoverable_errors = (self.connection.recoverable_channel_errors +
self.connection.recoverable_connection_errors) self.connection.recoverable_connection_errors)
try: try:
autoretry_method = self.connection.autoretry( autoretry_method = self.connection.autoretry(
method, channel=self.channel, execute_method, channel=self.channel,
max_retries=retry, max_retries=retry,
errback=on_error, errback=on_error,
interval_start=self.interval_start or 1, interval_start=self.interval_start or 1,
@ -650,10 +653,10 @@ class Connection(object):
on_revive=on_reconnection, on_revive=on_reconnection,
) )
ret, channel = autoretry_method() ret, channel = autoretry_method()
self.channel = channel self._set_current_channel(channel)
return ret return ret
except recoverable_errors as exc: except recoverable_errors as exc:
self.channel = None self._set_current_channel(None)
# NOTE(sileht): number of retry exceeded and the connection # NOTE(sileht): number of retry exceeded and the connection
# is still broken # is still broken
msg = _('Unable to connect to AMQP server on ' msg = _('Unable to connect to AMQP server on '
@ -666,17 +669,21 @@ class Connection(object):
LOG.error(msg) LOG.error(msg)
raise exceptions.MessageDeliveryFailure(msg) raise exceptions.MessageDeliveryFailure(msg)
def _set_current_channel(self, new_channel):
if self.channel is not None and new_channel != self.channel:
self.connection.maybe_close_channel(self.channel)
self.channel = new_channel
def close(self): def close(self):
"""Close/release this connection.""" """Close/release this connection."""
if self.connection: if self.connection:
self._set_current_channel(None)
self.connection.release() self.connection.release()
self.connection = None self.connection = None
def reset(self): def reset(self):
"""Reset a connection so it can be used again.""" """Reset a connection so it can be used again."""
if self.channel is not None: self._set_current_channel(self.connection.channel())
self.channel.close()
self.channel = self.connection.channel()
self.consumers = [] self.consumers = []
self.consumer_num = itertools.count(1) self.consumer_num = itertools.count(1)
@ -690,8 +697,8 @@ class Connection(object):
LOG.error(_("Failed to declare consumer for topic '%(topic)s': " LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
"%(err_str)s"), log_info) "%(err_str)s"), log_info)
def _declare_consumer(channel): def _declare_consumer():
consumer = consumer_cls(self.conf, channel, topic, callback, consumer = consumer_cls(self.conf, self.channel, topic, callback,
six.next(self.consumer_num)) six.next(self.consumer_num))
self.consumers.append(consumer) self.consumers.append(consumer)
return consumer return consumer
@ -714,7 +721,7 @@ class Connection(object):
LOG.exception(_('Failed to consume message from queue: %s'), LOG.exception(_('Failed to consume message from queue: %s'),
exc) exc)
def _consume(channel): def _consume():
if self.do_consume: if self.do_consume:
queues_head = self.consumers[:-1] # not fanout. queues_head = self.consumers[:-1] # not fanout.
queues_tail = self.consumers[-1] # fanout queues_tail = self.consumers[-1] # fanout
@ -752,8 +759,8 @@ class Connection(object):
LOG.exception(_("Failed to publish message to topic " LOG.exception(_("Failed to publish message to topic "
"'%(topic)s': %(err_str)s"), log_info) "'%(topic)s': %(err_str)s"), log_info)
def _publish(channel): def _publish():
publisher = cls(self.conf, channel, topic=topic, **kwargs) publisher = cls(self.conf, self.channel, topic=topic, **kwargs)
publisher.send(msg, timeout) publisher.send(msg, timeout)
self.ensure(_error_callback, _publish, retry=retry) self.ensure(_error_callback, _publish, retry=retry)