From fbe10f0d05331ecb138dceffd3873e018607d88f Mon Sep 17 00:00:00 2001 From: Oleksii Zamiatin Date: Mon, 8 Feb 2016 16:53:05 +0200 Subject: [PATCH] [zmq] Multithreading access to zmq sockets Make all publishers thread-safe using Queue object in front of the bare zmq sockets which are not thread safe by specification. Change-Id: I41b5c3ae0c07cc84cba1b9db2ea85607c684c072 --- .../dealer/zmq_dealer_call_publisher.py | 51 +++----- .../publishers/dealer/zmq_dealer_publisher.py | 121 +++++++----------- .../client/publishers/zmq_publisher_base.py | 32 ++++- .../client/publishers/zmq_push_publisher.py | 31 ++--- .../_drivers/zmq_driver/client/zmq_client.py | 4 +- 5 files changed, 111 insertions(+), 128 deletions(-) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py index dfccc70a0..2491311dd 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py @@ -43,7 +43,20 @@ class DealerCallPublisher(object): self.conf = conf self.matchmaker = matchmaker self.reply_waiter = ReplyWaiter(conf) - self.sender = RequestSender(conf, matchmaker, self.reply_waiter) + sockets_manager = zmq_publisher_base.SocketsManager( + conf, matchmaker, zmq.ROUTER, zmq.DEALER) + + def _do_send_request(socket, request): + # DEALER socket specific envelope empty delimiter + socket.send(b'', zmq.SNDMORE) + socket.send_pyobj(request) + + LOG.debug("Sent message_id %(message)s to a target %(target)s", + {"message": request.message_id, + "target": request.target}) + + self.sender = CallSender(sockets_manager, _do_send_request, + self.reply_waiter) def send_request(self, request): reply_future = self.sender.send_request(request) @@ -68,16 +81,12 @@ class DealerCallPublisher(object): self.sender.cleanup() -class RequestSender(zmq_publisher_base.PublisherBase): +class CallSender(zmq_publisher_base.QueuedSender): - def __init__(self, conf, matchmaker, reply_waiter): - sockets_manager = zmq_publisher_base.SocketsManager( - conf, matchmaker, zmq.ROUTER, zmq.DEALER) - super(RequestSender, self).__init__(sockets_manager) + def __init__(self, sockets_manager, _do_send_request, reply_waiter): + super(CallSender, self).__init__(sockets_manager, _do_send_request) + assert reply_waiter, "Valid ReplyWaiter expected!" self.reply_waiter = reply_waiter - self.queue, self.empty_except = zmq_async.get_queue() - self.executor = zmq_async.get_executor(self.run_loop) - self.executor.execute() def send_request(self, request): reply_future = futurist.Future() @@ -85,30 +94,10 @@ class RequestSender(zmq_publisher_base.PublisherBase): self.queue.put(request) return reply_future - def _do_send_request(self, socket, request): - socket.send(b'', zmq.SNDMORE) - socket.send_pyobj(request) - - LOG.debug("Sending message_id %(message)s to a target %(target)s", - {"message": request.message_id, "target": request.target}) - def _connect_socket(self, target): - return self.outbound_sockets.get_socket(target) - - def run_loop(self): - try: - request = self.queue.get(timeout=self.conf.rpc_poll_timeout) - except self.empty_except: - return - - socket = self._connect_socket(request.target) - - self._do_send_request(socket, request) + socket = self.outbound_sockets.get_socket(target) self.reply_waiter.poll_socket(socket) - - def cleanup(self): - self.executor.stop() - super(RequestSender, self).cleanup() + return socket class ReplyWaiter(object): diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py index fb7a8f3d2..f8fa60552 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py @@ -16,111 +16,78 @@ import logging from oslo_messaging._drivers.zmq_driver.client.publishers\ import zmq_publisher_base +from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names -from oslo_messaging._i18n import _LW LOG = logging.getLogger(__name__) zmq = zmq_async.import_zmq() -class DealerPublisher(zmq_publisher_base.PublisherBase): +class DealerPublisher(zmq_publisher_base.QueuedSender): def __init__(self, conf, matchmaker): + + def _send_message_data(socket, request): + socket.send(b'', zmq.SNDMORE) + socket.send_pyobj(request) + + LOG.debug("Sent message_id %(message)s to a target %(target)s", + {"message": request.message_id, + "target": request.target}) + + def _do_send_request(socket, request): + if request.msg_type in zmq_names.MULTISEND_TYPES: + for _ in range(socket.connections_count()): + _send_message_data(socket, request) + else: + _send_message_data(socket, request) + sockets_manager = zmq_publisher_base.SocketsManager( conf, matchmaker, zmq.ROUTER, zmq.DEALER) - super(DealerPublisher, self).__init__(sockets_manager) + super(DealerPublisher, self).__init__(sockets_manager, + _do_send_request) def send_request(self, request): - - self._check_request_pattern(request) - - dealer_socket = self.outbound_sockets.get_socket(request.target) - - if not dealer_socket.connections: - # NOTE(ozamiatin): Here we can provide - # a queue for keeping messages to send them later - # when some listener appears. However such approach - # being more reliable will consume additional memory. - LOG.warning(_LW("Request %s was dropped because no connection"), - request.msg_type) - return - - if request.msg_type in zmq_names.MULTISEND_TYPES: - for _ in range(dealer_socket.connections_count()): - self._send_request(dealer_socket, request) - else: - self._send_request(dealer_socket, request) - - def _check_request_pattern(self, request): if request.msg_type == zmq_names.CALL_TYPE: raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type) - - def _send_request(self, socket, request): - - socket.send(b'', zmq.SNDMORE) - socket.send_pyobj(request) - - LOG.debug("Sending message_id %(message)s to a target %(target)s", - {"message": request.message_id, "target": request.target}) + super(DealerPublisher, self).send_request(request) def cleanup(self): super(DealerPublisher, self).cleanup() -class DealerPublisherLight(object): +class DealerPublisherLight(zmq_publisher_base.QueuedSender): """Used when publishing to proxy. """ - def __init__(self, conf, address): - super(DealerPublisherLight, self).__init__() - self.conf = conf - self.zmq_context = zmq.Context() - self.socket = self.zmq_context.socket(zmq.DEALER) - self.address = address - self.socket.connect(address) + def __init__(self, conf, matchmaker): - def send_request(self, request): + def _do_send_request(socket, request): + if request.msg_type == zmq_names.CALL_TYPE: + raise zmq_publisher_base.UnsupportedSendPattern( + request.msg_type) - if request.msg_type == zmq_names.CALL_TYPE: - raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type) + envelope = request.create_envelope() - envelope = request.create_envelope() + socket.send(b'', zmq.SNDMORE) + socket.send_pyobj(envelope, zmq.SNDMORE) + socket.send_pyobj(request) - self.socket.send(b'', zmq.SNDMORE) - self.socket.send_pyobj(envelope, zmq.SNDMORE) - self.socket.send_pyobj(request) + LOG.debug("->[proxy:%(addr)s] Sending message_id %(message)s to " + "a target %(target)s", + {"message": request.message_id, + "target": request.target, + "addr": zmq_address.get_broker_address(conf)}) - LOG.debug("->[proxy:%(addr)s] Sending message_id %(message)s to " - "a target %(target)s", - {"message": request.message_id, - "target": request.target, - "addr": self.address}) + sockets_manager = zmq_publisher_base.SocketsManager( + conf, matchmaker, zmq.ROUTER, zmq.DEALER) + super(DealerPublisherLight, self).__init__( + sockets_manager, _do_send_request) + self.socket = self.outbound_sockets.get_socket_to_broker() + + def _connect_socket(self, target): + return self.socket def cleanup(self): self.socket.close() - - -class AcknowledgementReceiver(object): - - def __init__(self): - self.poller = zmq_async.get_poller() - self.thread = zmq_async.get_executor(self.poll_for_acknowledgements) - self.thread.execute() - - def _receive_acknowledgement(self, socket): - empty = socket.recv() - assert empty == b"", "Empty delimiter expected" - ack_message = socket.recv_pyobj() - return ack_message - - def track_socket(self, socket): - self.poller.register(socket, self._receive_acknowledgement) - - def poll_for_acknowledgements(self): - ack_message, socket = self.poller.poll() - LOG.debug("Message %s acknowledged", ack_message[zmq_names.FIELD_ID]) - - def cleanup(self): - self.thread.stop() - self.poller.close() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py index a58bd5fc7..929d1a76d 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py @@ -136,10 +136,9 @@ class SocketsManager(object): self._get_hosts_and_connect(socket, target) return socket - def get_socket_to_broker(self, target): + def get_socket_to_broker(self): socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context, self.socket_type) - self._track_socket(socket, target) address = zmq_address.get_broker_address(self.conf) socket.connect_to_address(address) return socket @@ -147,3 +146,32 @@ class SocketsManager(object): def cleanup(self): for socket, tm in self.outbound_sockets.values(): socket.close() + + +class QueuedSender(PublisherBase): + + def __init__(self, sockets_manager, _do_send_request): + super(QueuedSender, self).__init__(sockets_manager) + self._do_send_request = _do_send_request + self.queue, self.empty_except = zmq_async.get_queue() + self.executor = zmq_async.get_executor(self.run_loop) + self.executor.execute() + + def send_request(self, request): + self.queue.put(request) + + def _connect_socket(self, target): + return self.outbound_sockets.get_socket(target) + + def run_loop(self): + try: + request = self.queue.get(timeout=self.conf.rpc_poll_timeout) + except self.empty_except: + return + + socket = self._connect_socket(request.target) + self._do_send_request(socket, request) + + def cleanup(self): + self.executor.stop() + super(QueuedSender, self).cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py index 4603c915e..4960979a9 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py @@ -18,34 +18,35 @@ from oslo_messaging._drivers.zmq_driver.client.publishers\ import zmq_publisher_base from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names -from oslo_messaging._i18n import _LW LOG = logging.getLogger(__name__) zmq = zmq_async.import_zmq() -class PushPublisher(zmq_publisher_base.PublisherBase): +class PushPublisher(object): def __init__(self, conf, matchmaker): + super(PushPublisher, self).__init__() sockets_manager = zmq_publisher_base.SocketsManager( conf, matchmaker, zmq.PULL, zmq.PUSH) - super(PushPublisher, self).__init__(sockets_manager) + + def _do_send_request(push_socket, request): + push_socket.send_pyobj(request) + + LOG.debug("Sending message_id %(message)s to a target %(target)s", + {"message": request.message_id, + "target": request.target}) + + self.sender = zmq_publisher_base.QueuedSender( + sockets_manager, _do_send_request) def send_request(self, request): - if request.msg_type == zmq_names.CALL_TYPE: + if request.msg_type != zmq_names.CAST_TYPE: raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type) - push_socket = self.outbound_sockets.get_socket(request.target) + self.sender.send_request(request) - if not push_socket.connections: - LOG.warning(_LW("Request %s was dropped because no connection"), - request.msg_type) - return - - if request.msg_type in zmq_names.MULTISEND_TYPES: - for _ in range(push_socket.connections_count()): - self._send_request(push_socket, request) - else: - self._send_request(push_socket, request) + def cleanup(self): + self.sender.cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py index 2fc1d4882..d52667475 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py @@ -20,7 +20,6 @@ from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ from oslo_messaging._drivers.zmq_driver.client.publishers \ import zmq_push_publisher from oslo_messaging._drivers.zmq_driver.client import zmq_client_base -from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names @@ -35,8 +34,7 @@ class ZmqClient(zmq_client_base.ZmqClientBase): conf, matchmaker) fanout_publisher = zmq_dealer_publisher.DealerPublisherLight( - conf, zmq_address.get_broker_address(conf)) \ - if conf.use_pub_sub else default_publisher + conf, matchmaker) if conf.use_pub_sub else default_publisher super(ZmqClient, self).__init__( conf, matchmaker, allowed_remote_exmods,