diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py index 805f1e3de..a5c3f0fdf 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_dealer_publisher.py @@ -63,8 +63,8 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend): socket.send(b'', zmq.SNDMORE) socket.send_string(request.msg_type, zmq.SNDMORE) socket.send_string(message_id, zmq.SNDMORE) - socket.send_json(request.context, zmq.SNDMORE) - socket.send_json(request.message) + socket.send_pyobj(request.context, zmq.SNDMORE) + socket.send_pyobj(request.message) LOG.info(_LI("Sending message %(message)s to a target %(target)s") % {"message": request.message, @@ -85,7 +85,7 @@ class AcknowledgementReceiver(object): def _receive_acknowledgement(self, socket): empty = socket.recv() assert empty == b"", "Empty delimiter expected" - ack_message = socket.recv_json() + ack_message = socket.recv_pyobj() return ack_message def track_socket(self, socket): diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py index eff59dab9..fccd74bd1 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py @@ -93,8 +93,8 @@ class PublisherBase(object): :type request: zmq_request.Request """ socket.send_string(request.msg_type, zmq.SNDMORE) - socket.send_json(request.context, zmq.SNDMORE) - socket.send_json(request.message) + socket.send_pyobj(request.context, zmq.SNDMORE) + socket.send_pyobj(request.message) def cleanup(self): """Cleanup publisher. Close allocated connections.""" diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py index a3096959c..066be8bd5 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_req_publisher.py @@ -64,7 +64,7 @@ class ReqPublisher(zmq_publisher_base.PublisherBase): def _receive_reply(socket, request): def _receive_method(socket): - return socket.recv_json() + return socket.recv_pyobj() # NOTE(ozamiatin): Check for retry here (no retries now) with contextlib.closing(zmq_async.get_reply_poller()) as poller: diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py index a90f71b5a..98ef3a73c 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py @@ -54,8 +54,8 @@ class PullConsumer(zmq_consumer_base.SingleSocketConsumer): try: msg_type = socket.recv_string() assert msg_type is not None, 'Bad format: msg type expected' - context = socket.recv_json() - message = socket.recv_json() + context = socket.recv_pyobj() + message = socket.recv_pyobj() LOG.info(_LI("Received %(msg_type)s message %(msg)s") % {"msg_type": msg_type, "msg": str(message)}) diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py index 2219b0c27..bfbfe9fdc 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py @@ -46,7 +46,7 @@ class RouterIncomingMessage(base.IncomingMessage): ack_message = {zmq_names.FIELD_ID: self.msg_id} self.socket.send(self.reply_id, zmq.SNDMORE) self.socket.send(b'', zmq.SNDMORE) - self.socket.send_json(ack_message) + self.socket.send_pyobj(ack_message) def requeue(self): """Requeue is not supported""" @@ -73,8 +73,8 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): if msg_type != zmq_names.CALL_TYPE: msg_id = socket.recv_string() - context = socket.recv_json() - message = socket.recv_json() + context = socket.recv_pyobj() + message = socket.recv_pyobj() LOG.info(_LI("Received %(msg_type)s message %(msg)s") % {"msg_type": msg_type, "msg": str(message)}) diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py index 9d1351225..f43ec2325 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py @@ -48,7 +48,7 @@ class ZmqIncomingRequest(base.IncomingMessage): self.received = True self.reply_socket.send(self.reply_id, zmq.SNDMORE) self.reply_socket.send(b'', zmq.SNDMORE) - self.reply_socket.send_json(message_reply) + self.reply_socket.send_pyobj(message_reply) self.poller.resume_polling(self.reply_socket) def requeue(self): diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py index 59dee614e..2a4144c5a 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py @@ -54,6 +54,9 @@ class ZmqSocket(object): def send_json(self, *args, **kwargs): self.handle.send_json(*args, **kwargs) + def send_pyobj(self, *args, **kwargs): + self.handle.send_pyobj(*args, **kwargs) + def recv(self, *args, **kwargs): return self.handle.recv(*args, **kwargs) @@ -63,6 +66,9 @@ class ZmqSocket(object): def recv_json(self, *args, **kwargs): return self.handle.recv_json(*args, **kwargs) + def recv_pyobj(self, *args, **kwargs): + return self.handle.recv_pyobj(*args, **kwargs) + def close(self, *args, **kwargs): self.handle.close(*args, **kwargs)