oslo.messaging/oslo_messaging/notify/dispatcher.py

136 lines
5.0 KiB
Python

# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
# Copyright 2013 eNovance
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import itertools
import logging
import sys
from oslo_messaging import localcontext
from oslo_messaging import serializer as msg_serializer
LOG = logging.getLogger(__name__)
PRIORITIES = ['audit', 'debug', 'info', 'warn', 'error', 'critical', 'sample']
class NotificationResult(object):
HANDLED = 'handled'
REQUEUE = 'requeue'
class NotificationDispatcher(object):
"""A message dispatcher which understands Notification messages.
A MessageHandlingServer is constructed by passing a callable dispatcher
which is invoked with context and message dictionaries each time a message
is received.
NotifcationDispatcher is one such dispatcher which pass a raw notification
message to the endpoints
"""
def __init__(self, targets, endpoints, serializer, allow_requeue,
pool=None):
self.targets = targets
self.endpoints = endpoints
self.serializer = serializer or msg_serializer.NoOpSerializer()
self.allow_requeue = allow_requeue
self.pool = pool
self._callbacks_by_priority = {}
for endpoint, prio in itertools.product(endpoints, PRIORITIES):
if hasattr(endpoint, prio):
method = getattr(endpoint, prio)
self._callbacks_by_priority.setdefault(prio, []).append(method)
priorities = self._callbacks_by_priority.keys()
self._targets_priorities = set(itertools.product(self.targets,
priorities))
def _listen(self, transport):
return transport._listen_for_notifications(self._targets_priorities,
pool=self.pool)
@contextlib.contextmanager
def __call__(self, incoming, executor_callback=None):
result_wrapper = []
yield lambda: result_wrapper.append(
self._dispatch_and_handle_error(incoming, executor_callback))
if result_wrapper[0] == NotificationResult.HANDLED:
incoming.acknowledge()
else:
incoming.requeue()
def _dispatch_and_handle_error(self, incoming, executor_callback):
"""Dispatch a notification message to the appropriate endpoint method.
:param incoming: the incoming notification message
:type ctxt: IncomingMessage
"""
try:
return self._dispatch(incoming.ctxt, incoming.message,
executor_callback)
except Exception:
# sys.exc_info() is deleted by LOG.exception().
exc_info = sys.exc_info()
LOG.error('Exception during message handling',
exc_info=exc_info)
return NotificationResult.HANDLED
def _dispatch(self, ctxt, message, executor_callback=None):
"""Dispatch an RPC message to the appropriate endpoint method.
:param ctxt: the request context
:type ctxt: dict
:param message: the message payload
:type message: dict
"""
ctxt = self.serializer.deserialize_context(ctxt)
publisher_id = message.get('publisher_id')
event_type = message.get('event_type')
metadata = {
'message_id': message.get('message_id'),
'timestamp': message.get('timestamp')
}
priority = message.get('priority', '').lower()
if priority not in PRIORITIES:
LOG.warning('Unknown priority "%s"', priority)
return
payload = self.serializer.deserialize_entity(ctxt,
message.get('payload'))
for callback in self._callbacks_by_priority.get(priority, []):
localcontext.set_local_context(ctxt)
try:
if executor_callback:
ret = executor_callback(callback, ctxt, publisher_id,
event_type, payload, metadata)
else:
ret = callback(ctxt, publisher_id, event_type, payload,
metadata)
ret = NotificationResult.HANDLED if ret is None else ret
if self.allow_requeue and ret == NotificationResult.REQUEUE:
return ret
finally:
localcontext.clear_local_context()
return NotificationResult.HANDLED