[zmq] Use json/msgpack instead of pickle

Change-Id: Ia4a08b6f2d932ad0642d64f55bcdadef814e4350
Closes-Bug: #1582207
Closes-Bug: #1584763
Depends-On: I90df59d61af2b40b516a5151c67c184fcc91e366
This commit is contained in:
Gevorg Davoian 2016-07-04 20:09:30 +03:00
parent ac484f6b26
commit 66ded1f914
11 changed files with 112 additions and 62 deletions

@ -100,7 +100,12 @@ zmq_opts = [
cfg.IntOpt('rpc_zmq_bind_port_retries',
default=100,
help='Number of retries to find free port number before '
'fail with ZMQBindError.')
'fail with ZMQBindError.'),
cfg.StrOpt('rpc_zmq_serialization', default='json',
choices=('json', 'msgpack'),
help='Default serialization mechanism for '
'serializing/deserializing outgoing/incoming messages')
]

@ -19,6 +19,7 @@ import threading
import futurist
import six
from oslo_messaging._drivers.zmq_driver.client import zmq_response
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
@ -118,9 +119,11 @@ class ReplyReceiverProxy(ReplyReceiver):
reply_id = socket.recv()
assert reply_id is not None, "Reply ID expected!"
message_type = int(socket.recv())
assert message_type == zmq_names.REPLY_TYPE, "Reply is expected!"
assert message_type == zmq_names.REPLY_TYPE, "Reply expected!"
message_id = socket.recv()
reply = socket.recv_pyobj()
raw_reply = socket.recv_loaded()
assert isinstance(raw_reply, dict), "Dict expected!"
reply = zmq_response.Response(**raw_reply)
LOG.debug("Received reply for %s", message_id)
return reply_id, message_type, message_id, reply
@ -130,9 +133,11 @@ class ReplyReceiverDirect(ReplyReceiver):
def recv_response(self, socket):
empty = socket.recv()
assert empty == b'', "Empty expected!"
reply = socket.recv_pyobj()
raw_reply = socket.recv_loaded()
assert isinstance(raw_reply, dict), "Dict expected!"
reply = zmq_response.Response(**raw_reply)
LOG.debug("Received reply for %s", reply.message_id)
return reply.reply_id, reply.type_, reply.message_id, reply
return reply.reply_id, reply.msg_type, reply.message_id, reply
class AckAndReplyReceiver(ReceiverBase):

@ -17,23 +17,18 @@ from oslo_messaging._drivers.zmq_driver import zmq_names
class Response(object):
def __init__(self, id=None, type=None, message_id=None,
def __init__(self, msg_type=None, message_id=None,
reply_id=None, reply_body=None, failure=None):
self._id = id
self._type = type
self._msg_type = msg_type
self._message_id = message_id
self._reply_id = reply_id
self._reply_body = reply_body
self._failure = failure
@property
def id_(self):
return self._id
@property
def type_(self):
return self._type
def msg_type(self):
return self._msg_type
@property
def message_id(self):
@ -52,11 +47,10 @@ class Response(object):
return self._failure
def to_dict(self):
return {zmq_names.FIELD_ID: self._id,
zmq_names.FIELD_TYPE: self._type,
return {zmq_names.FIELD_MSG_TYPE: self._msg_type,
zmq_names.FIELD_MSG_ID: self._message_id,
zmq_names.FIELD_REPLY_ID: self._reply_id,
zmq_names.FIELD_REPLY: self._reply_body,
zmq_names.FIELD_REPLY_BODY: self._reply_body,
zmq_names.FIELD_FAILURE: self._failure}
def __str__(self):

@ -44,8 +44,8 @@ class RequestSenderProxy(SenderBase):
socket.send(six.b(str(request.msg_type)), zmq.SNDMORE)
socket.send(six.b(request.routing_key), zmq.SNDMORE)
socket.send(six.b(request.message_id), zmq.SNDMORE)
socket.send_pyobj(request.context, zmq.SNDMORE)
socket.send_pyobj(request.message)
socket.send_dumped(request.context, zmq.SNDMORE)
socket.send_dumped(request.message)
LOG.debug("->[proxy:%(addr)s] Sending %(msg_type)s message "
"%(msg_id)s to target %(target)s",
@ -60,20 +60,23 @@ class ReplySenderProxy(SenderBase):
def send(self, socket, reply):
LOG.debug("Replying to %s", reply.message_id)
assert reply.type_ == zmq_names.REPLY_TYPE, "Reply expected!"
assert reply.msg_type == zmq_names.REPLY_TYPE, "Reply expected!"
socket.send(b'', zmq.SNDMORE)
socket.send(six.b(str(reply.type_)), zmq.SNDMORE)
socket.send(six.b(str(reply.msg_type)), zmq.SNDMORE)
socket.send(reply.reply_id, zmq.SNDMORE)
socket.send(reply.message_id, zmq.SNDMORE)
socket.send_pyobj(reply)
socket.send_dumped(reply.to_dict())
class RequestSenderDirect(SenderBase):
def send(self, socket, request):
socket.send(b'', zmq.SNDMORE)
socket.send_pyobj(request)
socket.send(six.b(str(request.msg_type)), zmq.SNDMORE)
socket.send_string(request.message_id, zmq.SNDMORE)
socket.send_dumped(request.context, zmq.SNDMORE)
socket.send_dumped(request.message)
LOG.debug("Sending %(msg_type)s message %(msg_id)s to "
"target %(target)s",
@ -87,8 +90,8 @@ class ReplySenderDirect(SenderBase):
def send(self, socket, reply):
LOG.debug("Replying to %s", reply.message_id)
assert reply.type_ == zmq_names.REPLY_TYPE, "Reply expected!"
assert reply.msg_type == zmq_names.REPLY_TYPE, "Reply expected!"
socket.send(reply.reply_id, zmq.SNDMORE)
socket.send(b'', zmq.SNDMORE)
socket.send_pyobj(reply)
socket.send_dumped(reply.to_dict())

@ -60,10 +60,12 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer):
reply_id = socket.recv()
message_type = int(socket.recv())
message_id = socket.recv()
context = socket.recv_pyobj()
message = socket.recv_pyobj()
LOG.debug("[%(host)s] Received message %(id)s",
{"host": self.host, "id": message_id})
context = socket.recv_loaded()
message = socket.recv_loaded()
LOG.debug("[%(host)s] Received %(msg_type)s message %(msg_id)s",
{"host": self.host,
"msg_type": zmq_names.message_type_str(message_type),
"msg_id": message_id})
if message_type == zmq_names.CALL_TYPE:
return zmq_incoming_message.ZmqIncomingMessage(
context, message, reply_id, message_id, socket, self.sender

@ -15,7 +15,7 @@
import logging
from oslo_messaging._drivers.zmq_driver.client import zmq_senders
from oslo_messaging._drivers.zmq_driver.server.consumers\
from oslo_messaging._drivers.zmq_driver.server.consumers \
import zmq_consumer_base
from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
from oslo_messaging._drivers.zmq_driver import zmq_async
@ -38,29 +38,31 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
reply_id = socket.recv()
empty = socket.recv()
assert empty == b'', 'Bad format: empty delimiter expected'
request = socket.recv_pyobj()
return request, reply_id
msg_type = int(socket.recv())
message_id = socket.recv_string()
context = socket.recv_loaded()
message = socket.recv_loaded()
return reply_id, msg_type, message_id, context, message
def receive_message(self, socket):
try:
request, reply_id = self._receive_request(socket)
LOG.debug("[%(host)s] Received %(type)s, %(id)s, %(target)s",
reply_id, msg_type, message_id, context, message = \
self._receive_request(socket)
LOG.debug("[%(host)s] Received %(msg_type)s message %(msg_id)s",
{"host": self.host,
"type": request.msg_type,
"id": request.message_id,
"target": request.target})
"msg_type": zmq_names.message_type_str(msg_type),
"msg_id": message_id})
if request.msg_type == zmq_names.CALL_TYPE:
if msg_type == zmq_names.CALL_TYPE:
return zmq_incoming_message.ZmqIncomingMessage(
request.context, request.message, reply_id,
request.message_id, socket, self.sender
context, message, reply_id, message_id, socket, self.sender
)
elif request.msg_type in zmq_names.NON_BLOCKING_TYPES:
return zmq_incoming_message.ZmqIncomingMessage(request.context,
request.message)
elif msg_type in zmq_names.NON_BLOCKING_TYPES:
return zmq_incoming_message.ZmqIncomingMessage(context,
message)
else:
LOG.error(_LE("Unknown message type: %s"),
zmq_names.message_type_str(request.msg_type))
zmq_names.message_type_str(msg_type))
except (zmq.ZMQError, AssertionError) as e:
LOG.error(_LE("Receiving message failed: %s"), str(e))

@ -63,8 +63,8 @@ class SubConsumer(zmq_consumer_base.ConsumerBase):
def _receive_request(socket):
topic_filter = socket.recv()
message_id = socket.recv()
context = socket.recv_pyobj()
message = socket.recv_pyobj()
context = socket.recv_loaded()
message = socket.recv_loaded()
LOG.debug("Received %(topic_filter)s topic message %(id)s",
{'id': message_id, 'topic_filter': topic_filter})
return context, message

@ -50,7 +50,7 @@ class ZmqIncomingMessage(base.RpcIncomingMessage):
if self.sender is not None:
if failure is not None:
failure = rpc_common.serialize_remote_exception(failure)
reply = zmq_response.Response(type=zmq_names.REPLY_TYPE,
reply = zmq_response.Response(msg_type=zmq_names.REPLY_TYPE,
message_id=self.message_id,
reply_id=self.reply_id,
reply_body=reply,

@ -17,15 +17,11 @@ from oslo_messaging._drivers.zmq_driver import zmq_async
zmq = zmq_async.import_zmq()
FIELD_TYPE = 'type'
FIELD_FAILURE = 'failure'
FIELD_REPLY = 'reply'
FIELD_ID = 'id'
FIELD_MSG_ID = 'message_id'
FIELD_MSG_TYPE = 'msg_type'
FIELD_MSG_ID = 'message_id'
FIELD_REPLY_ID = 'reply_id'
FIELD_TARGET = 'target'
FIELD_ROUTING_KEY = 'routing_key'
FIELD_REPLY_BODY = 'reply_body'
FIELD_FAILURE = 'failure'
IDX_REPLY_TYPE = 1

@ -23,6 +23,8 @@ from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._i18n import _LE, _LI
from oslo_messaging import exceptions
from oslo_serialization.serializer import json_serializer
from oslo_serialization.serializer import msgpack_serializer
LOG = logging.getLogger(__name__)
@ -31,6 +33,11 @@ zmq = zmq_async.import_zmq()
class ZmqSocket(object):
SERIALIZERS = {
'json': json_serializer.JSONSerializer(),
'msgpack': msgpack_serializer.MessagePackSerializer()
}
def __init__(self, conf, context, socket_type, high_watermark=0):
self.conf = conf
self.context = context
@ -45,6 +52,14 @@ class ZmqSocket(object):
self.handle.identity = six.b(str(uuid.uuid4()))
self.connections = set()
def _get_serializer(self, serialization):
serializer = self.SERIALIZERS.get(serialization, None)
if serializer is None:
raise NotImplementedError(
"Serialization '{}' is not supported".format(serialization)
)
return serializer
def type_name(self):
return zmq_names.socket_type_str(self.socket_type)
@ -77,6 +92,13 @@ class ZmqSocket(object):
def send_multipart(self, *args, **kwargs):
self.handle.send_multipart(*args, **kwargs)
def send_dumped(self, obj, *args, **kwargs):
serialization = kwargs.pop('serialization',
self.conf.rpc_zmq_serialization)
serializer = self._get_serializer(serialization)
s = serializer.dump_as_bytes(obj)
self.handle.send(s, *args, **kwargs)
def recv(self, *args, **kwargs):
return self.handle.recv(*args, **kwargs)
@ -92,6 +114,14 @@ class ZmqSocket(object):
def recv_multipart(self, *args, **kwargs):
return self.handle.recv_multipart(*args, **kwargs)
def recv_loaded(self, *args, **kwargs):
serialization = kwargs.pop('serialization',
self.conf.rpc_zmq_serialization)
serializer = self._get_serializer(serialization)
s = self.handle.recv(*args, **kwargs)
obj = serializer.load_from_bytes(s)
return obj
def close(self, *args, **kwargs):
self.handle.close(*args, **kwargs)
@ -106,10 +136,10 @@ class ZmqSocket(object):
"address": address})
self.connect(address)
except zmq.ZMQError as e:
errmsg = _LE("Failed connecting %(stype) to %(address)s: %(e)s")\
% (stype, address, e)
LOG.error(_LE("Failed connecting %(stype) to %(address)s: %(e)s"),
(stype, address, e))
errmsg = _LE("Failed connecting %(stype)s to %(address)s: %(e)s") \
% {"stype": stype, "address": address, "e": e}
LOG.error(_LE("Failed connecting %(stype)s to %(address)s: %(e)s"),
{"stype": stype, "address": address, "e": e})
raise rpc_common.RPCException(errmsg)
def connect_to_host(self, host):

@ -12,9 +12,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import pickle
import json
import msgpack
import time
import six
import testscenarios
import oslo_messaging
from oslo_messaging._drivers.zmq_driver.client.publishers \
import zmq_pub_publisher
@ -23,6 +27,7 @@ from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging.tests.drivers.zmq import zmq_common
load_tests = testscenarios.load_tests_apply_scenarios
zmq = zmq_async.import_zmq()
@ -31,10 +36,18 @@ class TestPubSub(zmq_common.ZmqBaseTestCase):
LISTENERS_COUNT = 3
scenarios = [
('json', {'serialization': 'json',
'dumps': lambda obj: six.b(json.dumps(obj))}),
('msgpack', {'serialization': 'msgpack',
'dumps': msgpack.dumps})
]
def setUp(self):
super(TestPubSub, self).setUp()
kwargs = {'use_pub_sub': True}
kwargs = {'use_pub_sub': True,
'rpc_zmq_serialization': self.serialization}
self.config(**kwargs)
self.publisher = zmq_pub_publisher.PubPublisherProxy(
@ -58,8 +71,8 @@ class TestPubSub(zmq_common.ZmqBaseTestCase):
zmq_address.target_to_subscribe_filter(target),
b"message",
b"0000-0000",
pickle.dumps(context),
pickle.dumps(message)])
self.dumps(context),
self.dumps(message)])
def _check_listener(self, listener):
listener._received.wait(timeout=5)