support kombu4

- kombu4 wraps recoverable errors as OperationalErrors rather than
raising amqp errors
- also, raise a recoverable error and redeclare if for some reason a
message is double acknowledged... previously, this was hidden.
- ensure socket is not none
- use connect method to ensure connection

Depends-On: I9f980b51901ac31599b9651633956ad2eea6a1ac
Change-Id: I73958c8057353a2eefe1baaa7a41148193d507f7
changes/94/409294/32
gord chung 6 years ago committed by gordon chung
parent f3cc165dba
commit 5bacea1f42

@ -335,12 +335,15 @@ class Consumer(object):
# bugs.launchpad.net/oslo.messaging/+bug/1609766
# bugs.launchpad.net/neutron/+bug/1318721
# 406 error code relates to messages that are doubled ack'd
# At any channel error, the RabbitMQ closes
# the channel, but the amqp-lib quietly re-open
# it. So, we must reset all tags and declare
# all consumers again.
conn._new_tags = set(conn._consumers.values())
if exc.code == 404:
if exc.code == 404 or (exc.code == 406 and
exc.method_name == 'Basic.ack'):
self.declare(conn)
self.queue.consume(callback=self._callback,
consumer_tag=six.text_type(tag),
@ -593,6 +596,24 @@ class Connection(object):
' %(hostname)s:%(port)s',
self._get_connection_info())
# FIXME(gordc): wrapper to catch both kombu v3 and v4 errors
# remove this and only catch OperationalError when >4.0.0
if hasattr(kombu.exceptions, 'OperationalError'):
self.recoverable_errors = kombu.exceptions.OperationalError
else:
# NOTE(sileht): Some dummy driver like the in-memory one doesn't
# have notion of recoverable connection, so we must raise the
# original exception like kombu does in this case.
has_modern_errors = hasattr(
self.connection.transport, 'recoverable_connection_errors',
)
if has_modern_errors:
self.recoverable_errors = (
self.connection.recoverable_channel_errors +
self.connection.recoverable_connection_errors)
else:
self.recoverable_errors = ()
# NOTE(sileht): kombu recommend to run heartbeat_check every
# seconds, but we use a lock around the kombu connection
# so, to not lock to much this lock to most of the time do nothing
@ -707,7 +728,7 @@ class Connection(object):
# NOTE(sileht): we reset the channel and ensure
# the kombu underlying connection works
self._set_current_channel(None)
self.ensure(method=lambda: self.connection.connection)
self.ensure(method=self.connection.connect)
self.set_transport_socket_timeout()
def ensure(self, method, retry=None,
@ -792,19 +813,6 @@ class Connection(object):
self._set_current_channel(channel)
method()
# NOTE(sileht): Some dummy driver like the in-memory one doesn't
# have notion of recoverable connection, so we must raise the original
# exception like kombu does in this case.
has_modern_errors = hasattr(
self.connection.transport, 'recoverable_connection_errors',
)
if has_modern_errors:
recoverable_errors = (
self.connection.recoverable_channel_errors +
self.connection.recoverable_connection_errors)
else:
recoverable_errors = ()
try:
autoretry_method = self.connection.autoretry(
execute_method, channel=self.channel,
@ -817,7 +825,7 @@ class Connection(object):
ret, channel = autoretry_method()
self._set_current_channel(channel)
return ret
except recoverable_errors as exc:
except self.recoverable_errors as exc:
LOG.debug("Received recoverable error from kombu:",
exc_info=True)
error_callback and error_callback(exc)
@ -883,13 +891,11 @@ class Connection(object):
def reset(self):
"""Reset a connection so it can be used again."""
recoverable_errors = (self.connection.recoverable_channel_errors +
self.connection.recoverable_connection_errors)
with self._connection_lock:
try:
for consumer, tag in self._consumers.items():
consumer.cancel(tag=tag)
except recoverable_errors:
except self.recoverable_errors:
self.ensure_connection()
self._consumers.clear()
self._active_tags.clear()
@ -987,10 +993,6 @@ class Connection(object):
while not self._heartbeat_exit_event.is_set():
with self._connection_lock.for_heartbeat():
recoverable_errors = (
self.connection.recoverable_channel_errors +
self.connection.recoverable_connection_errors)
try:
try:
self._heartbeat_check()
@ -1004,7 +1006,7 @@ class Connection(object):
self.connection.drain_events(timeout=0.001)
except socket.timeout:
pass
except recoverable_errors as exc:
except self.recoverable_errors as exc:
LOG.info(_LI("A recoverable connection/channel error "
"occurred, trying to reconnect: %s"), exc)
self.ensure_connection()
@ -1091,6 +1093,12 @@ class Connection(object):
except socket.timeout as exc:
poll_timeout = timer.check_return(
_raise_timeout, exc, maximum=self._poll_timeout)
except self.connection.channel_errors as exc:
if exc.code == 406 and exc.method_name == 'Basic.ack':
# NOTE(gordc): occasionally multiple workers will grab
# same message and acknowledge it. if it happens, meh.
raise self.connection.recoverable_channel_errors[0]
raise
with self._connection_lock:
self.ensure(_consume,
@ -1172,7 +1180,8 @@ class Connection(object):
def _get_connection_info(self):
info = self.connection.info()
client_port = None
if self.channel and hasattr(self.channel.connection, 'sock'):
if (self.channel and hasattr(self.channel.connection, 'sock')
and self.channel.connection.sock):
client_port = self.channel.connection.sock.getsockname()[1]
info.update({'client_port': client_port,
'connection_id': self.connection_id})

@ -24,9 +24,7 @@ import kombu
import kombu.transport.memory
from oslo_config import cfg
from oslo_serialization import jsonutils
from oslo_utils import versionutils
from oslotest import mockpatch
import pkg_resources
import testscenarios
import oslo_messaging
@ -106,7 +104,7 @@ class TestHeartbeat(test_utils.BaseTestCase):
def test_test_heartbeat_sent_connection_fail(self):
self._do_test_heartbeat_sent(
heartbeat_side_effect=kombu.exceptions.ConnectionError,
heartbeat_side_effect=kombu.exceptions.OperationalError,
info='A recoverable connection/channel error occurred, '
'trying to reconnect: %s')
@ -219,23 +217,11 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
conn._publish(exchange_mock, 'msg', routing_key='routing_key',
timeout=1)
# NOTE(gcb) kombu accept TTL as seconds instead of millisecond since
# version 3.0.25, so do conversion according to kombu version.
# TODO(gcb) remove this workaround when all supported branches
# with requirement kombu >=3.0.25
kombu_version = pkg_resources.get_distribution('kombu').version
if versionutils.is_compatible('3.0.25', kombu_version):
fake_publish.assert_called_with(
'msg', expiration=1,
exchange=exchange_mock,
compression=self.conf.oslo_messaging_rabbit.kombu_compression,
routing_key='routing_key')
else:
fake_publish.assert_called_with(
'msg', expiration=1000,
exchange=exchange_mock,
compression=self.conf.oslo_messaging_rabbit.kombu_compression,
routing_key='routing_key')
fake_publish.assert_called_with(
'msg', expiration=1,
exchange=exchange_mock,
compression=self.conf.oslo_messaging_rabbit.kombu_compression,
routing_key='routing_key')
@mock.patch('kombu.messaging.Producer.publish')
def test_send_no_timeout(self, fake_publish):
@ -279,7 +265,8 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
with mock.patch('kombu.transport.virtual.Channel.close'):
# Ensure the exchange does not exists
self.assertRaises(exc, try_send, e_passive)
self.assertRaises(oslo_messaging.MessageDeliveryFailure,
try_send, e_passive)
# Create it
try_send(e_active)
# Ensure it creates it
@ -287,12 +274,14 @@ class TestRabbitPublisher(test_utils.BaseTestCase):
with mock.patch('kombu.messaging.Producer.publish',
side_effect=exc):
# Ensure the exchange is already in cache
self.assertIn('foobar', conn._declared_exchanges)
# Reset connection
self.assertRaises(exc, try_send, e_passive)
# Ensure the cache is empty
self.assertEqual(0, len(conn._declared_exchanges))
with mock.patch('kombu.transport.virtual.Channel.close'):
# Ensure the exchange is already in cache
self.assertIn('foobar', conn._declared_exchanges)
# Reset connection
self.assertRaises(oslo_messaging.MessageDeliveryFailure,
try_send, e_passive)
# Ensure the cache is empty
self.assertEqual(0, len(conn._declared_exchanges))
try_send(e_active)
self.assertIn('foobar', conn._declared_exchanges)
@ -336,7 +325,7 @@ class TestRabbitConsume(test_utils.BaseTestCase):
conn.connection.connection.recoverable_connection_errors = ()
conn.connection.connection.recoverable_channel_errors = ()
self.assertEqual(1, declare.call_count)
conn.connection.connection.transport.drain_events = mock.Mock()
conn.connection.connection.drain_events = mock.Mock()
# Ensure that a queue will be re-declared if the consume method
# of kombu.Queue raise amqp.NotFound
conn.consume()
@ -360,7 +349,7 @@ class TestRabbitConsume(test_utils.BaseTestCase):
IOError,)
conn.connection.connection.recoverable_channel_errors = ()
self.assertEqual(1, declare.call_count)
conn.connection.connection.transport.drain_events = mock.Mock()
conn.connection.connection.drain_events = mock.Mock()
# Ensure that a queue will be re-declared after
# 'queue not found' exception despite on connection error.
conn.consume()
@ -963,10 +952,6 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
heartbeat_timeout_threshold=0,
group="oslo_messaging_rabbit")
self.kombu_connect = mock.Mock()
self.useFixture(mockpatch.Patch(
'kombu.connection.Connection.connect',
side_effect=self.kombu_connect))
self.useFixture(mockpatch.Patch(
'kombu.connection.Connection.connection'))
self.useFixture(mockpatch.Patch(
@ -976,6 +961,10 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
url = oslo_messaging.TransportURL.parse(self.conf, None)
self.connection = rabbit_driver.Connection(self.conf, url,
driver_common.PURPOSE_SEND)
self.kombu_connect = mock.Mock()
self.useFixture(mockpatch.Patch(
'kombu.connection.Connection.connect',
side_effect=self.kombu_connect))
self.addCleanup(self.connection.close)
def test_ensure_four_retry(self):

@ -27,8 +27,9 @@ PyYAML>=3.10.0 # MIT
# rabbit driver is the default
# we set the amqp version to ensure heartbeat works
amqp<2.0,>=1.4.0 # LGPL
kombu<4.0.0,>=3.0.25 # BSD
# FIXME(gordc): bump to amqp2 and kombu4 once requirements updated
amqp>=1.4.0 # LGPL
kombu>=3.0.25 # BSD
pika>=0.10.0 # BSD
pika-pool>=0.1.3 # BSD

@ -27,5 +27,8 @@ pip install -c$localfile openstack-requirements
edit-constraints $localfile -- $CLIENT_NAME
pip install -c$localfile -U $*
# NOTE(gordc): temporary override since kombu capped at <4.0.0
pip install -U 'amqp>=2.0.0'
pip install -U 'kombu>=4.0.0'
exit $?

@ -32,12 +32,16 @@ commands = python setup.py build_sphinx
setenv =
{[testenv]setenv}
TRANSPORT_DRIVER=rabbit
amqp>=2.0.0
kombu>=4.0.0
commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'
[testenv:py35-func-rabbit]
setenv =
{[testenv]setenv}
TRANSPORT_DRIVER=rabbit
amqp>=2.0.0
kombu>=4.0.0
basepython = python3.5
commands = pifpaf run rabbitmq -- python setup.py testr --slowest --testr-args='{posargs:oslo_messaging.tests.functional}'

Loading…
Cancel
Save