From 53135341dea9e7cf345ffefaf5f8b5ef28ca3d72 Mon Sep 17 00:00:00 2001 From: Russell Bryant Date: Tue, 8 Jan 2013 14:31:46 -0500 Subject: [PATCH] Fix serialization in impl_zmq. Sync rpc from oslo-incubator to include some fixes to impl_zmq: https://review.openstack.org/#/c/18913/ Change-Id: I5f5c157e81026c108df04bd385776a9985d4a497 --- nova/openstack/common/rpc/impl_zmq.py | 41 +++++++++++++++++++++------ 1 file changed, 32 insertions(+), 9 deletions(-) diff --git a/nova/openstack/common/rpc/impl_zmq.py b/nova/openstack/common/rpc/impl_zmq.py index ef4aa21db..d99d390f2 100644 --- a/nova/openstack/common/rpc/impl_zmq.py +++ b/nova/openstack/common/rpc/impl_zmq.py @@ -28,6 +28,7 @@ import greenlet from nova.openstack.common import cfg from nova.openstack.common.gettextutils import _ from nova.openstack.common import importutils +from nova.openstack.common import jsonutils from nova.openstack.common.rpc import common as rpc_common @@ -76,6 +77,27 @@ ZMQ_CTX = None # ZeroMQ Context, must be global. matchmaker = None # memoized matchmaker object +def _serialize(data): + """ + Serialization wrapper + We prefer using JSON, but it cannot encode all types. + Error if a developer passes us bad data. + """ + try: + return str(jsonutils.dumps(data, ensure_ascii=True)) + except TypeError: + LOG.error(_("JSON serialization failed.")) + raise + + +def _deserialize(data): + """ + Deserialization wrapper + """ + LOG.debug(_("Deserializing: %s"), data) + return jsonutils.loads(data) + + class ZmqSocket(object): """ A tiny wrapper around ZeroMQ to simplify the send/recv protocol @@ -186,7 +208,8 @@ class ZmqClient(object): def cast(self, msg_id, topic, data, serialize=True, force_envelope=False): if serialize: data = rpc_common.serialize_msg(data, force_envelope) - self.outq.send([str(msg_id), str(topic), str('cast'), data]) + self.outq.send([str(msg_id), str(topic), str('cast'), + _serialize(data)]) def close(self): self.outq.close() @@ -211,11 +234,11 @@ class RpcContext(rpc_common.CommonRpcContext): @classmethod def marshal(self, ctx): ctx_data = ctx.to_dict() - return rpc_common.serialize_msg(ctx_data) + return _serialize(ctx_data) @classmethod def unmarshal(self, data): - return RpcContext.from_dict(rpc_common.deserialize_msg(data)) + return RpcContext.from_dict(_deserialize(data)) class InternalContext(object): @@ -229,7 +252,7 @@ class InternalContext(object): """Process a curried message and cast the result to topic.""" LOG.debug(_("Running func with context: %s"), ctx.to_dict()) data.setdefault('version', None) - data.setdefault('args', []) + data.setdefault('args', {}) try: result = proxy.dispatch( @@ -300,7 +323,7 @@ class ConsumerBase(object): return data.setdefault('version', None) - data.setdefault('args', []) + data.setdefault('args', {}) proxy.dispatch(ctx, data['version'], data['method'], **data['args']) @@ -412,11 +435,11 @@ class ZmqProxy(ZmqBaseReactor): sock_type = zmq.PUB elif topic.startswith('zmq_replies'): sock_type = zmq.PUB - inside = rpc_common.deserialize_msg(in_msg) + inside = rpc_common.deserialize_msg(_deserialize(in_msg)) msg_id = inside[-1]['args']['msg_id'] response = inside[-1]['args']['response'] LOG.debug(_("->response->%s"), response) - data = [str(msg_id), rpc_common.serialize_msg(response)] + data = [str(msg_id), _serialize(response)] else: sock_type = zmq.PUSH @@ -459,7 +482,7 @@ class ZmqReactor(ZmqBaseReactor): msg_id, topic, style, in_msg = data - ctx, request = rpc_common.deserialize_msg(in_msg) + ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg)) ctx = RpcContext.unmarshal(ctx) proxy = self.proxies[sock] @@ -570,7 +593,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None): msg = msg_waiter.recv() LOG.debug(_("Received message: %s"), msg) LOG.debug(_("Unpacking response")) - responses = rpc_common.deserialize_msg(msg[-1]) + responses = _deserialize(msg[-1]) # ZMQError trumps the Timeout error. except zmq.ZMQError: raise RPCException("ZMQ Socket Error")