Sync rpc and notifier from oslo-incubator.

This patch syncs the latest changes to rpc and notifier from
oslo-incubator.  The most significant changes are for adding support for
a message envelope for all messages, including notifications, sent via
rpc.

Related oslo-incubator reviews:

    https://review.openstack.org/#/c/17554/
    https://review.openstack.org/#/c/18057/

Note that a new notifier module was added: rpc_notifier2, which when
used, sends notifications out with a message envelope.

DocImpact.

Implements bp version-rpc-messages.

Change-Id: I495c224d0c502086ce3db6c078d48829b8d913a6
This commit is contained in:
Russell Bryant
2013-01-07 11:08:18 -05:00
parent 4290f943ee
commit c2b51dc574
10 changed files with 270 additions and 87 deletions

View File

@@ -137,10 +137,11 @@ def notify(context, publisher_id, event_type, priority, payload):
for driver in _get_drivers(): for driver in _get_drivers():
try: try:
driver.notify(context, msg) driver.notify(context, msg)
except Exception, e: except Exception as e:
LOG.exception(_("Problem '%(e)s' attempting to " LOG.exception(_("Problem '%(e)s' attempting to "
"send to notification system. " "send to notification system. "
"Payload=%(payload)s") % locals()) "Payload=%(payload)s")
% dict(e=e, payload=payload))
_drivers = None _drivers = None
@@ -166,7 +167,7 @@ def add_driver(notification_driver):
try: try:
driver = importutils.import_module(notification_driver) driver = importutils.import_module(notification_driver)
_drivers[notification_driver] = driver _drivers[notification_driver] = driver
except ImportError as e: except ImportError:
LOG.exception(_("Failed to load notifier %s. " LOG.exception(_("Failed to load notifier %s. "
"These notifications will not be sent.") % "These notifications will not be sent.") %
notification_driver) notification_driver)

View File

@@ -41,6 +41,6 @@ def notify(context, message):
topic = '%s.%s' % (topic, priority) topic = '%s.%s' % (topic, priority)
try: try:
rpc.notify(context, topic, message) rpc.notify(context, topic, message)
except Exception, e: except Exception:
LOG.exception(_("Could not send notification to %(topic)s. " LOG.exception(_("Could not send notification to %(topic)s. "
"Payload=%(message)s"), locals()) "Payload=%(message)s"), locals())

View File

@@ -0,0 +1,51 @@
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
'''messaging based notification driver, with message envelopes'''
from nova.openstack.common import cfg
from nova.openstack.common import context as req_context
from nova.openstack.common.gettextutils import _
from nova.openstack.common import log as logging
from nova.openstack.common import rpc
LOG = logging.getLogger(__name__)
notification_topic_opt = cfg.ListOpt(
'topics', default=['notifications', ],
help='AMQP topic(s) used for openstack notifications')
opt_group = cfg.OptGroup(name='rpc_notifier2',
title='Options for rpc_notifier2')
CONF = cfg.CONF
CONF.register_group(opt_group)
CONF.register_opt(notification_topic_opt, opt_group)
def notify(context, message):
"""Sends a notification via RPC"""
if not context:
context = req_context.get_admin_context()
priority = message.get('priority',
CONF.default_notification_level)
priority = priority.lower()
for topic in CONF.rpc_notifier2.topics:
topic = '%s.%s' % (topic, priority)
try:
rpc.notify(context, topic, message, envelope=True)
except Exception:
LOG.exception(_("Could not send notification to %(topic)s. "
"Payload=%(message)s"), locals())

View File

@@ -178,17 +178,18 @@ def multicall(context, topic, msg, timeout=None):
return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout) return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout)
def notify(context, topic, msg): def notify(context, topic, msg, envelope=False):
"""Send notification event. """Send notification event.
:param context: Information that identifies the user that has made this :param context: Information that identifies the user that has made this
request. request.
:param topic: The topic to send the notification to. :param topic: The topic to send the notification to.
:param msg: This is a dict of content of event. :param msg: This is a dict of content of event.
:param envelope: Set to True to enable message envelope for notifications.
:returns: None :returns: None
""" """
return _get_impl().notify(cfg.CONF, context, topic, msg) return _get_impl().notify(cfg.CONF, context, topic, msg, envelope)
def cleanup(): def cleanup():

View File

@@ -33,7 +33,6 @@ from eventlet import greenpool
from eventlet import pools from eventlet import pools
from eventlet import semaphore from eventlet import semaphore
from nova.openstack.common import cfg
from nova.openstack.common import excutils from nova.openstack.common import excutils
from nova.openstack.common.gettextutils import _ from nova.openstack.common.gettextutils import _
from nova.openstack.common import local from nova.openstack.common import local
@@ -169,7 +168,7 @@ def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None,
'failure': failure} 'failure': failure}
if ending: if ending:
msg['ending'] = True msg['ending'] = True
conn.direct_send(msg_id, msg) conn.direct_send(msg_id, rpc_common.serialize_msg(msg))
class RpcContext(rpc_common.CommonRpcContext): class RpcContext(rpc_common.CommonRpcContext):
@@ -294,6 +293,10 @@ class ProxyCallback(object):
ctxt.reply(None, sys.exc_info(), ctxt.reply(None, sys.exc_info(),
connection_pool=self.connection_pool) connection_pool=self.connection_pool)
def wait(self):
"""Wait for all callback threads to exit."""
self.pool.waitall()
class MulticallWaiter(object): class MulticallWaiter(object):
def __init__(self, conf, connection, timeout): def __init__(self, conf, connection, timeout):
@@ -356,7 +359,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
# that will continue to use the connection. When it's done, # that will continue to use the connection. When it's done,
# connection.close() will get called which will put it back into # connection.close() will get called which will put it back into
# the pool # the pool
LOG.debug(_('Making asynchronous call on %s ...'), topic) LOG.debug(_('Making synchronous call on %s ...'), topic)
msg_id = uuid.uuid4().hex msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id}) msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id)) LOG.debug(_('MSG_ID is %s') % (msg_id))
@@ -365,7 +368,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
conn = ConnectionContext(conf, connection_pool) conn = ConnectionContext(conf, connection_pool)
wait_msg = MulticallWaiter(conf, conn, timeout) wait_msg = MulticallWaiter(conf, conn, timeout)
conn.declare_direct_consumer(msg_id, wait_msg) conn.declare_direct_consumer(msg_id, wait_msg)
conn.topic_send(topic, msg) conn.topic_send(topic, rpc_common.serialize_msg(msg))
return wait_msg return wait_msg
@@ -384,7 +387,7 @@ def cast(conf, context, topic, msg, connection_pool):
LOG.debug(_('Making asynchronous cast on %s...'), topic) LOG.debug(_('Making asynchronous cast on %s...'), topic)
pack_context(msg, context) pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn: with ConnectionContext(conf, connection_pool) as conn:
conn.topic_send(topic, msg) conn.topic_send(topic, rpc_common.serialize_msg(msg))
def fanout_cast(conf, context, topic, msg, connection_pool): def fanout_cast(conf, context, topic, msg, connection_pool):
@@ -392,7 +395,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool):
LOG.debug(_('Making asynchronous fanout cast...')) LOG.debug(_('Making asynchronous fanout cast...'))
pack_context(msg, context) pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn: with ConnectionContext(conf, connection_pool) as conn:
conn.fanout_send(topic, msg) conn.fanout_send(topic, rpc_common.serialize_msg(msg))
def cast_to_server(conf, context, server_params, topic, msg, connection_pool): def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
@@ -400,7 +403,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
pack_context(msg, context) pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False, with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn: server_params=server_params) as conn:
conn.topic_send(topic, msg) conn.topic_send(topic, rpc_common.serialize_msg(msg))
def fanout_cast_to_server(conf, context, server_params, topic, msg, def fanout_cast_to_server(conf, context, server_params, topic, msg,
@@ -409,16 +412,18 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg,
pack_context(msg, context) pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False, with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn: server_params=server_params) as conn:
conn.fanout_send(topic, msg) conn.fanout_send(topic, rpc_common.serialize_msg(msg))
def notify(conf, context, topic, msg, connection_pool): def notify(conf, context, topic, msg, connection_pool, envelope):
"""Sends a notification event on a topic.""" """Sends a notification event on a topic."""
LOG.debug(_('Sending %(event_type)s on %(topic)s'), LOG.debug(_('Sending %(event_type)s on %(topic)s'),
dict(event_type=msg.get('event_type'), dict(event_type=msg.get('event_type'),
topic=topic)) topic=topic))
pack_context(msg, context) pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn: with ConnectionContext(conf, connection_pool) as conn:
if envelope:
msg = rpc_common.serialize_msg(msg, force_envelope=True)
conn.notify_send(topic, msg) conn.notify_send(topic, msg)

View File

@@ -21,6 +21,7 @@ import copy
import sys import sys
import traceback import traceback
from nova.openstack.common import cfg
from nova.openstack.common.gettextutils import _ from nova.openstack.common.gettextutils import _
from nova.openstack.common import importutils from nova.openstack.common import importutils
from nova.openstack.common import jsonutils from nova.openstack.common import jsonutils
@@ -28,9 +29,50 @@ from nova.openstack.common import local
from nova.openstack.common import log as logging from nova.openstack.common import log as logging
CONF = cfg.CONF
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
'''RPC Envelope Version.
This version number applies to the top level structure of messages sent out.
It does *not* apply to the message payload, which must be versioned
independently. For example, when using rpc APIs, a version number is applied
for changes to the API being exposed over rpc. This version number is handled
in the rpc proxy and dispatcher modules.
This version number applies to the message envelope that is used in the
serialization done inside the rpc layer. See serialize_msg() and
deserialize_msg().
The current message format (version 2.0) is very simple. It is:
{
'nova.version': <RPC Envelope Version as a String>,
'nova.message': <Application Message Payload, JSON encoded>
}
Message format version '1.0' is just considered to be the messages we sent
without a message envelope.
So, the current message envelope just includes the envelope version. It may
eventually contain additional information, such as a signature for the message
payload.
We will JSON encode the application message payload. The message envelope,
which includes the JSON encoded application message body, will be passed down
to the messaging libraries as a dict.
'''
_RPC_ENVELOPE_VERSION = '2.0'
_VERSION_KEY = 'nova.version'
_MESSAGE_KEY = 'nova.message'
# TODO(russellb) Turn this on after Grizzly.
_SEND_RPC_ENVELOPE = False
class RPCException(Exception): class RPCException(Exception):
message = _("An unknown RPC related exception occurred.") message = _("An unknown RPC related exception occurred.")
@@ -91,6 +133,11 @@ class UnsupportedRpcVersion(RPCException):
"this endpoint.") "this endpoint.")
class UnsupportedRpcEnvelopeVersion(RPCException):
message = _("Specified RPC envelope version, %(version)s, "
"not supported by this endpoint.")
class Connection(object): class Connection(object):
"""A connection, returned by rpc.create_connection(). """A connection, returned by rpc.create_connection().
@@ -165,8 +212,12 @@ class Connection(object):
def _safe_log(log_func, msg, msg_data): def _safe_log(log_func, msg, msg_data):
"""Sanitizes the msg_data field before logging.""" """Sanitizes the msg_data field before logging."""
SANITIZE = {'set_admin_password': ('new_pass',), SANITIZE = {'set_admin_password': [('args', 'new_pass')],
'run_instance': ('admin_password',), } 'run_instance': [('args', 'admin_password')],
'route_message': [('args', 'message', 'args', 'method_info',
'method_kwargs', 'password'),
('args', 'message', 'args', 'method_info',
'method_kwargs', 'admin_password')]}
has_method = 'method' in msg_data and msg_data['method'] in SANITIZE has_method = 'method' in msg_data and msg_data['method'] in SANITIZE
has_context_token = '_context_auth_token' in msg_data has_context_token = '_context_auth_token' in msg_data
@@ -178,14 +229,16 @@ def _safe_log(log_func, msg, msg_data):
msg_data = copy.deepcopy(msg_data) msg_data = copy.deepcopy(msg_data)
if has_method: if has_method:
method = msg_data['method'] for arg in SANITIZE.get(msg_data['method'], []):
if method in SANITIZE:
args_to_sanitize = SANITIZE[method]
for arg in args_to_sanitize:
try: try:
msg_data['args'][arg] = "<SANITIZED>" d = msg_data
except KeyError: for elem in arg[:-1]:
pass d = d[elem]
d[arg[-1]] = '<SANITIZED>'
except KeyError, e:
LOG.info(_('Failed to sanitize %(item)s. Key error %(err)s'),
{'item': arg,
'err': e})
if has_context_token: if has_context_token:
msg_data['_context_auth_token'] = '<SANITIZED>' msg_data['_context_auth_token'] = '<SANITIZED>'
@@ -344,3 +397,74 @@ def client_exceptions(*exceptions):
return catch_client_exception(exceptions, func, *args, **kwargs) return catch_client_exception(exceptions, func, *args, **kwargs)
return inner return inner
return outer return outer
def version_is_compatible(imp_version, version):
"""Determine whether versions are compatible.
:param imp_version: The version implemented
:param version: The version requested by an incoming message.
"""
version_parts = version.split('.')
imp_version_parts = imp_version.split('.')
if int(version_parts[0]) != int(imp_version_parts[0]): # Major
return False
if int(version_parts[1]) > int(imp_version_parts[1]): # Minor
return False
return True
def serialize_msg(raw_msg, force_envelope=False):
if not _SEND_RPC_ENVELOPE and not force_envelope:
return raw_msg
# NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
# information about this format.
msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
_MESSAGE_KEY: jsonutils.dumps(raw_msg)}
return msg
def deserialize_msg(msg):
# NOTE(russellb): Hang on to your hats, this road is about to
# get a little bumpy.
#
# Robustness Principle:
# "Be strict in what you send, liberal in what you accept."
#
# At this point we have to do a bit of guessing about what it
# is we just received. Here is the set of possibilities:
#
# 1) We received a dict. This could be 2 things:
#
# a) Inspect it to see if it looks like a standard message envelope.
# If so, great!
#
# b) If it doesn't look like a standard message envelope, it could either
# be a notification, or a message from before we added a message
# envelope (referred to as version 1.0).
# Just return the message as-is.
#
# 2) It's any other non-dict type. Just return it and hope for the best.
# This case covers return values from rpc.call() from before message
# envelopes were used. (messages to call a method were always a dict)
if not isinstance(msg, dict):
# See #2 above.
return msg
base_envelope_keys = (_VERSION_KEY, _MESSAGE_KEY)
if not all(map(lambda key: key in msg, base_envelope_keys)):
# See #1.b above.
return msg
# At this point we think we have the message envelope
# format we were expecting. (#1.a above)
if not version_is_compatible(_RPC_ENVELOPE_VERSION, msg[_VERSION_KEY]):
raise UnsupportedRpcEnvelopeVersion(version=msg[_VERSION_KEY])
raw_msg = jsonutils.loads(msg[_MESSAGE_KEY])
return raw_msg

View File

@@ -103,21 +103,6 @@ class RpcDispatcher(object):
self.callbacks = callbacks self.callbacks = callbacks
super(RpcDispatcher, self).__init__() super(RpcDispatcher, self).__init__()
@staticmethod
def _is_compatible(mversion, version):
"""Determine whether versions are compatible.
:param mversion: The API version implemented by a callback.
:param version: The API version requested by an incoming message.
"""
version_parts = version.split('.')
mversion_parts = mversion.split('.')
if int(version_parts[0]) != int(mversion_parts[0]): # Major
return False
if int(version_parts[1]) > int(mversion_parts[1]): # Minor
return False
return True
def dispatch(self, ctxt, version, method, **kwargs): def dispatch(self, ctxt, version, method, **kwargs):
"""Dispatch a message based on a requested version. """Dispatch a message based on a requested version.
@@ -139,7 +124,8 @@ class RpcDispatcher(object):
rpc_api_version = proxyobj.RPC_API_VERSION rpc_api_version = proxyobj.RPC_API_VERSION
else: else:
rpc_api_version = '1.0' rpc_api_version = '1.0'
is_compatible = self._is_compatible(rpc_api_version, version) is_compatible = rpc_common.version_is_compatible(rpc_api_version,
version)
had_compatible = had_compatible or is_compatible had_compatible = had_compatible or is_compatible
if not hasattr(proxyobj, method): if not hasattr(proxyobj, method):
continue continue

View File

@@ -162,7 +162,8 @@ class ConsumerBase(object):
def _callback(raw_message): def _callback(raw_message):
message = self.channel.message_to_python(raw_message) message = self.channel.message_to_python(raw_message)
try: try:
callback(message.payload) msg = rpc_common.deserialize_msg(message.payload)
callback(msg)
message.ack() message.ack()
except Exception: except Exception:
LOG.exception(_("Failed to process message... skipping it.")) LOG.exception(_("Failed to process message... skipping it."))
@@ -196,7 +197,7 @@ class DirectConsumer(ConsumerBase):
# Default options # Default options
options = {'durable': False, options = {'durable': False,
'auto_delete': True, 'auto_delete': True,
'exclusive': True} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
exchange = kombu.entity.Exchange(name=msg_id, exchange = kombu.entity.Exchange(name=msg_id,
type='direct', type='direct',
@@ -269,7 +270,7 @@ class FanoutConsumer(ConsumerBase):
options = {'durable': False, options = {'durable': False,
'queue_arguments': _get_queue_arguments(conf), 'queue_arguments': _get_queue_arguments(conf),
'auto_delete': True, 'auto_delete': True,
'exclusive': True} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
exchange = kombu.entity.Exchange(name=exchange_name, type='fanout', exchange = kombu.entity.Exchange(name=exchange_name, type='fanout',
durable=options['durable'], durable=options['durable'],
@@ -316,7 +317,7 @@ class DirectPublisher(Publisher):
options = {'durable': False, options = {'durable': False,
'auto_delete': True, 'auto_delete': True,
'exclusive': True} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
super(DirectPublisher, self).__init__(channel, msg_id, msg_id, super(DirectPublisher, self).__init__(channel, msg_id, msg_id,
type='direct', **options) type='direct', **options)
@@ -350,7 +351,7 @@ class FanoutPublisher(Publisher):
""" """
options = {'durable': False, options = {'durable': False,
'auto_delete': True, 'auto_delete': True,
'exclusive': True} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic, super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic,
None, type='fanout', **options) None, type='fanout', **options)
@@ -387,6 +388,7 @@ class Connection(object):
def __init__(self, conf, server_params=None): def __init__(self, conf, server_params=None):
self.consumers = [] self.consumers = []
self.consumer_thread = None self.consumer_thread = None
self.proxy_callbacks = []
self.conf = conf self.conf = conf
self.max_retries = self.conf.rabbit_max_retries self.max_retries = self.conf.rabbit_max_retries
# Try forever? # Try forever?
@@ -469,7 +471,7 @@ class Connection(object):
LOG.info(_("Reconnecting to AMQP server on " LOG.info(_("Reconnecting to AMQP server on "
"%(hostname)s:%(port)d") % params) "%(hostname)s:%(port)d") % params)
try: try:
self.connection.close() self.connection.release()
except self.connection_errors: except self.connection_errors:
pass pass
# Setting this in case the next statement fails, though # Setting this in case the next statement fails, though
@@ -573,12 +575,14 @@ class Connection(object):
def close(self): def close(self):
"""Close/release this connection""" """Close/release this connection"""
self.cancel_consumer_thread() self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.connection.release() self.connection.release()
self.connection = None self.connection = None
def reset(self): def reset(self):
"""Reset a connection so it can be used again""" """Reset a connection so it can be used again"""
self.cancel_consumer_thread() self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.channel.close() self.channel.close()
self.channel = self.connection.channel() self.channel = self.connection.channel()
# work around 'memory' transport bug in 1.1.3 # work around 'memory' transport bug in 1.1.3
@@ -644,6 +648,11 @@ class Connection(object):
pass pass
self.consumer_thread = None self.consumer_thread = None
def wait_on_proxy_callbacks(self):
"""Wait for all proxy callback threads to exit."""
for proxy_cb in self.proxy_callbacks:
proxy_cb.wait()
def publisher_send(self, cls, topic, msg, **kwargs): def publisher_send(self, cls, topic, msg, **kwargs):
"""Send to a publisher based on the publisher class""" """Send to a publisher based on the publisher class"""
@@ -719,6 +728,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback( proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy, self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection)) rpc_amqp.get_connection_pool(self.conf, Connection))
self.proxy_callbacks.append(proxy_cb)
if fanout: if fanout:
self.declare_fanout_consumer(topic, proxy_cb) self.declare_fanout_consumer(topic, proxy_cb)
@@ -730,6 +740,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback( proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy, self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection)) rpc_amqp.get_connection_pool(self.conf, Connection))
self.proxy_callbacks.append(proxy_cb)
self.declare_topic_consumer(topic, proxy_cb, pool_name) self.declare_topic_consumer(topic, proxy_cb, pool_name)
@@ -782,11 +793,12 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg):
rpc_amqp.get_connection_pool(conf, Connection)) rpc_amqp.get_connection_pool(conf, Connection))
def notify(conf, context, topic, msg): def notify(conf, context, topic, msg, envelope):
"""Sends a notification event on a topic.""" """Sends a notification event on a topic."""
return rpc_amqp.notify( return rpc_amqp.notify(
conf, context, topic, msg, conf, context, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection)) rpc_amqp.get_connection_pool(conf, Connection),
envelope)
def cleanup(): def cleanup():

View File

@@ -124,7 +124,8 @@ class ConsumerBase(object):
"""Fetch the message and pass it to the callback object""" """Fetch the message and pass it to the callback object"""
message = self.receiver.fetch() message = self.receiver.fetch()
try: try:
self.callback(message.content) msg = rpc_common.deserialize_msg(message.content)
self.callback(msg)
except Exception: except Exception:
LOG.exception(_("Failed to process message... skipping it.")) LOG.exception(_("Failed to process message... skipping it."))
finally: finally:
@@ -277,8 +278,16 @@ class Connection(object):
self.session = None self.session = None
self.consumers = {} self.consumers = {}
self.consumer_thread = None self.consumer_thread = None
self.proxy_callbacks = []
self.conf = conf self.conf = conf
if server_params and 'hostname' in server_params:
# NOTE(russellb) This enables support for cast_to_server.
server_params['qpid_hosts'] = [
'%s:%d' % (server_params['hostname'],
server_params.get('port', 5672))
]
params = { params = {
'qpid_hosts': self.conf.qpid_hosts, 'qpid_hosts': self.conf.qpid_hosts,
'username': self.conf.qpid_username, 'username': self.conf.qpid_username,
@@ -367,12 +376,14 @@ class Connection(object):
def close(self): def close(self):
"""Close/release this connection""" """Close/release this connection"""
self.cancel_consumer_thread() self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.connection.close() self.connection.close()
self.connection = None self.connection = None
def reset(self): def reset(self):
"""Reset a connection so it can be used again""" """Reset a connection so it can be used again"""
self.cancel_consumer_thread() self.cancel_consumer_thread()
self.wait_on_proxy_callbacks()
self.session.close() self.session.close()
self.session = self.connection.session() self.session = self.connection.session()
self.consumers = {} self.consumers = {}
@@ -427,6 +438,11 @@ class Connection(object):
pass pass
self.consumer_thread = None self.consumer_thread = None
def wait_on_proxy_callbacks(self):
"""Wait for all proxy callback threads to exit."""
for proxy_cb in self.proxy_callbacks:
proxy_cb.wait()
def publisher_send(self, cls, topic, msg): def publisher_send(self, cls, topic, msg):
"""Send to a publisher based on the publisher class""" """Send to a publisher based on the publisher class"""
@@ -502,6 +518,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback( proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy, self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection)) rpc_amqp.get_connection_pool(self.conf, Connection))
self.proxy_callbacks.append(proxy_cb)
if fanout: if fanout:
consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb) consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb)
@@ -517,6 +534,7 @@ class Connection(object):
proxy_cb = rpc_amqp.ProxyCallback( proxy_cb = rpc_amqp.ProxyCallback(
self.conf, proxy, self.conf, proxy,
rpc_amqp.get_connection_pool(self.conf, Connection)) rpc_amqp.get_connection_pool(self.conf, Connection))
self.proxy_callbacks.append(proxy_cb)
consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb, consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb,
name=pool_name) name=pool_name)
@@ -575,10 +593,11 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg):
rpc_amqp.get_connection_pool(conf, Connection)) rpc_amqp.get_connection_pool(conf, Connection))
def notify(conf, context, topic, msg): def notify(conf, context, topic, msg, envelope):
"""Sends a notification event on a topic.""" """Sends a notification event on a topic."""
return rpc_amqp.notify(conf, context, topic, msg, return rpc_amqp.notify(conf, context, topic, msg,
rpc_amqp.get_connection_pool(conf, Connection)) rpc_amqp.get_connection_pool(conf, Connection),
envelope)
def cleanup(): def cleanup():

View File

@@ -28,7 +28,6 @@ import greenlet
from nova.openstack.common import cfg from nova.openstack.common import cfg
from nova.openstack.common.gettextutils import _ from nova.openstack.common.gettextutils import _
from nova.openstack.common import importutils from nova.openstack.common import importutils
from nova.openstack.common import jsonutils
from nova.openstack.common.rpc import common as rpc_common from nova.openstack.common.rpc import common as rpc_common
@@ -77,27 +76,6 @@ ZMQ_CTX = None # ZeroMQ Context, must be global.
matchmaker = None # memoized matchmaker object matchmaker = None # memoized matchmaker object
def _serialize(data):
"""
Serialization wrapper
We prefer using JSON, but it cannot encode all types.
Error if a developer passes us bad data.
"""
try:
return str(jsonutils.dumps(data, ensure_ascii=True))
except TypeError:
LOG.error(_("JSON serialization failed."))
raise
def _deserialize(data):
"""
Deserialization wrapper
"""
LOG.debug(_("Deserializing: %s"), data)
return jsonutils.loads(data)
class ZmqSocket(object): class ZmqSocket(object):
""" """
A tiny wrapper around ZeroMQ to simplify the send/recv protocol A tiny wrapper around ZeroMQ to simplify the send/recv protocol
@@ -205,9 +183,10 @@ class ZmqClient(object):
def __init__(self, addr, socket_type=zmq.PUSH, bind=False): def __init__(self, addr, socket_type=zmq.PUSH, bind=False):
self.outq = ZmqSocket(addr, socket_type, bind=bind) self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data): def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
self.outq.send([str(msg_id), str(topic), str('cast'), if serialize:
_serialize(data)]) data = rpc_common.serialize_msg(data, force_envelope)
self.outq.send([str(msg_id), str(topic), str('cast'), data])
def close(self): def close(self):
self.outq.close() self.outq.close()
@@ -232,11 +211,11 @@ class RpcContext(rpc_common.CommonRpcContext):
@classmethod @classmethod
def marshal(self, ctx): def marshal(self, ctx):
ctx_data = ctx.to_dict() ctx_data = ctx.to_dict()
return _serialize(ctx_data) return rpc_common.serialize_msg(ctx_data)
@classmethod @classmethod
def unmarshal(self, data): def unmarshal(self, data):
return RpcContext.from_dict(_deserialize(data)) return RpcContext.from_dict(rpc_common.deserialize_msg(data))
class InternalContext(object): class InternalContext(object):
@@ -433,11 +412,11 @@ class ZmqProxy(ZmqBaseReactor):
sock_type = zmq.PUB sock_type = zmq.PUB
elif topic.startswith('zmq_replies'): elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB sock_type = zmq.PUB
inside = _deserialize(in_msg) inside = rpc_common.deserialize_msg(in_msg)
msg_id = inside[-1]['args']['msg_id'] msg_id = inside[-1]['args']['msg_id']
response = inside[-1]['args']['response'] response = inside[-1]['args']['response']
LOG.debug(_("->response->%s"), response) LOG.debug(_("->response->%s"), response)
data = [str(msg_id), _serialize(response)] data = [str(msg_id), rpc_common.serialize_msg(response)]
else: else:
sock_type = zmq.PUSH sock_type = zmq.PUSH
@@ -480,7 +459,7 @@ class ZmqReactor(ZmqBaseReactor):
msg_id, topic, style, in_msg = data msg_id, topic, style, in_msg = data
ctx, request = _deserialize(in_msg) ctx, request = rpc_common.deserialize_msg(in_msg)
ctx = RpcContext.unmarshal(ctx) ctx = RpcContext.unmarshal(ctx)
proxy = self.proxies[sock] proxy = self.proxies[sock]
@@ -531,7 +510,8 @@ class Connection(rpc_common.Connection):
self.reactor.consume_in_thread() self.reactor.consume_in_thread()
def _cast(addr, context, msg_id, topic, msg, timeout=None): def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
force_envelope=False):
timeout_cast = timeout or CONF.rpc_cast_timeout timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg] payload = [RpcContext.marshal(context), msg]
@@ -540,7 +520,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None):
conn = ZmqClient(addr) conn = ZmqClient(addr)
# assumes cast can't return an exception # assumes cast can't return an exception
conn.cast(msg_id, topic, payload) conn.cast(msg_id, topic, payload, serialize, force_envelope)
except zmq.ZMQError: except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception") raise RPCException("Cast failed. ZMQ Socket Exception")
finally: finally:
@@ -590,7 +570,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
msg = msg_waiter.recv() msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg) LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response")) LOG.debug(_("Unpacking response"))
responses = _deserialize(msg[-1]) responses = rpc_common.deserialize_msg(msg[-1])
# ZMQError trumps the Timeout error. # ZMQError trumps the Timeout error.
except zmq.ZMQError: except zmq.ZMQError:
raise RPCException("ZMQ Socket Error") raise RPCException("ZMQ Socket Error")
@@ -609,7 +589,8 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
return responses[-1] return responses[-1]
def _multi_send(method, context, topic, msg, timeout=None): def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
force_envelope=False):
""" """
Wraps the sending of messages, Wraps the sending of messages,
dispatches to the matchmaker and sends dispatches to the matchmaker and sends
@@ -635,7 +616,8 @@ def _multi_send(method, context, topic, msg, timeout=None):
if method.__name__ == '_cast': if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context, eventlet.spawn_n(method, _addr, context,
_topic, _topic, msg, timeout) _topic, _topic, msg, timeout, serialize,
force_envelope)
return return
return method(_addr, context, _topic, _topic, msg, timeout) return method(_addr, context, _topic, _topic, msg, timeout)
@@ -676,6 +658,8 @@ def notify(conf, context, topic, msg, **kwargs):
# NOTE(ewindisch): dot-priority in rpc notifier does not # NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions. # work with our assumptions.
topic.replace('.', '-') topic.replace('.', '-')
kwargs['serialize'] = kwargs.pop('envelope')
kwargs['force_envelope'] = True
cast(conf, context, topic, msg, **kwargs) cast(conf, context, topic, msg, **kwargs)