Move files out of the namespace package

Move the public API out of oslo.messaging to oslo_messaging. Retain
the ability to import from the old namespace package for backwards
compatibility for this release cycle.

bp/drop-namespace-packages

Co-authored-by: Mehdi Abaakouk <mehdi.abaakouk@enovance.com>
Change-Id: Ia562010c152a214f1c0fed767c82022c7c2c52e7
This commit is contained in:
Doug Hellmann 2015-01-02 14:24:57 -05:00
parent 8102f25dd2
commit e55a83e832
146 changed files with 10537 additions and 2494 deletions

View File

@ -2,7 +2,7 @@
AMQP 1.0 Protocol Support
-------------------------
.. currentmodule:: oslo.messaging
.. currentmodule:: oslo_messaging
============
Introduction

View File

@ -2,7 +2,7 @@
Testing Configurations
----------------------
.. currentmodule:: oslo.messaging.conffixture
.. currentmodule:: oslo_messaging.conffixture
.. autoclass:: ConfFixture
:members:

View File

@ -2,7 +2,7 @@
Exceptions
----------
.. currentmodule:: oslo.messaging
.. currentmodule:: oslo_messaging
.. autoexception:: ClientSendError
.. autoexception:: DriverLoadFailure

View File

@ -2,9 +2,9 @@
Executors
---------
.. automodule:: oslo.messaging._executors
.. automodule:: oslo_messaging._executors
.. currentmodule:: oslo.messaging
.. currentmodule:: oslo_messaging
==============
Executor types

View File

@ -2,9 +2,9 @@
Notification Listener
---------------------
.. automodule:: oslo.messaging.notify.listener
.. automodule:: oslo_messaging.notify.listener
.. currentmodule:: oslo.messaging
.. currentmodule:: oslo_messaging
.. autofunction:: get_notification_listener

View File

@ -2,7 +2,7 @@
Notifier
--------
.. currentmodule:: oslo.messaging
.. currentmodule:: oslo_messaging
.. autoclass:: Notifier
:members:

View File

@ -2,6 +2,6 @@
Configuration Options
----------------------
.. currentmodule:: oslo.messaging.opts
.. currentmodule:: oslo_messaging.opts
.. autofunction:: list_opts

View File

@ -2,7 +2,7 @@
RPC Client
----------
.. currentmodule:: oslo.messaging
.. currentmodule:: oslo_messaging
.. autoclass:: RPCClient
:members:

View File

@ -2,7 +2,7 @@
Serializer
----------
.. currentmodule:: oslo.messaging
.. currentmodule:: oslo_messaging
.. autoclass:: Serializer
:members:

View File

@ -2,9 +2,9 @@
Server
------
.. automodule:: oslo.messaging.rpc.server
.. automodule:: oslo_messaging.rpc.server
.. currentmodule:: oslo.messaging
.. currentmodule:: oslo_messaging
.. autofunction:: get_rpc_server

View File

@ -2,7 +2,7 @@
Target
------
.. currentmodule:: oslo.messaging
.. currentmodule:: oslo_messaging
.. autoclass:: Target

View File

@ -2,7 +2,7 @@
Transport
---------
.. currentmodule:: oslo.messaging
.. currentmodule:: oslo_messaging
.. autofunction:: get_transport

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import warnings
from .exceptions import *
from .localcontext import *
from .notify import *
@ -21,3 +23,16 @@ from .serializer import *
from .server import *
from .target import *
from .transport import *
def deprecated():
new_name = __name__.replace('.', '_')
warnings.warn(
('The oslo namespace package is deprecated. Please use %s instead.' %
new_name),
DeprecationWarning,
stacklevel=3,
)
deprecated()

View File

@ -1,6 +1,3 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@ -13,66 +10,4 @@
# License for the specific language governing permissions and limitations
# under the License.
__all__ = ['ConfFixture']
import sys
import fixtures
def _import_opts(conf, module, opts):
__import__(module)
conf.register_opts(getattr(sys.modules[module], opts))
class ConfFixture(fixtures.Fixture):
"""Tweak configuration options for unit testing.
oslo.messaging registers a number of configuration options, but rather than
directly referencing those options, users of the API should use this
interface for querying and overriding certain configuration options.
An example usage::
self.messaging_conf = self.useFixture(messaging.ConfFixture(cfg.CONF))
self.messaging_conf.transport_driver = 'fake'
:param conf: a ConfigOpts instance
:type conf: oslo.config.cfg.ConfigOpts
"""
def __init__(self, conf):
self.conf = conf
_import_opts(self.conf,
'oslo.messaging._drivers.impl_rabbit', 'rabbit_opts')
_import_opts(self.conf,
'oslo.messaging._drivers.impl_qpid', 'qpid_opts')
_import_opts(self.conf,
'oslo.messaging._drivers.amqp', 'amqp_opts')
_import_opts(self.conf, 'oslo.messaging.rpc.client', '_client_opts')
_import_opts(self.conf, 'oslo.messaging.transport', '_transport_opts')
_import_opts(self.conf,
'oslo.messaging.notify.notifier', '_notifier_opts')
def setUp(self):
super(ConfFixture, self).setUp()
self.addCleanup(self.conf.reset)
@property
def transport_driver(self):
"""The transport driver - for example 'rabbit', 'qpid' or 'fake'."""
return self.conf.rpc_backend
@transport_driver.setter
def transport_driver(self, value):
self.conf.set_override('rpc_backend', value)
@property
def response_timeout(self):
"""Default number of seconds to wait for a response from a call."""
return self.conf.rpc_response_timeout
@response_timeout.setter
def response_timeout(self, value):
self.conf.set_override('rpc_response_timeout', value)
from oslo_messaging.conffixture import * # noqa

View File

@ -1,6 +1,3 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@ -13,28 +10,4 @@
# License for the specific language governing permissions and limitations
# under the License.
__all__ = ['MessagingException', 'MessagingTimeout', 'MessageDeliveryFailure',
'InvalidTarget']
import six
class MessagingException(Exception):
"""Base class for exceptions."""
class MessagingTimeout(MessagingException):
"""Raised if message sending times out."""
class MessageDeliveryFailure(MessagingException):
"""Raised if message sending failed after the asked retry."""
class InvalidTarget(MessagingException, ValueError):
"""Raised if a target does not meet certain pre-conditions."""
def __init__(self, msg, target):
msg = msg + ":" + six.text_type(target)
super(InvalidTarget, self).__init__(msg)
self.target = target
from oslo_messaging.exceptions import * # noqa

View File

@ -1,6 +1,3 @@
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@ -13,43 +10,4 @@
# License for the specific language governing permissions and limitations
# under the License.
__all__ = [
'get_local_context',
'set_local_context',
'clear_local_context',
]
import threading
import uuid
_KEY = '_%s_%s' % (__name__.replace('.', '_'), uuid.uuid4().hex)
_STORE = threading.local()
def get_local_context(ctxt):
"""Retrieve the RPC endpoint request context for the current thread.
This method allows any code running in the context of a dispatched RPC
endpoint method to retrieve the context for this request.
This is commonly used for logging so that, for example, you can include the
request ID, user and tenant in every message logged from a RPC endpoint
method.
:returns: the context for the request dispatched in the current thread
"""
return getattr(_STORE, _KEY, None)
def set_local_context(ctxt):
"""Set the request context for the current thread.
:param ctxt: a deserialized request context
:type ctxt: dict
"""
setattr(_STORE, _KEY, ctxt)
def clear_local_context():
"""Clear the request context for the current thread."""
delattr(_STORE, _KEY)
from oslo_messaging.localcontext import * # noqa

View File

@ -1,7 +1,3 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
# Copyright 2013 eNovance
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@ -14,122 +10,4 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import itertools
import logging
import sys
from oslo.messaging import localcontext
from oslo.messaging import serializer as msg_serializer
LOG = logging.getLogger(__name__)
PRIORITIES = ['audit', 'debug', 'info', 'warn', 'error', 'critical', 'sample']
class NotificationResult(object):
HANDLED = 'handled'
REQUEUE = 'requeue'
class NotificationDispatcher(object):
"""A message dispatcher which understands Notification messages.
A MessageHandlingServer is constructed by passing a callable dispatcher
which is invoked with context and message dictionaries each time a message
is received.
NotifcationDispatcher is one such dispatcher which pass a raw notification
message to the endpoints
"""
def __init__(self, targets, endpoints, serializer, allow_requeue,
pool=None):
self.targets = targets
self.endpoints = endpoints
self.serializer = serializer or msg_serializer.NoOpSerializer()
self.allow_requeue = allow_requeue
self.pool = pool
self._callbacks_by_priority = {}
for endpoint, prio in itertools.product(endpoints, PRIORITIES):
if hasattr(endpoint, prio):
method = getattr(endpoint, prio)
self._callbacks_by_priority.setdefault(prio, []).append(method)
priorities = self._callbacks_by_priority.keys()
self._targets_priorities = set(itertools.product(self.targets,
priorities))
def _listen(self, transport):
return transport._listen_for_notifications(self._targets_priorities,
pool=self.pool)
@contextlib.contextmanager
def __call__(self, incoming, executor_callback=None):
result_wrapper = []
yield lambda: result_wrapper.append(
self._dispatch_and_handle_error(incoming, executor_callback))
if result_wrapper[0] == NotificationResult.HANDLED:
incoming.acknowledge()
else:
incoming.requeue()
def _dispatch_and_handle_error(self, incoming, executor_callback):
"""Dispatch a notification message to the appropriate endpoint method.
:param incoming: the incoming notification message
:type ctxt: IncomingMessage
"""
try:
return self._dispatch(incoming.ctxt, incoming.message,
executor_callback)
except Exception:
# sys.exc_info() is deleted by LOG.exception().
exc_info = sys.exc_info()
LOG.error('Exception during message handling',
exc_info=exc_info)
return NotificationResult.HANDLED
def _dispatch(self, ctxt, message, executor_callback=None):
"""Dispatch an RPC message to the appropriate endpoint method.
:param ctxt: the request context
:type ctxt: dict
:param message: the message payload
:type message: dict
"""
ctxt = self.serializer.deserialize_context(ctxt)
publisher_id = message.get('publisher_id')
event_type = message.get('event_type')
metadata = {
'message_id': message.get('message_id'),
'timestamp': message.get('timestamp')
}
priority = message.get('priority', '').lower()
if priority not in PRIORITIES:
LOG.warning('Unknown priority "%s"', priority)
return
payload = self.serializer.deserialize_entity(ctxt,
message.get('payload'))
for callback in self._callbacks_by_priority.get(priority, []):
localcontext.set_local_context(ctxt)
try:
if executor_callback:
ret = executor_callback(callback, ctxt, publisher_id,
event_type, payload, metadata)
else:
ret = callback(ctxt, publisher_id, event_type, payload,
metadata)
ret = NotificationResult.HANDLED if ret is None else ret
if self.allow_requeue and ret == NotificationResult.REQUEUE:
return ret
finally:
localcontext.clear_local_context()
return NotificationResult.HANDLED
from oslo_messaging.notify.dispatcher import * # noqa

View File

@ -1,7 +1,3 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
# Copyright 2013 eNovance
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@ -13,125 +9,5 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
A notification listener exposes a number of endpoints, each of which
contain a set of methods. Each method corresponds to a notification priority.
To create a notification listener, you supply a transport, list of targets and
a list of endpoints.
A transport can be obtained simply by calling the get_transport() method::
transport = messaging.get_transport(conf)
which will load the appropriate transport driver according to the user's
messaging configuration configuration. See get_transport() for more details.
The target supplied when creating a notification listener expresses the topic
and - optionally - the exchange to listen on. See Target for more details
on these attributes.
Notification listener have start(), stop() and wait() messages to begin
handling requests, stop handling requests and wait for all in-process
requests to complete.
Each notification listener is associated with an executor which integrates the
listener with a specific I/O handling framework. Currently, there are blocking
and eventlet executors available.
A simple example of a notification listener with multiple endpoints might be::
from oslo.config import cfg
from oslo import messaging
class NotificationEndpoint(object):
def warn(self, ctxt, publisher_id, event_type, payload, metadata):
do_something(payload)
class ErrorEndpoint(object):
def error(self, ctxt, publisher_id, event_type, payload, metadata):
do_something(payload)
transport = messaging.get_transport(cfg.CONF)
targets = [
messaging.Target(topic='notifications')
messaging.Target(topic='notifications_bis')
]
endpoints = [
NotificationEndpoint(),
ErrorEndpoint(),
]
pool = "listener-workers"
server = messaging.get_notification_listener(transport, targets, endpoints,
pool)
server.start()
server.wait()
A notifier sends a notification on a topic with a priority, the notification
listener will receive this notification if the topic of this one have been set
in one of the targets and if an endpoint implements the method named like the
priority
Parameters to endpoint methods are the request context supplied by the client,
the publisher_id of the notification message, the event_type, the payload and
metadata. The metadata parameter is a mapping containing a unique message_id
and a timestamp.
By supplying a serializer object, a listener can deserialize a request context
and arguments from - and serialize return values to - primitive types.
By supplying a pool name you can create multiple groups of listeners consuming
notifications and that each group only receives one copy of each
notification.
An endpoint method can explicitly return messaging.NotificationResult.HANDLED
to acknowledge a message or messaging.NotificationResult.REQUEUE to requeue the
message.
The message is acknowledged only if all endpoints either return
messaging.NotificationResult.HANDLED or None.
Note that not all transport drivers implement support for requeueing. In order
to use this feature, applications should assert that the feature is available
by passing allow_requeue=True to get_notification_listener(). If the driver
does not support requeueing, it will raise NotImplementedError at this point.
"""
from oslo.messaging.notify import dispatcher as notify_dispatcher
from oslo.messaging import server as msg_server
def get_notification_listener(transport, targets, endpoints,
executor='blocking', serializer=None,
allow_requeue=False, pool=None):
"""Construct a notification listener
The executor parameter controls how incoming messages will be received and
dispatched. By default, the most simple executor is used - the blocking
executor.
If the eventlet executor is used, the threading and time library need to be
monkeypatched.
:param transport: the messaging transport
:type transport: Transport
:param targets: the exchanges and topics to listen on
:type targets: list of Target
:param endpoints: a list of endpoint objects
:type endpoints: list
:param executor: name of a message executor - for example
'eventlet', 'blocking'
:type executor: str
:param serializer: an optional entity serializer
:type serializer: Serializer
:param allow_requeue: whether NotificationResult.REQUEUE support is needed
:type allow_requeue: bool
:param pool: the pool name
:type pool: str
:raises: NotImplementedError
"""
transport._require_driver_features(requeue=allow_requeue)
dispatcher = notify_dispatcher.NotificationDispatcher(targets, endpoints,
serializer,
allow_requeue, pool)
return msg_server.MessageHandlingServer(transport, dispatcher, executor)
from oslo_messaging.notify.listener import * # noqa

View File

@ -1,41 +1,13 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo.config import cfg
class LoggingErrorNotificationHandler(logging.Handler):
def __init__(self, *args, **kwargs):
# NOTE(dhellmann): Avoid a cyclical import by doing this one
# at runtime.
from oslo import messaging
logging.Handler.__init__(self, *args, **kwargs)
self._transport = messaging.get_transport(cfg.CONF)
self._notifier = messaging.Notifier(self._transport,
publisher_id='error.publisher')
def emit(self, record):
# NOTE(bnemec): Notifier registers this opt with the transport.
if ('log' in self._transport.conf.notification_driver):
# NOTE(lbragstad): If we detect that log is one of the
# notification drivers, then return. This protects from infinite
# recursion where something bad happens, it gets logged, the log
# handler sends a notification, and the log_notifier sees the
# notification and logs it.
return
self._notifier.error(None, 'error_notification',
dict(error=record.msg))
PublishErrorsHandler = LoggingErrorNotificationHandler
from oslo_messaging.notify.log_handler import * # noqa

View File

@ -1,5 +1,3 @@
# Copyright 2013 eNovance
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@ -11,71 +9,5 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Driver for the Python logging package that sends log records as a notification.
"""
import logging
from oslo.config import cfg
from oslo.messaging.notify import notifier
from oslo.messaging import transport
class LoggingNotificationHandler(logging.Handler):
"""Handler for logging to the messaging notification system.
Each time the application logs a message using the :py:mod:`logging`
module, it will be sent as a notification. The severity used for the
notification will be the same as the one used for the log record.
This can be used into a Python logging configuration this way::
[handler_notifier]
class=oslo.messaging.LoggingNotificationHandler
level=ERROR
args=('qpid:///')
"""
CONF = cfg.CONF
"""Default configuration object used, subclass this class if you want to
use another one.
"""
def __init__(self, url, publisher_id=None, driver=None,
topic=None, serializer=None):
self.notifier = notifier.Notifier(
transport.get_transport(self.CONF, url),
publisher_id, driver,
topic,
serializer() if serializer else None)
logging.Handler.__init__(self)
def emit(self, record):
"""Emit the log record to the messaging notification system.
:param record: A log record to emit.
"""
method = getattr(self.notifier, record.levelname.lower(), None)
if not method:
return
method(None,
'logrecord',
{
'name': record.name,
'levelno': record.levelno,
'levelname': record.levelname,
'exc_info': record.exc_info,
'pathname': record.pathname,
'lineno': record.lineno,
'msg': record.getMessage(),
'funcName': record.funcName,
'thread': record.thread,
'processName': record.processName,
'process': record.process,
'extra': getattr(record, 'extra', None),
})
from oslo_messaging.notify.logger import * # noqa

View File

@ -1,5 +1,3 @@
# Copyright (c) 2013-2014 eNovance
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@ -12,117 +10,4 @@
# License for the specific language governing permissions and limitations
# under the License.
"""
Send notifications on request
"""
import logging
import os.path
import sys
import traceback as tb
import six
import webob.dec
from oslo.config import cfg
from oslo import messaging
from oslo.messaging._i18n import _LE
from oslo.messaging import notify
from oslo.messaging.openstack.common import context
from oslo.middleware import base
LOG = logging.getLogger(__name__)
def log_and_ignore_error(fn):
def wrapped(*args, **kwargs):
try:
return fn(*args, **kwargs)
except Exception as e:
LOG.exception(_LE('An exception occurred processing '
'the API call: %s ') % e)
return wrapped
class RequestNotifier(base.Middleware):
"""Send notification on request."""
@classmethod
def factory(cls, global_conf, **local_conf):
"""Factory method for paste.deploy."""
conf = global_conf.copy()
conf.update(local_conf)
def _factory(app):
return cls(app, **conf)
return _factory
def __init__(self, app, **conf):
self.notifier = notify.Notifier(
messaging.get_transport(cfg.CONF, conf.get('url')),
publisher_id=conf.get('publisher_id',
os.path.basename(sys.argv[0])))
self.service_name = conf.get('service_name')
self.ignore_req_list = [x.upper().strip() for x in
conf.get('ignore_req_list', '').split(',')]
super(RequestNotifier, self).__init__(app)
@staticmethod
def environ_to_dict(environ):
"""Following PEP 333, server variables are lower case, so don't
include them.
"""
return dict((k, v) for k, v in six.iteritems(environ)
if k.isupper() and k != 'HTTP_X_AUTH_TOKEN')
@log_and_ignore_error
def process_request(self, request):
request.environ['HTTP_X_SERVICE_NAME'] = \
self.service_name or request.host
payload = {
'request': self.environ_to_dict(request.environ),
}
self.notifier.info(context.get_admin_context(),
'http.request',
payload)
@log_and_ignore_error
def process_response(self, request, response,
exception=None, traceback=None):
payload = {
'request': self.environ_to_dict(request.environ),
}
if response:
payload['response'] = {
'status': response.status,
'headers': response.headers,
}
if exception:
payload['exception'] = {
'value': repr(exception),
'traceback': tb.format_tb(traceback)
}
self.notifier.info(context.get_admin_context(),
'http.response',
payload)
@webob.dec.wsgify
def __call__(self, req):
if req.method in self.ignore_req_list:
return req.get_response(self.application)
else:
self.process_request(req)
try:
response = req.get_response(self.application)
except Exception:
exc_type, value, traceback = sys.exc_info()
self.process_response(req, None, value, traceback)
raise
else:
self.process_response(req, response)
return response
from oslo_messaging.notify.middleware import * # noqa

View File

@ -1,8 +1,3 @@
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@ -15,301 +10,4 @@
# License for the specific language governing permissions and limitations
# under the License.
import abc
import logging
import uuid
import six
from stevedore import named
from oslo.config import cfg
from oslo.messaging import serializer as msg_serializer
from oslo.utils import timeutils
_notifier_opts = [
cfg.MultiStrOpt('notification_driver',
default=[],
help='Driver or drivers to handle sending notifications.'),
cfg.ListOpt('notification_topics',
default=['notifications', ],
deprecated_name='topics',
deprecated_group='rpc_notifier2',
help='AMQP topic used for OpenStack notifications.'),
]
_LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class _Driver(object):
def __init__(self, conf, topics, transport):
self.conf = conf
self.topics = topics
self.transport = transport
@abc.abstractmethod
def notify(self, ctxt, msg, priority, retry):
pass
class Notifier(object):
"""Send notification messages.
The Notifier class is used for sending notification messages over a
messaging transport or other means.
Notification messages follow the following format::
{'message_id': six.text_type(uuid.uuid4()),
'publisher_id': 'compute.host1',
'timestamp': timeutils.utcnow(),
'priority': 'WARN',
'event_type': 'compute.create_instance',
'payload': {'instance_id': 12, ... }}
A Notifier object can be instantiated with a transport object and a
publisher ID:
notifier = messaging.Notifier(get_transport(CONF), 'compute')
and notifications are sent via drivers chosen with the notification_driver
config option and on the topics chosen with the notification_topics config
option.
Alternatively, a Notifier object can be instantiated with a specific
driver or topic::
notifier = notifier.Notifier(RPC_TRANSPORT,
'compute.host',
driver='messaging',
topic='notifications')
Notifier objects are relatively expensive to instantiate (mostly the cost
of loading notification drivers), so it is possible to specialize a given
Notifier object with a different publisher id using the prepare() method::
notifier = notifier.prepare(publisher_id='compute')
notifier.info(ctxt, event_type, payload)
"""
def __init__(self, transport, publisher_id=None,
driver=None, topic=None,
serializer=None, retry=None):
"""Construct a Notifier object.
:param transport: the transport to use for sending messages
:type transport: oslo.messaging.Transport
:param publisher_id: field in notifications sent, for example
'compute.host1'
:type publisher_id: str
:param driver: a driver to lookup from oslo.messaging.notify.drivers
:type driver: str
:param topic: the topic which to send messages on
:type topic: str
:param serializer: an optional entity serializer
:type serializer: Serializer
:param retry: an connection retries configuration
None or -1 means to retry forever
0 means no retry
N means N retries
:type retry: int
"""
transport.conf.register_opts(_notifier_opts)
self.transport = transport
self.publisher_id = publisher_id
self.retry = retry
self._driver_names = ([driver] if driver is not None
else transport.conf.notification_driver)
self._topics = ([topic] if topic is not None
else transport.conf.notification_topics)
self._serializer = serializer or msg_serializer.NoOpSerializer()
self._driver_mgr = named.NamedExtensionManager(
'oslo.messaging.notify.drivers',
names=self._driver_names,
invoke_on_load=True,
invoke_args=[transport.conf],
invoke_kwds={
'topics': self._topics,
'transport': self.transport,
}
)
_marker = object()
def prepare(self, publisher_id=_marker, retry=_marker):
"""Return a specialized Notifier instance.
Returns a new Notifier instance with the supplied publisher_id. Allows
sending notifications from multiple publisher_ids without the overhead
of notification driver loading.
:param publisher_id: field in notifications sent, for example
'compute.host1'
:type publisher_id: str
:param retry: an connection retries configuration
None or -1 means to retry forever
0 means no retry
N means N retries
:type retry: int
"""
return _SubNotifier._prepare(self, publisher_id, retry=retry)
def _notify(self, ctxt, event_type, payload, priority, publisher_id=None,
retry=None):
payload = self._serializer.serialize_entity(ctxt, payload)
ctxt = self._serializer.serialize_context(ctxt)
msg = dict(message_id=six.text_type(uuid.uuid4()),
publisher_id=publisher_id or self.publisher_id,
event_type=event_type,
priority=priority,
payload=payload,
timestamp=six.text_type(timeutils.utcnow()))
def do_notify(ext):
try:
ext.obj.notify(ctxt, msg, priority, retry or self.retry)
except Exception as e:
_LOG.exception("Problem '%(e)s' attempting to send to "
"notification system. Payload=%(payload)s",
dict(e=e, payload=payload))
if self._driver_mgr.extensions:
self._driver_mgr.map(do_notify)
def audit(self, ctxt, event_type, payload):
"""Send a notification at audit level.
:param ctxt: a request context dict
:type ctxt: dict
:param event_type: describes the event, for example
'compute.create_instance'
:type event_type: str
:param payload: the notification payload
:type payload: dict
:raises: MessageDeliveryFailure
"""
self._notify(ctxt, event_type, payload, 'AUDIT')
def debug(self, ctxt, event_type, payload):
"""Send a notification at debug level.
:param ctxt: a request context dict
:type ctxt: dict
:param event_type: describes the event, for example
'compute.create_instance'
:type event_type: str
:param payload: the notification payload
:type payload: dict
:raises: MessageDeliveryFailure
"""
self._notify(ctxt, event_type, payload, 'DEBUG')
def info(self, ctxt, event_type, payload):
"""Send a notification at info level.
:param ctxt: a request context dict
:type ctxt: dict
:param event_type: describes the event, for example
'compute.create_instance'
:type event_type: str
:param payload: the notification payload
:type payload: dict
:raises: MessageDeliveryFailure
"""
self._notify(ctxt, event_type, payload, 'INFO')
def warn(self, ctxt, event_type, payload):
"""Send a notification at warning level.
:param ctxt: a request context dict
:type ctxt: dict
:param event_type: describes the event, for example
'compute.create_instance'
:type event_type: str
:param payload: the notification payload
:type payload: dict
:raises: MessageDeliveryFailure
"""
self._notify(ctxt, event_type, payload, 'WARN')
warning = warn
def error(self, ctxt, event_type, payload):
"""Send a notification at error level.
:param ctxt: a request context dict
:type ctxt: dict
:param event_type: describes the event, for example
'compute.create_instance'
:type event_type: str
:param payload: the notification payload
:type payload: dict
:raises: MessageDeliveryFailure
"""
self._notify(ctxt, event_type, payload, 'ERROR')
def critical(self, ctxt, event_type, payload):
"""Send a notification at critical level.
:param ctxt: a request context dict
:type ctxt: dict
:param event_type: describes the event, for example
'compute.create_instance'
:type event_type: str
:param payload: the notification payload
:type payload: dict
:raises: MessageDeliveryFailure
"""
self._notify(ctxt, event_type, payload, 'CRITICAL')
def sample(self, ctxt, event_type, payload):
"""Send a notification at sample level.
Sample notifications are for high-frequency events
that typically contain small payloads. eg: "CPU = 70%"
Not all drivers support the sample level
(log, for example) so these could be dropped.
:param ctxt: a request context dict
:type ctxt: dict
:param event_type: describes the event, for example
'compute.create_instance'
:type event_type: str
:param payload: the notification payload
:type payload: dict
:raises: MessageDeliveryFailure
"""
self._notify(ctxt, event_type, payload, 'SAMPLE')
class _SubNotifier(Notifier):
_marker = Notifier._marker
def __init__(self, base, publisher_id, retry):
self._base = base
self.transport = base.transport
self.publisher_id = publisher_id
self.retry = retry
self._serializer = self._base._serializer
self._driver_mgr = self._base._driver_mgr
def _notify(self, ctxt, event_type, payload, priority):
super(_SubNotifier, self)._notify(ctxt, event_type, payload, priority)
@classmethod
def _prepare(cls, base, publisher_id=_marker, retry=_marker):
if publisher_id is cls._marker:
publisher_id = base.publisher_id
if retry is cls._marker:
retry = base.retry
return cls(base, publisher_id, retry=retry)
from oslo_messaging.notify.notifier import * # noqa

View File

@ -1,9 +1,3 @@
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
# Copyright 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@ -16,382 +10,4 @@
# License for the specific language governing permissions and limitations
# under the License.
__all__ = [
'ClientSendError',
'RPCClient',
'RPCVersionCapError',
'RemoteError',
]
import six
from oslo.config import cfg
from oslo.messaging._drivers import base as driver_base
from oslo.messaging import _utils as utils
from oslo.messaging import exceptions
from oslo.messaging import serializer as msg_serializer
_client_opts = [
cfg.IntOpt('rpc_response_timeout',
default=60,
help='Seconds to wait for a response from a call.'),
]
class RemoteError(exceptions.MessagingException):
"""Signifies that a remote endpoint method has raised an exception.
Contains a string representation of the type of the original exception,
the value of the original exception, and the traceback. These are
sent to the parent as a joined string so printing the exception
contains all of the relevant info.
"""
def __init__(self, exc_type=None, value=None, traceback=None):
self.exc_type = exc_type
self.value = value
self.traceback = traceback
msg = ("Remote error: %(exc_type)s %(value)s\n%(traceback)s." %
dict(exc_type=self.exc_type, value=self.value,
traceback=self.traceback))
super(RemoteError, self).__init__(msg)
class RPCVersionCapError(exceptions.MessagingException):
def __init__(self, version, version_cap):
self.version = version
self.version_cap = version_cap
msg = ("Requested message version, %(version)s is too high. It needs "
"to be lower than the specified version cap %(version_cap)s." %
dict(version=self.version, version_cap=self.version_cap))
super(RPCVersionCapError, self).__init__(msg)
class ClientSendError(exceptions.MessagingException):
"""Raised if we failed to send a message to a target."""
def __init__(self, target, ex):
msg = 'Failed to send to target "%s": %s' % (target, ex)
super(ClientSendError, self).__init__(msg)
self.target = target
self.ex = ex
class _CallContext(object):
_marker = object()
def __init__(self, transport, target, serializer,
timeout=None, version_cap=None, retry=None):
self.conf = transport.conf
self.transport = transport
self.target = target
self.serializer = serializer
self.timeout = timeout
self.retry = retry
self.version_cap = version_cap
super(_CallContext, self).__init__()
def _make_message(self, ctxt, method, args):
msg = dict(method=method)
msg['args'] = dict()
for argname, arg in six.iteritems(args):
msg['args'][argname] = self.serializer.serialize_entity(ctxt, arg)
if self.target.namespace is not None:
msg['namespace'] = self.target.namespace
if self.target.version is not None:
msg['version'] = self.target.version
return msg
def _check_version_cap(self, version):
if not utils.version_is_compatible(self.version_cap, version):
raise RPCVersionCapError(version=version,
version_cap=self.version_cap)
def can_send_version(self, version=_marker):
"""Check to see if a version is compatible with the version cap."""
version = self.target.version if version is self._marker else version
return (not self.version_cap or
utils.version_is_compatible(self.version_cap,
self.target.version))
def cast(self, ctxt, method, **kwargs):
"""Invoke a method and return immediately. See RPCClient.cast()."""
msg = self._make_message(ctxt, method, kwargs)
ctxt = self.serializer.serialize_context(ctxt)
if self.version_cap:
self._check_version_cap(msg.get('version'))
try:
self.transport._send(self.target, ctxt, msg, retry=self.retry)
except driver_base.TransportDriverError as ex:
raise ClientSendError(self.target, ex)
def call(self, ctxt, method, **kwargs):
"""Invoke a method and wait for a reply. See RPCClient.call()."""
if self.target.fanout:
raise exceptions.InvalidTarget('A call cannot be used with fanout',
self.target)
msg = self._make_message(ctxt, method, kwargs)
msg_ctxt = self.serializer.serialize_context(ctxt)
timeout = self.timeout
if self.timeout is None:
timeout = self.conf.rpc_response_timeout
if self.version_cap:
self._check_version_cap(msg.get('version'))
try:
result = self.transport._send(self.target, msg_ctxt, msg,
wait_for_reply=True, timeout=timeout,
retry=self.retry)
except driver_base.TransportDriverError as ex:
raise ClientSendError(self.target, ex)
return self.serializer.deserialize_entity(ctxt, result)
@classmethod
def _prepare(cls, base,
exchange=_marker, topic=_marker, namespace=_marker,
version=_marker, server=_marker, fanout=_marker,
timeout=_marker, version_cap=_marker, retry=_marker):
"""Prepare a method invocation context. See RPCClient.prepare()."""
kwargs = dict(
exchange=exchange,
topic=topic,
namespace=namespace,
version=version,
server=server,
fanout=fanout)
kwargs = dict([(k, v) for k, v in kwargs.items()
if v is not cls._marker])
target = base.target(**kwargs)
if timeout is cls._marker:
timeout = base.timeout
if retry is cls._marker:
retry = base.retry
if version_cap is cls._marker:
version_cap = base.version_cap
return _CallContext(base.transport, target,
base.serializer,
timeout, version_cap, retry)
def prepare(self, exchange=_marker, topic=_marker, namespace=_marker,
version=_marker, server=_marker, fanout=_marker,
timeout=_marker, version_cap=_marker, retry=_marker):
"""Prepare a method invocation context. See RPCClient.prepare()."""
return self._prepare(self,
exchange, topic, namespace,
version, server, fanout,