From 2ae4f8faf3af845f4fac4fe768c9911f728fd9ba Mon Sep 17 00:00:00 2001 From: Oleksii Zamiatin Date: Thu, 4 Feb 2016 15:57:53 +0200 Subject: [PATCH] [zmq] Use PUSH/PULL for direct CAST In order to complete implementation of the spec [2] and for optimal zmq-way patterns usage. By zmq-way meaning some relaxed guarantees on reliability, but high performance and scalability. PUSH/PULL for CASTs here gives us benefit of not having redundant backward tcp-connections which DEALER/ROUTER opens. Opposite to that there is another approach provided by spec [1]. This approach has strong gurantees on messages delivery based on acknowledgements and retries. Such approach has lower performance though. The general idea is to provide both approaches in the driver and switch between them in configuration. 1. https://review.openstack.org/#/c/171131/ 2. https://review.openstack.org/#/c/187338/ Change-Id: I32712f73e2ec4114406641de5aec3b12152ad58f --- .../client/publishers/zmq_push_publisher.py | 14 ++----- .../_drivers/zmq_driver/client/zmq_client.py | 5 +++ .../server/consumers/zmq_consumer_base.py | 42 +++++++++++++++++++ .../server/consumers/zmq_pull_consumer.py | 27 +++++++++--- .../server/consumers/zmq_router_consumer.py | 41 +----------------- .../_drivers/zmq_driver/server/zmq_server.py | 11 +++-- .../tests/functional/zmq/multiproc_utils.py | 3 +- 7 files changed, 82 insertions(+), 61 deletions(-) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py index 549d3dce..4603c915 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py @@ -28,15 +28,16 @@ zmq = zmq_async.import_zmq() class PushPublisher(zmq_publisher_base.PublisherBase): def __init__(self, conf, matchmaker): - super(PushPublisher, self).__init__(conf, matchmaker, zmq.PUSH) + sockets_manager = zmq_publisher_base.SocketsManager( + conf, matchmaker, zmq.PULL, zmq.PUSH) + super(PushPublisher, self).__init__(sockets_manager) def send_request(self, request): if request.msg_type == zmq_names.CALL_TYPE: raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type) - push_socket = self._check_hosts_connections( - request.target, zmq_names.socket_type_str(zmq.PULL)) + push_socket = self.outbound_sockets.get_socket(request.target) if not push_socket.connections: LOG.warning(_LW("Request %s was dropped because no connection"), @@ -48,10 +49,3 @@ class PushPublisher(zmq_publisher_base.PublisherBase): self._send_request(push_socket, request) else: self._send_request(push_socket, request) - - def _send_request(self, socket, request): - - super(PushPublisher, self)._send_request(socket, request) - - LOG.debug("Publishing message %(message)s to a target %(target)s", - {"message": request.message, "target": request.target}) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py index c6bc6799..2fc1d488 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py @@ -17,6 +17,8 @@ from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ import zmq_dealer_call_publisher from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ import zmq_dealer_publisher +from oslo_messaging._drivers.zmq_driver.client.publishers \ + import zmq_push_publisher from oslo_messaging._drivers.zmq_driver.client import zmq_client_base from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async @@ -43,6 +45,9 @@ class ZmqClient(zmq_client_base.ZmqClientBase): zmq_dealer_call_publisher.DealerCallPublisher( conf, matchmaker), + zmq_names.CAST_TYPE: + zmq_push_publisher.PushPublisher(conf, matchmaker), + # Here use DealerPublisherLight for sending request to proxy # which finally uses PubPublisher to send fanout in case of # 'use_pub_sub' option configured. diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py index 88a7dd40..c6ac324b 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py @@ -14,6 +14,8 @@ import abc import logging +import threading +import time import six @@ -84,3 +86,43 @@ class SingleSocketConsumer(ConsumerBase): @property def port(self): return self.socket.port + + +class TargetsManager(object): + + def __init__(self, conf, matchmaker, host, socket_type): + self.targets = [] + self.conf = conf + self.matchmaker = matchmaker + self.host = host + self.socket_type = socket_type + self.targets_lock = threading.Lock() + self.updater = zmq_async.get_executor(method=self._update_targets) \ + if conf.zmq_target_expire > 0 else None + if self.updater: + self.updater.execute() + + def _update_targets(self): + with self.targets_lock: + for target in self.targets: + self.matchmaker.register( + target, self.host, + zmq_names.socket_type_str(self.socket_type)) + + # Update target-records once per half expiration time + time.sleep(self.conf.zmq_target_expire / 2) + + def listen(self, target): + with self.targets_lock: + self.targets.append(target) + self.matchmaker.register( + target, self.host, + zmq_names.socket_type_str(self.socket_type)) + + def cleanup(self): + if self.updater: + self.updater.stop() + for target in self.targets: + self.matchmaker.unregister( + target, self.host, + zmq_names.socket_type_str(self.socket_type)) diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py index 4a3efeec..8dc58e50 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py @@ -17,6 +17,7 @@ import logging from oslo_messaging._drivers import base from oslo_messaging._drivers.zmq_driver.server.consumers\ import zmq_consumer_base +from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._i18n import _LE, _LI @@ -45,19 +46,33 @@ class PullConsumer(zmq_consumer_base.SingleSocketConsumer): def __init__(self, conf, poller, server): super(PullConsumer, self).__init__(conf, poller, server, zmq.PULL) + self.matchmaker = server.matchmaker + self.host = zmq_address.combine_address(self.conf.rpc_zmq_host, + self.port) + self.targets = zmq_consumer_base.TargetsManager( + conf, self.matchmaker, self.host, zmq.PULL) + LOG.info(_LI("[%s] Run PULL consumer"), self.host) def listen(self, target): LOG.info(_LI("Listen to target %s"), str(target)) - # Do nothing here because we have a single socket + self.targets.listen(target) + + def cleanup(self): + super(PullConsumer, self).cleanup() + self.targets.cleanup() def receive_message(self, socket): try: - msg_type = socket.recv_string() + request = socket.recv_pyobj() + msg_type = request.msg_type assert msg_type is not None, 'Bad format: msg type expected' - context = socket.recv_pyobj() - message = socket.recv_pyobj() - LOG.debug("Received %(msg_type)s message %(msg)s", - {"msg_type": msg_type, "msg": str(message)}) + context = request.context + message = request.message + LOG.debug("[%(host)s] Received %(type)s, %(id)s, %(target)s", + {"host": self.host, + "type": request.msg_type, + "id": request.message_id, + "target": request.target}) if msg_type in (zmq_names.CAST_TYPES + zmq_names.NOTIFY_TYPES): return PullIncomingMessage(self.server, context, message) diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py index 94617b3d..27957d7e 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py @@ -13,8 +13,6 @@ # under the License. import logging -import threading -import time from oslo_messaging._drivers import base from oslo_messaging._drivers.zmq_driver.server.consumers\ @@ -58,7 +56,8 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): self.matchmaker = server.matchmaker self.host = zmq_address.combine_address(self.conf.rpc_zmq_host, self.port) - self.targets = TargetsManager(conf, self.matchmaker, self.host) + self.targets = zmq_consumer_base.TargetsManager( + conf, self.matchmaker, self.host, zmq.ROUTER) LOG.info(_LI("[%s] Run ROUTER consumer"), self.host) def listen(self, target): @@ -98,39 +97,3 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): except zmq.ZMQError as e: LOG.error(_LE("Receiving message failed: %s"), str(e)) - - -class TargetsManager(object): - - def __init__(self, conf, matchmaker, host): - self.targets = [] - self.conf = conf - self.matchmaker = matchmaker - self.host = host - self.targets_lock = threading.Lock() - self.updater = zmq_async.get_executor(method=self._update_targets) \ - if conf.zmq_target_expire > 0 else None - if self.updater: - self.updater.execute() - - def _update_targets(self): - with self.targets_lock: - for target in self.targets: - self.matchmaker.register( - target, self.host, zmq_names.socket_type_str(zmq.ROUTER)) - - # Update target-records once per half expiration time - time.sleep(self.conf.zmq_target_expire / 2) - - def listen(self, target): - with self.targets_lock: - self.targets.append(target) - self.matchmaker.register(target, self.host, - zmq_names.socket_type_str(zmq.ROUTER)) - - def cleanup(self): - if self.updater: - self.updater.stop() - for target in self.targets: - self.matchmaker.unregister(target, self.host, - zmq_names.socket_type_str(zmq.ROUTER)) diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py index 8a95a1f6..944bd8ef 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py @@ -16,6 +16,8 @@ import copy import logging from oslo_messaging._drivers import base +from oslo_messaging._drivers.zmq_driver.server.consumers\ + import zmq_pull_consumer from oslo_messaging._drivers.zmq_driver.server.consumers\ import zmq_router_consumer from oslo_messaging._drivers.zmq_driver.server.consumers\ @@ -36,12 +38,14 @@ class ZmqServer(base.Listener): self.poller = zmq_async.get_poller() self.router_consumer = zmq_router_consumer.RouterConsumer( conf, self.poller, self) + self.pull_consumer = zmq_pull_consumer.PullConsumer( + conf, self.poller, self) self.sub_consumer = zmq_sub_consumer.SubConsumer( conf, self.poller, self) if conf.use_pub_sub else None self.notify_consumer = self.sub_consumer if conf.use_pub_sub \ else self.router_consumer - self.consumers = [self.router_consumer] + self.consumers = [self.router_consumer, self.pull_consumer] if self.sub_consumer: self.consumers.append(self.sub_consumer) @@ -62,9 +66,8 @@ class ZmqServer(base.Listener): consumer.cleanup() def listen(self, target): - consumer = self.router_consumer - consumer.listen(target) - + self.router_consumer.listen(target) + self.pull_consumer.listen(target) if self.sub_consumer: self.sub_consumer.listen(target) diff --git a/oslo_messaging/tests/functional/zmq/multiproc_utils.py b/oslo_messaging/tests/functional/zmq/multiproc_utils.py index e07f1a23..218aad49 100644 --- a/oslo_messaging/tests/functional/zmq/multiproc_utils.py +++ b/oslo_messaging/tests/functional/zmq/multiproc_utils.py @@ -161,8 +161,7 @@ class Server(object): LOG.debug("Waiting for the stop signal ...") time.sleep(1) self.rpc_server.stop() - LOG.debug("Leaving process T:%s Pid:%d", (str(target), - os.getpid())) + LOG.debug("Leaving process T:%s Pid:%d", str(target), os.getpid()) def cleanup(self): LOG.debug("Stopping server")