Sync rpc from oslo-incubator.
Updates include: * Enabling the rpc message envelope. * Improved logging for timeouts * fixes in the zmq driver Change-Id: I0b2ab1acff1fea9c07a33da176cf6a11ed8a3f2c
This commit is contained in:
@@ -661,7 +661,7 @@ def notify(conf, context, topic, msg, connection_pool, envelope):
|
|||||||
pack_context(msg, context)
|
pack_context(msg, context)
|
||||||
with ConnectionContext(conf, connection_pool) as conn:
|
with ConnectionContext(conf, connection_pool) as conn:
|
||||||
if envelope:
|
if envelope:
|
||||||
msg = rpc_common.serialize_msg(msg, force_envelope=True)
|
msg = rpc_common.serialize_msg(msg)
|
||||||
conn.notify_send(topic, msg)
|
conn.notify_send(topic, msg)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -70,10 +70,6 @@ _VERSION_KEY = 'oslo.version'
|
|||||||
_MESSAGE_KEY = 'oslo.message'
|
_MESSAGE_KEY = 'oslo.message'
|
||||||
|
|
||||||
|
|
||||||
# TODO(russellb) Turn this on after Grizzly.
|
|
||||||
_SEND_RPC_ENVELOPE = False
|
|
||||||
|
|
||||||
|
|
||||||
class RPCException(Exception):
|
class RPCException(Exception):
|
||||||
message = _("An unknown RPC related exception occurred.")
|
message = _("An unknown RPC related exception occurred.")
|
||||||
|
|
||||||
@@ -122,7 +118,25 @@ class Timeout(RPCException):
|
|||||||
This exception is raised if the rpc_response_timeout is reached while
|
This exception is raised if the rpc_response_timeout is reached while
|
||||||
waiting for a response from the remote side.
|
waiting for a response from the remote side.
|
||||||
"""
|
"""
|
||||||
message = _("Timeout while waiting on RPC response.")
|
message = _('Timeout while waiting on RPC response - '
|
||||||
|
'topic: "%(topic)s", RPC method: "%(method)s" '
|
||||||
|
'info: "%(info)s"')
|
||||||
|
|
||||||
|
def __init__(self, info=None, topic=None, method=None):
|
||||||
|
"""
|
||||||
|
:param info: Extra info to convey to the user
|
||||||
|
:param topic: The topic that the rpc call was sent to
|
||||||
|
:param rpc_method_name: The name of the rpc method being
|
||||||
|
called
|
||||||
|
"""
|
||||||
|
self.info = info
|
||||||
|
self.topic = topic
|
||||||
|
self.method = method
|
||||||
|
super(Timeout, self).__init__(
|
||||||
|
None,
|
||||||
|
info=info or _('<unknown>'),
|
||||||
|
topic=topic or _('<unknown>'),
|
||||||
|
method=method or _('<unknown>'))
|
||||||
|
|
||||||
|
|
||||||
class DuplicateMessageError(RPCException):
|
class DuplicateMessageError(RPCException):
|
||||||
@@ -441,10 +455,7 @@ def version_is_compatible(imp_version, version):
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def serialize_msg(raw_msg, force_envelope=False):
|
def serialize_msg(raw_msg):
|
||||||
if not _SEND_RPC_ENVELOPE and not force_envelope:
|
|
||||||
return raw_msg
|
|
||||||
|
|
||||||
# NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
|
# NOTE(russellb) See the docstring for _RPC_ENVELOPE_VERSION for more
|
||||||
# information about this format.
|
# information about this format.
|
||||||
msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
|
msg = {_VERSION_KEY: _RPC_ENVELOPE_VERSION,
|
||||||
|
|||||||
@@ -16,6 +16,7 @@
|
|||||||
|
|
||||||
import os
|
import os
|
||||||
import pprint
|
import pprint
|
||||||
|
import re
|
||||||
import socket
|
import socket
|
||||||
import sys
|
import sys
|
||||||
import types
|
import types
|
||||||
@@ -220,7 +221,7 @@ class ZmqClient(object):
|
|||||||
def cast(self, msg_id, topic, data, envelope=False):
|
def cast(self, msg_id, topic, data, envelope=False):
|
||||||
msg_id = msg_id or 0
|
msg_id = msg_id or 0
|
||||||
|
|
||||||
if not (envelope or rpc_common._SEND_RPC_ENVELOPE):
|
if not envelope:
|
||||||
self.outq.send(map(bytes,
|
self.outq.send(map(bytes,
|
||||||
(msg_id, topic, 'cast', _serialize(data))))
|
(msg_id, topic, 'cast', _serialize(data))))
|
||||||
return
|
return
|
||||||
@@ -294,11 +295,16 @@ class InternalContext(object):
|
|||||||
def reply(self, ctx, proxy,
|
def reply(self, ctx, proxy,
|
||||||
msg_id=None, context=None, topic=None, msg=None):
|
msg_id=None, context=None, topic=None, msg=None):
|
||||||
"""Reply to a casted call."""
|
"""Reply to a casted call."""
|
||||||
# Our real method is curried into msg['args']
|
# NOTE(ewindisch): context kwarg exists for Grizzly compat.
|
||||||
|
# this may be able to be removed earlier than
|
||||||
|
# 'I' if ConsumerBase.process were refactored.
|
||||||
|
if type(msg) is list:
|
||||||
|
payload = msg[-1]
|
||||||
|
else:
|
||||||
|
payload = msg
|
||||||
|
|
||||||
child_ctx = RpcContext.unmarshal(msg[0])
|
|
||||||
response = ConsumerBase.normalize_reply(
|
response = ConsumerBase.normalize_reply(
|
||||||
self._get_response(child_ctx, proxy, topic, msg[1]),
|
self._get_response(ctx, proxy, topic, payload),
|
||||||
ctx.replies)
|
ctx.replies)
|
||||||
|
|
||||||
LOG.debug(_("Sending reply"))
|
LOG.debug(_("Sending reply"))
|
||||||
@@ -431,6 +437,8 @@ class ZmqProxy(ZmqBaseReactor):
|
|||||||
|
|
||||||
def __init__(self, conf):
|
def __init__(self, conf):
|
||||||
super(ZmqProxy, self).__init__(conf)
|
super(ZmqProxy, self).__init__(conf)
|
||||||
|
pathsep = set((os.path.sep or '', os.path.altsep or '', '/', '\\'))
|
||||||
|
self.badchars = re.compile(r'[%s]' % re.escape(''.join(pathsep)))
|
||||||
|
|
||||||
self.topic_proxy = {}
|
self.topic_proxy = {}
|
||||||
|
|
||||||
@@ -456,6 +464,13 @@ class ZmqProxy(ZmqBaseReactor):
|
|||||||
LOG.info(_("Creating proxy for topic: %s"), topic)
|
LOG.info(_("Creating proxy for topic: %s"), topic)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# The topic is received over the network,
|
||||||
|
# don't trust this input.
|
||||||
|
if self.badchars.search(topic) is not None:
|
||||||
|
emsg = _("Topic contained dangerous characters.")
|
||||||
|
LOG.warn(emsg)
|
||||||
|
raise RPCException(emsg)
|
||||||
|
|
||||||
out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
|
out_sock = ZmqSocket("ipc://%s/zmq_topic_%s" %
|
||||||
(ipc_dir, topic),
|
(ipc_dir, topic),
|
||||||
sock_type, bind=True)
|
sock_type, bind=True)
|
||||||
@@ -675,8 +690,8 @@ def _call(addr, context, topic, msg, timeout=None,
|
|||||||
'method': '-reply',
|
'method': '-reply',
|
||||||
'args': {
|
'args': {
|
||||||
'msg_id': msg_id,
|
'msg_id': msg_id,
|
||||||
'context': mcontext,
|
|
||||||
'topic': reply_topic,
|
'topic': reply_topic,
|
||||||
|
# TODO(ewindisch): safe to remove mcontext in I.
|
||||||
'msg': [mcontext, msg]
|
'msg': [mcontext, msg]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -68,16 +68,21 @@ class RpcProxy(object):
|
|||||||
:param context: The request context
|
:param context: The request context
|
||||||
:param msg: The message to send, including the method and args.
|
:param msg: The message to send, including the method and args.
|
||||||
:param topic: Override the topic for this message.
|
:param topic: Override the topic for this message.
|
||||||
|
:param version: (Optional) Override the requested API version in this
|
||||||
|
message.
|
||||||
:param timeout: (Optional) A timeout to use when waiting for the
|
:param timeout: (Optional) A timeout to use when waiting for the
|
||||||
response. If no timeout is specified, a default timeout will be
|
response. If no timeout is specified, a default timeout will be
|
||||||
used that is usually sufficient.
|
used that is usually sufficient.
|
||||||
:param version: (Optional) Override the requested API version in this
|
|
||||||
message.
|
|
||||||
|
|
||||||
:returns: The return value from the remote method.
|
:returns: The return value from the remote method.
|
||||||
"""
|
"""
|
||||||
self._set_version(msg, version)
|
self._set_version(msg, version)
|
||||||
return rpc.call(context, self._get_topic(topic), msg, timeout)
|
real_topic = self._get_topic(topic)
|
||||||
|
try:
|
||||||
|
return rpc.call(context, real_topic, msg, timeout)
|
||||||
|
except rpc.common.Timeout as exc:
|
||||||
|
raise rpc.common.Timeout(
|
||||||
|
exc.info, real_topic, msg.get('method'))
|
||||||
|
|
||||||
def multicall(self, context, msg, topic=None, version=None, timeout=None):
|
def multicall(self, context, msg, topic=None, version=None, timeout=None):
|
||||||
"""rpc.multicall() a remote method.
|
"""rpc.multicall() a remote method.
|
||||||
@@ -85,17 +90,22 @@ class RpcProxy(object):
|
|||||||
:param context: The request context
|
:param context: The request context
|
||||||
:param msg: The message to send, including the method and args.
|
:param msg: The message to send, including the method and args.
|
||||||
:param topic: Override the topic for this message.
|
:param topic: Override the topic for this message.
|
||||||
|
:param version: (Optional) Override the requested API version in this
|
||||||
|
message.
|
||||||
:param timeout: (Optional) A timeout to use when waiting for the
|
:param timeout: (Optional) A timeout to use when waiting for the
|
||||||
response. If no timeout is specified, a default timeout will be
|
response. If no timeout is specified, a default timeout will be
|
||||||
used that is usually sufficient.
|
used that is usually sufficient.
|
||||||
:param version: (Optional) Override the requested API version in this
|
|
||||||
message.
|
|
||||||
|
|
||||||
:returns: An iterator that lets you process each of the returned values
|
:returns: An iterator that lets you process each of the returned values
|
||||||
from the remote method as they arrive.
|
from the remote method as they arrive.
|
||||||
"""
|
"""
|
||||||
self._set_version(msg, version)
|
self._set_version(msg, version)
|
||||||
return rpc.multicall(context, self._get_topic(topic), msg, timeout)
|
real_topic = self._get_topic(topic)
|
||||||
|
try:
|
||||||
|
return rpc.multicall(context, real_topic, msg, timeout)
|
||||||
|
except rpc.common.Timeout as exc:
|
||||||
|
raise rpc.common.Timeout(
|
||||||
|
exc.info, real_topic, msg.get('method'))
|
||||||
|
|
||||||
def cast(self, context, msg, topic=None, version=None):
|
def cast(self, context, msg, topic=None, version=None):
|
||||||
"""rpc.cast() a remote method.
|
"""rpc.cast() a remote method.
|
||||||
|
|||||||
Reference in New Issue
Block a user