Implements notification listener and dispatcher

This patch allows to quickly create a listener to receive
notification messages.

Example of the api:

class Endpoint(object):
    def warn(self, ctxt, publisher_id, event_type, payload):
        do_something(payload)

target = messaging.Target(topic='notifications', exchange='cinder')
listener = notify.get_notification_listener(transport, [target],
                                            [Endpoint()],
                                            executor,
                                            serializer)

Implements blueprint notification-subscriber-server

Change-Id: I434bc487c382a2048670df726d9bebd640150bb9
This commit is contained in:
Mehdi Abaakouk 2013-12-02 09:27:29 +01:00 committed by Mehdi Abaakouk
parent 70dbe6a7cc
commit 9f58e2c3fe
13 changed files with 530 additions and 1 deletions

View File

@ -15,6 +15,7 @@ Contents
server
rpcclient
notifier
notification_listener
serializer
exceptions
opts

View File

@ -0,0 +1,14 @@
---------------------
Notification Listener
---------------------
.. automodule:: oslo.messaging.notify.listener
.. currentmodule:: oslo.messaging
.. autofunction:: get_notification_listener
.. autoclass:: MessageHandlingServer
:members:
.. autofunction:: get_local_context

View File

@ -404,6 +404,16 @@ class AMQPDriverBase(base.BaseDriver):
return listener
def listen_for_notifications(self, targets_and_priorities):
conn = self._get_connection(pooled=False)
listener = AMQPListener(self, conn)
for target, priority in targets_and_priorities:
conn.declare_topic_consumer('%s.%s' % (target.topic, priority),
callback=listener,
exchange_name=target.exchange)
return listener
def cleanup(self):
if self._connection_pool:
self._connection_pool.empty()

View File

@ -73,6 +73,12 @@ class BaseDriver(object):
def listen(self, target):
"""Construct a Listener for the given target."""
@abc.abstractmethod
def listen_for_notifications(self, targets_and_priorities):
"""Construct a notification Listener for the given list of
tuple of (target, priority).
"""
@abc.abstractmethod
def cleanup(self):
"""Release all resources."""

View File

@ -161,5 +161,15 @@ class FakeDriver(base.BaseDriver):
messaging.Target(topic=target.topic)])
return listener
def listen_for_notifications(self, targets_and_priorities):
# TODO(sileht): Handle the target.exchange
exchange = self._get_exchange(self._default_exchange)
targets = [messaging.Target(topic='%s.%s' % (target.topic, priority))
for target, priority in targets_and_priorities]
listener = FakeListener(self, exchange, targets)
return listener
def cleanup(self):
pass

View File

@ -959,5 +959,19 @@ class ZmqDriver(base.BaseDriver):
return listener
def listen_for_notifications(self, targets_and_priorities):
conn = create_connection(self.conf)
listener = ZmqListener(self, None)
for target, priority in targets_and_priorities:
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
# NOTE(sileht): create_consumer doesn't support target.exchange
conn.create_consumer('%s-%s' % (target.topic, priority),
listener)
conn.consume_in_thread()
return listener
def cleanup(self):
cleanup()

View File

@ -14,7 +14,9 @@
# under the License.
__all__ = ['Notifier',
'LoggingNotificationHandler']
'LoggingNotificationHandler',
'get_notification_listener']
from .notifier import *
from .listener import *
from .logger import *

View File

@ -0,0 +1,83 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
# Copyright 2013 eNovance
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
import logging
from oslo.messaging import localcontext
from oslo.messaging import serializer as msg_serializer
LOG = logging.getLogger(__name__)
PRIORITIES = ['audit', 'debug', 'info', 'warn', 'error', 'critical', 'sample']
class NotificationDispatcher(object):
"""A message dispatcher which understands Notification messages.
A MessageHandlingServer is constructed by passing a callable dispatcher
which is invoked with context and message dictionaries each time a message
is received.
NotifcationDispatcher is one such dispatcher which pass a raw notification
message to the endpoints
"""
def __init__(self, targets, endpoints, serializer):
self.targets = targets
self.endpoints = endpoints
self.serializer = serializer or msg_serializer.NoOpSerializer()
self._callbacks_by_priority = {}
for endpoint, prio in itertools.product(endpoints, PRIORITIES):
if hasattr(endpoint, prio):
method = getattr(endpoint, prio)
self._callbacks_by_priority.setdefault(prio, []).append(method)
priorities = self._callbacks_by_priority.keys()
self._targets_priorities = set(itertools.product(self.targets,
priorities))
def _listen(self, transport):
return transport._listen_for_notifications(self._targets_priorities)
def __call__(self, ctxt, message):
"""Dispatch an RPC message to the appropriate endpoint method.
:param ctxt: the request context
:type ctxt: dict
:param message: the message payload
:type message: dict
"""
ctxt = self.serializer.deserialize_context(ctxt)
publisher_id = message.get('publisher_id')
event_type = message.get('event_type')
priority = message.get('priority', '').lower()
if priority not in PRIORITIES:
LOG.warning('Unknown priority "%s"' % priority)
return
payload = self.serializer.deserialize_entity(ctxt,
message.get('payload'))
for callback in self._callbacks_by_priority.get(priority, []):
localcontext.set_local_context(ctxt)
try:
callback(ctxt, publisher_id, event_type, payload)
finally:
localcontext.clear_local_context()

View File

@ -0,0 +1,105 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
# Copyright 2013 eNovance
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
A notification listener exposes a number of endpoints, each of which
contain a set of methods. Each method corresponds to a notification priority.
To create a notification listener, you supply a transport, list of targets and
a list of endpoints.
A transport can be obtained simply by calling the get_transport() method::
transport = messaging.get_transport(conf)
which will load the appropriate transport driver according to the user's
messaging configuration configuration. See get_transport() for more details.
The target supplied when creating a notification listener expresses the topic
and - optionally - the exchange to listen on. See Target for more details
on these attributes.
Notification listener have start(), stop() and wait() messages to begin
handling requests, stop handling requests and wait for all in-process
requests to complete.
Each notification listener is associated with an executor which integrates the
listener with a specific I/O handling framework. Currently, there are blocking
and eventlet executors available.
A simple example of a notification listener with multiple endpoints might be::
from oslo.config import cfg
from oslo import messaging
class NotificationEndpoint(object):
def warn(self, ctxt, publisher_id, event_type, payload):
do_something(payload)
class ErrorEndpoint(object):
def error(self, ctxt, publisher_id, event_type, payload):
do_something(payload)
transport = messaging.get_transport(cfg.CONF)
targets = [
messaging.Target(topic='notifications')
messaging.Target(topic='notifications_bis')
]
endpoints = [
NotificationEndpoint(),
ErrorEndpoint(),
]
server = messaging.get_notification_listener(transport, targets, endpoints)
server.start()
server.wait()
A notifier sends a notification on a topic with a priority, the notification
listener will receive this notification if the topic of this one have been set
in one of the targets and if an endpoint implements the method named like the
priority
Parameters to endpoint methods are the request context supplied by the client,
the publisher_id of the notification message, the event_type, the payload.
By supplying a serializer object, a listener can deserialize a request context
and arguments from - and serialize return values to - primitive types.
"""
from oslo.messaging.notify import dispatcher as notify_dispatcher
from oslo.messaging import server as msg_server
def get_notification_listener(transport, targets, endpoints,
executor='blocking', serializer=None):
"""Construct a notification listener
The executor parameter controls how incoming messages will be received and
dispatched. By default, the most simple executor is used - the blocking
executor.
:param transport: the messaging transport
:type transport: Transport
:param targets: the exchanges and topics to listen on
:type targets: list of Target
:param endpoints: a list of endpoint objects
:type endpoints: list
:param executor: name of a message executor - e.g. 'eventlet', 'blocking'
:type executor: str
:param serializer: an optional entity serializer
:type serializer: Serializer
"""
dispatcher = notify_dispatcher.NotificationDispatcher(targets, endpoints,
serializer)
return msg_server.MessageHandlingServer(transport, dispatcher, executor)

View File

@ -99,6 +99,14 @@ class Transport(object):
target)
return self._driver.listen(target)
def _listen_for_notifications(self, targets_and_priorities):
for target, priority in targets_and_priorities:
if not target.topic:
raise exceptions.InvalidTarget('A target must have '
'topic specified',
target)
return self._driver.listen_for_notifications(targets_and_priorities)
def cleanup(self):
"""Release all resources associated with this transport."""
self._driver.cleanup()

View File

@ -0,0 +1,98 @@
# Copyright 2013 eNovance
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
import mock
import testscenarios
from oslo import messaging
from oslo.messaging.notify import dispatcher as notify_dispatcher
from oslo.messaging.openstack.common import timeutils
from tests import utils as test_utils
load_tests = testscenarios.load_tests_apply_scenarios
notification_msg = dict(
publisher_id="publisher_id",
event_type="compute.start",
payload={"info": "fuu"},
message_id="uuid",
timestamp=str(timeutils.utcnow())
)
class TestDispatcher(test_utils.BaseTestCase):
scenarios = [
('no_endpoints',
dict(endpoints=[],
endpoints_expect_calls=[],
priority='info')),
('one_endpoints',
dict(endpoints=[['warn']],
endpoints_expect_calls=['warn'],
priority='warn')),
('two_endpoints_only_one_match',
dict(endpoints=[['warn'], ['info']],
endpoints_expect_calls=[None, 'info'],
priority='info')),
('two_endpoints_both_match',
dict(endpoints=[['debug', 'info'], ['info', 'debug']],
endpoints_expect_calls=['debug', 'debug'],
priority='debug')),
]
def test_dispatcher(self):
endpoints = [mock.Mock(spec=endpoint_methods)
for endpoint_methods in self.endpoints]
msg = notification_msg.copy()
msg['priority'] = self.priority
targets = [messaging.Target(topic='notifications')]
dispatcher = notify_dispatcher.NotificationDispatcher(targets,
endpoints,
None)
# check it listen on wanted topics
self.assertEqual(sorted(dispatcher._targets_priorities),
sorted(set((targets[0], prio)
for prio in itertools.chain.from_iterable(
self.endpoints))))
dispatcher({}, msg)
# check endpoint callbacks are called or not
for i, endpoint_methods in enumerate(self.endpoints):
for m in endpoint_methods:
if m == self.endpoints_expect_calls[i]:
method = getattr(endpoints[i], m)
expected = [mock.call({}, msg['publisher_id'],
msg['event_type'],
msg['payload'])]
self.assertEqual(method.call_args_list, expected)
else:
self.assertEqual(endpoints[i].call_count, 0)
@mock.patch('oslo.messaging.notify.dispatcher.LOG')
def test_dispatcher_unknown_prio(self, mylog):
msg = notification_msg.copy()
msg['priority'] = 'what???'
dispatcher = notify_dispatcher.NotificationDispatcher([mock.Mock()],
[mock.Mock()],
None)
dispatcher({}, msg)
mylog.warning.assert_called_once_with('Unknown priority "what???"')

View File

@ -0,0 +1,173 @@
# Copyright 2013 eNovance
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
import mock
from oslo.config import cfg
import testscenarios
from oslo import messaging
from oslo.messaging.notify import dispatcher
from tests import utils as test_utils
load_tests = testscenarios.load_tests_apply_scenarios
class ListenerSetupMixin(object):
class Listener(object):
def __init__(self, transport, topics, endpoints, expect_messages):
targets = [messaging.Target(topic=topic)
for topic in topics]
self._expect_messages = expect_messages
self._received_msgs = 0
self._listener = messaging.get_notification_listener(
transport, targets, endpoints + [self])
def info(self, ctxt, publisher_id, event_type, payload):
self._received_msgs += 1
if self._expect_messages == self._received_msgs:
# Check start() does nothing with a running listener
self._listener.start()
self._listener.stop()
self._listener.wait()
def start(self):
self._listener.start()
def _setup_listener(self, transport, endpoints, expect_messages,
topics=None):
listener = self.Listener(transport,
topics=topics or ['testtopic'],
expect_messages=expect_messages,
endpoints=endpoints)
thread = threading.Thread(target=listener.start)
thread.daemon = True
thread.start()
return thread
def _stop_listener(self, thread):
thread.join(timeout=5)
def _setup_notifier(self, transport, topic='testtopic',
publisher_id='testpublisher'):
return messaging.Notifier(transport, topic=topic,
driver='messaging',
publisher_id=publisher_id)
class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
def __init__(self, *args):
super(TestNotifyListener, self).__init__(*args)
ListenerSetupMixin.__init__(self)
def setUp(self):
super(TestNotifyListener, self).setUp(conf=cfg.ConfigOpts())
def test_constructor(self):
transport = messaging.get_transport(self.conf, url='fake:')
target = messaging.Target(topic='foo')
endpoints = [object()]
listener = messaging.get_notification_listener(transport, [target],
endpoints)
self.assertIs(listener.conf, self.conf)
self.assertIs(listener.transport, transport)
self.assertIsInstance(listener.dispatcher,
dispatcher.NotificationDispatcher)
self.assertIs(listener.dispatcher.endpoints, endpoints)
self.assertIs(listener.executor, 'blocking')
def test_no_target_topic(self):
transport = messaging.get_transport(self.conf, url='fake:')
listener = messaging.get_notification_listener(transport,
[messaging.Target()],
[mock.Mock()])
try:
listener.start()
except Exception as ex:
self.assertIsInstance(ex, messaging.InvalidTarget, ex)
else:
self.assertTrue(False)
def test_unknown_executor(self):
transport = messaging.get_transport(self.conf, url='fake:')
try:
messaging.get_notification_listener(transport, [], [],
executor='foo')
except Exception as ex:
self.assertIsInstance(ex, messaging.ExecutorLoadFailure)
self.assertEqual(ex.executor, 'foo')
else:
self.assertTrue(False)
def test_one_topic(self):
transport = messaging.get_transport(self.conf, url='fake:')
endpoint = mock.Mock()
endpoint.info = mock.Mock()
listener_thread = self._setup_listener(transport, [endpoint], 1)
notifier = self._setup_notifier(transport)
notifier.info({}, 'an_event.start', 'test message')
self._stop_listener(listener_thread)
endpoint.info.assert_called_once_with(
{}, 'testpublisher', 'an_event.start', 'test message')
def test_two_topics(self):
transport = messaging.get_transport(self.conf, url='fake:')
endpoint = mock.Mock()
endpoint.info = mock.Mock()
topics = ["topic1", "topic2"]
listener_thread = self._setup_listener(transport, [endpoint], 2,
topics=topics)
notifier = self._setup_notifier(transport, topic='topic1')
notifier.info({}, 'an_event.start1', 'test')
notifier = self._setup_notifier(transport, topic='topic2')
notifier.info({}, 'an_event.start2', 'test')
self._stop_listener(listener_thread)
expected = [mock.call({}, 'testpublisher', 'an_event.start1', 'test'),
mock.call({}, 'testpublisher', 'an_event.start2', 'test')]
self.assertEqual(sorted(endpoint.info.call_args_list), expected)
def test_two_endpoints(self):
transport = messaging.get_transport(self.conf, url='fake:')
endpoint1 = mock.Mock()
endpoint1.info = mock.Mock()
endpoint2 = mock.Mock()
endpoint2.info = mock.Mock()
listener_thread = self._setup_listener(transport,
[endpoint1, endpoint2], 1)
notifier = self._setup_notifier(transport)
notifier.info({}, 'an_event.start', 'test')
self._stop_listener(listener_thread)
endpoint1.info.assert_called_once_with(
{}, 'testpublisher', 'an_event.start', 'test')
endpoint2.info.assert_called_once_with(
{}, 'testpublisher', 'an_event.start', 'test')

View File

@ -108,6 +108,11 @@ class TestRabbitTransportURL(test_utils.BaseTestCase):
self._driver.listen(self._target)
self.assertEqual(self._server_params[0], self.expected)
def test_transport_url_listen_for_notification(self):
self._driver.listen_for_notifications(
[(messaging.Target(topic='topic'), 'info')])
self.assertEqual(self._server_params[0], self.expected)
def test_transport_url_send(self):
self._driver.send(self._target, {}, {})
self.assertEqual(self._server_params[0], self.expected)