diff --git a/oslo_messaging/_drivers/common.py b/oslo_messaging/_drivers/common.py index c6e337bbe..b2d97b37c 100644 --- a/oslo_messaging/_drivers/common.py +++ b/oslo_messaging/_drivers/common.py @@ -341,8 +341,9 @@ class DecayingTimer(object): if self._duration is not None: self._ends_at = time.time() + max(0, self._duration) - def check_return(self, timeout_callback, *args, **kwargs): + def check_return(self, timeout_callback=None, *args, **kwargs): maximum = kwargs.pop('maximum', None) + if self._duration is None: return None if maximum is None else maximum if self._ends_at is None: @@ -350,7 +351,7 @@ class DecayingTimer(object): " that has not been started.")) left = self._ends_at - time.time() - if left <= 0: + if left <= 0 and timeout_callback is not None: timeout_callback(*args, **kwargs) return left if maximum is None else min(left, maximum) diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 76332c9b3..cd176c06d 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -373,7 +373,8 @@ class DirectPublisher(Publisher): options = {'durable': False, 'auto_delete': True, - 'exclusive': False} + 'exclusive': False, + 'passive': True} options.update(kwargs) super(DirectPublisher, self).__init__(channel, topic, topic, type='direct', **options) @@ -389,6 +390,7 @@ class TopicPublisher(Publisher): options = {'durable': conf.amqp_durable_queues, 'auto_delete': conf.amqp_auto_delete, 'exclusive': False} + options.update(kwargs) super(TopicPublisher, self).__init__(channel, exchange_name, @@ -780,7 +782,31 @@ class Connection(object): def direct_send(self, msg_id, msg): """Send a 'direct' message.""" - self.publisher_send(DirectPublisher, msg_id, msg) + + timer = rpc_common.DecayingTimer(duration=60) + timer.start() + # NOTE(sileht): retry at least 60sec, after we have a good change + # that the caller is really dead too... + + while True: + try: + self.publisher_send(DirectPublisher, msg_id, msg) + except self.connection.channel_errors as exc: + # NOTE(noelbk/sileht): + # If rabbit dies, the consumer can be disconnected before the + # publisher sends, and if the consumer hasn't declared the + # queue, the publisher's will send a message to an exchange + # that's not bound to a queue, and the message wll be lost. + # So we set passive=True to the publisher exchange and catch + # the 404 kombu ChannelError and retry until the exchange + # appears + if exc.code == 404 and timer.check_return() > 0: + LOG.info(_LI("The exchange to reply to %s doesn't " + "exist yet, retrying...") % msg_id) + time.sleep(1) + continue + raise + return def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None): """Send a 'topic' message.""" diff --git a/oslo_messaging/tests/test_utils.py b/oslo_messaging/tests/test_utils.py index d57e32ed2..5754205e5 100644 --- a/oslo_messaging/tests/test_utils.py +++ b/oslo_messaging/tests/test_utils.py @@ -13,6 +13,10 @@ # License for the specific language governing permissions and limitations # under the License. +import time + +import mock + from oslo_messaging._drivers import common from oslo_messaging import _utils as utils from oslo_messaging.tests import utils as test_utils @@ -51,14 +55,28 @@ class VersionIsCompatibleTestCase(test_utils.BaseTestCase): class TimerTestCase(test_utils.BaseTestCase): - def test_duration_is_none(self): + def test_no_duration_no_callback(self): t = common.DecayingTimer() t.start() - remaining = t.check_return(None) + remaining = t.check_return() self.assertEqual(None, remaining) - def test_duration_is_none_and_maximun_set(self): + def test_no_duration_but_maximun(self): t = common.DecayingTimer() t.start() - remaining = t.check_return(None, maximum=2) + remaining = t.check_return(maximum=2) self.assertEqual(2, remaining) + + def test_duration_expired_no_callback(self): + t = common.DecayingTimer(2) + t._ends_at = time.time() - 10 + remaining = t.check_return() + self.assertAlmostEqual(-10, remaining, 0) + + def test_duration_callback(self): + t = common.DecayingTimer(2) + t._ends_at = time.time() - 10 + callback = mock.Mock() + remaining = t.check_return(callback) + self.assertAlmostEqual(-10, remaining, 0) + callback.assert_called_once diff --git a/tests/drivers/test_impl_rabbit.py b/tests/drivers/test_impl_rabbit.py index 1fe005732..8b7b708d5 100644 --- a/tests/drivers/test_impl_rabbit.py +++ b/tests/drivers/test_impl_rabbit.py @@ -247,8 +247,27 @@ class TestSendReceive(test_utils.BaseTestCase): raise ZeroDivisionError except Exception: failure = sys.exc_info() - msgs[i].reply(failure=failure, - log_failure=not self.expected) + + # NOTE(noelbk) confirm that Publisher exchanges + # are always declared with passive=True + outer_self = self + test_exchange_was_called = [False] + old_init = kombu.entity.Exchange.__init__ + + def new_init(self, *args, **kwargs): + test_exchange_was_called[0] = True + outer_self.assertTrue(kwargs['passive']) + old_init(self, *args, **kwargs) + kombu.entity.Exchange.__init__ = new_init + + try: + msgs[i].reply(failure=failure, + log_failure=not self.expected) + finally: + kombu.entity.Exchange.__init__ = old_init + + self.assertTrue(test_exchange_was_called[0]) + elif self.rx_id: msgs[i].reply({'rx_id': i}) else: