Merge "Ensure kombu channels are closed"

This commit is contained in:
Jenkins 2015-01-30 18:32:53 +00:00 committed by Gerrit Code Review
commit f5b9defce1
1 changed files with 21 additions and 14 deletions

View File

@ -512,7 +512,7 @@ class Connection(object):
'port': self.connection.port})
# NOTE(sileht): just ensure the connection is setuped at startup
self.ensure(error_callback=None,
method=lambda channel: True)
method=lambda: True)
LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d'),
{'hostname': self.connection.hostname,
'port': self.connection.port})
@ -600,8 +600,6 @@ class Connection(object):
retry = None
def on_error(exc, interval):
self.channel = None
error_callback and error_callback(exc)
interval = (self.conf.kombu_reconnect_delay + interval
@ -637,6 +635,7 @@ class Connection(object):
"""Callback invoked when the kombu reconnects and creates
a new channel, we use it the reconfigure our consumers.
"""
self._set_current_channel(new_channel)
self.consumer_num = itertools.count(1)
for consumer in self.consumers:
consumer.reconnect(new_channel)
@ -646,11 +645,15 @@ class Connection(object):
{'hostname': self.connection.hostname,
'port': self.connection.port})
def execute_method(channel):
self._set_current_channel(channel)
method()
recoverable_errors = (self.connection.recoverable_channel_errors +
self.connection.recoverable_connection_errors)
try:
autoretry_method = self.connection.autoretry(
method, channel=self.channel,
execute_method, channel=self.channel,
max_retries=retry,
errback=on_error,
interval_start=self.interval_start or 1,
@ -658,10 +661,10 @@ class Connection(object):
on_revive=on_reconnection,
)
ret, channel = autoretry_method()
self.channel = channel
self._set_current_channel(channel)
return ret
except recoverable_errors as exc:
self.channel = None
self._set_current_channel(None)
# NOTE(sileht): number of retry exceeded and the connection
# is still broken
msg = _('Unable to connect to AMQP server on '
@ -674,17 +677,21 @@ class Connection(object):
LOG.error(msg)
raise exceptions.MessageDeliveryFailure(msg)
def _set_current_channel(self, new_channel):
if self.channel is not None and new_channel != self.channel:
self.connection.maybe_close_channel(self.channel)
self.channel = new_channel
def close(self):
"""Close/release this connection."""
if self.connection:
self._set_current_channel(None)
self.connection.release()
self.connection = None
def reset(self):
"""Reset a connection so it can be used again."""
if self.channel is not None:
self.channel.close()
self.channel = self.connection.channel()
self._set_current_channel(self.connection.channel())
self.consumers = []
self.consumer_num = itertools.count(1)
@ -698,8 +705,8 @@ class Connection(object):
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
"%(err_str)s"), log_info)
def _declare_consumer(channel):
consumer = consumer_cls(self.conf, channel, topic, callback,
def _declare_consumer():
consumer = consumer_cls(self.conf, self.channel, topic, callback,
six.next(self.consumer_num))
self.consumers.append(consumer)
return consumer
@ -722,7 +729,7 @@ class Connection(object):
LOG.exception(_('Failed to consume message from queue: %s'),
exc)
def _consume(channel):
def _consume():
if self.do_consume:
queues_head = self.consumers[:-1] # not fanout.
queues_tail = self.consumers[-1] # fanout
@ -758,8 +765,8 @@ class Connection(object):
LOG.exception(_("Failed to publish message to topic "
"'%(topic)s': %(err_str)s"), log_info)
def _publish(channel):
publisher = cls(self.conf, channel, topic=topic, **kwargs)
def _publish():
publisher = cls(self.conf, self.channel, topic=topic, **kwargs)
publisher.send(msg, timeout)
self.ensure(_error_callback, _publish, retry=retry)