[zmq] Multithreading access to zmq sockets
Make all publishers thread-safe using Queue object in front of the bare zmq sockets which are not thread safe by specification. Change-Id: I41b5c3ae0c07cc84cba1b9db2ea85607c684c072
This commit is contained in:
parent
87ff93e476
commit
fbe10f0d05
@ -43,7 +43,20 @@ class DealerCallPublisher(object):
|
||||
self.conf = conf
|
||||
self.matchmaker = matchmaker
|
||||
self.reply_waiter = ReplyWaiter(conf)
|
||||
self.sender = RequestSender(conf, matchmaker, self.reply_waiter)
|
||||
sockets_manager = zmq_publisher_base.SocketsManager(
|
||||
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
|
||||
|
||||
def _do_send_request(socket, request):
|
||||
# DEALER socket specific envelope empty delimiter
|
||||
socket.send(b'', zmq.SNDMORE)
|
||||
socket.send_pyobj(request)
|
||||
|
||||
LOG.debug("Sent message_id %(message)s to a target %(target)s",
|
||||
{"message": request.message_id,
|
||||
"target": request.target})
|
||||
|
||||
self.sender = CallSender(sockets_manager, _do_send_request,
|
||||
self.reply_waiter)
|
||||
|
||||
def send_request(self, request):
|
||||
reply_future = self.sender.send_request(request)
|
||||
@ -68,16 +81,12 @@ class DealerCallPublisher(object):
|
||||
self.sender.cleanup()
|
||||
|
||||
|
||||
class RequestSender(zmq_publisher_base.PublisherBase):
|
||||
class CallSender(zmq_publisher_base.QueuedSender):
|
||||
|
||||
def __init__(self, conf, matchmaker, reply_waiter):
|
||||
sockets_manager = zmq_publisher_base.SocketsManager(
|
||||
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
|
||||
super(RequestSender, self).__init__(sockets_manager)
|
||||
def __init__(self, sockets_manager, _do_send_request, reply_waiter):
|
||||
super(CallSender, self).__init__(sockets_manager, _do_send_request)
|
||||
assert reply_waiter, "Valid ReplyWaiter expected!"
|
||||
self.reply_waiter = reply_waiter
|
||||
self.queue, self.empty_except = zmq_async.get_queue()
|
||||
self.executor = zmq_async.get_executor(self.run_loop)
|
||||
self.executor.execute()
|
||||
|
||||
def send_request(self, request):
|
||||
reply_future = futurist.Future()
|
||||
@ -85,30 +94,10 @@ class RequestSender(zmq_publisher_base.PublisherBase):
|
||||
self.queue.put(request)
|
||||
return reply_future
|
||||
|
||||
def _do_send_request(self, socket, request):
|
||||
socket.send(b'', zmq.SNDMORE)
|
||||
socket.send_pyobj(request)
|
||||
|
||||
LOG.debug("Sending message_id %(message)s to a target %(target)s",
|
||||
{"message": request.message_id, "target": request.target})
|
||||
|
||||
def _connect_socket(self, target):
|
||||
return self.outbound_sockets.get_socket(target)
|
||||
|
||||
def run_loop(self):
|
||||
try:
|
||||
request = self.queue.get(timeout=self.conf.rpc_poll_timeout)
|
||||
except self.empty_except:
|
||||
return
|
||||
|
||||
socket = self._connect_socket(request.target)
|
||||
|
||||
self._do_send_request(socket, request)
|
||||
socket = self.outbound_sockets.get_socket(target)
|
||||
self.reply_waiter.poll_socket(socket)
|
||||
|
||||
def cleanup(self):
|
||||
self.executor.stop()
|
||||
super(RequestSender, self).cleanup()
|
||||
return socket
|
||||
|
||||
|
||||
class ReplyWaiter(object):
|
||||
|
@ -16,111 +16,78 @@ import logging
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers\
|
||||
import zmq_publisher_base
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._i18n import _LW
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
class DealerPublisher(zmq_publisher_base.PublisherBase):
|
||||
class DealerPublisher(zmq_publisher_base.QueuedSender):
|
||||
|
||||
def __init__(self, conf, matchmaker):
|
||||
|
||||
def _send_message_data(socket, request):
|
||||
socket.send(b'', zmq.SNDMORE)
|
||||
socket.send_pyobj(request)
|
||||
|
||||
LOG.debug("Sent message_id %(message)s to a target %(target)s",
|
||||
{"message": request.message_id,
|
||||
"target": request.target})
|
||||
|
||||
def _do_send_request(socket, request):
|
||||
if request.msg_type in zmq_names.MULTISEND_TYPES:
|
||||
for _ in range(socket.connections_count()):
|
||||
_send_message_data(socket, request)
|
||||
else:
|
||||
_send_message_data(socket, request)
|
||||
|
||||
sockets_manager = zmq_publisher_base.SocketsManager(
|
||||
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
|
||||
super(DealerPublisher, self).__init__(sockets_manager)
|
||||
super(DealerPublisher, self).__init__(sockets_manager,
|
||||
_do_send_request)
|
||||
|
||||
def send_request(self, request):
|
||||
|
||||
self._check_request_pattern(request)
|
||||
|
||||
dealer_socket = self.outbound_sockets.get_socket(request.target)
|
||||
|
||||
if not dealer_socket.connections:
|
||||
# NOTE(ozamiatin): Here we can provide
|
||||
# a queue for keeping messages to send them later
|
||||
# when some listener appears. However such approach
|
||||
# being more reliable will consume additional memory.
|
||||
LOG.warning(_LW("Request %s was dropped because no connection"),
|
||||
request.msg_type)
|
||||
return
|
||||
|
||||
if request.msg_type in zmq_names.MULTISEND_TYPES:
|
||||
for _ in range(dealer_socket.connections_count()):
|
||||
self._send_request(dealer_socket, request)
|
||||
else:
|
||||
self._send_request(dealer_socket, request)
|
||||
|
||||
def _check_request_pattern(self, request):
|
||||
if request.msg_type == zmq_names.CALL_TYPE:
|
||||
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
|
||||
|
||||
def _send_request(self, socket, request):
|
||||
|
||||
socket.send(b'', zmq.SNDMORE)
|
||||
socket.send_pyobj(request)
|
||||
|
||||
LOG.debug("Sending message_id %(message)s to a target %(target)s",
|
||||
{"message": request.message_id, "target": request.target})
|
||||
super(DealerPublisher, self).send_request(request)
|
||||
|
||||
def cleanup(self):
|
||||
super(DealerPublisher, self).cleanup()
|
||||
|
||||
|
||||
class DealerPublisherLight(object):
|
||||
class DealerPublisherLight(zmq_publisher_base.QueuedSender):
|
||||
"""Used when publishing to proxy. """
|
||||
|
||||
def __init__(self, conf, address):
|
||||
super(DealerPublisherLight, self).__init__()
|
||||
self.conf = conf
|
||||
self.zmq_context = zmq.Context()
|
||||
self.socket = self.zmq_context.socket(zmq.DEALER)
|
||||
self.address = address
|
||||
self.socket.connect(address)
|
||||
def __init__(self, conf, matchmaker):
|
||||
|
||||
def send_request(self, request):
|
||||
def _do_send_request(socket, request):
|
||||
if request.msg_type == zmq_names.CALL_TYPE:
|
||||
raise zmq_publisher_base.UnsupportedSendPattern(
|
||||
request.msg_type)
|
||||
|
||||
if request.msg_type == zmq_names.CALL_TYPE:
|
||||
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
|
||||
envelope = request.create_envelope()
|
||||
|
||||
envelope = request.create_envelope()
|
||||
socket.send(b'', zmq.SNDMORE)
|
||||
socket.send_pyobj(envelope, zmq.SNDMORE)
|
||||
socket.send_pyobj(request)
|
||||
|
||||
self.socket.send(b'', zmq.SNDMORE)
|
||||
self.socket.send_pyobj(envelope, zmq.SNDMORE)
|
||||
self.socket.send_pyobj(request)
|
||||
LOG.debug("->[proxy:%(addr)s] Sending message_id %(message)s to "
|
||||
"a target %(target)s",
|
||||
{"message": request.message_id,
|
||||
"target": request.target,
|
||||
"addr": zmq_address.get_broker_address(conf)})
|
||||
|
||||
LOG.debug("->[proxy:%(addr)s] Sending message_id %(message)s to "
|
||||
"a target %(target)s",
|
||||
{"message": request.message_id,
|
||||
"target": request.target,
|
||||
"addr": self.address})
|
||||
sockets_manager = zmq_publisher_base.SocketsManager(
|
||||
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
|
||||
super(DealerPublisherLight, self).__init__(
|
||||
sockets_manager, _do_send_request)
|
||||
self.socket = self.outbound_sockets.get_socket_to_broker()
|
||||
|
||||
def _connect_socket(self, target):
|
||||
return self.socket
|
||||
|
||||
def cleanup(self):
|
||||
self.socket.close()
|
||||
|
||||
|
||||
class AcknowledgementReceiver(object):
|
||||
|
||||
def __init__(self):
|
||||
self.poller = zmq_async.get_poller()
|
||||
self.thread = zmq_async.get_executor(self.poll_for_acknowledgements)
|
||||
self.thread.execute()
|
||||
|
||||
def _receive_acknowledgement(self, socket):
|
||||
empty = socket.recv()
|
||||
assert empty == b"", "Empty delimiter expected"
|
||||
ack_message = socket.recv_pyobj()
|
||||
return ack_message
|
||||
|
||||
def track_socket(self, socket):
|
||||
self.poller.register(socket, self._receive_acknowledgement)
|
||||
|
||||
def poll_for_acknowledgements(self):
|
||||
ack_message, socket = self.poller.poll()
|
||||
LOG.debug("Message %s acknowledged", ack_message[zmq_names.FIELD_ID])
|
||||
|
||||
def cleanup(self):
|
||||
self.thread.stop()
|
||||
self.poller.close()
|
||||
|
@ -136,10 +136,9 @@ class SocketsManager(object):
|
||||
self._get_hosts_and_connect(socket, target)
|
||||
return socket
|
||||
|
||||
def get_socket_to_broker(self, target):
|
||||
def get_socket_to_broker(self):
|
||||
socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context,
|
||||
self.socket_type)
|
||||
self._track_socket(socket, target)
|
||||
address = zmq_address.get_broker_address(self.conf)
|
||||
socket.connect_to_address(address)
|
||||
return socket
|
||||
@ -147,3 +146,32 @@ class SocketsManager(object):
|
||||
def cleanup(self):
|
||||
for socket, tm in self.outbound_sockets.values():
|
||||
socket.close()
|
||||
|
||||
|
||||
class QueuedSender(PublisherBase):
|
||||
|
||||
def __init__(self, sockets_manager, _do_send_request):
|
||||
super(QueuedSender, self).__init__(sockets_manager)
|
||||
self._do_send_request = _do_send_request
|
||||
self.queue, self.empty_except = zmq_async.get_queue()
|
||||
self.executor = zmq_async.get_executor(self.run_loop)
|
||||
self.executor.execute()
|
||||
|
||||
def send_request(self, request):
|
||||
self.queue.put(request)
|
||||
|
||||
def _connect_socket(self, target):
|
||||
return self.outbound_sockets.get_socket(target)
|
||||
|
||||
def run_loop(self):
|
||||
try:
|
||||
request = self.queue.get(timeout=self.conf.rpc_poll_timeout)
|
||||
except self.empty_except:
|
||||
return
|
||||
|
||||
socket = self._connect_socket(request.target)
|
||||
self._do_send_request(socket, request)
|
||||
|
||||
def cleanup(self):
|
||||
self.executor.stop()
|
||||
super(QueuedSender, self).cleanup()
|
||||
|
@ -18,34 +18,35 @@ from oslo_messaging._drivers.zmq_driver.client.publishers\
|
||||
import zmq_publisher_base
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._i18n import _LW
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
class PushPublisher(zmq_publisher_base.PublisherBase):
|
||||
class PushPublisher(object):
|
||||
|
||||
def __init__(self, conf, matchmaker):
|
||||
super(PushPublisher, self).__init__()
|
||||
sockets_manager = zmq_publisher_base.SocketsManager(
|
||||
conf, matchmaker, zmq.PULL, zmq.PUSH)
|
||||
super(PushPublisher, self).__init__(sockets_manager)
|
||||
|
||||
def _do_send_request(push_socket, request):
|
||||
push_socket.send_pyobj(request)
|
||||
|
||||
LOG.debug("Sending message_id %(message)s to a target %(target)s",
|
||||
{"message": request.message_id,
|
||||
"target": request.target})
|
||||
|
||||
self.sender = zmq_publisher_base.QueuedSender(
|
||||
sockets_manager, _do_send_request)
|
||||
|
||||
def send_request(self, request):
|
||||
|
||||
if request.msg_type == zmq_names.CALL_TYPE:
|
||||
if request.msg_type != zmq_names.CAST_TYPE:
|
||||
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
|
||||
|
||||
push_socket = self.outbound_sockets.get_socket(request.target)
|
||||
self.sender.send_request(request)
|
||||
|
||||
if not push_socket.connections:
|
||||
LOG.warning(_LW("Request %s was dropped because no connection"),
|
||||
request.msg_type)
|
||||
return
|
||||
|
||||
if request.msg_type in zmq_names.MULTISEND_TYPES:
|
||||
for _ in range(push_socket.connections_count()):
|
||||
self._send_request(push_socket, request)
|
||||
else:
|
||||
self._send_request(push_socket, request)
|
||||
def cleanup(self):
|
||||
self.sender.cleanup()
|
||||
|
@ -20,7 +20,6 @@ from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers \
|
||||
import zmq_push_publisher
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_client_base
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
|
||||
@ -35,8 +34,7 @@ class ZmqClient(zmq_client_base.ZmqClientBase):
|
||||
conf, matchmaker)
|
||||
|
||||
fanout_publisher = zmq_dealer_publisher.DealerPublisherLight(
|
||||
conf, zmq_address.get_broker_address(conf)) \
|
||||
if conf.use_pub_sub else default_publisher
|
||||
conf, matchmaker) if conf.use_pub_sub else default_publisher
|
||||
|
||||
super(ZmqClient, self).__init__(
|
||||
conf, matchmaker, allowed_remote_exmods,
|
||||
|
Loading…
Reference in New Issue
Block a user