Declare DirectPublisher exchanges with passive=True

If rabbit dies, the consumer can be disconnected before the publisher
sends, and if the consumer hasn't declared the queue, the publisher's
will send a message to an exchange that's not bound to a queue, and
the message wll be lost.  Setting passive=True will cause the
publisher to fail and retry if the consumer hasn't declared the
receiving queue yet.

Co-Authored-By: Noel Burton-Krahn <noel@burton-krahn.com>
Closes-Bug: #1338732
Change-Id: I5ba4d311b97236d3a85a9f5badff61f12b08c12d
This commit is contained in:
Mehdi Abaakouk 2014-07-22 09:42:52 -07:00
parent eb9251173c
commit 434b5c8781
4 changed files with 74 additions and 10 deletions

View File

@ -341,8 +341,9 @@ class DecayingTimer(object):
if self._duration is not None: if self._duration is not None:
self._ends_at = time.time() + max(0, self._duration) self._ends_at = time.time() + max(0, self._duration)
def check_return(self, timeout_callback, *args, **kwargs): def check_return(self, timeout_callback=None, *args, **kwargs):
maximum = kwargs.pop('maximum', None) maximum = kwargs.pop('maximum', None)
if self._duration is None: if self._duration is None:
return None if maximum is None else maximum return None if maximum is None else maximum
if self._ends_at is None: if self._ends_at is None:
@ -350,7 +351,7 @@ class DecayingTimer(object):
" that has not been started.")) " that has not been started."))
left = self._ends_at - time.time() left = self._ends_at - time.time()
if left <= 0: if left <= 0 and timeout_callback is not None:
timeout_callback(*args, **kwargs) timeout_callback(*args, **kwargs)
return left if maximum is None else min(left, maximum) return left if maximum is None else min(left, maximum)

View File

@ -373,7 +373,8 @@ class DirectPublisher(Publisher):
options = {'durable': False, options = {'durable': False,
'auto_delete': True, 'auto_delete': True,
'exclusive': False} 'exclusive': False,
'passive': True}
options.update(kwargs) options.update(kwargs)
super(DirectPublisher, self).__init__(channel, topic, topic, super(DirectPublisher, self).__init__(channel, topic, topic,
type='direct', **options) type='direct', **options)
@ -389,6 +390,7 @@ class TopicPublisher(Publisher):
options = {'durable': conf.amqp_durable_queues, options = {'durable': conf.amqp_durable_queues,
'auto_delete': conf.amqp_auto_delete, 'auto_delete': conf.amqp_auto_delete,
'exclusive': False} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
super(TopicPublisher, self).__init__(channel, super(TopicPublisher, self).__init__(channel,
exchange_name, exchange_name,
@ -780,7 +782,31 @@ class Connection(object):
def direct_send(self, msg_id, msg): def direct_send(self, msg_id, msg):
"""Send a 'direct' message.""" """Send a 'direct' message."""
self.publisher_send(DirectPublisher, msg_id, msg)
timer = rpc_common.DecayingTimer(duration=60)
timer.start()
# NOTE(sileht): retry at least 60sec, after we have a good change
# that the caller is really dead too...
while True:
try:
self.publisher_send(DirectPublisher, msg_id, msg)
except self.connection.channel_errors as exc:
# NOTE(noelbk/sileht):
# If rabbit dies, the consumer can be disconnected before the
# publisher sends, and if the consumer hasn't declared the
# queue, the publisher's will send a message to an exchange
# that's not bound to a queue, and the message wll be lost.
# So we set passive=True to the publisher exchange and catch
# the 404 kombu ChannelError and retry until the exchange
# appears
if exc.code == 404 and timer.check_return() > 0:
LOG.info(_LI("The exchange to reply to %s doesn't "
"exist yet, retrying...") % msg_id)
time.sleep(1)
continue
raise
return
def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None): def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None):
"""Send a 'topic' message.""" """Send a 'topic' message."""

View File

@ -13,6 +13,10 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import time
import mock
from oslo_messaging._drivers import common from oslo_messaging._drivers import common
from oslo_messaging import _utils as utils from oslo_messaging import _utils as utils
from oslo_messaging.tests import utils as test_utils from oslo_messaging.tests import utils as test_utils
@ -51,14 +55,28 @@ class VersionIsCompatibleTestCase(test_utils.BaseTestCase):
class TimerTestCase(test_utils.BaseTestCase): class TimerTestCase(test_utils.BaseTestCase):
def test_duration_is_none(self): def test_no_duration_no_callback(self):
t = common.DecayingTimer() t = common.DecayingTimer()
t.start() t.start()
remaining = t.check_return(None) remaining = t.check_return()
self.assertEqual(None, remaining) self.assertEqual(None, remaining)
def test_duration_is_none_and_maximun_set(self): def test_no_duration_but_maximun(self):
t = common.DecayingTimer() t = common.DecayingTimer()
t.start() t.start()
remaining = t.check_return(None, maximum=2) remaining = t.check_return(maximum=2)
self.assertEqual(2, remaining) self.assertEqual(2, remaining)
def test_duration_expired_no_callback(self):
t = common.DecayingTimer(2)
t._ends_at = time.time() - 10
remaining = t.check_return()
self.assertAlmostEqual(-10, remaining, 0)
def test_duration_callback(self):
t = common.DecayingTimer(2)
t._ends_at = time.time() - 10
callback = mock.Mock()
remaining = t.check_return(callback)
self.assertAlmostEqual(-10, remaining, 0)
callback.assert_called_once

View File

@ -247,8 +247,27 @@ class TestSendReceive(test_utils.BaseTestCase):
raise ZeroDivisionError raise ZeroDivisionError
except Exception: except Exception:
failure = sys.exc_info() failure = sys.exc_info()
msgs[i].reply(failure=failure,
log_failure=not self.expected) # NOTE(noelbk) confirm that Publisher exchanges
# are always declared with passive=True
outer_self = self
test_exchange_was_called = [False]
old_init = kombu.entity.Exchange.__init__
def new_init(self, *args, **kwargs):
test_exchange_was_called[0] = True
outer_self.assertTrue(kwargs['passive'])
old_init(self, *args, **kwargs)
kombu.entity.Exchange.__init__ = new_init
try:
msgs[i].reply(failure=failure,
log_failure=not self.expected)
finally:
kombu.entity.Exchange.__init__ = old_init
self.assertTrue(test_exchange_was_called[0])
elif self.rx_id: elif self.rx_id:
msgs[i].reply({'rx_id': i}) msgs[i].reply({'rx_id': i})
else: else: