diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py index 549d3dce..4603c915 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py @@ -28,15 +28,16 @@ zmq = zmq_async.import_zmq() class PushPublisher(zmq_publisher_base.PublisherBase): def __init__(self, conf, matchmaker): - super(PushPublisher, self).__init__(conf, matchmaker, zmq.PUSH) + sockets_manager = zmq_publisher_base.SocketsManager( + conf, matchmaker, zmq.PULL, zmq.PUSH) + super(PushPublisher, self).__init__(sockets_manager) def send_request(self, request): if request.msg_type == zmq_names.CALL_TYPE: raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type) - push_socket = self._check_hosts_connections( - request.target, zmq_names.socket_type_str(zmq.PULL)) + push_socket = self.outbound_sockets.get_socket(request.target) if not push_socket.connections: LOG.warning(_LW("Request %s was dropped because no connection"), @@ -48,10 +49,3 @@ class PushPublisher(zmq_publisher_base.PublisherBase): self._send_request(push_socket, request) else: self._send_request(push_socket, request) - - def _send_request(self, socket, request): - - super(PushPublisher, self)._send_request(socket, request) - - LOG.debug("Publishing message %(message)s to a target %(target)s", - {"message": request.message, "target": request.target}) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py index c6bc6799..2fc1d488 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py @@ -17,6 +17,8 @@ from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ import zmq_dealer_call_publisher from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ import zmq_dealer_publisher +from oslo_messaging._drivers.zmq_driver.client.publishers \ + import zmq_push_publisher from oslo_messaging._drivers.zmq_driver.client import zmq_client_base from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async @@ -43,6 +45,9 @@ class ZmqClient(zmq_client_base.ZmqClientBase): zmq_dealer_call_publisher.DealerCallPublisher( conf, matchmaker), + zmq_names.CAST_TYPE: + zmq_push_publisher.PushPublisher(conf, matchmaker), + # Here use DealerPublisherLight for sending request to proxy # which finally uses PubPublisher to send fanout in case of # 'use_pub_sub' option configured. diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py index 88a7dd40..c6ac324b 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py @@ -14,6 +14,8 @@ import abc import logging +import threading +import time import six @@ -84,3 +86,43 @@ class SingleSocketConsumer(ConsumerBase): @property def port(self): return self.socket.port + + +class TargetsManager(object): + + def __init__(self, conf, matchmaker, host, socket_type): + self.targets = [] + self.conf = conf + self.matchmaker = matchmaker + self.host = host + self.socket_type = socket_type + self.targets_lock = threading.Lock() + self.updater = zmq_async.get_executor(method=self._update_targets) \ + if conf.zmq_target_expire > 0 else None + if self.updater: + self.updater.execute() + + def _update_targets(self): + with self.targets_lock: + for target in self.targets: + self.matchmaker.register( + target, self.host, + zmq_names.socket_type_str(self.socket_type)) + + # Update target-records once per half expiration time + time.sleep(self.conf.zmq_target_expire / 2) + + def listen(self, target): + with self.targets_lock: + self.targets.append(target) + self.matchmaker.register( + target, self.host, + zmq_names.socket_type_str(self.socket_type)) + + def cleanup(self): + if self.updater: + self.updater.stop() + for target in self.targets: + self.matchmaker.unregister( + target, self.host, + zmq_names.socket_type_str(self.socket_type)) diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py index 4a3efeec..8dc58e50 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py @@ -17,6 +17,7 @@ import logging from oslo_messaging._drivers import base from oslo_messaging._drivers.zmq_driver.server.consumers\ import zmq_consumer_base +from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._i18n import _LE, _LI @@ -45,19 +46,33 @@ class PullConsumer(zmq_consumer_base.SingleSocketConsumer): def __init__(self, conf, poller, server): super(PullConsumer, self).__init__(conf, poller, server, zmq.PULL) + self.matchmaker = server.matchmaker + self.host = zmq_address.combine_address(self.conf.rpc_zmq_host, + self.port) + self.targets = zmq_consumer_base.TargetsManager( + conf, self.matchmaker, self.host, zmq.PULL) + LOG.info(_LI("[%s] Run PULL consumer"), self.host) def listen(self, target): LOG.info(_LI("Listen to target %s"), str(target)) - # Do nothing here because we have a single socket + self.targets.listen(target) + + def cleanup(self): + super(PullConsumer, self).cleanup() + self.targets.cleanup() def receive_message(self, socket): try: - msg_type = socket.recv_string() + request = socket.recv_pyobj() + msg_type = request.msg_type assert msg_type is not None, 'Bad format: msg type expected' - context = socket.recv_pyobj() - message = socket.recv_pyobj() - LOG.debug("Received %(msg_type)s message %(msg)s", - {"msg_type": msg_type, "msg": str(message)}) + context = request.context + message = request.message + LOG.debug("[%(host)s] Received %(type)s, %(id)s, %(target)s", + {"host": self.host, + "type": request.msg_type, + "id": request.message_id, + "target": request.target}) if msg_type in (zmq_names.CAST_TYPES + zmq_names.NOTIFY_TYPES): return PullIncomingMessage(self.server, context, message) diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py index 94617b3d..27957d7e 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py @@ -13,8 +13,6 @@ # under the License. import logging -import threading -import time from oslo_messaging._drivers import base from oslo_messaging._drivers.zmq_driver.server.consumers\ @@ -58,7 +56,8 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): self.matchmaker = server.matchmaker self.host = zmq_address.combine_address(self.conf.rpc_zmq_host, self.port) - self.targets = TargetsManager(conf, self.matchmaker, self.host) + self.targets = zmq_consumer_base.TargetsManager( + conf, self.matchmaker, self.host, zmq.ROUTER) LOG.info(_LI("[%s] Run ROUTER consumer"), self.host) def listen(self, target): @@ -98,39 +97,3 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): except zmq.ZMQError as e: LOG.error(_LE("Receiving message failed: %s"), str(e)) - - -class TargetsManager(object): - - def __init__(self, conf, matchmaker, host): - self.targets = [] - self.conf = conf - self.matchmaker = matchmaker - self.host = host - self.targets_lock = threading.Lock() - self.updater = zmq_async.get_executor(method=self._update_targets) \ - if conf.zmq_target_expire > 0 else None - if self.updater: - self.updater.execute() - - def _update_targets(self): - with self.targets_lock: - for target in self.targets: - self.matchmaker.register( - target, self.host, zmq_names.socket_type_str(zmq.ROUTER)) - - # Update target-records once per half expiration time - time.sleep(self.conf.zmq_target_expire / 2) - - def listen(self, target): - with self.targets_lock: - self.targets.append(target) - self.matchmaker.register(target, self.host, - zmq_names.socket_type_str(zmq.ROUTER)) - - def cleanup(self): - if self.updater: - self.updater.stop() - for target in self.targets: - self.matchmaker.unregister(target, self.host, - zmq_names.socket_type_str(zmq.ROUTER)) diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py index 8a95a1f6..944bd8ef 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py @@ -16,6 +16,8 @@ import copy import logging from oslo_messaging._drivers import base +from oslo_messaging._drivers.zmq_driver.server.consumers\ + import zmq_pull_consumer from oslo_messaging._drivers.zmq_driver.server.consumers\ import zmq_router_consumer from oslo_messaging._drivers.zmq_driver.server.consumers\ @@ -36,12 +38,14 @@ class ZmqServer(base.Listener): self.poller = zmq_async.get_poller() self.router_consumer = zmq_router_consumer.RouterConsumer( conf, self.poller, self) + self.pull_consumer = zmq_pull_consumer.PullConsumer( + conf, self.poller, self) self.sub_consumer = zmq_sub_consumer.SubConsumer( conf, self.poller, self) if conf.use_pub_sub else None self.notify_consumer = self.sub_consumer if conf.use_pub_sub \ else self.router_consumer - self.consumers = [self.router_consumer] + self.consumers = [self.router_consumer, self.pull_consumer] if self.sub_consumer: self.consumers.append(self.sub_consumer) @@ -62,9 +66,8 @@ class ZmqServer(base.Listener): consumer.cleanup() def listen(self, target): - consumer = self.router_consumer - consumer.listen(target) - + self.router_consumer.listen(target) + self.pull_consumer.listen(target) if self.sub_consumer: self.sub_consumer.listen(target) diff --git a/oslo_messaging/tests/functional/zmq/multiproc_utils.py b/oslo_messaging/tests/functional/zmq/multiproc_utils.py index a1a78085..ee9f56e0 100644 --- a/oslo_messaging/tests/functional/zmq/multiproc_utils.py +++ b/oslo_messaging/tests/functional/zmq/multiproc_utils.py @@ -157,8 +157,7 @@ class Server(object): LOG.debug("Waiting for the stop signal ...") time.sleep(1) self.rpc_server.stop() - LOG.debug("Leaving process T:%s Pid:%d", (str(target), - os.getpid())) + LOG.debug("Leaving process T:%s Pid:%d", str(target), os.getpid()) def cleanup(self): LOG.debug("Stopping server")