Merge "Transport reconnection retries for notification"

This commit is contained in:
Jenkins 2014-06-20 10:20:24 +00:00 committed by Gerrit Code Review
commit 88800137b4
15 changed files with 106 additions and 44 deletions

View File

@ -389,9 +389,9 @@ class AMQPDriverBase(base.BaseDriver):
return self._send(target, ctxt, message, wait_for_reply, timeout,
retry=retry)
def send_notification(self, target, ctxt, message, version):
def send_notification(self, target, ctxt, message, version, retry=None):
return self._send(target, ctxt, message,
envelope=(version == 2.0), notify=True)
envelope=(version == 2.0), notify=True, retry=retry)
def listen(self, target):
conn = self._get_connection(pooled=False)

View File

@ -168,7 +168,9 @@ class FakeDriver(base.BaseDriver):
# transport always works
return self._send(target, ctxt, message, wait_for_reply, timeout)
def send_notification(self, target, ctxt, message, version):
def send_notification(self, target, ctxt, message, version, retry=None):
# NOTE(sileht): retry doesn't need to be implemented, the fake
# transport always works
self._send(target, ctxt, message)
def listen(self, target):

View File

@ -697,10 +697,10 @@ class Connection(object):
"""Send a 'fanout' message."""
self.publisher_send(FanoutPublisher, topic=topic, msg=msg, retry=retry)
def notify_send(self, exchange_name, topic, msg, **kwargs):
def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs):
"""Send a notify message on a topic."""
self.publisher_send(NotifyPublisher, topic=topic, msg=msg,
exchange_name=exchange_name)
exchange_name=exchange_name, retry=retry)
def consume(self, limit=None, timeout=None):
"""Consume from all queues/consumers."""

View File

@ -788,10 +788,10 @@ class Connection(object):
"""Send a 'fanout' message."""
self.publisher_send(FanoutPublisher, topic, msg, retry=retry)
def notify_send(self, exchange_name, topic, msg, **kwargs):
def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs):
"""Send a notify message on a topic."""
self.publisher_send(NotifyPublisher, topic, msg, timeout=None,
exchange_name=exchange_name, **kwargs)
exchange_name=exchange_name, retry=retry, **kwargs)
def consume(self, limit=None, timeout=None):
"""Consume from all queues/consumers."""

View File

@ -945,9 +945,11 @@ class ZmqDriver(base.BaseDriver):
# retry anything
return self._send(target, ctxt, message, wait_for_reply, timeout)
def send_notification(self, target, ctxt, message, version):
def send_notification(self, target, ctxt, message, version, retry=None):
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
# NOTE(sileht): retry is not implemented because this driver never
# retry anything
target = target(topic=target.topic.replace('.', '-'))
return self._send(target, ctxt, message, envelope=(version == 2.0))

View File

@ -27,7 +27,7 @@ class LogDriver(notifier._Driver):
LOGGER_BASE = 'oslo.messaging.notification'
def notify(self, ctxt, message, priority):
def notify(self, ctxt, message, priority, retry):
logger = logging.getLogger('%s.%s' % (self.LOGGER_BASE,
message['event_type']))
method = getattr(logger, priority.lower(), None)

View File

@ -38,13 +38,14 @@ class MessagingDriver(notifier._Driver):
super(MessagingDriver, self).__init__(conf, topics, transport)
self.version = version
def notify(self, ctxt, message, priority):
def notify(self, ctxt, message, priority, retry):
priority = priority.lower()
for topic in self.topics:
target = messaging.Target(topic='%s.%s' % (topic, priority))
try:
self.transport._send_notification(target, ctxt, message,
version=self.version)
version=self.version,
retry=retry)
except Exception:
LOG.exception("Could not send notification to %(topic)s. "
"Payload=%(message)s",

View File

@ -20,5 +20,5 @@ from oslo.messaging.notify import notifier
class NoOpDriver(notifier._Driver):
def notify(self, ctxt, message, priority):
def notify(self, ctxt, message, priority, retry):
pass

View File

@ -104,21 +104,23 @@ class RoutingDriver(notifier._Driver):
return list(accepted_drivers)
def _filter_func(self, ext, context, message, priority, accepted_drivers):
def _filter_func(self, ext, context, message, priority, retry,
accepted_drivers):
"""True/False if the driver should be called for this message.
"""
# context is unused here, but passed in by map()
return ext.name in accepted_drivers
def _call_notify(self, ext, context, message, priority, accepted_drivers):
def _call_notify(self, ext, context, message, priority, retry,
accepted_drivers):
"""Emit the notification.
"""
# accepted_drivers is passed in as a result of the map() function
LOG.info(_("Routing '%(event)s' notification to '%(driver)s' driver") %
{'event': message.get('event_type'), 'driver': ext.name})
ext.obj.notify(context, message, priority)
ext.obj.notify(context, message, priority, retry)
def notify(self, context, message, priority):
def notify(self, context, message, priority, retry):
if not self.plugin_manager:
self._load_notifiers()
@ -131,4 +133,5 @@ class RoutingDriver(notifier._Driver):
self._get_drivers_for_message(group, event_type,
priority.lower()))
self.plugin_manager.map(self._filter_func, self._call_notify, context,
message, priority, list(accepted_drivers))
message, priority, retry,
list(accepted_drivers))

View File

@ -30,5 +30,5 @@ class TestDriver(notifier._Driver):
"Store notifications in memory for test verification."
def notify(self, ctxt, message, priority):
NOTIFICATIONS.append((ctxt, message, priority))
def notify(self, ctxt, message, priority, retry):
NOTIFICATIONS.append((ctxt, message, priority, retry))

View File

@ -49,7 +49,7 @@ class _Driver(object):
self.transport = transport
@abc.abstractmethod
def notify(self, ctxt, msg, priority):
def notify(self, ctxt, msg, priority, retry):
pass
@ -96,7 +96,7 @@ class Notifier(object):
def __init__(self, transport, publisher_id=None,
driver=None, topic=None,
serializer=None):
serializer=None, retry=None):
"""Construct a Notifier object.
:param transport: the transport to use for sending messages
@ -109,11 +109,17 @@ class Notifier(object):
:type topic: str
:param serializer: an optional entity serializer
:type serializer: Serializer
:param retry: an connection retries configuration
None or -1 means to retry forever
0 means no retry
N means N retries
:type retry: int
"""
transport.conf.register_opts(_notifier_opts)
self.transport = transport
self.publisher_id = publisher_id
self.retry = retry
self._driver_names = ([driver] if driver is not None
else transport.conf.notification_driver)
@ -130,12 +136,12 @@ class Notifier(object):
invoke_kwds={
'topics': self._topics,
'transport': self.transport,
},
}
)
_marker = object()
def prepare(self, publisher_id=_marker):
def prepare(self, publisher_id=_marker, retry=_marker):
"""Return a specialized Notifier instance.
Returns a new Notifier instance with the supplied publisher_id. Allows
@ -144,10 +150,16 @@ class Notifier(object):
:param publisher_id: field in notifications sent, e.g. 'compute.host1'
:type publisher_id: str
:param retry: an connection retries configuration
None or -1 means to retry forever
0 means no retry
N means N retries
:type retry: int
"""
return _SubNotifier._prepare(self, publisher_id)
return _SubNotifier._prepare(self, publisher_id, retry=retry)
def _notify(self, ctxt, event_type, payload, priority, publisher_id=None):
def _notify(self, ctxt, event_type, payload, priority, publisher_id=None,
retry=None):
payload = self._serializer.serialize_entity(ctxt, payload)
ctxt = self._serializer.serialize_context(ctxt)
@ -160,7 +172,7 @@ class Notifier(object):
def do_notify(ext):
try:
ext.obj.notify(ctxt, msg, priority)
ext.obj.notify(ctxt, msg, priority, retry or self.retry)
except Exception as e:
_LOG.exception("Problem '%(e)s' attempting to send to "
"notification system. Payload=%(payload)s",
@ -178,6 +190,7 @@ class Notifier(object):
:type event_type: str
:param payload: the notification payload
:type payload: dict
:raises: MessageDeliveryFailure
"""
self._notify(ctxt, event_type, payload, 'AUDIT')
@ -190,6 +203,7 @@ class Notifier(object):
:type event_type: str
:param payload: the notification payload
:type payload: dict
:raises: MessageDeliveryFailure
"""
self._notify(ctxt, event_type, payload, 'DEBUG')
@ -202,6 +216,7 @@ class Notifier(object):
:type event_type: str
:param payload: the notification payload
:type payload: dict
:raises: MessageDeliveryFailure
"""
self._notify(ctxt, event_type, payload, 'INFO')
@ -214,6 +229,7 @@ class Notifier(object):
:type event_type: str
:param payload: the notification payload
:type payload: dict
:raises: MessageDeliveryFailure
"""
self._notify(ctxt, event_type, payload, 'WARN')
@ -228,6 +244,7 @@ class Notifier(object):
:type event_type: str
:param payload: the notification payload
:type payload: dict
:raises: MessageDeliveryFailure
"""
self._notify(ctxt, event_type, payload, 'ERROR')
@ -240,6 +257,7 @@ class Notifier(object):
:type event_type: str
:param payload: the notification payload
:type payload: dict
:raises: MessageDeliveryFailure
"""
self._notify(ctxt, event_type, payload, 'CRITICAL')
@ -258,6 +276,7 @@ class Notifier(object):
:type event_type: str
:param payload: the notification payload
:type payload: dict
:raises: MessageDeliveryFailure
"""
self._notify(ctxt, event_type, payload, 'SAMPLE')
@ -266,10 +285,11 @@ class _SubNotifier(Notifier):
_marker = Notifier._marker
def __init__(self, base, publisher_id):
def __init__(self, base, publisher_id, retry):
self._base = base
self.transport = base.transport
self.publisher_id = publisher_id
self.retry = retry
self._serializer = self._base._serializer
self._driver_mgr = self._base._driver_mgr
@ -278,7 +298,9 @@ class _SubNotifier(Notifier):
super(_SubNotifier, self)._notify(ctxt, event_type, payload, priority)
@classmethod
def _prepare(cls, base, publisher_id=_marker):
def _prepare(cls, base, publisher_id=_marker, retry=_marker):
if publisher_id is cls._marker:
publisher_id = base.publisher_id
return cls(base, publisher_id)
if retry is cls._marker:
retry = base.retry
return cls(base, publisher_id, retry=retry)

View File

@ -89,11 +89,12 @@ class Transport(object):
wait_for_reply=wait_for_reply,
timeout=timeout, retry=retry)
def _send_notification(self, target, ctxt, message, version):
def _send_notification(self, target, ctxt, message, version, retry=None):
if not target.topic:
raise exceptions.InvalidTarget('A topic is required to send',
target)
self._driver.send_notification(target, ctxt, message, version)
self._driver.send_notification(target, ctxt, message, version,
retry=retry)
def _listen(self, target):
if not (target.topic and target.server):

View File

@ -44,7 +44,7 @@ class _FakeTransport(object):
def __init__(self, conf):
self.conf = conf
def _send_notification(self, target, ctxt, message, version):
def _send_notification(self, target, ctxt, message, version, retry=None):
pass
@ -123,6 +123,13 @@ class TestMessagingNotifier(test_utils.BaseTestCase):
('ctxt', dict(ctxt={'user': 'bob'})),
]
_retry = [
('unconfigured', dict()),
('None', dict(retry=None)),
('0', dict(retry=0)),
('5', dict(retry=5)),
]
@classmethod
def generate_scenarios(cls):
cls.scenarios = testscenarios.multiply_scenarios(cls._v1,
@ -131,7 +138,8 @@ class TestMessagingNotifier(test_utils.BaseTestCase):
cls._topics,
cls._priority,
cls._payload,
cls._context)
cls._context,
cls._retry)
def setUp(self):
super(TestMessagingNotifier, self).setUp()
@ -159,8 +167,13 @@ class TestMessagingNotifier(test_utils.BaseTestCase):
else:
notifier = messaging.Notifier(transport)
prepare_kwds = {}
if hasattr(self, 'retry'):
prepare_kwds['retry'] = self.retry
if hasattr(self, 'prep_pub_id'):
notifier = notifier.prepare(publisher_id=self.prep_pub_id)
prepare_kwds['publisher_id'] = self.prep_pub_id
if prepare_kwds:
notifier = notifier.prepare(**prepare_kwds)
self.mox.StubOutWithMock(transport, '_send_notification')
@ -187,6 +200,10 @@ class TestMessagingNotifier(test_utils.BaseTestCase):
for send_kwargs in sends:
for topic in self.topics:
if hasattr(self, 'retry'):
send_kwargs['retry'] = self.retry
else:
send_kwargs['retry'] = None
target = messaging.Target(topic='%s.%s' % (topic,
self.priority))
transport._send_notification(target, self.ctxt, message,
@ -244,7 +261,7 @@ class TestSerializer(test_utils.BaseTestCase):
'timestamp': str(timeutils.utcnow()),
}
self.assertEqual([(dict(user='alice'), message, 'INFO')],
self.assertEqual([(dict(user='alice'), message, 'INFO', None)],
_impl_test.NOTIFICATIONS)
@ -299,7 +316,7 @@ class TestLogNotifier(test_utils.BaseTestCase):
self.mox.ReplayAll()
msg = {'event_type': 'foo'}
driver.notify(None, msg, "sample")
driver.notify(None, msg, "sample", None)
class TestRoutingNotifier(test_utils.BaseTestCase):
@ -467,11 +484,11 @@ group_1:
# Good ...
self.assertTrue(self.router._filter_func(ext, {}, {}, 'info',
['foo', 'rpc']))
None, ['foo', 'rpc']))
# Bad
self.assertFalse(self.router._filter_func(ext, {}, {}, 'info',
['foo']))
None, ['foo']))
def test_notify(self):
self.router.routing_groups = {'group_1': None, 'group_2': None}
@ -482,7 +499,7 @@ group_1:
with mock.patch.object(self.router, '_get_drivers_for_message',
drivers_mock):
self.notifier.info({}, 'my_event', {})
self.assertEqual(['rpc', 'foo'], pm.map.call_args[0][5])
self.assertEqual(['rpc', 'foo'], pm.map.call_args[0][6])
def test_notify_filtered(self):
self.config(routing_notifier_config="routing_notifier.yaml")
@ -518,6 +535,7 @@ group_1:
return_value=pm)):
self.notifier.info({}, 'my_event', {})
self.assertFalse(bar_driver.info.called)
rpc_driver.notify.assert_called_once_with({}, mock.ANY, 'INFO')
rpc_driver.notify.assert_called_once_with(
{}, mock.ANY, 'INFO', None)
rpc2_driver.notify.assert_called_once_with(
{}, mock.ANY, 'INFO')
{}, mock.ANY, 'INFO', None)

View File

@ -174,10 +174,11 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
notifier = self._setup_notifier(transport, topic="topic")
def mock_notifier_exchange(name):
def side_effect(target, ctxt, message, version):
def side_effect(target, ctxt, message, version, retry):
target.exchange = name
return transport._driver.send_notification(target, ctxt,
message, version)
message, version,
retry=retry)
transport._send_notification = mock.MagicMock(
side_effect=side_effect)

View File

@ -292,11 +292,23 @@ class TestTransportMethodArgs(test_utils.BaseTestCase):
t = transport.Transport(_FakeDriver(cfg.CONF))
self.mox.StubOutWithMock(t._driver, 'send_notification')
t._driver.send_notification(self._target, 'ctxt', 'message', 1.0)
t._driver.send_notification(self._target, 'ctxt', 'message', 1.0,
retry=None)
self.mox.ReplayAll()
t._send_notification(self._target, 'ctxt', 'message', version=1.0)
def test_send_notification_all_args(self):
t = transport.Transport(_FakeDriver(cfg.CONF))
self.mox.StubOutWithMock(t._driver, 'send_notification')
t._driver.send_notification(self._target, 'ctxt', 'message', 1.0,
retry=5)
self.mox.ReplayAll()
t._send_notification(self._target, 'ctxt', 'message', version=1.0,
retry=5)
def test_listen(self):
t = transport.Transport(_FakeDriver(cfg.CONF))