Merge "[zmq] Multithreading access to zmq sockets"
This commit is contained in:
commit
2a226ab270
@ -43,7 +43,20 @@ class DealerCallPublisher(object):
|
|||||||
self.conf = conf
|
self.conf = conf
|
||||||
self.matchmaker = matchmaker
|
self.matchmaker = matchmaker
|
||||||
self.reply_waiter = ReplyWaiter(conf)
|
self.reply_waiter = ReplyWaiter(conf)
|
||||||
self.sender = RequestSender(conf, matchmaker, self.reply_waiter)
|
sockets_manager = zmq_publisher_base.SocketsManager(
|
||||||
|
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
|
||||||
|
|
||||||
|
def _do_send_request(socket, request):
|
||||||
|
# DEALER socket specific envelope empty delimiter
|
||||||
|
socket.send(b'', zmq.SNDMORE)
|
||||||
|
socket.send_pyobj(request)
|
||||||
|
|
||||||
|
LOG.debug("Sent message_id %(message)s to a target %(target)s",
|
||||||
|
{"message": request.message_id,
|
||||||
|
"target": request.target})
|
||||||
|
|
||||||
|
self.sender = CallSender(sockets_manager, _do_send_request,
|
||||||
|
self.reply_waiter)
|
||||||
|
|
||||||
def send_request(self, request):
|
def send_request(self, request):
|
||||||
reply_future = self.sender.send_request(request)
|
reply_future = self.sender.send_request(request)
|
||||||
@ -68,16 +81,12 @@ class DealerCallPublisher(object):
|
|||||||
self.sender.cleanup()
|
self.sender.cleanup()
|
||||||
|
|
||||||
|
|
||||||
class RequestSender(zmq_publisher_base.PublisherBase):
|
class CallSender(zmq_publisher_base.QueuedSender):
|
||||||
|
|
||||||
def __init__(self, conf, matchmaker, reply_waiter):
|
def __init__(self, sockets_manager, _do_send_request, reply_waiter):
|
||||||
sockets_manager = zmq_publisher_base.SocketsManager(
|
super(CallSender, self).__init__(sockets_manager, _do_send_request)
|
||||||
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
|
assert reply_waiter, "Valid ReplyWaiter expected!"
|
||||||
super(RequestSender, self).__init__(sockets_manager)
|
|
||||||
self.reply_waiter = reply_waiter
|
self.reply_waiter = reply_waiter
|
||||||
self.queue, self.empty_except = zmq_async.get_queue()
|
|
||||||
self.executor = zmq_async.get_executor(self.run_loop)
|
|
||||||
self.executor.execute()
|
|
||||||
|
|
||||||
def send_request(self, request):
|
def send_request(self, request):
|
||||||
reply_future = futurist.Future()
|
reply_future = futurist.Future()
|
||||||
@ -85,30 +94,10 @@ class RequestSender(zmq_publisher_base.PublisherBase):
|
|||||||
self.queue.put(request)
|
self.queue.put(request)
|
||||||
return reply_future
|
return reply_future
|
||||||
|
|
||||||
def _do_send_request(self, socket, request):
|
|
||||||
socket.send(b'', zmq.SNDMORE)
|
|
||||||
socket.send_pyobj(request)
|
|
||||||
|
|
||||||
LOG.debug("Sending message_id %(message)s to a target %(target)s",
|
|
||||||
{"message": request.message_id, "target": request.target})
|
|
||||||
|
|
||||||
def _connect_socket(self, target):
|
def _connect_socket(self, target):
|
||||||
return self.outbound_sockets.get_socket(target)
|
socket = self.outbound_sockets.get_socket(target)
|
||||||
|
|
||||||
def run_loop(self):
|
|
||||||
try:
|
|
||||||
request = self.queue.get(timeout=self.conf.rpc_poll_timeout)
|
|
||||||
except self.empty_except:
|
|
||||||
return
|
|
||||||
|
|
||||||
socket = self._connect_socket(request.target)
|
|
||||||
|
|
||||||
self._do_send_request(socket, request)
|
|
||||||
self.reply_waiter.poll_socket(socket)
|
self.reply_waiter.poll_socket(socket)
|
||||||
|
return socket
|
||||||
def cleanup(self):
|
|
||||||
self.executor.stop()
|
|
||||||
super(RequestSender, self).cleanup()
|
|
||||||
|
|
||||||
|
|
||||||
class ReplyWaiter(object):
|
class ReplyWaiter(object):
|
||||||
|
@ -16,111 +16,78 @@ import logging
|
|||||||
|
|
||||||
from oslo_messaging._drivers.zmq_driver.client.publishers\
|
from oslo_messaging._drivers.zmq_driver.client.publishers\
|
||||||
import zmq_publisher_base
|
import zmq_publisher_base
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||||
from oslo_messaging._i18n import _LW
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
zmq = zmq_async.import_zmq()
|
zmq = zmq_async.import_zmq()
|
||||||
|
|
||||||
|
|
||||||
class DealerPublisher(zmq_publisher_base.PublisherBase):
|
class DealerPublisher(zmq_publisher_base.QueuedSender):
|
||||||
|
|
||||||
def __init__(self, conf, matchmaker):
|
def __init__(self, conf, matchmaker):
|
||||||
|
|
||||||
|
def _send_message_data(socket, request):
|
||||||
|
socket.send(b'', zmq.SNDMORE)
|
||||||
|
socket.send_pyobj(request)
|
||||||
|
|
||||||
|
LOG.debug("Sent message_id %(message)s to a target %(target)s",
|
||||||
|
{"message": request.message_id,
|
||||||
|
"target": request.target})
|
||||||
|
|
||||||
|
def _do_send_request(socket, request):
|
||||||
|
if request.msg_type in zmq_names.MULTISEND_TYPES:
|
||||||
|
for _ in range(socket.connections_count()):
|
||||||
|
_send_message_data(socket, request)
|
||||||
|
else:
|
||||||
|
_send_message_data(socket, request)
|
||||||
|
|
||||||
sockets_manager = zmq_publisher_base.SocketsManager(
|
sockets_manager = zmq_publisher_base.SocketsManager(
|
||||||
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
|
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
|
||||||
super(DealerPublisher, self).__init__(sockets_manager)
|
super(DealerPublisher, self).__init__(sockets_manager,
|
||||||
|
_do_send_request)
|
||||||
|
|
||||||
def send_request(self, request):
|
def send_request(self, request):
|
||||||
|
|
||||||
self._check_request_pattern(request)
|
|
||||||
|
|
||||||
dealer_socket = self.outbound_sockets.get_socket(request.target)
|
|
||||||
|
|
||||||
if not dealer_socket.connections:
|
|
||||||
# NOTE(ozamiatin): Here we can provide
|
|
||||||
# a queue for keeping messages to send them later
|
|
||||||
# when some listener appears. However such approach
|
|
||||||
# being more reliable will consume additional memory.
|
|
||||||
LOG.warning(_LW("Request %s was dropped because no connection"),
|
|
||||||
request.msg_type)
|
|
||||||
return
|
|
||||||
|
|
||||||
if request.msg_type in zmq_names.MULTISEND_TYPES:
|
|
||||||
for _ in range(dealer_socket.connections_count()):
|
|
||||||
self._send_request(dealer_socket, request)
|
|
||||||
else:
|
|
||||||
self._send_request(dealer_socket, request)
|
|
||||||
|
|
||||||
def _check_request_pattern(self, request):
|
|
||||||
if request.msg_type == zmq_names.CALL_TYPE:
|
if request.msg_type == zmq_names.CALL_TYPE:
|
||||||
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
|
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
|
||||||
|
super(DealerPublisher, self).send_request(request)
|
||||||
def _send_request(self, socket, request):
|
|
||||||
|
|
||||||
socket.send(b'', zmq.SNDMORE)
|
|
||||||
socket.send_pyobj(request)
|
|
||||||
|
|
||||||
LOG.debug("Sending message_id %(message)s to a target %(target)s",
|
|
||||||
{"message": request.message_id, "target": request.target})
|
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
super(DealerPublisher, self).cleanup()
|
super(DealerPublisher, self).cleanup()
|
||||||
|
|
||||||
|
|
||||||
class DealerPublisherLight(object):
|
class DealerPublisherLight(zmq_publisher_base.QueuedSender):
|
||||||
"""Used when publishing to proxy. """
|
"""Used when publishing to proxy. """
|
||||||
|
|
||||||
def __init__(self, conf, address):
|
def __init__(self, conf, matchmaker):
|
||||||
super(DealerPublisherLight, self).__init__()
|
|
||||||
self.conf = conf
|
|
||||||
self.zmq_context = zmq.Context()
|
|
||||||
self.socket = self.zmq_context.socket(zmq.DEALER)
|
|
||||||
self.address = address
|
|
||||||
self.socket.connect(address)
|
|
||||||
|
|
||||||
def send_request(self, request):
|
def _do_send_request(socket, request):
|
||||||
|
if request.msg_type == zmq_names.CALL_TYPE:
|
||||||
|
raise zmq_publisher_base.UnsupportedSendPattern(
|
||||||
|
request.msg_type)
|
||||||
|
|
||||||
if request.msg_type == zmq_names.CALL_TYPE:
|
envelope = request.create_envelope()
|
||||||
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
|
|
||||||
|
|
||||||
envelope = request.create_envelope()
|
socket.send(b'', zmq.SNDMORE)
|
||||||
|
socket.send_pyobj(envelope, zmq.SNDMORE)
|
||||||
|
socket.send_pyobj(request)
|
||||||
|
|
||||||
self.socket.send(b'', zmq.SNDMORE)
|
LOG.debug("->[proxy:%(addr)s] Sending message_id %(message)s to "
|
||||||
self.socket.send_pyobj(envelope, zmq.SNDMORE)
|
"a target %(target)s",
|
||||||
self.socket.send_pyobj(request)
|
{"message": request.message_id,
|
||||||
|
"target": request.target,
|
||||||
|
"addr": zmq_address.get_broker_address(conf)})
|
||||||
|
|
||||||
LOG.debug("->[proxy:%(addr)s] Sending message_id %(message)s to "
|
sockets_manager = zmq_publisher_base.SocketsManager(
|
||||||
"a target %(target)s",
|
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
|
||||||
{"message": request.message_id,
|
super(DealerPublisherLight, self).__init__(
|
||||||
"target": request.target,
|
sockets_manager, _do_send_request)
|
||||||
"addr": self.address})
|
self.socket = self.outbound_sockets.get_socket_to_broker()
|
||||||
|
|
||||||
|
def _connect_socket(self, target):
|
||||||
|
return self.socket
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
self.socket.close()
|
self.socket.close()
|
||||||
|
|
||||||
|
|
||||||
class AcknowledgementReceiver(object):
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
self.poller = zmq_async.get_poller()
|
|
||||||
self.thread = zmq_async.get_executor(self.poll_for_acknowledgements)
|
|
||||||
self.thread.execute()
|
|
||||||
|
|
||||||
def _receive_acknowledgement(self, socket):
|
|
||||||
empty = socket.recv()
|
|
||||||
assert empty == b"", "Empty delimiter expected"
|
|
||||||
ack_message = socket.recv_pyobj()
|
|
||||||
return ack_message
|
|
||||||
|
|
||||||
def track_socket(self, socket):
|
|
||||||
self.poller.register(socket, self._receive_acknowledgement)
|
|
||||||
|
|
||||||
def poll_for_acknowledgements(self):
|
|
||||||
ack_message, socket = self.poller.poll()
|
|
||||||
LOG.debug("Message %s acknowledged", ack_message[zmq_names.FIELD_ID])
|
|
||||||
|
|
||||||
def cleanup(self):
|
|
||||||
self.thread.stop()
|
|
||||||
self.poller.close()
|
|
||||||
|
@ -136,10 +136,9 @@ class SocketsManager(object):
|
|||||||
self._get_hosts_and_connect(socket, target)
|
self._get_hosts_and_connect(socket, target)
|
||||||
return socket
|
return socket
|
||||||
|
|
||||||
def get_socket_to_broker(self, target):
|
def get_socket_to_broker(self):
|
||||||
socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context,
|
socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context,
|
||||||
self.socket_type)
|
self.socket_type)
|
||||||
self._track_socket(socket, target)
|
|
||||||
address = zmq_address.get_broker_address(self.conf)
|
address = zmq_address.get_broker_address(self.conf)
|
||||||
socket.connect_to_address(address)
|
socket.connect_to_address(address)
|
||||||
return socket
|
return socket
|
||||||
@ -147,3 +146,32 @@ class SocketsManager(object):
|
|||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
for socket, tm in self.outbound_sockets.values():
|
for socket, tm in self.outbound_sockets.values():
|
||||||
socket.close()
|
socket.close()
|
||||||
|
|
||||||
|
|
||||||
|
class QueuedSender(PublisherBase):
|
||||||
|
|
||||||
|
def __init__(self, sockets_manager, _do_send_request):
|
||||||
|
super(QueuedSender, self).__init__(sockets_manager)
|
||||||
|
self._do_send_request = _do_send_request
|
||||||
|
self.queue, self.empty_except = zmq_async.get_queue()
|
||||||
|
self.executor = zmq_async.get_executor(self.run_loop)
|
||||||
|
self.executor.execute()
|
||||||
|
|
||||||
|
def send_request(self, request):
|
||||||
|
self.queue.put(request)
|
||||||
|
|
||||||
|
def _connect_socket(self, target):
|
||||||
|
return self.outbound_sockets.get_socket(target)
|
||||||
|
|
||||||
|
def run_loop(self):
|
||||||
|
try:
|
||||||
|
request = self.queue.get(timeout=self.conf.rpc_poll_timeout)
|
||||||
|
except self.empty_except:
|
||||||
|
return
|
||||||
|
|
||||||
|
socket = self._connect_socket(request.target)
|
||||||
|
self._do_send_request(socket, request)
|
||||||
|
|
||||||
|
def cleanup(self):
|
||||||
|
self.executor.stop()
|
||||||
|
super(QueuedSender, self).cleanup()
|
||||||
|
@ -18,34 +18,35 @@ from oslo_messaging._drivers.zmq_driver.client.publishers\
|
|||||||
import zmq_publisher_base
|
import zmq_publisher_base
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||||
from oslo_messaging._i18n import _LW
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
zmq = zmq_async.import_zmq()
|
zmq = zmq_async.import_zmq()
|
||||||
|
|
||||||
|
|
||||||
class PushPublisher(zmq_publisher_base.PublisherBase):
|
class PushPublisher(object):
|
||||||
|
|
||||||
def __init__(self, conf, matchmaker):
|
def __init__(self, conf, matchmaker):
|
||||||
|
super(PushPublisher, self).__init__()
|
||||||
sockets_manager = zmq_publisher_base.SocketsManager(
|
sockets_manager = zmq_publisher_base.SocketsManager(
|
||||||
conf, matchmaker, zmq.PULL, zmq.PUSH)
|
conf, matchmaker, zmq.PULL, zmq.PUSH)
|
||||||
super(PushPublisher, self).__init__(sockets_manager)
|
|
||||||
|
def _do_send_request(push_socket, request):
|
||||||
|
push_socket.send_pyobj(request)
|
||||||
|
|
||||||
|
LOG.debug("Sending message_id %(message)s to a target %(target)s",
|
||||||
|
{"message": request.message_id,
|
||||||
|
"target": request.target})
|
||||||
|
|
||||||
|
self.sender = zmq_publisher_base.QueuedSender(
|
||||||
|
sockets_manager, _do_send_request)
|
||||||
|
|
||||||
def send_request(self, request):
|
def send_request(self, request):
|
||||||
|
|
||||||
if request.msg_type == zmq_names.CALL_TYPE:
|
if request.msg_type != zmq_names.CAST_TYPE:
|
||||||
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
|
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
|
||||||
|
|
||||||
push_socket = self.outbound_sockets.get_socket(request.target)
|
self.sender.send_request(request)
|
||||||
|
|
||||||
if not push_socket.connections:
|
def cleanup(self):
|
||||||
LOG.warning(_LW("Request %s was dropped because no connection"),
|
self.sender.cleanup()
|
||||||
request.msg_type)
|
|
||||||
return
|
|
||||||
|
|
||||||
if request.msg_type in zmq_names.MULTISEND_TYPES:
|
|
||||||
for _ in range(push_socket.connections_count()):
|
|
||||||
self._send_request(push_socket, request)
|
|
||||||
else:
|
|
||||||
self._send_request(push_socket, request)
|
|
||||||
|
@ -20,7 +20,6 @@ from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
|||||||
from oslo_messaging._drivers.zmq_driver.client.publishers \
|
from oslo_messaging._drivers.zmq_driver.client.publishers \
|
||||||
import zmq_push_publisher
|
import zmq_push_publisher
|
||||||
from oslo_messaging._drivers.zmq_driver.client import zmq_client_base
|
from oslo_messaging._drivers.zmq_driver.client import zmq_client_base
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||||
|
|
||||||
@ -35,8 +34,7 @@ class ZmqClient(zmq_client_base.ZmqClientBase):
|
|||||||
conf, matchmaker)
|
conf, matchmaker)
|
||||||
|
|
||||||
fanout_publisher = zmq_dealer_publisher.DealerPublisherLight(
|
fanout_publisher = zmq_dealer_publisher.DealerPublisherLight(
|
||||||
conf, zmq_address.get_broker_address(conf)) \
|
conf, matchmaker) if conf.use_pub_sub else default_publisher
|
||||||
if conf.use_pub_sub else default_publisher
|
|
||||||
|
|
||||||
super(ZmqClient, self).__init__(
|
super(ZmqClient, self).__init__(
|
||||||
conf, matchmaker, allowed_remote_exmods,
|
conf, matchmaker, allowed_remote_exmods,
|
||||||
|
Loading…
Reference in New Issue
Block a user