diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py index 31143646b..61f3e37a0 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py @@ -14,6 +14,8 @@ import logging +import six + from oslo_messaging._drivers.zmq_driver.client.publishers \ import zmq_pub_publisher from oslo_messaging._drivers.zmq_driver import zmq_address @@ -67,11 +69,11 @@ class UniversalQueueProxy(object): if message is None: return - envelope = message[zmq_names.MULTIPART_IDX_ENVELOPE] - if self.conf.use_pub_sub and envelope.is_mult_send: - LOG.debug("-> Redirecting request %s to TCP publisher", envelope) + msg_type = message[0] + if self.conf.use_pub_sub and msg_type in (zmq_names.CAST_FANOUT_TYPE, + zmq_names.NOTIFY_TYPE): self.pub_publisher.send_request(message) - elif not envelope.is_mult_send: + elif msg_type in zmq_names.DIRECT_TYPES: self._redirect_message(self.be_router_socket if socket is self.fe_router_socket else self.fe_router_socket, message) @@ -83,9 +85,12 @@ class UniversalQueueProxy(object): assert reply_id is not None, "Valid id expected" empty = socket.recv() assert empty == b'', "Empty delimiter expected" - envelope = socket.recv_pyobj() + msg_type = int(socket.recv()) + routing_key = socket.recv() payload = socket.recv_multipart() - payload.insert(zmq_names.MULTIPART_IDX_ENVELOPE, envelope) + payload.insert(0, reply_id) + payload.insert(0, routing_key) + payload.insert(0, msg_type) return payload except (AssertionError, zmq.ZMQError): LOG.error("Received message with wrong format") @@ -93,14 +98,16 @@ class UniversalQueueProxy(object): @staticmethod def _redirect_message(socket, multipart_message): - envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE] - LOG.debug("<-> Dispatch message: %s", envelope) - response_binary = multipart_message[zmq_names.MULTIPART_IDX_BODY] - - socket.send(envelope.routing_key, zmq.SNDMORE) + message_type = multipart_message.pop(0) + routing_key = multipart_message.pop(0) + reply_id = multipart_message.pop(0) + message_id = multipart_message[0] + socket.send(routing_key, zmq.SNDMORE) socket.send(b'', zmq.SNDMORE) - socket.send_pyobj(envelope, zmq.SNDMORE) - socket.send(response_binary) + socket.send(reply_id, zmq.SNDMORE) + socket.send(six.b(str(message_type)), zmq.SNDMORE) + LOG.debug("Redirecting message %s" % message_id) + socket.send_multipart(multipart_message) def cleanup(self): self.fe_router_socket.close() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py index 9bd7118fe..1064e7279 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py @@ -13,6 +13,7 @@ # under the License. import logging +import six import time from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ @@ -21,6 +22,7 @@ from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ import zmq_reply_waiter from oslo_messaging._drivers.zmq_driver.client.publishers \ import zmq_publisher_base +from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names @@ -43,13 +45,16 @@ class DealerPublisherProxy(object): raise zmq_publisher_base.UnsupportedSendPattern( request.msg_type) - envelope = request.create_envelope( - routing_key=self.routing_table.get_routable_host(request.target) - if request.msg_type in zmq_names.DIRECT_TYPES else None) + routing_key = self.routing_table.get_routable_host(request.target) \ + if request.msg_type in zmq_names.DIRECT_TYPES else \ + zmq_address.target_to_subscribe_filter(request.target) self.socket.send(b'', zmq.SNDMORE) - self.socket.send_pyobj(envelope, zmq.SNDMORE) - self.socket.send_pyobj(request) + self.socket.send(six.b(str(request.msg_type)), zmq.SNDMORE) + self.socket.send(six.b(routing_key), zmq.SNDMORE) + self.socket.send(six.b(request.message_id), zmq.SNDMORE) + self.socket.send_pyobj(request.context, zmq.SNDMORE) + self.socket.send_pyobj(request.message) LOG.debug("->[proxy:%(addr)s] Sending message_id %(message)s to " "a target %(target)s", @@ -64,7 +69,7 @@ class DealerPublisherProxy(object): class DealerCallPublisherProxy(zmq_dealer_call_publisher.DealerCallPublisher): def __init__(self, conf, matchmaker, sockets_manager): - reply_waiter = zmq_reply_waiter.ReplyWaiter(conf) + reply_waiter = ReplyWaiterProxy(conf) sender = CallSenderProxy(conf, matchmaker, sockets_manager, reply_waiter) super(DealerCallPublisherProxy, self).__init__( @@ -84,19 +89,36 @@ class CallSenderProxy(zmq_dealer_call_publisher.CallSender): return self.socket def _do_send_request(self, socket, request): - envelope = request.create_envelope( - routing_key=self.routing_table.get_routable_host(request.target), - reply_id=self.socket.handle.identity) + routing_key = self.routing_table.get_routable_host(request.target) + # DEALER socket specific envelope empty delimiter socket.send(b'', zmq.SNDMORE) - socket.send_pyobj(envelope, zmq.SNDMORE) - socket.send_pyobj(request) + socket.send(six.b(str(request.msg_type)), zmq.SNDMORE) + socket.send(six.b(routing_key), zmq.SNDMORE) + socket.send(six.b(request.message_id), zmq.SNDMORE) + socket.send_pyobj(request.context, zmq.SNDMORE) + socket.send_pyobj(request.message) LOG.debug("Sent message_id %(message)s to a target %(target)s", {"message": request.message_id, "target": request.target}) +class ReplyWaiterProxy(zmq_reply_waiter.ReplyWaiter): + + def receive_method(self, socket): + empty = socket.recv() + assert empty == b'', "Empty expected!" + reply_id = socket.recv() + assert reply_id is not None, "Reply ID expected!" + message_type = int(socket.recv()) + assert message_type == zmq_names.REPLY_TYPE, "Reply is expected!" + message_id = socket.recv() + reply = socket.recv_pyobj() + LOG.debug("Received reply %s", message_id) + return reply + + class RoutingTable(object): """This class implements local routing-table cache taken from matchmaker. Its purpose is to give the next routable diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_reply_waiter.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_reply_waiter.py index b7d8a7b76..027bc7baf 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_reply_waiter.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_reply_waiter.py @@ -42,17 +42,16 @@ class ReplyWaiter(object): self.replies.pop(message_id) def poll_socket(self, socket): + self.poller.register(socket, recv_method=self.receive_method) - def _receive_method(socket): - empty = socket.recv() - assert empty == b'', "Empty expected!" - envelope = socket.recv_pyobj() - assert envelope is not None, "Invalid envelope!" - reply = socket.recv_pyobj() - LOG.debug("Received reply %s", envelope) - return reply - - self.poller.register(socket, recv_method=_receive_method) + def receive_method(self, socket): + empty = socket.recv() + assert empty == b'', "Empty expected!" + envelope = socket.recv_pyobj() + assert envelope is not None, "Invalid envelope!" + reply = socket.recv_pyobj() + LOG.debug("Received reply %s", envelope) + return reply def run_loop(self): reply, socket = self.poller.poll( diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py index 4054194c5..dbe995b24 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py @@ -14,8 +14,6 @@ import logging -from oslo_messaging._drivers.zmq_driver.client.publishers\ - import zmq_publisher_base from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names @@ -53,21 +51,21 @@ class PubPublisherProxy(object): self.socket.port) def send_request(self, multipart_message): - - envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE] - if not envelope.is_mult_send: - raise zmq_publisher_base.UnsupportedSendPattern(envelope.msg_type) - - topic_filter = envelope.topic_filter + message_type = multipart_message.pop(0) + assert message_type in (zmq_names.CAST_FANOUT_TYPE, + zmq_names.NOTIFY_TYPE), "Fanout expected!" + topic_filter = multipart_message.pop(0) + message_id = multipart_message.pop(0) + reply_id = multipart_message.pop(0) + assert reply_id is not None, "Reply id expected!" self.socket.send(topic_filter, zmq.SNDMORE) - self.socket.send(multipart_message[zmq_names.MULTIPART_IDX_BODY]) + self.socket.send(message_id, zmq.SNDMORE) + self.socket.send_multipart(multipart_message) - LOG.debug("Publishing message [%(topic)s] %(message_id)s to " - "a target %(target)s ", - {"message_id": envelope.message_id, - "target": envelope.target, - "topic": topic_filter}) + LOG.debug("Publishing message %(message_id)s on [%(topic)s]", + {"topic": topic_filter, + "message_id": message_id}) def cleanup(self): self.socket.close() diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py index dc5419450..6fbebf63f 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py @@ -14,6 +14,8 @@ import logging +import six + from oslo_messaging._drivers import base from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver.client.publishers\ @@ -32,15 +34,14 @@ zmq = zmq_async.import_zmq() class DealerIncomingMessage(base.RpcIncomingMessage): - def __init__(self, context, message, msg_id): + def __init__(self, context, message): super(DealerIncomingMessage, self).__init__(context, message) - self.msg_id = msg_id def reply(self, reply=None, failure=None, log_failure=True): """Reply is not needed for non-call messages""" def acknowledge(self): - LOG.debug("Not sending acknowledge for %s", self.msg_id) + """Not sending acknowledge""" def requeue(self): """Requeue is not supported""" @@ -48,31 +49,29 @@ class DealerIncomingMessage(base.RpcIncomingMessage): class DealerIncomingRequest(base.RpcIncomingMessage): - def __init__(self, socket, request, envelope): - super(DealerIncomingRequest, self).__init__(request.context, - request.message) + def __init__(self, socket, reply_id, message_id, context, message): + super(DealerIncomingRequest, self).__init__(context, message) self.reply_socket = socket - self.request = request - self.envelope = envelope + self.reply_id = reply_id + self.message_id = message_id def reply(self, reply=None, failure=None, log_failure=True): if failure is not None: failure = rpc_common.serialize_remote_exception(failure, log_failure) response = zmq_response.Response(type=zmq_names.REPLY_TYPE, - message_id=self.request.message_id, - reply_id=self.envelope.reply_id, + message_id=self.message_id, + reply_id=self.reply_id, reply_body=reply, failure=failure, log_failure=log_failure) - LOG.debug("Replying %s", (str(self.request.message_id))) - - self.envelope.routing_key = self.envelope.reply_id - self.envelope.msg_type = zmq_names.REPLY_TYPE + LOG.debug("Replying %s", self.message_id) self.reply_socket.send(b'', zmq.SNDMORE) - self.reply_socket.send_pyobj(self.envelope, zmq.SNDMORE) + self.reply_socket.send(six.b(str(zmq_names.REPLY_TYPE)), zmq.SNDMORE) + self.reply_socket.send(self.reply_id, zmq.SNDMORE) + self.reply_socket.send(self.message_id, zmq.SNDMORE) self.reply_socket.send_pyobj(response) def requeue(self): @@ -95,29 +94,25 @@ class DealerConsumer(zmq_consumer_base.ConsumerBase): zmq.DEALER) LOG.info(_LI("[%s] Run DEALER consumer"), self.host) - def _receive_request(self, socket): - empty = socket.recv() - assert empty == b'', 'Bad format: empty delimiter expected' - envelope = socket.recv_pyobj() - request = socket.recv_pyobj() - return request, envelope - def receive_message(self, socket): try: - request, envelope = self._receive_request(socket) - LOG.debug("[%(host)s] Received %(type)s, %(id)s, %(target)s", - {"host": self.host, - "type": request.msg_type, - "id": request.message_id, - "target": request.target}) - - if request.msg_type == zmq_names.CALL_TYPE: - return DealerIncomingRequest(socket, request, envelope) - elif request.msg_type in zmq_names.NON_BLOCKING_TYPES: - return DealerIncomingMessage(request.context, request.message, - request.message_id) + empty = socket.recv() + assert empty == b'', 'Bad format: empty delimiter expected' + reply_id = socket.recv() + message_type = int(socket.recv()) + message_id = socket.recv() + context = socket.recv_pyobj() + message = socket.recv_pyobj() + LOG.debug("[%(host)s] Received message %(id)s", + {"host": self.host, "id": message_id}) + if message_type == zmq_names.CALL_TYPE: + return DealerIncomingRequest( + socket, reply_id, message_id, context, message) + elif message_type in zmq_names.NON_BLOCKING_TYPES: + return DealerIncomingMessage(context, message) else: - LOG.error(_LE("Unknown message type: %s"), request.msg_type) + LOG.error(_LE("Unknown message type: %s"), + zmq_names.message_type_str(message_type)) except (zmq.ZMQError, AssertionError) as e: LOG.error(_LE("Receiving message failure: %s"), str(e)) diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py index 32c1ec5cd..0d1c5213e 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py @@ -13,7 +13,6 @@ # under the License. import logging -import uuid import six @@ -22,7 +21,6 @@ from oslo_messaging._drivers.zmq_driver.server.consumers\ import zmq_consumer_base from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_socket from oslo_messaging._i18n import _LE @@ -33,17 +31,14 @@ zmq = zmq_async.import_zmq() class SubIncomingMessage(base.RpcIncomingMessage): - def __init__(self, request, socket): - super(SubIncomingMessage, self).__init__( - request.context, request.message) - self.socket = socket - self.msg_id = request.message_id + def __init__(self, context, message): + super(SubIncomingMessage, self).__init__(context, message) def reply(self, reply=None, failure=None, log_failure=True): """Reply is not needed for non-call messages.""" def acknowledge(self): - LOG.debug("Not sending acknowledge for %s", self.msg_id) + """Requeue is not supported""" def requeue(self): """Requeue is not supported""" @@ -57,7 +52,6 @@ class SubConsumer(zmq_consumer_base.ConsumerBase): self.target = server.target self.socket = zmq_socket.ZmqSocket(self.conf, self.context, zmq.SUB) self.sockets.append(self.socket) - self.id = uuid.uuid4() self._subscribe_on_target(self.target) self.on_publishers(self.matchmaker.get_publishers()) self.poller.register(self.socket, self.receive_message) @@ -66,11 +60,9 @@ class SubConsumer(zmq_consumer_base.ConsumerBase): for host, sync in publishers: self.socket.connect(zmq_address.get_tcp_direct_address(host)) LOG.debug("[%s] SUB consumer connected to publishers %s", - self.id, publishers) + self.socket.handle.identity, publishers) def _subscribe_on_target(self, target): - # NOTE(ozamiatin): No locks needed here, because this is called - # before the async updater loop started topic_filter = zmq_address.target_to_subscribe_filter(target) if target.topic: self.socket.setsockopt(zmq.SUBSCRIBE, six.b(target.topic)) @@ -78,31 +70,27 @@ class SubConsumer(zmq_consumer_base.ConsumerBase): self.socket.setsockopt(zmq.SUBSCRIBE, six.b(target.server)) if target.topic and target.server: self.socket.setsockopt(zmq.SUBSCRIBE, topic_filter) - LOG.debug("[%(host)s] Subscribing to topic %(filter)s", - {"host": self.id, "filter": topic_filter}) + {"host": self.socket.handle.identity, + "filter": topic_filter}) - def _receive_request(self, socket): + @staticmethod + def _receive_request(socket): topic_filter = socket.recv() - LOG.debug("[%(id)s] Received %(topic_filter)s topic", - {'id': self.id, 'topic_filter': topic_filter}) - request = socket.recv_pyobj() - return request + message_id = socket.recv() + context = socket.recv_pyobj() + message = socket.recv_pyobj() + LOG.debug("Received %(topic_filter)s topic message %(id)s", + {'id': message_id, 'topic_filter': topic_filter}) + return context, message def receive_message(self, socket): try: - request = self._receive_request(socket) - if not request: + context, message = self._receive_request(socket) + if not message: return None - LOG.debug("Received %(type)s, %(id)s, %(target)s", - {"type": request.msg_type, - "id": request.message_id, - "target": request.target}) - if request.msg_type not in zmq_names.MULTISEND_TYPES: - LOG.error(_LE("Unknown message type: %s"), request.msg_type) - else: - return SubIncomingMessage(request, socket) + return SubIncomingMessage(context, message) except (zmq.ZMQError, AssertionError) as e: LOG.error(_LE("Receiving message failed: %s"), str(e)) diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_names.py b/oslo_messaging/_drivers/zmq_driver/zmq_names.py index 796e94d2c..ae477e6df 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_names.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_names.py @@ -36,13 +36,12 @@ MULTIPART_IDX_ENVELOPE = 0 MULTIPART_IDX_BODY = 1 -CALL_TYPE = 'call' -CAST_TYPE = 'cast' -CAST_FANOUT_TYPE = 'cast-f' -NOTIFY_TYPE = 'notify' - -REPLY_TYPE = 'reply' -ACK_TYPE = 'ack' +CALL_TYPE = 1 +CAST_TYPE = 2 +CAST_FANOUT_TYPE = 3 +NOTIFY_TYPE = 4 +REPLY_TYPE = 5 +ACK_TYPE = 6 MESSAGE_TYPES = (CALL_TYPE, CAST_TYPE, @@ -50,7 +49,7 @@ MESSAGE_TYPES = (CALL_TYPE, NOTIFY_TYPE) MULTISEND_TYPES = (CAST_FANOUT_TYPE, NOTIFY_TYPE) -DIRECT_TYPES = (CALL_TYPE, CAST_TYPE) +DIRECT_TYPES = (CALL_TYPE, CAST_TYPE, REPLY_TYPE) CAST_TYPES = (CAST_TYPE, CAST_FANOUT_TYPE) NOTIFY_TYPES = (NOTIFY_TYPE,) NON_BLOCKING_TYPES = CAST_TYPES + NOTIFY_TYPES @@ -66,3 +65,13 @@ def socket_type_str(socket_type): zmq.PUB: "PUB", zmq.SUB: "SUB"} return zmq_socket_str[socket_type] + + +def message_type_str(message_type): + msg_type_str = {CALL_TYPE: "CALL", + CAST_TYPE: "CAST", + CAST_FANOUT_TYPE: "CAST_FANOUT_TYPE", + NOTIFY_TYPE: "NOTIFY_TYPE", + REPLY_TYPE: "REPLY_TYPE", + ACK_TYPE: "ACK_TYPE"} + return msg_type_str[message_type] diff --git a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py index 091397cf8..0287ccf24 100644 --- a/oslo_messaging/tests/drivers/zmq/test_pub_sub.py +++ b/oslo_messaging/tests/drivers/zmq/test_pub_sub.py @@ -15,13 +15,12 @@ import pickle import time -import contextlib - import oslo_messaging from oslo_messaging._drivers.zmq_driver.client.publishers \ import zmq_pub_publisher -from oslo_messaging._drivers.zmq_driver.client import zmq_request +from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging.tests.drivers.zmq import zmq_common @@ -51,11 +50,16 @@ class TestPubSub(zmq_common.ZmqBaseTestCase): # Needed only in test env to give listener a chance to connect # before request fires time.sleep(1) - with contextlib.closing(zmq_request.FanoutRequest( - target, context={}, message={'method': 'hello-world'}, - retry=None)) as request: - self.publisher.send_request([request.create_envelope(), - pickle.dumps(request)]) + context = {} + message = {'method': 'hello-world'} + + self.publisher.send_request( + [zmq_names.CAST_FANOUT_TYPE, + zmq_address.target_to_subscribe_filter(target), + b"message", + b"0000-0000", + pickle.dumps(context), + pickle.dumps(message)]) def _check_listener(self, listener): listener._received.wait(timeout=5) diff --git a/setup-test-env-zmq.sh b/setup-test-env-zmq.sh index df9629f30..a8ed0dd32 100755 --- a/setup-test-env-zmq.sh +++ b/setup-test-env-zmq.sh @@ -16,7 +16,7 @@ cat > ${DATADIR}/zmq.conf <