Rabbit: iterconsume must honor timeout
The iterconsume method of the rabbit driver must honor timeout parameter when reconnection to the broker occurs. Change-Id: I666d818449750c6bae9dde02f519842687a8e4fa
This commit is contained in:
parent
bcb3b23b8f
commit
2dd7de989f
|
@ -536,14 +536,6 @@ class Connection(object):
|
||||||
# Return the extended behavior or just have the default behavior
|
# Return the extended behavior or just have the default behavior
|
||||||
return ssl_params or None
|
return ssl_params or None
|
||||||
|
|
||||||
def _setup_new_channel(self, new_channel):
|
|
||||||
"""Callback invoked when the kombu connection have created
|
|
||||||
a new channel, we use it the reconfigure our consumers.
|
|
||||||
"""
|
|
||||||
self.consumer_num = itertools.count(1)
|
|
||||||
for consumer in self.consumers:
|
|
||||||
consumer.reconnect(new_channel)
|
|
||||||
|
|
||||||
def ensure(self, error_callback, method, retry=None,
|
def ensure(self, error_callback, method, retry=None,
|
||||||
timeout_is_error=True):
|
timeout_is_error=True):
|
||||||
"""Will retry up to retry number of times.
|
"""Will retry up to retry number of times.
|
||||||
|
@ -559,18 +551,23 @@ class Connection(object):
|
||||||
retry = None
|
retry = None
|
||||||
|
|
||||||
def on_error(exc, interval):
|
def on_error(exc, interval):
|
||||||
|
self.channel = None
|
||||||
|
|
||||||
error_callback and error_callback(exc)
|
error_callback and error_callback(exc)
|
||||||
|
|
||||||
|
interval = (self.conf.kombu_reconnect_delay + interval
|
||||||
|
if self.conf.kombu_reconnect_delay > 0 else interval)
|
||||||
|
|
||||||
info = {'hostname': self.connection.hostname,
|
info = {'hostname': self.connection.hostname,
|
||||||
'port': self.connection.port,
|
'port': self.connection.port,
|
||||||
'err_str': exc, 'sleep_time': interval}
|
'err_str': exc, 'sleep_time': interval}
|
||||||
|
|
||||||
if 'Socket closed' in six.text_type(exc):
|
if 'Socket closed' in six.text_type(exc):
|
||||||
LOG.error(_('AMQP server %(hostname)s:%(port)d closed'
|
LOG.error(_('AMQP server %(hostname)s:%(port)s closed'
|
||||||
' the connection. Check login credentials:'
|
' the connection. Check login credentials:'
|
||||||
' %(err_str)s'), info)
|
' %(err_str)s'), info)
|
||||||
else:
|
else:
|
||||||
LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
|
LOG.error(_('AMQP server on %(hostname)s:%(port)s is '
|
||||||
'unreachable: %(err_str)s. Trying again in '
|
'unreachable: %(err_str)s. Trying again in '
|
||||||
'%(sleep_time)d seconds.'), info)
|
'%(sleep_time)d seconds.'), info)
|
||||||
|
|
||||||
|
@ -585,10 +582,16 @@ class Connection(object):
|
||||||
# should sufficient, because the underlying kombu transport
|
# should sufficient, because the underlying kombu transport
|
||||||
# connection object freed.
|
# connection object freed.
|
||||||
if self.conf.kombu_reconnect_delay > 0:
|
if self.conf.kombu_reconnect_delay > 0:
|
||||||
LOG.info(_("Delaying reconnect for %1.1f seconds...") %
|
|
||||||
self.conf.kombu_reconnect_delay)
|
|
||||||
time.sleep(self.conf.kombu_reconnect_delay)
|
time.sleep(self.conf.kombu_reconnect_delay)
|
||||||
|
|
||||||
|
def on_reconnection(new_channel):
|
||||||
|
"""Callback invoked when the kombu reconnects and creates
|
||||||
|
a new channel, we use it the reconfigure our consumers.
|
||||||
|
"""
|
||||||
|
self.consumer_num = itertools.count(1)
|
||||||
|
for consumer in self.consumers:
|
||||||
|
consumer.reconnect(new_channel)
|
||||||
|
|
||||||
recoverable_errors = (self.connection.recoverable_channel_errors +
|
recoverable_errors = (self.connection.recoverable_channel_errors +
|
||||||
self.connection.recoverable_connection_errors)
|
self.connection.recoverable_connection_errors)
|
||||||
try:
|
try:
|
||||||
|
@ -598,12 +601,13 @@ class Connection(object):
|
||||||
errback=on_error,
|
errback=on_error,
|
||||||
interval_start=self.interval_start or 1,
|
interval_start=self.interval_start or 1,
|
||||||
interval_step=self.interval_stepping,
|
interval_step=self.interval_stepping,
|
||||||
on_revive=self._setup_new_channel,
|
on_revive=on_reconnection,
|
||||||
)
|
)
|
||||||
ret, channel = autoretry_method()
|
ret, channel = autoretry_method()
|
||||||
self.channel = channel
|
self.channel = channel
|
||||||
return ret
|
return ret
|
||||||
except recoverable_errors as exc:
|
except recoverable_errors as exc:
|
||||||
|
self.channel = None
|
||||||
# NOTE(sileht): number of retry exceeded and the connection
|
# NOTE(sileht): number of retry exceeded and the connection
|
||||||
# is still broken
|
# is still broken
|
||||||
msg = _('Unable to connect to AMQP server on '
|
msg = _('Unable to connect to AMQP server on '
|
||||||
|
@ -624,10 +628,11 @@ class Connection(object):
|
||||||
|
|
||||||
def reset(self):
|
def reset(self):
|
||||||
"""Reset a connection so it can be used again."""
|
"""Reset a connection so it can be used again."""
|
||||||
self.channel.close()
|
if self.channel is not None:
|
||||||
self.channel = self.connection.channel()
|
self.channel.close()
|
||||||
|
self.channel = self.connection.channel()
|
||||||
self.consumers = []
|
self.consumers = []
|
||||||
self._setup_new_channel(self.channel)
|
self.consumer_num = itertools.count(1)
|
||||||
|
|
||||||
def declare_consumer(self, consumer_cls, topic, callback):
|
def declare_consumer(self, consumer_cls, topic, callback):
|
||||||
"""Create a Consumer using the class that was passed in and
|
"""Create a Consumer using the class that was passed in and
|
||||||
|
@ -650,10 +655,21 @@ class Connection(object):
|
||||||
def iterconsume(self, limit=None, timeout=None):
|
def iterconsume(self, limit=None, timeout=None):
|
||||||
"""Return an iterator that will consume from all queues/consumers."""
|
"""Return an iterator that will consume from all queues/consumers."""
|
||||||
|
|
||||||
|
if timeout is None:
|
||||||
|
deadline = None
|
||||||
|
else:
|
||||||
|
deadline = time.time() + timeout
|
||||||
|
|
||||||
|
def _raise_timeout_if_deadline_is_reached(exc):
|
||||||
|
if deadline is not None and deadline - time.time() < 0:
|
||||||
|
LOG.debug('Timed out waiting for RPC response: %s', exc)
|
||||||
|
raise rpc_common.Timeout()
|
||||||
|
|
||||||
def _error_callback(exc):
|
def _error_callback(exc):
|
||||||
|
self.do_consume = True
|
||||||
|
_raise_timeout_if_deadline_is_reached(exc)
|
||||||
LOG.exception(_('Failed to consume message from queue: %s'),
|
LOG.exception(_('Failed to consume message from queue: %s'),
|
||||||
exc)
|
exc)
|
||||||
self.do_consume = True
|
|
||||||
|
|
||||||
def _consume(channel):
|
def _consume(channel):
|
||||||
if self.do_consume:
|
if self.do_consume:
|
||||||
|
@ -663,11 +679,11 @@ class Connection(object):
|
||||||
queue.consume(nowait=True)
|
queue.consume(nowait=True)
|
||||||
queues_tail.consume(nowait=False)
|
queues_tail.consume(nowait=False)
|
||||||
self.do_consume = False
|
self.do_consume = False
|
||||||
try:
|
while True:
|
||||||
return self.connection.drain_events(timeout=timeout)
|
try:
|
||||||
except socket.timeout as exc:
|
return self.connection.drain_events(timeout=1)
|
||||||
LOG.debug('Timed out waiting for RPC response: %s', exc)
|
except socket.timeout as exc:
|
||||||
raise rpc_common.Timeout()
|
_raise_timeout_if_deadline_is_reached(exc)
|
||||||
|
|
||||||
for iteration in itertools.count(0):
|
for iteration in itertools.count(0):
|
||||||
if limit and iteration >= limit:
|
if limit and iteration >= limit:
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
import datetime
|
import datetime
|
||||||
import sys
|
import sys
|
||||||
import threading
|
import threading
|
||||||
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
import fixtures
|
import fixtures
|
||||||
|
@ -47,6 +48,28 @@ class TestRabbitDriverLoad(test_utils.BaseTestCase):
|
||||||
self.assertIsInstance(transport._driver, rabbit_driver.RabbitDriver)
|
self.assertIsInstance(transport._driver, rabbit_driver.RabbitDriver)
|
||||||
|
|
||||||
|
|
||||||
|
class TestRabbitIterconsume(test_utils.BaseTestCase):
|
||||||
|
|
||||||
|
def test_iterconsume_timeout(self):
|
||||||
|
transport = messaging.get_transport(self.conf, 'kombu+memory:////')
|
||||||
|
self.addCleanup(transport.cleanup)
|
||||||
|
deadline = time.time() + 3
|
||||||
|
with transport._driver._get_connection() as conn:
|
||||||
|
conn.iterconsume(timeout=3)
|
||||||
|
# kombu memory transport doesn't really raise error
|
||||||
|
# so just simulate a real driver behavior
|
||||||
|
conn.connection.connection.recoverable_channel_errors = (IOError,)
|
||||||
|
conn.declare_fanout_consumer("notif.info", lambda msg: True)
|
||||||
|
with mock.patch('kombu.connection.Connection.drain_events',
|
||||||
|
side_effect=IOError):
|
||||||
|
try:
|
||||||
|
conn.consume(timeout=3)
|
||||||
|
except driver_common.Timeout:
|
||||||
|
pass
|
||||||
|
|
||||||
|
self.assertEqual(0, int(deadline - time.time()))
|
||||||
|
|
||||||
|
|
||||||
class TestRabbitTransportURL(test_utils.BaseTestCase):
|
class TestRabbitTransportURL(test_utils.BaseTestCase):
|
||||||
|
|
||||||
scenarios = [
|
scenarios = [
|
||||||
|
|
Loading…
Reference in New Issue