From ac484f6b26c6509549edc1150673915b48482ac2 Mon Sep 17 00:00:00 2001 From: Gevorg Davoian Date: Wed, 22 Jun 2016 19:09:32 +0300 Subject: [PATCH] [zmq] Refactor publishers This patch refactors publishers by separating responsibilities and introducing senders and waiters within publishers. Change-Id: I90df59d61af2b40b516a5151c67c184fcc91e366 Co-Authored-By: Oleksii Zamiatin --- .../dealer/zmq_dealer_call_publisher.py | 106 ---------- .../publishers/dealer/zmq_dealer_publisher.py | 128 +++++++----- .../dealer/zmq_dealer_publisher_proxy.py | 191 +++++------------- .../publishers/dealer/zmq_reply_waiter.py | 66 ------ .../client/publishers/zmq_publisher_base.py | 131 +----------- .../client/publishers/zmq_push_publisher.py | 52 ----- .../_drivers/zmq_driver/client/zmq_client.py | 68 ++++--- .../zmq_driver/client/zmq_receivers.py | 140 +++++++++++++ .../zmq_driver/client/zmq_routing_table.py | 65 ++++++ .../_drivers/zmq_driver/client/zmq_senders.py | 94 +++++++++ .../zmq_driver/client/zmq_sockets_manager.py | 96 +++++++++ .../server/consumers/zmq_dealer_consumer.py | 68 ++----- .../server/consumers/zmq_pull_consumer.py | 69 ------- .../server/consumers/zmq_router_consumer.py | 42 ++-- .../server/consumers/zmq_sub_consumer.py | 22 +- .../zmq_driver/server/zmq_incoming_message.py | 51 ++--- .../_drivers/zmq_driver/zmq_names.py | 8 +- .../_drivers/zmq_driver/zmq_updater.py | 2 + oslo_messaging/tests/functional/utils.py | 2 + 19 files changed, 638 insertions(+), 763 deletions(-) delete mode 100644 oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py delete mode 100644 oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_reply_waiter.py delete mode 100644 oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py create mode 100644 oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py create mode 100644 oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py create mode 100644 oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py create mode 100644 oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py delete mode 100644 oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py deleted file mode 100644 index 7d1cdf13a..000000000 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_call_publisher.py +++ /dev/null @@ -1,106 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging - -from concurrent import futures -import futurist - -import oslo_messaging -from oslo_messaging._drivers import common as rpc_common -from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ - import zmq_reply_waiter -from oslo_messaging._drivers.zmq_driver.client.publishers \ - import zmq_publisher_base -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._i18n import _LE - -LOG = logging.getLogger(__name__) - -zmq = zmq_async.import_zmq() - - -class DealerCallPublisher(object): - """Thread-safe CALL publisher - - Used as faster and thread-safe publisher for CALL - instead of ReqPublisher. - """ - - def __init__(self, conf, matchmaker, sockets_manager, sender=None, - reply_waiter=None): - super(DealerCallPublisher, self).__init__() - self.conf = conf - self.matchmaker = matchmaker - self.reply_waiter = reply_waiter or zmq_reply_waiter.ReplyWaiter(conf) - self.sockets_manager = sockets_manager - self.sender = sender or CallSender(self.sockets_manager, - self.reply_waiter) - - def send_request(self, request): - reply_future = self.sender.send_request(request) - try: - reply = reply_future.result(timeout=request.timeout) - LOG.debug("Received reply %s", request.message_id) - except AssertionError: - LOG.error(_LE("Message format error in reply %s"), - request.message_id) - return None - except futures.TimeoutError: - raise oslo_messaging.MessagingTimeout( - "Timeout %(tout)s seconds was reached for message %(id)s" % - {"tout": request.timeout, - "id": request.message_id}) - finally: - self.reply_waiter.untrack_id(request.message_id) - - if reply.failure: - raise rpc_common.deserialize_remote_exception( - reply.failure, - request.allowed_remote_exmods) - else: - return reply.reply_body - - def cleanup(self): - self.reply_waiter.cleanup() - self.sender.cleanup() - - -class CallSender(zmq_publisher_base.QueuedSender): - - def __init__(self, sockets_manager, reply_waiter): - super(CallSender, self).__init__(sockets_manager, - self._do_send_request) - assert reply_waiter, "Valid ReplyWaiter expected!" - self.reply_waiter = reply_waiter - - def _do_send_request(self, socket, request): - # DEALER socket specific envelope empty delimiter - socket.send(b'', zmq.SNDMORE) - socket.send_pyobj(request) - - LOG.debug("Sent message_id %(message)s to a target %(target)s", - {"message": request.message_id, - "target": request.target}) - - def send_request(self, request): - reply_future = futurist.Future() - self.reply_waiter.track_reply(reply_future, request.message_id) - self.queue.put(request) - return reply_future - - def _connect_socket(self, target): - socket = self.outbound_sockets.get_socket(target) - self.reply_waiter.poll_socket(socket) - return socket diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py index 593451967..89031ecc4 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py @@ -12,78 +12,98 @@ # License for the specific language governing permissions and limitations # under the License. +from concurrent import futures import logging -from oslo_messaging._drivers.zmq_driver.client.publishers\ +import retrying + +import oslo_messaging +from oslo_messaging._drivers import common as rpc_common +from oslo_messaging._drivers.zmq_driver.client.publishers \ import zmq_publisher_base from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names +from oslo_messaging._i18n import _LE LOG = logging.getLogger(__name__) zmq = zmq_async.import_zmq() -class DealerPublisher(zmq_publisher_base.QueuedSender): - - def __init__(self, conf, matchmaker): - - def _send_message_data(socket, request): - socket.send(b'', zmq.SNDMORE) - socket.send_pyobj(request) - - LOG.debug("Sent message_id %(message)s to a target %(target)s", - {"message": request.message_id, - "target": request.target}) - - def _do_send_request(socket, request): - if request.msg_type in zmq_names.MULTISEND_TYPES: - for _ in range(socket.connections_count()): - _send_message_data(socket, request) - else: - _send_message_data(socket, request) - - sockets_manager = zmq_publisher_base.SocketsManager( - conf, matchmaker, zmq.ROUTER, zmq.DEALER) - super(DealerPublisher, self).__init__(sockets_manager, - _do_send_request) +class DealerPublisher(zmq_publisher_base.PublisherBase): + """Non-CALL publisher using direct connections.""" def send_request(self, request): if request.msg_type == zmq_names.CALL_TYPE: - raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type) - super(DealerPublisher, self).send_request(request) + raise zmq_publisher_base.UnsupportedSendPattern( + zmq_names.message_type_str(request.msg_type) + ) - -class DealerPublisherAsync(object): - """This simplified publisher is to be used with eventlet only. - Eventlet takes care about zmq sockets sharing between green threads - using queued lock. - Use DealerPublisher for other concurrency models. - """ - - def __init__(self, conf, matchmaker): - self.sockets_manager = zmq_publisher_base.SocketsManager( - conf, matchmaker, zmq.ROUTER, zmq.DEALER) - - @staticmethod - def _send_message_data(socket, request): - socket.send(b'', zmq.SNDMORE) - socket.send_pyobj(request) - - LOG.debug("Sent message_id %(message)s to a target %(target)s", - {"message": request.message_id, - "target": request.target}) - - def send_request(self, request): - if request.msg_type == zmq_names.CALL_TYPE: - raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type) - socket = self.sockets_manager.get_socket(request.target) + try: + socket = self.sockets_manager.get_socket(request.target) + except retrying.RetryError: + return if request.msg_type in zmq_names.MULTISEND_TYPES: for _ in range(socket.connections_count()): - self._send_message_data(socket, request) + self.sender.send(socket, request) else: - self._send_message_data(socket, request) + self.sender.send(socket, request) + + +class DealerCallPublisher(zmq_publisher_base.PublisherBase): + """CALL publisher using direct connections.""" + + def __init__(self, sockets_manager, sender, reply_receiver): + super(DealerCallPublisher, self).__init__(sockets_manager, sender) + self.reply_receiver = reply_receiver + + @staticmethod + def _raise_timeout(request): + raise oslo_messaging.MessagingTimeout( + "Timeout %(tout)s seconds was reached for message %(msg_id)s" % + {"tout": request.timeout, "msg_id": request.message_id} + ) + + def send_request(self, request): + if request.msg_type != zmq_names.CALL_TYPE: + raise zmq_publisher_base.UnsupportedSendPattern( + zmq_names.message_type_str(request.msg_type) + ) + + try: + socket = self._connect_socket(request.target) + except retrying.RetryError: + self._raise_timeout(request) + + self.sender.send(socket, request) + self.reply_receiver.register_socket(socket) + return self._recv_reply(request) + + def _connect_socket(self, target): + return self.sockets_manager.get_socket(target) + + def _recv_reply(self, request): + reply_future, = self.reply_receiver.track_request(request) + + try: + _, reply = reply_future.result(timeout=request.timeout) + except AssertionError: + LOG.error(_LE("Message format error in reply for %s"), + request.message_id) + return None + except futures.TimeoutError: + self._raise_timeout(request) + finally: + self.reply_receiver.untrack_request(request) + + if reply.failure: + raise rpc_common.deserialize_remote_exception( + reply.failure, request.allowed_remote_exmods + ) + else: + return reply.reply_body def cleanup(self): - self.sockets_manager.cleanup() + self.reply_receiver.stop() + super(DealerCallPublisher, self).cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py index d3e120dfa..9f53bed15 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py @@ -13,171 +13,90 @@ # under the License. import logging -import six -import time + +import retrying from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ - import zmq_dealer_call_publisher -from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ - import zmq_reply_waiter + import zmq_dealer_publisher from oslo_messaging._drivers.zmq_driver.client.publishers \ import zmq_publisher_base +from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_updater -zmq = zmq_async.import_zmq() - LOG = logging.getLogger(__name__) +zmq = zmq_async.import_zmq() -class DealerPublisherProxy(object): - """Used when publishing to a proxy. """ - def __init__(self, conf, matchmaker, socket_to_proxy): - self.conf = conf - self.sockets_manager = zmq_publisher_base.SocketsManager( - conf, matchmaker, zmq.ROUTER, zmq.DEALER) - self.socket = socket_to_proxy - self.routing_table = RoutingTable(conf, matchmaker) - self.connection_updater = PublisherConnectionUpdater( - conf, matchmaker, self.socket) +class DealerPublisherProxy(zmq_publisher_base.PublisherBase): + """Non-CALL publisher via proxy.""" + + def __init__(self, sockets_manager, sender): + super(DealerPublisherProxy, self).__init__(sockets_manager, sender) + self.socket = sockets_manager.get_socket_to_publishers() + self.routing_table = zmq_routing_table.RoutingTable(self.conf, + self.matchmaker) + self.connection_updater = \ + PublisherConnectionUpdater(self.conf, self.matchmaker, self.socket) + + def _get_routing_keys(self, request): + try: + if request.msg_type in zmq_names.DIRECT_TYPES: + return [self.routing_table.get_routable_host(request.target)] + else: + return \ + [zmq_address.target_to_subscribe_filter(request.target)] \ + if self.conf.use_pub_sub else \ + self.routing_table.get_all_hosts(request.target) + except retrying.RetryError: + return [] def send_request(self, request): if request.msg_type == zmq_names.CALL_TYPE: raise zmq_publisher_base.UnsupportedSendPattern( - request.msg_type) - - if self.conf.use_pub_sub: - routing_key = self.routing_table.get_routable_host(request.target) \ - if request.msg_type in zmq_names.DIRECT_TYPES else \ - zmq_address.target_to_subscribe_filter(request.target) - self._do_send_request(request, routing_key) - else: - routing_keys = \ - [self.routing_table.get_routable_host(request.target)] \ - if request.msg_type in zmq_names.DIRECT_TYPES else \ - self.routing_table.get_all_hosts(request.target) - for routing_key in routing_keys: - self._do_send_request(request, routing_key) - - def _do_send_request(self, request, routing_key): - self.socket.send(b'', zmq.SNDMORE) - self.socket.send(six.b(str(request.msg_type)), zmq.SNDMORE) - self.socket.send(six.b(routing_key), zmq.SNDMORE) - self.socket.send(six.b(request.message_id), zmq.SNDMORE) - self.socket.send_pyobj(request.context, zmq.SNDMORE) - self.socket.send_pyobj(request.message) - - LOG.debug("->[proxy:%(addr)s] Sending message_id %(message)s to " - "a target %(target)s", - {"message": request.message_id, - "target": request.target, - "addr": list(self.socket.connections)}) + zmq_names.message_type_str(request.msg_type) + ) + for routing_key in self._get_routing_keys(request): + request.routing_key = routing_key + self.sender.send(self.socket, request) def cleanup(self): + self.connection_updater.stop() self.socket.close() + super(DealerPublisherProxy, self).cleanup() -class DealerCallPublisherProxy(zmq_dealer_call_publisher.DealerCallPublisher): +class DealerCallPublisherProxy(zmq_dealer_publisher.DealerCallPublisher): + """CALL publisher via proxy.""" - def __init__(self, conf, matchmaker, sockets_manager): - reply_waiter = ReplyWaiterProxy(conf) - sender = CallSenderProxy(conf, matchmaker, sockets_manager, - reply_waiter) + def __init__(self, sockets_manager, sender, reply_waiter): super(DealerCallPublisherProxy, self).__init__( - conf, matchmaker, sockets_manager, sender, reply_waiter) + sockets_manager, sender, reply_waiter + ) + self.socket = self.sockets_manager.get_socket_to_publishers() + self.routing_table = zmq_routing_table.RoutingTable(self.conf, + self.matchmaker) + self.connection_updater = \ + PublisherConnectionUpdater(self.conf, self.matchmaker, self.socket) - -class CallSenderProxy(zmq_dealer_call_publisher.CallSender): - - def __init__(self, conf, matchmaker, sockets_manager, reply_waiter): - super(CallSenderProxy, self).__init__( - sockets_manager, reply_waiter) - self.socket = self.outbound_sockets.get_socket_to_publishers() - self.reply_waiter.poll_socket(self.socket) - self.routing_table = RoutingTable(conf, matchmaker) - self.connection_updater = PublisherConnectionUpdater( - conf, matchmaker, self.socket) + def send_request(self, request): + try: + request.routing_key = \ + self.routing_table.get_routable_host(request.target) + except retrying.RetryError: + self._raise_timeout(request) + return super(DealerCallPublisherProxy, self).send_request(request) def _connect_socket(self, target): return self.socket - def _do_send_request(self, socket, request): - routing_key = self.routing_table.get_routable_host(request.target) - - # DEALER socket specific envelope empty delimiter - socket.send(b'', zmq.SNDMORE) - socket.send(six.b(str(request.msg_type)), zmq.SNDMORE) - socket.send(six.b(routing_key), zmq.SNDMORE) - socket.send(six.b(request.message_id), zmq.SNDMORE) - socket.send_pyobj(request.context, zmq.SNDMORE) - socket.send_pyobj(request.message) - - LOG.debug("Sent message_id %(message)s to a target %(target)s", - {"message": request.message_id, - "target": request.target}) - - -class ReplyWaiterProxy(zmq_reply_waiter.ReplyWaiter): - - def receive_method(self, socket): - empty = socket.recv() - assert empty == b'', "Empty expected!" - reply_id = socket.recv() - assert reply_id is not None, "Reply ID expected!" - message_type = int(socket.recv()) - assert message_type == zmq_names.REPLY_TYPE, "Reply is expected!" - message_id = socket.recv() - reply = socket.recv_pyobj() - LOG.debug("Received reply %s", message_id) - return reply - - -class RoutingTable(object): - """This class implements local routing-table cache - taken from matchmaker. Its purpose is to give the next routable - host id (remote DEALER's id) by request for specific target in - round-robin fashion. - """ - - def __init__(self, conf, matchmaker): - self.conf = conf - self.matchmaker = matchmaker - self.routing_table = {} - self.routable_hosts = {} - - def get_all_hosts(self, target): - self._update_routing_table(target) - return list(self.routable_hosts.get(str(target)) or []) - - def get_routable_host(self, target): - self._update_routing_table(target) - hosts_for_target = self.routable_hosts[str(target)] - host = hosts_for_target.pop(0) - if not hosts_for_target: - self._renew_routable_hosts(target) - return host - - def _is_tm_expired(self, tm): - return 0 <= self.conf.zmq_target_expire <= time.time() - tm - - def _update_routing_table(self, target): - routing_record = self.routing_table.get(str(target)) - if routing_record is None: - self._fetch_hosts(target) - self._renew_routable_hosts(target) - elif self._is_tm_expired(routing_record[1]): - self._fetch_hosts(target) - - def _fetch_hosts(self, target): - self.routing_table[str(target)] = (self.matchmaker.get_hosts( - target, zmq_names.socket_type_str(zmq.DEALER)), time.time()) - - def _renew_routable_hosts(self, target): - hosts, _ = self.routing_table[str(target)] - self.routable_hosts[str(target)] = list(hosts) + def cleanup(self): + self.connection_updater.stop() + super(DealerCallPublisherProxy, self).cleanup() + self.socket.close() class PublisherConnectionUpdater(zmq_updater.ConnectionUpdater): diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_reply_waiter.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_reply_waiter.py deleted file mode 100644 index bb15fb2d9..000000000 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_reply_waiter.py +++ /dev/null @@ -1,66 +0,0 @@ -# Copyright 2016 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging -import threading - -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._i18n import _LW - -LOG = logging.getLogger(__name__) - -zmq = zmq_async.import_zmq() - - -class ReplyWaiter(object): - - def __init__(self, conf): - self.conf = conf - self.replies = {} - self.poller = zmq_async.get_poller() - self.executor = zmq_async.get_executor(self.run_loop) - self.executor.execute() - self._lock = threading.Lock() - - def track_reply(self, reply_future, message_id): - with self._lock: - self.replies[message_id] = reply_future - - def untrack_id(self, message_id): - with self._lock: - self.replies.pop(message_id) - - def poll_socket(self, socket): - self.poller.register(socket, recv_method=self.receive_method) - - def receive_method(self, socket): - empty = socket.recv() - assert empty == b'', "Empty expected!" - reply = socket.recv_pyobj() - LOG.debug("Received reply %s", reply.message_id) - return reply - - def run_loop(self): - reply, socket = self.poller.poll( - timeout=self.conf.rpc_poll_timeout) - if reply is not None: - call_future = self.replies.get(reply.message_id) - if call_future: - call_future.set_result(reply) - else: - LOG.warning(_LW("Received timed out reply: %s"), - reply.message_id) - - def cleanup(self): - self.poller.close() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py index bfaff0d4d..bb5f29484 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py @@ -14,14 +14,11 @@ import abc import logging -import time import six from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_names -from oslo_messaging._drivers.zmq_driver import zmq_socket from oslo_messaging._i18n import _LE LOG = logging.getLogger(__name__) @@ -56,7 +53,7 @@ class PublisherBase(object): Publisher can send request objects from zmq_request. """ - def __init__(self, sockets_manager): + def __init__(self, sockets_manager, sender): """Construct publisher @@ -66,10 +63,10 @@ class PublisherBase(object): :param conf: configuration object :type conf: oslo_config.CONF """ - self.outbound_sockets = sockets_manager + self.sockets_manager = sockets_manager self.conf = sockets_manager.conf self.matchmaker = sockets_manager.matchmaker - super(PublisherBase, self).__init__() + self.sender = sender @abc.abstractmethod def send_request(self, request): @@ -79,126 +76,6 @@ class PublisherBase(object): :type request: zmq_request.Request """ - def _send_request(self, socket, request): - """Send request to consumer. - Helper private method which defines basic sending behavior. - - :param socket: Socket to publish message on - :type socket: zmq.Socket - :param request: Message data and destination container object - :type request: zmq_request.Request - """ - LOG.debug("Sending %(type)s message_id %(message)s to a target " - "%(target)s", - {"type": request.msg_type, - "message": request.message_id, - "target": request.target}) - socket.send_pyobj(request) - def cleanup(self): """Cleanup publisher. Close allocated connections.""" - self.outbound_sockets.cleanup() - - -class SocketsManager(object): - - def __init__(self, conf, matchmaker, listener_type, socket_type): - self.conf = conf - self.matchmaker = matchmaker - self.listener_type = listener_type - self.socket_type = socket_type - self.zmq_context = zmq.Context() - self.outbound_sockets = {} - self.socket_to_publishers = None - self.socket_to_routers = None - - def get_hosts(self, target): - return self.matchmaker.get_hosts( - target, zmq_names.socket_type_str(self.listener_type)) - - @staticmethod - def _key_from_target(target): - return target.topic if target.fanout else str(target) - - def _get_hosts_and_connect(self, socket, target): - hosts = self.get_hosts(target) - self._connect_to_hosts(socket, target, hosts) - - def _track_socket(self, socket, target): - key = self._key_from_target(target) - self.outbound_sockets[key] = (socket, time.time()) - - def _connect_to_hosts(self, socket, target, hosts): - for host in hosts: - socket.connect_to_host(host) - self._track_socket(socket, target) - - def _check_for_new_hosts(self, target): - key = self._key_from_target(target) - socket, tm = self.outbound_sockets[key] - if 0 <= self.conf.zmq_target_expire <= time.time() - tm: - self._get_hosts_and_connect(socket, target) - return socket - - def get_socket(self, target): - key = self._key_from_target(target) - if key in self.outbound_sockets: - socket = self._check_for_new_hosts(target) - else: - socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context, - self.socket_type) - self._get_hosts_and_connect(socket, target) - return socket - - def get_socket_to_publishers(self): - if self.socket_to_publishers is not None: - return self.socket_to_publishers - self.socket_to_publishers = zmq_socket.ZmqSocket( - self.conf, self.zmq_context, self.socket_type) - publishers = self.matchmaker.get_publishers() - for pub_address, router_address in publishers: - self.socket_to_publishers.connect_to_host(router_address) - return self.socket_to_publishers - - def get_socket_to_routers(self): - if self.socket_to_routers is not None: - return self.socket_to_routers - self.socket_to_routers = zmq_socket.ZmqSocket( - self.conf, self.zmq_context, self.socket_type) - routers = self.matchmaker.get_routers() - for router_address in routers: - self.socket_to_routers.connect_to_host(router_address) - return self.socket_to_routers - - def cleanup(self): - for socket, tm in self.outbound_sockets.values(): - socket.close() - - -class QueuedSender(PublisherBase): - - def __init__(self, sockets_manager, _do_send_request): - super(QueuedSender, self).__init__(sockets_manager) - self._do_send_request = _do_send_request - self.queue, self.empty_except = zmq_async.get_queue() - self.executor = zmq_async.get_executor(self.run_loop) - self.executor.execute() - - def send_request(self, request): - self.queue.put(request) - - def _connect_socket(self, target): - return self.outbound_sockets.get_socket(target) - - def run_loop(self): - try: - request = self.queue.get(timeout=self.conf.rpc_poll_timeout) - except self.empty_except: - return - - socket = self._connect_socket(request.target) - self._do_send_request(socket, request) - - def cleanup(self): - self.executor.stop() - super(QueuedSender, self).cleanup() + self.sockets_manager.cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py deleted file mode 100644 index 4960979a9..000000000 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_push_publisher.py +++ /dev/null @@ -1,52 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging - -from oslo_messaging._drivers.zmq_driver.client.publishers\ - import zmq_publisher_base -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_names - -LOG = logging.getLogger(__name__) - -zmq = zmq_async.import_zmq() - - -class PushPublisher(object): - - def __init__(self, conf, matchmaker): - super(PushPublisher, self).__init__() - sockets_manager = zmq_publisher_base.SocketsManager( - conf, matchmaker, zmq.PULL, zmq.PUSH) - - def _do_send_request(push_socket, request): - push_socket.send_pyobj(request) - - LOG.debug("Sending message_id %(message)s to a target %(target)s", - {"message": request.message_id, - "target": request.target}) - - self.sender = zmq_publisher_base.QueuedSender( - sockets_manager, _do_send_request) - - def send_request(self, request): - - if request.msg_type != zmq_names.CAST_TYPE: - raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type) - - self.sender.send_request(request) - - def cleanup(self): - self.sender.cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py index e5951cb9d..a8cfe934a 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py @@ -14,15 +14,14 @@ from oslo_messaging._drivers import common -from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ - import zmq_dealer_call_publisher from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ import zmq_dealer_publisher from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ import zmq_dealer_publisher_proxy -from oslo_messaging._drivers.zmq_driver.client.publishers \ - import zmq_publisher_base from oslo_messaging._drivers.zmq_driver.client import zmq_client_base +from oslo_messaging._drivers.zmq_driver.client import zmq_receivers +from oslo_messaging._drivers.zmq_driver.client import zmq_senders +from oslo_messaging._drivers.zmq_driver.client import zmq_sockets_manager from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names @@ -46,25 +45,34 @@ class ZmqClientMixDirectPubSub(zmq_client_base.ZmqClientBase): if conf.use_router_proxy or not conf.use_pub_sub: raise WrongClientException() - self.sockets_manager = zmq_publisher_base.SocketsManager( - conf, matchmaker, zmq.ROUTER, zmq.DEALER) + self.sockets_manager = zmq_sockets_manager.SocketsManager( + conf, matchmaker, zmq.ROUTER, zmq.DEALER + ) + + sender_proxy = zmq_senders.RequestSenderProxy(conf) + sender_direct = zmq_senders.RequestSenderDirect(conf) + + receiver_direct = zmq_receivers.ReplyReceiverDirect(conf) fanout_publisher = zmq_dealer_publisher_proxy.DealerPublisherProxy( - conf, matchmaker, self.sockets_manager.get_socket_to_publishers()) + self.sockets_manager, sender_proxy + ) super(ZmqClientMixDirectPubSub, self).__init__( conf, matchmaker, allowed_remote_exmods, publishers={ zmq_names.CALL_TYPE: - zmq_dealer_call_publisher.DealerCallPublisher( - conf, matchmaker, self.sockets_manager), + zmq_dealer_publisher.DealerCallPublisher( + self.sockets_manager, sender_direct, receiver_direct + ), zmq_names.CAST_FANOUT_TYPE: fanout_publisher, zmq_names.NOTIFY_TYPE: fanout_publisher, - "default": zmq_dealer_publisher.DealerPublisherAsync( - conf, matchmaker) + "default": + zmq_dealer_publisher.DealerPublisher(self.sockets_manager, + sender_direct) } ) @@ -82,18 +90,25 @@ class ZmqClientDirect(zmq_client_base.ZmqClientBase): if conf.use_pub_sub or conf.use_router_proxy: raise WrongClientException() - self.sockets_manager = zmq_publisher_base.SocketsManager( - conf, matchmaker, zmq.ROUTER, zmq.DEALER) + self.sockets_manager = zmq_sockets_manager.SocketsManager( + conf, matchmaker, zmq.ROUTER, zmq.DEALER + ) + + sender = zmq_senders.RequestSenderDirect(conf) + + receiver = zmq_receivers.ReplyReceiverDirect(conf) super(ZmqClientDirect, self).__init__( conf, matchmaker, allowed_remote_exmods, publishers={ zmq_names.CALL_TYPE: - zmq_dealer_call_publisher.DealerCallPublisher( - conf, matchmaker, self.sockets_manager), + zmq_dealer_publisher.DealerCallPublisher( + self.sockets_manager, sender, receiver + ), - "default": zmq_dealer_publisher.DealerPublisher( - conf, matchmaker) + "default": + zmq_dealer_publisher.DealerPublisher(self.sockets_manager, + sender) } ) @@ -113,18 +128,25 @@ class ZmqClientProxy(zmq_client_base.ZmqClientBase): if not conf.use_router_proxy: raise WrongClientException() - self.sockets_manager = zmq_publisher_base.SocketsManager( - conf, matchmaker, zmq.ROUTER, zmq.DEALER) + self.sockets_manager = zmq_sockets_manager.SocketsManager( + conf, matchmaker, zmq.ROUTER, zmq.DEALER + ) + + sender = zmq_senders.RequestSenderProxy(conf) + + receiver = zmq_receivers.ReplyReceiverProxy(conf) super(ZmqClientProxy, self).__init__( conf, matchmaker, allowed_remote_exmods, publishers={ zmq_names.CALL_TYPE: zmq_dealer_publisher_proxy.DealerCallPublisherProxy( - conf, matchmaker, self.sockets_manager), + self.sockets_manager, sender, receiver + ), - "default": zmq_dealer_publisher_proxy.DealerPublisherProxy( - conf, matchmaker, - self.sockets_manager.get_socket_to_publishers()) + "default": + zmq_dealer_publisher_proxy.DealerPublisherProxy( + self.sockets_manager, sender + ) } ) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py new file mode 100644 index 000000000..e8be2a364 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py @@ -0,0 +1,140 @@ +# Copyright 2016 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import logging +import threading + +import futurist +import six + +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_names + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +@six.add_metaclass(abc.ABCMeta) +class ReceiverBase(object): + """Base response receiving interface.""" + + def __init__(self, conf): + self.conf = conf + self._lock = threading.Lock() + self._requests = {} + self._poller = zmq_async.get_poller() + self._executor = zmq_async.get_executor(method=self._run_loop) + self._executor.execute() + + @abc.abstractproperty + def message_types(self): + """A list of supported incoming response types.""" + + def register_socket(self, socket): + """Register a socket for receiving data.""" + self._poller.register(socket, recv_method=self.recv_response) + + @abc.abstractmethod + def recv_response(self, socket): + """Receive a response and return a tuple of the form + (reply_id, message_type, message_id, response). + """ + + def track_request(self, request): + """Track a request via already registered sockets and return + a list of futures for monitoring all types of responses. + """ + futures = [] + for message_type in self.message_types: + future = futurist.Future() + self._set_future(request.message_id, message_type, future) + futures.append(future) + return futures + + def untrack_request(self, request): + """Untrack a request and stop monitoring any responses.""" + for message_type in self.message_types: + self._pop_future(request.message_id, message_type) + + def stop(self): + self._poller.close() + self._executor.stop() + + def _get_future(self, message_id, message_type): + with self._lock: + return self._requests.get((message_id, message_type)) + + def _set_future(self, message_id, message_type, future): + with self._lock: + self._requests[(message_id, message_type)] = future + + def _pop_future(self, message_id, message_type): + with self._lock: + return self._requests.pop((message_id, message_type), None) + + def _run_loop(self): + data, socket = self._poller.poll(timeout=self.conf.rpc_poll_timeout) + if data is None: + return + reply_id, message_type, message_id, response = data + assert message_type in self.message_types, \ + "%s is not supported!" % zmq_names.message_type_str(message_type) + future = self._get_future(message_id, message_type) + if future is not None: + LOG.debug("Received %(msg_type)s for %(msg_id)s", + {"msg_type": zmq_names.message_type_str(message_type), + "msg_id": message_id}) + future.set_result((reply_id, response)) + + +class AckReceiver(ReceiverBase): + + message_types = (zmq_names.ACK_TYPE,) + + +class ReplyReceiver(ReceiverBase): + + message_types = (zmq_names.REPLY_TYPE,) + + +class ReplyReceiverProxy(ReplyReceiver): + + def recv_response(self, socket): + empty = socket.recv() + assert empty == b'', "Empty expected!" + reply_id = socket.recv() + assert reply_id is not None, "Reply ID expected!" + message_type = int(socket.recv()) + assert message_type == zmq_names.REPLY_TYPE, "Reply is expected!" + message_id = socket.recv() + reply = socket.recv_pyobj() + LOG.debug("Received reply for %s", message_id) + return reply_id, message_type, message_id, reply + + +class ReplyReceiverDirect(ReplyReceiver): + + def recv_response(self, socket): + empty = socket.recv() + assert empty == b'', "Empty expected!" + reply = socket.recv_pyobj() + LOG.debug("Received reply for %s", reply.message_id) + return reply.reply_id, reply.type_, reply.message_id, reply + + +class AckAndReplyReceiver(ReceiverBase): + + message_types = (zmq_names.ACK_TYPE, zmq_names.REPLY_TYPE) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py new file mode 100644 index 000000000..2abb21b7e --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py @@ -0,0 +1,65 @@ +# Copyright 2016 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time + +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_names + +zmq = zmq_async.import_zmq() + + +class RoutingTable(object): + """This class implements local routing-table cache + taken from matchmaker. Its purpose is to give the next routable + host id (remote DEALER's id) by request for specific target in + round-robin fashion. + """ + + def __init__(self, conf, matchmaker): + self.conf = conf + self.matchmaker = matchmaker + self.routing_table = {} + self.routable_hosts = {} + + def get_all_hosts(self, target): + self._update_routing_table(target) + return list(self.routable_hosts.get(str(target)) or []) + + def get_routable_host(self, target): + self._update_routing_table(target) + hosts_for_target = self.routable_hosts[str(target)] + host = hosts_for_target.pop(0) + if not hosts_for_target: + self._renew_routable_hosts(target) + return host + + def _is_tm_expired(self, tm): + return 0 <= self.conf.zmq_target_expire <= time.time() - tm + + def _update_routing_table(self, target): + routing_record = self.routing_table.get(str(target)) + if routing_record is None: + self._fetch_hosts(target) + self._renew_routable_hosts(target) + elif self._is_tm_expired(routing_record[1]): + self._fetch_hosts(target) + + def _fetch_hosts(self, target): + self.routing_table[str(target)] = (self.matchmaker.get_hosts( + target, zmq_names.socket_type_str(zmq.DEALER)), time.time()) + + def _renew_routable_hosts(self, target): + hosts, _ = self.routing_table[str(target)] + self.routable_hosts[str(target)] = list(hosts) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py new file mode 100644 index 000000000..f7cde0d04 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_senders.py @@ -0,0 +1,94 @@ +# Copyright 2016 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import logging + +import six + +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_names + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +@six.add_metaclass(abc.ABCMeta) +class SenderBase(object): + """Base request/ack/reply sending interface.""" + + def __init__(self, conf): + self.conf = conf + + @abc.abstractmethod + def send(self, socket, message): + pass + + +class RequestSenderProxy(SenderBase): + + def send(self, socket, request): + socket.send(b'', zmq.SNDMORE) + socket.send(six.b(str(request.msg_type)), zmq.SNDMORE) + socket.send(six.b(request.routing_key), zmq.SNDMORE) + socket.send(six.b(request.message_id), zmq.SNDMORE) + socket.send_pyobj(request.context, zmq.SNDMORE) + socket.send_pyobj(request.message) + + LOG.debug("->[proxy:%(addr)s] Sending %(msg_type)s message " + "%(msg_id)s to target %(target)s", + {"addr": list(socket.connections), + "msg_type": zmq_names.message_type_str(request.msg_type), + "msg_id": request.message_id, + "target": request.target}) + + +class ReplySenderProxy(SenderBase): + + def send(self, socket, reply): + LOG.debug("Replying to %s", reply.message_id) + + assert reply.type_ == zmq_names.REPLY_TYPE, "Reply expected!" + + socket.send(b'', zmq.SNDMORE) + socket.send(six.b(str(reply.type_)), zmq.SNDMORE) + socket.send(reply.reply_id, zmq.SNDMORE) + socket.send(reply.message_id, zmq.SNDMORE) + socket.send_pyobj(reply) + + +class RequestSenderDirect(SenderBase): + + def send(self, socket, request): + socket.send(b'', zmq.SNDMORE) + socket.send_pyobj(request) + + LOG.debug("Sending %(msg_type)s message %(msg_id)s to " + "target %(target)s", + {"msg_type": zmq_names.message_type_str(request.msg_type), + "msg_id": request.message_id, + "target": request.target}) + + +class ReplySenderDirect(SenderBase): + + def send(self, socket, reply): + LOG.debug("Replying to %s", reply.message_id) + + assert reply.type_ == zmq_names.REPLY_TYPE, "Reply expected!" + + socket.send(reply.reply_id, zmq.SNDMORE) + socket.send(b'', zmq.SNDMORE) + socket.send_pyobj(reply) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py new file mode 100644 index 000000000..27e5f6e71 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py @@ -0,0 +1,96 @@ +# Copyright 2016 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time + +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._drivers.zmq_driver import zmq_names +from oslo_messaging._drivers.zmq_driver import zmq_socket + +zmq = zmq_async.import_zmq() + + +class SocketsManager(object): + + def __init__(self, conf, matchmaker, listener_type, socket_type): + self.conf = conf + self.matchmaker = matchmaker + self.listener_type = listener_type + self.socket_type = socket_type + self.zmq_context = zmq.Context() + self.outbound_sockets = {} + self.socket_to_publishers = None + self.socket_to_routers = None + + def get_hosts(self, target): + return self.matchmaker.get_hosts( + target, zmq_names.socket_type_str(self.listener_type)) + + @staticmethod + def _key_from_target(target): + return target.topic if target.fanout else str(target) + + def _get_hosts_and_connect(self, socket, target): + hosts = self.get_hosts(target) + self._connect_to_hosts(socket, target, hosts) + + def _track_socket(self, socket, target): + key = self._key_from_target(target) + self.outbound_sockets[key] = (socket, time.time()) + + def _connect_to_hosts(self, socket, target, hosts): + for host in hosts: + socket.connect_to_host(host) + self._track_socket(socket, target) + + def _check_for_new_hosts(self, target): + key = self._key_from_target(target) + socket, tm = self.outbound_sockets[key] + if 0 <= self.conf.zmq_target_expire <= time.time() - tm: + self._get_hosts_and_connect(socket, target) + return socket + + def get_socket(self, target): + key = self._key_from_target(target) + if key in self.outbound_sockets: + socket = self._check_for_new_hosts(target) + else: + socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context, + self.socket_type) + self._get_hosts_and_connect(socket, target) + return socket + + def get_socket_to_publishers(self): + if self.socket_to_publishers is not None: + return self.socket_to_publishers + self.socket_to_publishers = zmq_socket.ZmqSocket( + self.conf, self.zmq_context, self.socket_type) + publishers = self.matchmaker.get_publishers() + for pub_address, router_address in publishers: + self.socket_to_publishers.connect_to_host(router_address) + return self.socket_to_publishers + + def get_socket_to_routers(self): + if self.socket_to_routers is not None: + return self.socket_to_routers + self.socket_to_routers = zmq_socket.ZmqSocket( + self.conf, self.zmq_context, self.socket_type) + routers = self.matchmaker.get_routers() + for router_address in routers: + self.socket_to_routers.connect_to_host(router_address) + return self.socket_to_routers + + def cleanup(self): + for socket, tm in self.outbound_sockets.values(): + socket.close() diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py index c7792df82..cfde3adb8 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py @@ -14,15 +14,12 @@ import logging -import six - -from oslo_messaging._drivers import base from oslo_messaging._drivers import common as rpc_common -from oslo_messaging._drivers.zmq_driver.client.publishers\ - import zmq_publisher_base -from oslo_messaging._drivers.zmq_driver.client import zmq_response -from oslo_messaging._drivers.zmq_driver.server.consumers\ +from oslo_messaging._drivers.zmq_driver.client import zmq_senders +from oslo_messaging._drivers.zmq_driver.client import zmq_sockets_manager +from oslo_messaging._drivers.zmq_driver.server.consumers \ import zmq_consumer_base +from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_updater @@ -33,54 +30,11 @@ LOG = logging.getLogger(__name__) zmq = zmq_async.import_zmq() -class DealerIncomingMessage(base.RpcIncomingMessage): - - def __init__(self, context, message): - super(DealerIncomingMessage, self).__init__(context, message) - - def reply(self, reply=None, failure=None): - """Reply is not needed for non-call messages""" - - def acknowledge(self): - """Not sending acknowledge""" - - def requeue(self): - """Requeue is not supported""" - - -class DealerIncomingRequest(base.RpcIncomingMessage): - - def __init__(self, socket, reply_id, message_id, context, message): - super(DealerIncomingRequest, self).__init__(context, message) - self.reply_socket = socket - self.reply_id = reply_id - self.message_id = message_id - - def reply(self, reply=None, failure=None): - if failure is not None: - failure = rpc_common.serialize_remote_exception(failure) - response = zmq_response.Response(type=zmq_names.REPLY_TYPE, - message_id=self.message_id, - reply_id=self.reply_id, - reply_body=reply, - failure=failure) - - LOG.debug("Replying %s", self.message_id) - - self.reply_socket.send(b'', zmq.SNDMORE) - self.reply_socket.send(six.b(str(zmq_names.REPLY_TYPE)), zmq.SNDMORE) - self.reply_socket.send(self.reply_id, zmq.SNDMORE) - self.reply_socket.send(self.message_id, zmq.SNDMORE) - self.reply_socket.send_pyobj(response) - - def requeue(self): - """Requeue is not supported""" - - class DealerConsumer(zmq_consumer_base.SingleSocketConsumer): def __init__(self, conf, poller, server): - self.sockets_manager = zmq_publisher_base.SocketsManager( + self.sender = zmq_senders.ReplySenderProxy(conf) + self.sockets_manager = zmq_sockets_manager.SocketsManager( conf, server.matchmaker, zmq.ROUTER, zmq.DEALER) self.host = None super(DealerConsumer, self).__init__(conf, poller, server, zmq.DEALER) @@ -91,6 +45,7 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer): def subscribe_socket(self, socket_type): try: socket = self.sockets_manager.get_socket_to_routers() + self.sockets.append(socket) self.host = socket.handle.identity self.poller.register(socket, self.receive_message) return socket @@ -110,10 +65,12 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer): LOG.debug("[%(host)s] Received message %(id)s", {"host": self.host, "id": message_id}) if message_type == zmq_names.CALL_TYPE: - return DealerIncomingRequest( - socket, reply_id, message_id, context, message) + return zmq_incoming_message.ZmqIncomingMessage( + context, message, reply_id, message_id, socket, self.sender + ) elif message_type in zmq_names.NON_BLOCKING_TYPES: - return DealerIncomingMessage(context, message) + return zmq_incoming_message.ZmqIncomingMessage(context, + message) else: LOG.error(_LE("Unknown message type: %s"), zmq_names.message_type_str(message_type)) @@ -122,6 +79,7 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer): def cleanup(self): LOG.info(_LI("[%s] Destroy DEALER consumer"), self.host) + self.connection_updater.cleanup() super(DealerConsumer, self).cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py deleted file mode 100644 index 719c24e4e..000000000 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_pull_consumer.py +++ /dev/null @@ -1,69 +0,0 @@ -# Copyright 2015 Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging - -from oslo_messaging._drivers import base -from oslo_messaging._drivers.zmq_driver.server.consumers\ - import zmq_consumer_base -from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_names -from oslo_messaging._i18n import _LE, _LI - -LOG = logging.getLogger(__name__) - -zmq = zmq_async.import_zmq() - - -class PullIncomingMessage(base.RpcIncomingMessage): - - def __init__(self, context, message): - super(PullIncomingMessage, self).__init__(context, message) - - def reply(self, reply=None, failure=None): - """Reply is not needed for non-call messages.""" - - def acknowledge(self): - """Acknowledgments are not supported by this type of consumer.""" - - def requeue(self): - """Requeueing is not supported.""" - - -class PullConsumer(zmq_consumer_base.SingleSocketConsumer): - - def __init__(self, conf, poller, server): - super(PullConsumer, self).__init__(conf, poller, server, zmq.PULL) - LOG.info(_LI("[%s] Run PULL consumer"), self.host) - - def receive_message(self, socket): - try: - request = socket.recv_pyobj() - msg_type = request.msg_type - assert msg_type is not None, 'Bad format: msg type expected' - context = request.context - message = request.message - LOG.debug("[%(host)s] Received %(type)s, %(id)s, %(target)s", - {"host": self.host, - "type": request.msg_type, - "id": request.message_id, - "target": request.target}) - - if msg_type in (zmq_names.CAST_TYPES + zmq_names.NOTIFY_TYPES): - return PullIncomingMessage(context, message) - else: - LOG.error(_LE("Unknown message type: %s"), msg_type) - - except (zmq.ZMQError, AssertionError) as e: - LOG.error(_LE("Receiving message failed: %s"), str(e)) diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py index 0e40d5cae..64cbcfd30 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_router_consumer.py @@ -14,7 +14,7 @@ import logging -from oslo_messaging._drivers import base +from oslo_messaging._drivers.zmq_driver.client import zmq_senders from oslo_messaging._drivers.zmq_driver.server.consumers\ import zmq_consumer_base from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message @@ -27,29 +27,10 @@ LOG = logging.getLogger(__name__) zmq = zmq_async.import_zmq() -class RouterIncomingMessage(base.RpcIncomingMessage): - - def __init__(self, context, message, socket, reply_id, msg_id, - poller): - super(RouterIncomingMessage, self).__init__(context, message) - self.socket = socket - self.reply_id = reply_id - self.msg_id = msg_id - self.message = message - - def reply(self, reply=None, failure=None): - """Reply is not needed for non-call messages""" - - def acknowledge(self): - LOG.debug("Not sending acknowledge for %s", self.msg_id) - - def requeue(self): - """Requeue is not supported""" - - class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): def __init__(self, conf, poller, server): + self.sender = zmq_senders.ReplySenderDirect(conf) super(RouterConsumer, self).__init__(conf, poller, server, zmq.ROUTER) LOG.info(_LI("[%s] Run ROUTER consumer"), self.host) @@ -70,14 +51,19 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer): "target": request.target}) if request.msg_type == zmq_names.CALL_TYPE: - return zmq_incoming_message.ZmqIncomingRequest( - socket, reply_id, request, self.poller) + return zmq_incoming_message.ZmqIncomingMessage( + request.context, request.message, reply_id, + request.message_id, socket, self.sender + ) elif request.msg_type in zmq_names.NON_BLOCKING_TYPES: - return RouterIncomingMessage( - request.context, request.message, socket, reply_id, - request.message_id, self.poller) + return zmq_incoming_message.ZmqIncomingMessage(request.context, + request.message) else: - LOG.error(_LE("Unknown message type: %s"), request.msg_type) - + LOG.error(_LE("Unknown message type: %s"), + zmq_names.message_type_str(request.msg_type)) except (zmq.ZMQError, AssertionError) as e: LOG.error(_LE("Receiving message failed: %s"), str(e)) + + def cleanup(self): + LOG.info(_LI("[%s] Destroy ROUTER consumer"), self.host) + super(RouterConsumer, self).cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py index 6aa8ec4eb..a6e32aa53 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py @@ -16,9 +16,9 @@ import logging import six -from oslo_messaging._drivers import base -from oslo_messaging._drivers.zmq_driver.server.consumers\ +from oslo_messaging._drivers.zmq_driver.server.consumers \ import zmq_consumer_base +from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_socket @@ -29,21 +29,6 @@ LOG = logging.getLogger(__name__) zmq = zmq_async.import_zmq() -class SubIncomingMessage(base.RpcIncomingMessage): - - def __init__(self, context, message): - super(SubIncomingMessage, self).__init__(context, message) - - def reply(self, reply=None, failure=None): - """Reply is not needed for non-call messages.""" - - def acknowledge(self): - """Requeue is not supported""" - - def requeue(self): - """Requeue is not supported""" - - class SubConsumer(zmq_consumer_base.ConsumerBase): def __init__(self, conf, poller, server): @@ -89,8 +74,7 @@ class SubConsumer(zmq_consumer_base.ConsumerBase): context, message = self._receive_request(socket) if not message: return None - - return SubIncomingMessage(context, message) + return zmq_incoming_message.ZmqIncomingMessage(context, message) except (zmq.ZMQError, AssertionError) as e: LOG.error(_LE("Receiving message failed: %s"), str(e)) diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py index 51c83e2c2..2c7622713 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_incoming_message.py @@ -21,38 +21,41 @@ from oslo_messaging._drivers.zmq_driver.client import zmq_response from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names - LOG = logging.getLogger(__name__) zmq = zmq_async.import_zmq() -class ZmqIncomingRequest(base.RpcIncomingMessage): +class ZmqIncomingMessage(base.RpcIncomingMessage): - def __init__(self, socket, rep_id, request, poller): - super(ZmqIncomingRequest, self).__init__(request.context, - request.message) - self.reply_socket = socket - self.reply_id = rep_id - self.request = request - self.received = None - self.poller = poller + def __init__(self, context, message, reply_id=None, message_id=None, + socket=None, sender=None): + + if sender is not None: + assert socket is not None, "Valid socket expected!" + assert message_id is not None, "Valid message ID expected!" + assert reply_id is not None, "Valid reply ID expected!" + + super(ZmqIncomingMessage, self).__init__(context, message) + + self.reply_id = reply_id + self.message_id = message_id + self.socket = socket + self.sender = sender + + def acknowledge(self): + """Not sending acknowledge""" def reply(self, reply=None, failure=None): - if failure is not None: - failure = rpc_common.serialize_remote_exception(failure) - response = zmq_response.Response(type=zmq_names.REPLY_TYPE, - message_id=self.request.message_id, - reply_id=self.reply_id, - reply_body=reply, - failure=failure) - - LOG.debug("Replying %s", (str(self.request.message_id))) - - self.received = True - self.reply_socket.send(self.reply_id, zmq.SNDMORE) - self.reply_socket.send(b'', zmq.SNDMORE) - self.reply_socket.send_pyobj(response) + if self.sender is not None: + if failure is not None: + failure = rpc_common.serialize_remote_exception(failure) + reply = zmq_response.Response(type=zmq_names.REPLY_TYPE, + message_id=self.message_id, + reply_id=self.reply_id, + reply_body=reply, + failure=failure) + self.sender.send(self.socket, reply) def requeue(self): """Requeue is not supported""" diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_names.py b/oslo_messaging/_drivers/zmq_driver/zmq_names.py index 51f68c6e8..8b63e0ef0 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_names.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_names.py @@ -69,8 +69,8 @@ def socket_type_str(socket_type): def message_type_str(message_type): msg_type_str = {CALL_TYPE: "CALL", CAST_TYPE: "CAST", - CAST_FANOUT_TYPE: "CAST_FANOUT_TYPE", - NOTIFY_TYPE: "NOTIFY_TYPE", - REPLY_TYPE: "REPLY_TYPE", - ACK_TYPE: "ACK_TYPE"} + CAST_FANOUT_TYPE: "CAST_FANOUT", + NOTIFY_TYPE: "NOTIFY", + REPLY_TYPE: "REPLY", + ACK_TYPE: "ACK"} return msg_type_str[message_type] diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_updater.py b/oslo_messaging/_drivers/zmq_driver/zmq_updater.py index a8ea82279..302915d1c 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_updater.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_updater.py @@ -31,6 +31,8 @@ class UpdaterBase(object): self.conf = conf self.matchmaker = matchmaker self.update_method = update_method + # make first update immediately + self.update_method() self.executor = zmq_async.get_executor(method=self._update_loop) self.executor.execute() diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py index e48952264..0dcc04778 100644 --- a/oslo_messaging/tests/functional/utils.py +++ b/oslo_messaging/tests/functional/utils.py @@ -300,6 +300,8 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase): zmq_redis_port = os.environ.get('ZMQ_REDIS_PORT') if zmq_redis_port: self.config(port=zmq_redis_port, group="matchmaker_redis") + self.config(check_timeout=10000, group="matchmaker_redis") + self.config(wait_timeout=1000, group="matchmaker_redis") zmq_use_pub_sub = os.environ.get('ZMQ_USE_PUB_SUB') if zmq_use_pub_sub: self.config(use_pub_sub=zmq_use_pub_sub)