From e7e5506b92606d1ea160da4a22be6bd66353d1cc Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Tue, 6 Jan 2015 15:11:55 +0100 Subject: [PATCH] Ensure kombu channels are closed Kombu doesn't always close channel we created for us. So we must ensure they are closed correctly. Closes-bug: #1406629 Change-Id: I7c1b31c37ac75dd4fded4d86f046c18f9c2dd7b8 --- oslo_messaging/_drivers/impl_rabbit.py | 35 +++++++++++++++----------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index f7b932e3b..dc154c519 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -510,7 +510,7 @@ class Connection(object): 'port': self.connection.port}) # NOTE(sileht): just ensure the connection is setuped at startup self.ensure(error_callback=None, - method=lambda channel: True) + method=lambda: True) LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d'), {'hostname': self.connection.hostname, 'port': self.connection.port}) @@ -592,8 +592,6 @@ class Connection(object): retry = None def on_error(exc, interval): - self.channel = None - error_callback and error_callback(exc) interval = (self.conf.kombu_reconnect_delay + interval @@ -629,6 +627,7 @@ class Connection(object): """Callback invoked when the kombu reconnects and creates a new channel, we use it the reconfigure our consumers. """ + self._set_current_channel(new_channel) self.consumer_num = itertools.count(1) for consumer in self.consumers: consumer.reconnect(new_channel) @@ -638,11 +637,15 @@ class Connection(object): {'hostname': self.connection.hostname, 'port': self.connection.port}) + def execute_method(channel): + self._set_current_channel(channel) + method() + recoverable_errors = (self.connection.recoverable_channel_errors + self.connection.recoverable_connection_errors) try: autoretry_method = self.connection.autoretry( - method, channel=self.channel, + execute_method, channel=self.channel, max_retries=retry, errback=on_error, interval_start=self.interval_start or 1, @@ -650,10 +653,10 @@ class Connection(object): on_revive=on_reconnection, ) ret, channel = autoretry_method() - self.channel = channel + self._set_current_channel(channel) return ret except recoverable_errors as exc: - self.channel = None + self._set_current_channel(None) # NOTE(sileht): number of retry exceeded and the connection # is still broken msg = _('Unable to connect to AMQP server on ' @@ -666,17 +669,21 @@ class Connection(object): LOG.error(msg) raise exceptions.MessageDeliveryFailure(msg) + def _set_current_channel(self, new_channel): + if self.channel is not None and new_channel != self.channel: + self.connection.maybe_close_channel(self.channel) + self.channel = new_channel + def close(self): """Close/release this connection.""" if self.connection: + self._set_current_channel(None) self.connection.release() self.connection = None def reset(self): """Reset a connection so it can be used again.""" - if self.channel is not None: - self.channel.close() - self.channel = self.connection.channel() + self._set_current_channel(self.connection.channel()) self.consumers = [] self.consumer_num = itertools.count(1) @@ -690,8 +697,8 @@ class Connection(object): LOG.error(_("Failed to declare consumer for topic '%(topic)s': " "%(err_str)s"), log_info) - def _declare_consumer(channel): - consumer = consumer_cls(self.conf, channel, topic, callback, + def _declare_consumer(): + consumer = consumer_cls(self.conf, self.channel, topic, callback, six.next(self.consumer_num)) self.consumers.append(consumer) return consumer @@ -714,7 +721,7 @@ class Connection(object): LOG.exception(_('Failed to consume message from queue: %s'), exc) - def _consume(channel): + def _consume(): if self.do_consume: queues_head = self.consumers[:-1] # not fanout. queues_tail = self.consumers[-1] # fanout @@ -752,8 +759,8 @@ class Connection(object): LOG.exception(_("Failed to publish message to topic " "'%(topic)s': %(err_str)s"), log_info) - def _publish(channel): - publisher = cls(self.conf, channel, topic=topic, **kwargs) + def _publish(): + publisher = cls(self.conf, self.channel, topic=topic, **kwargs) publisher.send(msg, timeout) self.ensure(_error_callback, _publish, retry=retry)