diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py index 6b8a70bd9..80044dcff 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_base.py @@ -34,15 +34,18 @@ class DealerPublisherBase(zmq_publisher_base.PublisherBase): def __init__(self, conf, matchmaker, sender, receiver): sockets_manager = zmq_sockets_manager.SocketsManager( - conf, matchmaker, zmq.ROUTER, zmq.DEALER - ) - super(DealerPublisherBase, self).__init__(sockets_manager, sender, - receiver) + conf, matchmaker, zmq.ROUTER, zmq.DEALER) + self.socket_type = zmq.DEALER + super(DealerPublisherBase, self).__init__( + sockets_manager, sender, receiver) def _check_received_data(self, reply_id, reply, request): assert isinstance(reply, zmq_response.Reply), "Reply expected!" - def _recv_reply(self, request, socket): + def _finally_unregister(self, socket, request): + self.receiver.untrack_request(request) + + def receive_reply(self, socket, request): self.receiver.register_socket(socket) reply_future = \ self.receiver.track_request(request)[zmq_names.REPLY_TYPE] @@ -57,11 +60,10 @@ class DealerPublisherBase(zmq_publisher_base.PublisherBase): except futures.TimeoutError: self._raise_timeout(request) finally: - self.receiver.untrack_request(request) + self._finally_unregister(socket, request) if reply.failure: raise rpc_common.deserialize_remote_exception( - reply.failure, request.allowed_remote_exmods - ) + reply.failure, request.allowed_remote_exmods) else: return reply.reply_body diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py index 9356ac623..e7fdaa325 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_direct.py @@ -12,16 +12,18 @@ # License for the specific language governing permissions and limitations # under the License. -import logging -import retrying +import logging from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ import zmq_dealer_publisher_base from oslo_messaging._drivers.zmq_driver.client import zmq_receivers +from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table from oslo_messaging._drivers.zmq_driver.client import zmq_senders from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names +from oslo_messaging._drivers.zmq_driver import zmq_socket + LOG = logging.getLogger(__name__) @@ -29,9 +31,32 @@ zmq = zmq_async.import_zmq() class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase): - """DEALER-publisher using direct connections.""" + """DEALER-publisher using direct connections. + + Publishing directly to remote services assumes the following: + - All direct connections are dynamic - so they live per message, + thus each message send executes the following: + * Open a new socket + * Connect to some host got from the RoutingTable + * Send message(s) + * Close connection, destroy socket + - RoutingTable/RoutingTableUpdater implements local cache of + matchmaker (e.g. Redis) for target resolution to the list of + available hosts. Cache updates in a background thread. + - Caching of connections is not appropriate for directly connected + OS services, because finally it results in a full-mesh of + connections between services. + - Yes we lose on performance opening and closing connections + for each message, but that is done intentionally to implement + the dynamic connections concept. The key thought here is to + have minimum number of connected services at the moment. + - Using the local RoutingTable cache is done to optimise access + to the matchmaker so we don't call the matchmaker per each message + """ def __init__(self, conf, matchmaker): + self.routing_table = zmq_routing_table.RoutingTableAdaptor( + conf, matchmaker, zmq.ROUTER) sender = zmq_senders.RequestSenderDirect(conf) if conf.oslo_messaging_zmq.rpc_use_acks: receiver = zmq_receivers.AckAndReplyReceiverDirect(conf) @@ -40,19 +65,34 @@ class DealerPublisherDirect(zmq_dealer_publisher_base.DealerPublisherBase): super(DealerPublisherDirect, self).__init__(conf, matchmaker, sender, receiver) - def _connect_socket(self, request): - try: - return self.sockets_manager.get_socket(request.target) - except retrying.RetryError: - return None + def _get_round_robin_host_connection(self, target, socket): + host = self.routing_table.get_round_robin_host(target) + socket.connect_to_host(host) - def _send_request(self, request): - socket = self._connect_socket(request) - if not socket: - return None + def _get_fanout_connection(self, target, socket): + for host in self.routing_table.get_fanout_hosts(target): + socket.connect_to_host(host) + + def acquire_connection(self, request): + socket = zmq_socket.ZmqSocket(self.conf, self.context, + self.socket_type, immediate=False) + if request.msg_type in zmq_names.DIRECT_TYPES: + self._get_round_robin_host_connection(request.target, socket) + elif request.msg_type in zmq_names.MULTISEND_TYPES: + self._get_fanout_connection(request.target, socket) + return socket + + def _finally_unregister(self, socket, request): + super(DealerPublisherDirect, self)._finally_unregister(socket, request) + self.receiver.unregister_socket(socket) + + def send_request(self, socket, request): if request.msg_type in zmq_names.MULTISEND_TYPES: for _ in range(socket.connections_count()): self.sender.send(socket, request) else: self.sender.send(socket, request) - return socket + + def cleanup(self): + self.routing_table.cleanup() + super(DealerPublisherDirect, self).cleanup() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py index dce8c2ff0..d67d1b472 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py @@ -13,6 +13,7 @@ # under the License. import logging +import random import uuid import six @@ -22,11 +23,13 @@ from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ from oslo_messaging._drivers.zmq_driver.client import zmq_receivers from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table from oslo_messaging._drivers.zmq_driver.client import zmq_senders +from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_base from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names +from oslo_messaging._drivers.zmq_driver import zmq_socket from oslo_messaging._drivers.zmq_driver import zmq_updater -from oslo_messaging._i18n import _LW + LOG = logging.getLogger(__name__) @@ -46,8 +49,10 @@ class DealerPublisherProxy(zmq_dealer_publisher_base.DealerPublisherBase): receiver) self.socket = self.sockets_manager.get_socket_to_publishers( self._generate_identity()) - self.routing_table = zmq_routing_table.RoutingTable(self.conf, - self.matchmaker) + + self.routing_table = zmq_routing_table.RoutingTableAdaptor( + conf, matchmaker, zmq.DEALER) + self.connection_updater = \ PublisherConnectionUpdater(self.conf, self.matchmaker, self.socket) @@ -63,30 +68,24 @@ class DealerPublisherProxy(zmq_dealer_publisher_base.DealerPublisherBase): def _get_routing_keys(self, request): if request.msg_type in zmq_names.DIRECT_TYPES: - return [self.routing_table.get_routable_host(request.target)] + return [self.routing_table.get_round_robin_host(request.target)] else: return \ [zmq_address.target_to_subscribe_filter(request.target)] \ if self.conf.oslo_messaging_zmq.use_pub_sub else \ - self.routing_table.get_all_hosts(request.target) + self.routing_table.get_fanout_hosts(request.target) - def _send_request(self, request): - routing_keys = [routing_key - for routing_key in self._get_routing_keys(request) - if routing_key is not None] - if not routing_keys: - LOG.warning(_LW("Matchmaker contains no records for specified " - "target %(target)s. Dropping message %(msg_id)s.") - % {"target": request.target, - "msg_id": request.message_id}) - return None - for routing_key in routing_keys: - request.routing_key = routing_key - self.sender.send(self.socket, request) + def acquire_connection(self, request): return self.socket + def send_request(self, socket, request): + for routing_key in self._get_routing_keys(request): + request.routing_key = routing_key + self.sender.send(socket, request) + def cleanup(self): super(DealerPublisherProxy, self).cleanup() + self.routing_table.cleanup() self.connection_updater.stop() self.socket.close() @@ -97,3 +96,49 @@ class PublisherConnectionUpdater(zmq_updater.ConnectionUpdater): publishers = self.matchmaker.get_publishers() for pub_address, router_address in publishers: self.socket.connect_to_host(router_address) + + +class DealerPublisherProxyDynamic( + zmq_dealer_publisher_base.DealerPublisherBase): + + def __init__(self, conf, matchmaker): + self.publishers = set() + self.updater = DynamicPublishersUpdater(conf, matchmaker, + self.publishers) + self.updater.update_publishers() + sender = zmq_senders.RequestSenderProxy(conf) + receiver = zmq_receivers.ReplyReceiverDirect(conf) + super(DealerPublisherProxyDynamic, self).__init__( + conf, matchmaker, sender, receiver) + + def acquire_connection(self, request): + socket = zmq_socket.ZmqSocket(self.conf, self.context, + self.socket_type, immediate=False) + if not self.publishers: + raise zmq_matchmaker_base.MatchmakerUnavailable() + socket.connect_to_host(random.choice(tuple(self.publishers))) + return socket + + def send_request(self, socket, request): + assert request.msg_type in zmq_names.MULTISEND_TYPES + request.routing_key = zmq_address.target_to_subscribe_filter( + request.target) + self.sender.send(socket, request) + + def cleanup(self): + super(DealerPublisherProxyDynamic, self).cleanup() + self.updater.cleanup() + + +class DynamicPublishersUpdater(zmq_updater.UpdaterBase): + + def __init__(self, conf, matchmaker, publishers): + super(DynamicPublishersUpdater, self).__init__( + conf, matchmaker, self.update_publishers, + sleep_for=conf.oslo_messaging_zmq.zmq_target_update + ) + self.publishers = publishers + + def update_publishers(self): + for _, pub_frontend in self.matchmaker.get_publishers(): + self.publishers.add(pub_frontend) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py index c7c4cc8d4..838d11e2c 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py @@ -18,32 +18,13 @@ import logging import six import oslo_messaging -from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver import zmq_async -from oslo_messaging._drivers.zmq_driver import zmq_names -from oslo_messaging._i18n import _LE + LOG = logging.getLogger(__name__) - zmq = zmq_async.import_zmq() -class UnsupportedSendPattern(rpc_common.RPCException): - - """Exception to raise from publishers in case of unsupported - sending pattern called. - """ - - def __init__(self, pattern_name): - """Construct exception object - - :param pattern_name: Message type name from zmq_names - :type pattern_name: str - """ - errmsg = _LE("Sending pattern %s is unsupported.") % pattern_name - super(UnsupportedSendPattern, self).__init__(errmsg) - - @six.add_metaclass(abc.ABCMeta) class PublisherBase(object): @@ -68,16 +49,40 @@ class PublisherBase(object): :param receiver: reply receiver object :type receiver: zmq_receivers.ReplyReceiver """ + self.context = zmq.Context() self.sockets_manager = sockets_manager self.conf = sockets_manager.conf self.matchmaker = sockets_manager.matchmaker self.sender = sender self.receiver = receiver - @staticmethod - def _check_message_pattern(expected, actual): - if expected != actual: - raise UnsupportedSendPattern(zmq_names.message_type_str(actual)) + @abc.abstractmethod + def acquire_connection(self, request): + """Get socket to publish request on it + + :param request: request object + :type senders: zmq_request.Request + """ + + @abc.abstractmethod + def send_request(self, socket, request): + """Publish request on a socket + + :param socket: socket object to publish request on + :type socket: zmq_socket.ZmqSocket + :param request: request object + :type senders: zmq_request.Request + """ + + @abc.abstractmethod + def receive_reply(self, socket, request): + """Wait for a reply via the socket used for sending the request. + + :param socket: socket object to receive reply from + :type socket: zmq_socket.ZmqSocket + :param request: request object + :type senders: zmq_request.Request + """ @staticmethod def _raise_timeout(request): @@ -86,37 +91,6 @@ class PublisherBase(object): {"tout": request.timeout, "msg_id": request.message_id} ) - @abc.abstractmethod - def _send_request(self, request): - """Send the request and return a socket used for that. - Return value of None means some failure (e.g. connection - can't be established, etc). - """ - - @abc.abstractmethod - def _recv_reply(self, request, socket): - """Wait for a reply via the socket used for sending the request.""" - - def send_call(self, request): - self._check_message_pattern(zmq_names.CALL_TYPE, request.msg_type) - socket = self._send_request(request) - if not socket: - raise self._raise_timeout(request) - return self._recv_reply(request, socket) - - def send_cast(self, request): - self._check_message_pattern(zmq_names.CAST_TYPE, request.msg_type) - self._send_request(request) - - def send_fanout(self, request): - self._check_message_pattern(zmq_names.CAST_FANOUT_TYPE, - request.msg_type) - self._send_request(request) - - def send_notify(self, request): - self._check_message_pattern(zmq_names.NOTIFY_TYPE, request.msg_type) - self._send_request(request) - def cleanup(self): """Cleanup publisher. Close allocated connections.""" self.receiver.stop() diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_ack_manager.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_ack_manager.py index e1d9e8897..d5b408232 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_ack_manager.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_ack_manager.py @@ -15,6 +15,7 @@ from concurrent import futures import logging +from oslo_messaging._drivers.zmq_driver.client import zmq_publisher_manager from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._i18n import _LE, _LW @@ -24,38 +25,10 @@ LOG = logging.getLogger(__name__) zmq = zmq_async.import_zmq() -class AckManagerBase(object): +class AckManager(zmq_publisher_manager.PublisherManagerBase): def __init__(self, publisher): - self.publisher = publisher - self.conf = publisher.conf - self.sender = publisher.sender - self.receiver = publisher.receiver - - def send_call(self, request): - return self.publisher.send_call(request) - - def send_cast(self, request): - self.publisher.send_cast(request) - - def send_fanout(self, request): - self.publisher.send_fanout(request) - - def send_notify(self, request): - self.publisher.send_notify(request) - - def cleanup(self): - self.publisher.cleanup() - - -class AckManagerDirect(AckManagerBase): - pass - - -class AckManagerProxy(AckManagerBase): - - def __init__(self, publisher): - super(AckManagerProxy, self).__init__(publisher) + super(AckManager, self).__init__(publisher) self._pool = zmq_async.get_pool( size=self.conf.oslo_messaging_zmq.rpc_thread_pool_size ) @@ -99,33 +72,39 @@ class AckManagerProxy(AckManagerBase): if request.msg_type != zmq_names.CALL_TYPE: self.receiver.untrack_request(request) - def _send_request_and_get_ack_future(self, request): - socket = self.publisher._send_request(request) - if not socket: - return None + def _schedule_request_for_ack(self, request): + socket = self.publisher.acquire_connection(request) + self.publisher.send_request(socket, request) self.receiver.register_socket(socket) - ack_future = self.receiver.track_request(request)[zmq_names.ACK_TYPE] + futures_by_type = self.receiver.track_request(request) + ack_future = futures_by_type[zmq_names.ACK_TYPE] ack_future.request = request ack_future.socket = socket return ack_future + @zmq_publisher_manager.target_not_found_timeout def send_call(self, request): - ack_future = self._send_request_and_get_ack_future(request) - if not ack_future: - self.publisher._raise_timeout(request) - self._pool.submit(self._wait_for_ack, ack_future) try: - return self.publisher._recv_reply(request, ack_future.socket) + ack_future = self._schedule_request_for_ack(request) + self._pool.submit(self._wait_for_ack, ack_future) + return self.publisher.receive_reply(ack_future.socket, request) finally: if not ack_future.done(): ack_future.set_result((None, None)) + @zmq_publisher_manager.target_not_found_warn def send_cast(self, request): - ack_future = self._send_request_and_get_ack_future(request) - if not ack_future: - return + ack_future = self._schedule_request_for_ack(request) self._pool.submit(self._wait_for_ack, ack_future) + @zmq_publisher_manager.target_not_found_warn + def _send_request(self, request): + socket = self.publisher.acquire_connection(request) + self.publisher.send_request(socket, request) + def cleanup(self): self._pool.shutdown(wait=True) - super(AckManagerProxy, self).cleanup() + super(AckManager, self).cleanup() + + send_fanout = _send_request + send_notify = _send_request diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py index 24f08f002..9175ad120 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client.py @@ -14,11 +14,6 @@ from oslo_messaging._drivers import common -from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ - import zmq_dealer_publisher_direct -from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ - import zmq_dealer_publisher_proxy -from oslo_messaging._drivers.zmq_driver.client import zmq_ack_manager from oslo_messaging._drivers.zmq_driver.client import zmq_client_base from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names @@ -44,17 +39,9 @@ class ZmqClientMixDirectPubSub(zmq_client_base.ZmqClientBase): conf.oslo_messaging_zmq.use_pub_sub: raise WrongClientException() - publisher_direct = self.create_publisher( - conf, matchmaker, - zmq_dealer_publisher_direct.DealerPublisherDirect, - zmq_ack_manager.AckManagerDirect - ) - - publisher_proxy = self.create_publisher( - conf, matchmaker, - zmq_dealer_publisher_proxy.DealerPublisherProxy, - zmq_ack_manager.AckManagerProxy - ) + publisher_direct = self._create_publisher_direct(conf, matchmaker) + publisher_proxy = self._create_publisher_proxy_dynamic(conf, + matchmaker) super(ZmqClientMixDirectPubSub, self).__init__( conf, matchmaker, allowed_remote_exmods, @@ -80,15 +67,10 @@ class ZmqClientDirect(zmq_client_base.ZmqClientBase): conf.oslo_messaging_zmq.use_router_proxy: raise WrongClientException() - publisher = self.create_publisher( - conf, matchmaker, - zmq_dealer_publisher_direct.DealerPublisherDirect, - zmq_ack_manager.AckManagerDirect - ) - super(ZmqClientDirect, self).__init__( conf, matchmaker, allowed_remote_exmods, - publishers={"default": publisher} + publishers={"default": self._create_publisher_direct( + conf, matchmaker)} ) @@ -107,13 +89,8 @@ class ZmqClientProxy(zmq_client_base.ZmqClientBase): if not conf.oslo_messaging_zmq.use_router_proxy: raise WrongClientException() - publisher = self.create_publisher( - conf, matchmaker, - zmq_dealer_publisher_proxy.DealerPublisherProxy, - zmq_ack_manager.AckManagerProxy - ) - super(ZmqClientProxy, self).__init__( conf, matchmaker, allowed_remote_exmods, - publishers={"default": publisher} + publishers={"default": self._create_publisher_proxy( + conf, matchmaker)} ) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py index 5d6ee5a6d..42d8568e0 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_client_base.py @@ -12,6 +12,12 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ + import zmq_dealer_publisher_direct +from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \ + import zmq_dealer_publisher_proxy +from oslo_messaging._drivers.zmq_driver.client import zmq_ack_manager +from oslo_messaging._drivers.zmq_driver.client import zmq_publisher_manager from oslo_messaging._drivers.zmq_driver.client import zmq_request from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names @@ -37,13 +43,6 @@ class ZmqClientBase(object): self.notify_publisher = publishers.get(zmq_names.NOTIFY_TYPE, publishers["default"]) - @staticmethod - def create_publisher(conf, matchmaker, publisher_cls, ack_manager_cls): - publisher = publisher_cls(conf, matchmaker) - if conf.oslo_messaging_zmq.rpc_use_acks: - publisher = ack_manager_cls(publisher) - return publisher - def send_call(self, target, context, message, timeout=None, retry=None): request = zmq_request.CallRequest( target, context=context, message=message, retry=retry, @@ -70,6 +69,27 @@ class ZmqClientBase(object): ) self.notify_publisher.send_notify(request) + @staticmethod + def _create_publisher_direct(conf, matchmaker): + publisher_direct = zmq_dealer_publisher_direct.DealerPublisherDirect( + conf, matchmaker) + return zmq_publisher_manager.PublisherManagerDynamic( + publisher_direct) + + @staticmethod + def _create_publisher_proxy(conf, matchmaker): + publisher_proxy = zmq_dealer_publisher_proxy.DealerPublisherProxy( + conf, matchmaker) + return zmq_ack_manager.AckManager(publisher_proxy) \ + if conf.oslo_messaging_zmq.rpc_use_acks else \ + zmq_publisher_manager.PublisherManagerStatic(publisher_proxy) + + @staticmethod + def _create_publisher_proxy_dynamic(conf, matchmaker): + return zmq_publisher_manager.PublisherManagerDynamic( + zmq_dealer_publisher_proxy.DealerPublisherProxyDynamic( + conf, matchmaker)) + def cleanup(self): cleaned = set() for publisher in self.publishers.values(): diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_publisher_manager.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_publisher_manager.py new file mode 100644 index 000000000..c790e138c --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_publisher_manager.py @@ -0,0 +1,143 @@ +# Copyright 2016 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import contextlib +import logging + +import retrying +import six + +from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_base +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging._i18n import _LW + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +def target_not_found_warn(func): + def _target_not_found_warn(self, request, *args, **kwargs): + try: + return func(self, request, *args, **kwargs) + except (zmq_matchmaker_base.MatchmakerUnavailable, + retrying.RetryError): + LOG.warning(_LW("Matchmaker contains no records for specified " + "target %(target)s. Dropping message %(msg_id)s.") + % {"target": request.target, + "msg_id": request.message_id}) + return _target_not_found_warn + + +def target_not_found_timeout(func): + def _target_not_found_timeout(self, request, *args, **kwargs): + try: + return func(self, request, *args, **kwargs) + except (zmq_matchmaker_base.MatchmakerUnavailable, + retrying.RetryError): + self.publisher._raise_timeout(request) + return _target_not_found_timeout + + +@six.add_metaclass(abc.ABCMeta) +class PublisherManagerBase(object): + + """Abstract publisher manager class + + Publisher knows how to establish connection, how to send message, + and how to receive reply. PublisherManager coordinates all these steps + regarding retrying logic in AckManager implementations + """ + + def __init__(self, publisher): + self.publisher = publisher + self.conf = publisher.conf + self.sender = publisher.sender + self.receiver = publisher.receiver + + @abc.abstractmethod + def send_call(self, request): + """Send call request + + :param request: request object + :type senders: zmq_request.Request + """ + + @abc.abstractmethod + def send_cast(self, request): + """Send call request + + :param request: request object + :type senders: zmq_request.Request + """ + + @abc.abstractmethod + def send_fanout(self, request): + """Send call request + + :param request: request object + :type senders: zmq_request.Request + """ + + @abc.abstractmethod + def send_notify(self, request): + """Send call request + + :param request: request object + :type senders: zmq_request.Request + """ + + def cleanup(self): + self.publisher.cleanup() + + +class PublisherManagerDynamic(PublisherManagerBase): + + @target_not_found_timeout + def send_call(self, request): + with contextlib.closing( + self.publisher.acquire_connection(request)) as socket: + self.publisher.send_request(socket, request) + reply = self.publisher.receive_reply(socket, request) + return reply + + @target_not_found_warn + def _send(self, request): + with contextlib.closing(self.publisher.acquire_connection(request)) \ + as socket: + self.publisher.send_request(socket, request) + + send_cast = _send + send_fanout = _send + send_notify = _send + + +class PublisherManagerStatic(PublisherManagerBase): + + @target_not_found_timeout + def send_call(self, request): + socket = self.publisher.acquire_connection(request) + self.publisher.send_request(socket, request) + reply = self.publisher.receive_reply(socket, request) + return reply + + @target_not_found_warn + def _send(self, request): + socket = self.publisher.acquire_connection(request) + self.publisher.send_request(socket, request) + + send_cast = _send + send_fanout = _send + send_notify = _send diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py index 40c824e53..6bf4665a9 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_receivers.py @@ -95,8 +95,7 @@ class ReceiverBase(object): def _run_loop(self): data, socket = self._poller.poll( - timeout=self.conf.oslo_messaging_zmq.rpc_poll_timeout - ) + timeout=self.conf.oslo_messaging_zmq.rpc_poll_timeout) if data is None: return reply_id, message_type, message_id, response = data diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py index 569826dd1..d032e4f79 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_routing_table.py @@ -13,12 +13,16 @@ # under the License. import logging -import retrying +import threading import time +import itertools + from oslo_messaging._drivers.zmq_driver.matchmaker import zmq_matchmaker_base +from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names +from oslo_messaging._drivers.zmq_driver import zmq_updater from oslo_messaging._i18n import _LW zmq = zmq_async.import_zmq() @@ -26,75 +30,162 @@ zmq = zmq_async.import_zmq() LOG = logging.getLogger(__name__) -class RoutingTable(object): - """This class implements local routing-table cache - taken from matchmaker. Its purpose is to give the next routable - host id (remote DEALER's id) by request for specific target in - round-robin fashion. - """ +class RoutingTableAdaptor(object): - def __init__(self, conf, matchmaker): + def __init__(self, conf, matchmaker, listener_type): self.conf = conf self.matchmaker = matchmaker - self.routing_table = {} - self.routable_hosts = {} + self.listener_type = listener_type + self.routing_table = RoutingTable(conf) + self.routing_table_updater = RoutingTableUpdater( + conf, matchmaker, self.routing_table) + self.round_robin_targets = {} - def get_all_hosts(self, target): - self._update_routing_table( - target, - get_hosts=self.matchmaker.get_hosts_fanout, - get_hosts_retry=self.matchmaker.get_hosts_fanout_retry) - return self.routable_hosts.get(str(target), []) + def get_round_robin_host(self, target): + target_key = zmq_address.target_to_key( + target, zmq_names.socket_type_str(self.listener_type)) - def get_routable_host(self, target): - self._update_routing_table( - target, - get_hosts=self.matchmaker.get_hosts, - get_hosts_retry=self.matchmaker.get_hosts_retry) - hosts_for_target = self.routable_hosts.get(str(target)) - if not hosts_for_target: - # Matchmaker doesn't contain any target - return None - host = hosts_for_target.pop(0) - if not hosts_for_target: - self._renew_routable_hosts(target) + LOG.debug("Processing target %s for round-robin." % target_key) + + if target_key not in self.round_robin_targets: + LOG.debug("Target %s is not in cache. Check matchmaker server." + % target_key) + hosts = self.matchmaker.get_hosts_retry( + target, zmq_names.socket_type_str(self.listener_type)) + LOG.debug("Received hosts %s" % hosts) + self.routing_table.update_hosts(target_key, hosts) + self.round_robin_targets[target_key] = \ + self.routing_table.get_hosts_round_robin(target_key) + + rr_gen = self.round_robin_targets[target_key] + host = next(rr_gen) + LOG.debug("Host resolved for the current connection is %s" % host) return host - def _is_tm_expired(self, tm): - return 0 <= self.conf.oslo_messaging_zmq.zmq_target_expire \ - <= time.time() - tm + def get_fanout_hosts(self, target): + target_key = zmq_address.target_to_key( + target, zmq_names.socket_type_str(self.listener_type)) - def _update_routing_table(self, target, get_hosts, get_hosts_retry): - routing_record = self.routing_table.get(str(target)) - if routing_record is None: - self._fetch_hosts(target, get_hosts, get_hosts_retry) - self._renew_routable_hosts(target) - elif self._is_tm_expired(routing_record[1]): - self._fetch_hosts(target, get_hosts, get_hosts_retry) + LOG.debug("Processing target %s for fanout." % target_key) - def _fetch_hosts(self, target, get_hosts, get_hosts_retry): - key = str(target) - if key not in self.routing_table: - try: - hosts = get_hosts_retry( - target, zmq_names.socket_type_str(zmq.DEALER)) - self.routing_table[key] = (hosts, time.time()) - except retrying.RetryError: - LOG.warning(_LW("Matchmaker contains no hosts for target %s") - % key) + if not self.routing_table.contains(target_key): + LOG.debug("Target %s is not in cache. Check matchmaker server." + % target_key) + hosts = self.matchmaker.get_hosts_fanout_retry( + target, zmq_names.socket_type_str(self.listener_type)) + LOG.debug("Received hosts %s" % hosts) + self.routing_table.update_hosts(target_key, hosts) else: - try: - hosts = get_hosts( - target, zmq_names.socket_type_str(zmq.DEALER)) - self.routing_table[key] = (hosts, time.time()) - except zmq_matchmaker_base.MatchmakerUnavailable: - LOG.warning(_LW("Matchmaker contains no hosts for target %s") - % key) + LOG.debug("Target %s has been found in cache." % target_key) + return self.routing_table.get_hosts_fanout(target_key) + + def cleanup(self): + self.routing_table_updater.cleanup() + + +class RoutingTable(object): + + def __init__(self, conf): + self.conf = conf + self.targets = {} + self._lock = threading.Lock() + + def register(self, target_key, host): + with self._lock: + if target_key in self.targets: + hosts, tm = self.targets[target_key] + if host not in hosts: + hosts.add(host) + self.targets[target_key] = (hosts, self._create_tm()) + else: + self.targets[target_key] = ({host}, self._create_tm()) + + def get_targets(self): + with self._lock: + return list(self.targets.keys()) + + def unregister(self, target_key, host): + with self._lock: + hosts, tm = self.targets.get(target_key) + if hosts and host in hosts: + hosts.discard(host) + self.targets[target_key] = (hosts, self._create_tm()) + + def update_hosts(self, target_key, hosts_updated): + with self._lock: + if target_key in self.targets and not hosts_updated: + self.targets.pop(target_key) + return + hosts_current, _ = self.targets.get(target_key, (set(), None)) + hosts_updated = set(hosts_updated) + has_differences = hosts_updated ^ hosts_current + if has_differences: + self.targets[target_key] = (hosts_updated, self._create_tm()) + + def get_hosts_round_robin(self, target_key): + while self._contains_hosts(target_key): + for host in self._get_hosts_rr(target_key): + yield host + + def get_hosts_fanout(self, target_key): + hosts, _ = self._get_hosts(target_key) + for host in hosts: + yield host + + def contains(self, target_key): + with self._lock: + return target_key in self.targets + + def _get_hosts(self, target_key): + with self._lock: + hosts, tm = self.targets.get(target_key, ([], None)) + hosts = list(hosts) + return hosts, tm + + def _get_tm(self, target_key): + with self._lock: + _, tm = self.targets.get(target_key) + return tm + + def _contains_hosts(self, target_key): + with self._lock: + return target_key in self.targets + + def _is_target_changed(self, target_key, tm_orig): + return self._get_tm(target_key) != tm_orig + + @staticmethod + def _create_tm(): + return time.time() + + def _get_hosts_rr(self, target_key): + hosts, tm_original = self._get_hosts(target_key) + for host in itertools.cycle(hosts): + if self._is_target_changed(target_key, tm_original): + raise StopIteration() + yield host + + +class RoutingTableUpdater(zmq_updater.UpdaterBase): + + def __init__(self, conf, matchmaker, routing_table): + self.routing_table = routing_table + super(RoutingTableUpdater, self).__init__( + conf, matchmaker, self._update_routing_table, + conf.oslo_messaging_zmq.zmq_target_update) + + def _update_routing_table(self): + target_keys = self.routing_table.get_targets() - def _renew_routable_hosts(self, target): - key = str(target) try: - hosts, _ = self.routing_table[key] - self.routable_hosts[key] = list(hosts) - except KeyError: - self.routable_hosts[key] = [] + for target_key in target_keys: + hosts = self.matchmaker.get_hosts_by_key(target_key) + if not hosts: + LOG.warning(_LW("Target %s has been removed") % target_key) + else: + self.routing_table.update_hosts(target_key, hosts) + LOG.debug("Updating routing table from the matchmaker. " + "%d target(s) updated %s." % (len(target_keys), + target_keys)) + except zmq_matchmaker_base.MatchmakerUnavailable: + LOG.warning(_LW("Not updated. Matchmaker was not available.")) diff --git a/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py index 6ad89fdf1..7223442ce 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py +++ b/oslo_messaging/_drivers/zmq_driver/client/zmq_sockets_manager.py @@ -45,26 +45,29 @@ class SocketsManager(object): def _key_from_target(target): return target.topic if target.fanout else str(target) + def _get_hosts_and_track(self, socket, target): + self._get_hosts_and_connect(socket, target) + self._track_socket(socket, target) + def _get_hosts_and_connect(self, socket, target): get_hosts = self.get_hosts_fanout if target.fanout else self.get_hosts hosts = get_hosts(target) - self._connect_to_hosts(socket, target, hosts) + self._connect_to_hosts(socket, hosts) def _track_socket(self, socket, target): key = self._key_from_target(target) self.outbound_sockets[key] = (socket, time.time()) - def _connect_to_hosts(self, socket, target, hosts): + def _connect_to_hosts(self, socket, hosts): for host in hosts: socket.connect_to_host(host) - self._track_socket(socket, target) def _check_for_new_hosts(self, target): key = self._key_from_target(target) socket, tm = self.outbound_sockets[key] if 0 <= self.conf.oslo_messaging_zmq.zmq_target_expire \ <= time.time() - tm: - self._get_hosts_and_connect(socket, target) + self._get_hosts_and_track(socket, target) return socket def get_socket(self, target): @@ -74,7 +77,7 @@ class SocketsManager(object): else: socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context, self.socket_type, immediate=False) - self._get_hosts_and_connect(socket, target) + self._get_hosts_and_track(socket, target) return socket def get_socket_to_publishers(self, identity=None): diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py index 1681d5491..a1aedf296 100644 --- a/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py +++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/zmq_matchmaker_redis.py @@ -195,6 +195,10 @@ class MatchmakerRedis(zmq_matchmaker_base.MatchmakerBase): def get_routers(self): return self._get_hosts_by_key(_ROUTERS_KEY) + @redis_connection_warn + def get_hosts_by_key(self, key): + return self._get_hosts_by_key(key) + def _get_hosts_by_key(self, key): return self._redis.smembers(key) diff --git a/oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_central_proxy.py b/oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_central_proxy.py index cf86e5c73..3eecca4df 100644 --- a/oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_central_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/proxy/central/zmq_central_proxy.py @@ -156,8 +156,9 @@ class RouterUpdater(zmq_updater.UpdaterBase): self.publisher_address = publisher_address self.fe_router_address = fe_router_address self.be_router_address = be_router_address - super(RouterUpdater, self).__init__(conf, matchmaker, - self._update_records) + super(RouterUpdater, self).__init__( + conf, matchmaker, self._update_records, + conf.oslo_messaging_zmq.zmq_target_update) def _update_records(self): self.matchmaker.register_publisher( diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py index 5e6381dd5..0b9fe6200 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py @@ -112,8 +112,17 @@ class TargetUpdater(zmq_updater.UpdaterBase): self.target = target self.host = host self.socket_type = socket_type - super(TargetUpdater, self).__init__(conf, matchmaker, - self._update_target) + self.conf = conf + self.matchmaker = matchmaker + self._sleep_for = conf.oslo_messaging_zmq.zmq_target_update + + # NOTE(ozamiatin): Update target immediately not waiting + # for background executor to initialize. + self._update_target() + + super(TargetUpdater, self).__init__( + conf, matchmaker, self._update_target, + conf.oslo_messaging_zmq.zmq_target_update) def _update_target(self): try: diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_options.py b/oslo_messaging/_drivers/zmq_driver/zmq_options.py index 8ab87de46..5bbe02ec6 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_options.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_options.py @@ -87,7 +87,7 @@ zmq_opts = [ help='Use PUB/SUB pattern for fanout methods. ' 'PUB/SUB always uses proxy.'), - cfg.BoolOpt('use_router_proxy', default=True, + cfg.BoolOpt('use_router_proxy', default=False, deprecated_group='DEFAULT', help='Use ROUTER remote proxy.'), diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py index 7f0fcc21f..6a6a4c38b 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py @@ -21,7 +21,7 @@ from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names -from oslo_messaging._i18n import _LE, _LI +from oslo_messaging._i18n import _LE from oslo_messaging import exceptions from oslo_serialization.serializer import json_serializer from oslo_serialization.serializer import msgpack_serializer @@ -174,7 +174,9 @@ class ZmqSocket(object): return obj def close(self, *args, **kwargs): + identity = self.handle.identity self.handle.close(*args, **kwargs) + LOG.debug("Socket %s closed" % identity) def connect_to_address(self, address): if address in self.connections: @@ -182,8 +184,8 @@ class ZmqSocket(object): stype = zmq_names.socket_type_str(self.socket_type) sid = self.handle.identity try: - LOG.info(_LI("Connecting %(stype)s socket %(sid)s to %(address)s"), - {"stype": stype, "sid": sid, "address": address}) + LOG.debug("Connecting %(stype)s socket %(sid)s to %(address)s", + {"stype": stype, "sid": sid, "address": address}) self.connect(address) except zmq.ZMQError as e: LOG.error(_LE("Failed connecting %(stype)s-%(sid)s to " diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_updater.py b/oslo_messaging/_drivers/zmq_driver/zmq_updater.py index 8ce53c73a..886952d23 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_updater.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_updater.py @@ -27,12 +27,11 @@ zmq = zmq_async.import_zmq() class UpdaterBase(object): - def __init__(self, conf, matchmaker, update_method): + def __init__(self, conf, matchmaker, update_method, sleep_for): self.conf = conf self.matchmaker = matchmaker self.update_method = update_method - self._sleep_for = self.conf.oslo_messaging_zmq.zmq_target_update - self.update_method() + self._sleep_for = sleep_for self.executor = zmq_async.get_executor(method=self._update_loop) self.executor.execute() @@ -53,7 +52,8 @@ class ConnectionUpdater(UpdaterBase): def __init__(self, conf, matchmaker, socket): self.socket = socket super(ConnectionUpdater, self).__init__( - conf, matchmaker, self._update_connection) + conf, matchmaker, self._update_connection, + conf.oslo_messaging_zmq.zmq_target_update) @abc.abstractmethod def _update_connection(self): diff --git a/oslo_messaging/tests/drivers/zmq/test_routing_table.py b/oslo_messaging/tests/drivers/zmq/test_routing_table.py new file mode 100644 index 000000000..508a161e4 --- /dev/null +++ b/oslo_messaging/tests/drivers/zmq/test_routing_table.py @@ -0,0 +1,80 @@ +# Copyright 2016 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_messaging._drivers.zmq_driver.client import zmq_routing_table +from oslo_messaging._drivers.zmq_driver import zmq_async +from oslo_messaging.tests import utils as test_utils + + +zmq = zmq_async.import_zmq() + + +class TestRoutingTable(test_utils.BaseTestCase): + + def setUp(self): + super(TestRoutingTable, self).setUp() + + def test_get_next_while_origin_changed(self): + table = zmq_routing_table.RoutingTable(self.conf) + table.register("topic1.server1", "1") + table.register("topic1.server1", "2") + table.register("topic1.server1", "3") + + rr_gen = table.get_hosts_round_robin("topic1.server1") + + result = [] + for i in range(3): + result.append(next(rr_gen)) + + self.assertEqual(3, len(result)) + self.assertIn("1", result) + self.assertIn("2", result) + self.assertIn("3", result) + + table.register("topic1.server1", "4") + table.register("topic1.server1", "5") + table.register("topic1.server1", "6") + + result = [] + for i in range(6): + result.append(next(rr_gen)) + + self.assertEqual(6, len(result)) + self.assertIn("1", result) + self.assertIn("2", result) + self.assertIn("3", result) + self.assertIn("4", result) + self.assertIn("5", result) + self.assertIn("6", result) + + def test_no_targets(self): + table = zmq_routing_table.RoutingTable(self.conf) + rr_gen = table.get_hosts_round_robin("topic1.server1") + + result = [] + for t in rr_gen: + result.append(t) + self.assertEqual(0, len(result)) + + def test_target_unchanged(self): + table = zmq_routing_table.RoutingTable(self.conf) + table.register("topic1.server1", "1") + + rr_gen = table.get_hosts_round_robin("topic1.server1") + + result = [] + for i in range(3): + result.append(next(rr_gen)) + + self.assertEqual(["1", "1", "1"], result) diff --git a/oslo_messaging/tests/functional/test_functional.py b/oslo_messaging/tests/functional/test_functional.py index e1e90205d..9851a4c7c 100644 --- a/oslo_messaging/tests/functional/test_functional.py +++ b/oslo_messaging/tests/functional/test_functional.py @@ -165,6 +165,7 @@ class CastTestCase(utils.SkipIfNoTransportURL): client.append(text='stack') client.add(increment=2) client.add(increment=10) + time.sleep(0.3) client.sync() group.sync(1) @@ -205,6 +206,7 @@ class CastTestCase(utils.SkipIfNoTransportURL): client.append(text='stack') client.add(increment=2) client.add(increment=10) + time.sleep(0.3) client.sync() group.sync(server='all') for s in group.servers: diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py index 21e4582b4..7df5bb849 100644 --- a/oslo_messaging/tests/functional/utils.py +++ b/oslo_messaging/tests/functional/utils.py @@ -177,6 +177,7 @@ class RpcServerGroupFixture(fixtures.Fixture): if server is None: for i in range(len(self.servers)): self.client(i).ping() + time.sleep(0.3) else: if server == 'all': for s in self.servers: