Fix serialization in impl_zmq.
Sync rpc from oslo-incubator to include some fixes to impl_zmq:
https://review.openstack.org/#/c/18913/
Change-Id: I5f5c157e81026c108df04bd385776a9985d4a497
This commit is contained in:
@@ -28,6 +28,7 @@ import greenlet
|
|||||||
from nova.openstack.common import cfg
|
from nova.openstack.common import cfg
|
||||||
from nova.openstack.common.gettextutils import _
|
from nova.openstack.common.gettextutils import _
|
||||||
from nova.openstack.common import importutils
|
from nova.openstack.common import importutils
|
||||||
|
from nova.openstack.common import jsonutils
|
||||||
from nova.openstack.common.rpc import common as rpc_common
|
from nova.openstack.common.rpc import common as rpc_common
|
||||||
|
|
||||||
|
|
||||||
@@ -76,6 +77,27 @@ ZMQ_CTX = None # ZeroMQ Context, must be global.
|
|||||||
matchmaker = None # memoized matchmaker object
|
matchmaker = None # memoized matchmaker object
|
||||||
|
|
||||||
|
|
||||||
|
def _serialize(data):
|
||||||
|
"""
|
||||||
|
Serialization wrapper
|
||||||
|
We prefer using JSON, but it cannot encode all types.
|
||||||
|
Error if a developer passes us bad data.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
return str(jsonutils.dumps(data, ensure_ascii=True))
|
||||||
|
except TypeError:
|
||||||
|
LOG.error(_("JSON serialization failed."))
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
def _deserialize(data):
|
||||||
|
"""
|
||||||
|
Deserialization wrapper
|
||||||
|
"""
|
||||||
|
LOG.debug(_("Deserializing: %s"), data)
|
||||||
|
return jsonutils.loads(data)
|
||||||
|
|
||||||
|
|
||||||
class ZmqSocket(object):
|
class ZmqSocket(object):
|
||||||
"""
|
"""
|
||||||
A tiny wrapper around ZeroMQ to simplify the send/recv protocol
|
A tiny wrapper around ZeroMQ to simplify the send/recv protocol
|
||||||
@@ -186,7 +208,8 @@ class ZmqClient(object):
|
|||||||
def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
|
def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
|
||||||
if serialize:
|
if serialize:
|
||||||
data = rpc_common.serialize_msg(data, force_envelope)
|
data = rpc_common.serialize_msg(data, force_envelope)
|
||||||
self.outq.send([str(msg_id), str(topic), str('cast'), data])
|
self.outq.send([str(msg_id), str(topic), str('cast'),
|
||||||
|
_serialize(data)])
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self.outq.close()
|
self.outq.close()
|
||||||
@@ -211,11 +234,11 @@ class RpcContext(rpc_common.CommonRpcContext):
|
|||||||
@classmethod
|
@classmethod
|
||||||
def marshal(self, ctx):
|
def marshal(self, ctx):
|
||||||
ctx_data = ctx.to_dict()
|
ctx_data = ctx.to_dict()
|
||||||
return rpc_common.serialize_msg(ctx_data)
|
return _serialize(ctx_data)
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def unmarshal(self, data):
|
def unmarshal(self, data):
|
||||||
return RpcContext.from_dict(rpc_common.deserialize_msg(data))
|
return RpcContext.from_dict(_deserialize(data))
|
||||||
|
|
||||||
|
|
||||||
class InternalContext(object):
|
class InternalContext(object):
|
||||||
@@ -229,7 +252,7 @@ class InternalContext(object):
|
|||||||
"""Process a curried message and cast the result to topic."""
|
"""Process a curried message and cast the result to topic."""
|
||||||
LOG.debug(_("Running func with context: %s"), ctx.to_dict())
|
LOG.debug(_("Running func with context: %s"), ctx.to_dict())
|
||||||
data.setdefault('version', None)
|
data.setdefault('version', None)
|
||||||
data.setdefault('args', [])
|
data.setdefault('args', {})
|
||||||
|
|
||||||
try:
|
try:
|
||||||
result = proxy.dispatch(
|
result = proxy.dispatch(
|
||||||
@@ -300,7 +323,7 @@ class ConsumerBase(object):
|
|||||||
return
|
return
|
||||||
|
|
||||||
data.setdefault('version', None)
|
data.setdefault('version', None)
|
||||||
data.setdefault('args', [])
|
data.setdefault('args', {})
|
||||||
proxy.dispatch(ctx, data['version'],
|
proxy.dispatch(ctx, data['version'],
|
||||||
data['method'], **data['args'])
|
data['method'], **data['args'])
|
||||||
|
|
||||||
@@ -412,11 +435,11 @@ class ZmqProxy(ZmqBaseReactor):
|
|||||||
sock_type = zmq.PUB
|
sock_type = zmq.PUB
|
||||||
elif topic.startswith('zmq_replies'):
|
elif topic.startswith('zmq_replies'):
|
||||||
sock_type = zmq.PUB
|
sock_type = zmq.PUB
|
||||||
inside = rpc_common.deserialize_msg(in_msg)
|
inside = rpc_common.deserialize_msg(_deserialize(in_msg))
|
||||||
msg_id = inside[-1]['args']['msg_id']
|
msg_id = inside[-1]['args']['msg_id']
|
||||||
response = inside[-1]['args']['response']
|
response = inside[-1]['args']['response']
|
||||||
LOG.debug(_("->response->%s"), response)
|
LOG.debug(_("->response->%s"), response)
|
||||||
data = [str(msg_id), rpc_common.serialize_msg(response)]
|
data = [str(msg_id), _serialize(response)]
|
||||||
else:
|
else:
|
||||||
sock_type = zmq.PUSH
|
sock_type = zmq.PUSH
|
||||||
|
|
||||||
@@ -459,7 +482,7 @@ class ZmqReactor(ZmqBaseReactor):
|
|||||||
|
|
||||||
msg_id, topic, style, in_msg = data
|
msg_id, topic, style, in_msg = data
|
||||||
|
|
||||||
ctx, request = rpc_common.deserialize_msg(in_msg)
|
ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
|
||||||
ctx = RpcContext.unmarshal(ctx)
|
ctx = RpcContext.unmarshal(ctx)
|
||||||
|
|
||||||
proxy = self.proxies[sock]
|
proxy = self.proxies[sock]
|
||||||
@@ -570,7 +593,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
|
|||||||
msg = msg_waiter.recv()
|
msg = msg_waiter.recv()
|
||||||
LOG.debug(_("Received message: %s"), msg)
|
LOG.debug(_("Received message: %s"), msg)
|
||||||
LOG.debug(_("Unpacking response"))
|
LOG.debug(_("Unpacking response"))
|
||||||
responses = rpc_common.deserialize_msg(msg[-1])
|
responses = _deserialize(msg[-1])
|
||||||
# ZMQError trumps the Timeout error.
|
# ZMQError trumps the Timeout error.
|
||||||
except zmq.ZMQError:
|
except zmq.ZMQError:
|
||||||
raise RPCException("ZMQ Socket Error")
|
raise RPCException("ZMQ Socket Error")
|
||||||
|
|||||||
Reference in New Issue
Block a user