diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index 21118d790..3829fa5e4 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -72,10 +72,14 @@ zmq_opts = [ help='The default number of seconds that poll should wait. ' 'Poll raises timeout exception when timeout expired.'), - cfg.IntOpt('zmq_target_expire', default=120, + cfg.IntOpt('zmq_target_expire', default=300, help='Expiration timeout in seconds of a name service record ' 'about existing target ( < 0 means no timeout).'), + cfg.IntOpt('zmq_target_update', default=180, + help='Update period in seconds of a name service record ' + 'about existing target.'), + cfg.BoolOpt('use_pub_sub', default=True, help='Use PUB/SUB pattern for fanout methods. ' 'PUB/SUB always uses proxy.'), diff --git a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py index 9ed07370a..c75ff4e1f 100644 --- a/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/broker/zmq_queue_proxy.py @@ -22,6 +22,7 @@ from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_socket +from oslo_messaging._drivers.zmq_driver import zmq_updater from oslo_messaging._i18n import _LI zmq = zmq_async.import_zmq() @@ -55,14 +56,9 @@ class UniversalQueueProxy(object): self.pub_publisher = zmq_pub_publisher.PubPublisherProxy( conf, matchmaker) - self.matchmaker.register_publisher( - (self.pub_publisher.host, self.fe_router_address)) - LOG.info(_LI("[PUB:%(pub)s, ROUTER:%(router)s] Run PUB publisher"), - {"pub": self.pub_publisher.host, - "router": self.fe_router_address}) - self.matchmaker.register_router(self.be_router_address) - LOG.info(_LI("[Backend ROUTER:%(router)s] Run ROUTER"), - {"router": self.be_router_address}) + self._router_updater = RouterUpdater( + conf, matchmaker, self.pub_publisher.host, self.fe_router_address, + self.be_router_address) def run(self): message, socket = self.poller.poll() @@ -106,7 +102,7 @@ class UniversalQueueProxy(object): socket.send(b'', zmq.SNDMORE) socket.send(reply_id, zmq.SNDMORE) socket.send(six.b(str(message_type)), zmq.SNDMORE) - LOG.debug("Redirecting message %s" % message_id) + LOG.debug("Dispatching message %s" % message_id) socket.send_multipart(multipart_message) def cleanup(self): @@ -116,3 +112,29 @@ class UniversalQueueProxy(object): self.matchmaker.unregister_publisher( (self.pub_publisher.host, self.fe_router_address)) self.matchmaker.unregister_router(self.be_router_address) + + +class RouterUpdater(zmq_updater.UpdaterBase): + """This entity performs periodic async updates + from router proxy to the matchmaker. + """ + + def __init__(self, conf, matchmaker, publisher_address, fe_router_address, + be_router_address): + self.publisher_address = publisher_address + self.fe_router_address = fe_router_address + self.be_router_address = be_router_address + super(RouterUpdater, self).__init__(conf, matchmaker, + self._update_records) + + def _update_records(self): + self.matchmaker.register_publisher( + (self.publisher_address, self.fe_router_address), + expire=self.conf.zmq_target_expire) + LOG.info(_LI("[PUB:%(pub)s, ROUTER:%(router)s] Update PUB publisher"), + {"pub": self.publisher_address, + "router": self.fe_router_address}) + self.matchmaker.register_router(self.be_router_address, + expire=self.conf.zmq_target_expire) + LOG.info(_LI("[Backend ROUTER:%(router)s] Update ROUTER"), + {"router": self.be_router_address}) diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py index 5cba7820a..e446cde21 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher_proxy.py @@ -25,6 +25,7 @@ from oslo_messaging._drivers.zmq_driver.client.publishers \ from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names +from oslo_messaging._drivers.zmq_driver import zmq_updater zmq = zmq_async.import_zmq() @@ -40,6 +41,8 @@ class DealerPublisherProxy(object): conf, matchmaker, zmq.ROUTER, zmq.DEALER) self.socket = socket_to_proxy self.routing_table = RoutingTable(conf, matchmaker) + self.connection_updater = PublisherConnectionUpdater( + conf, matchmaker, self.socket) def send_request(self, request): if request.msg_type == zmq_names.CALL_TYPE: @@ -92,6 +95,8 @@ class CallSenderProxy(zmq_dealer_call_publisher.CallSender): self.socket = self.outbound_sockets.get_socket_to_publishers() self.reply_waiter.poll_socket(self.socket) self.routing_table = RoutingTable(conf, matchmaker) + self.connection_updater = PublisherConnectionUpdater( + conf, matchmaker, self.socket) def _connect_socket(self, target): return self.socket @@ -170,3 +175,11 @@ class RoutingTable(object): def _renew_routable_hosts(self, target): hosts, _ = self.routing_table[str(target)] self.routable_hosts[str(target)] = list(hosts) + + +class PublisherConnectionUpdater(zmq_updater.ConnectionUpdater): + + def _update_connection(self): + publishers = self.matchmaker.get_publishers() + for pub_address, router_address in publishers: + self.socket.connect_to_host(router_address) diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py index c73cb872a..5f36055f9 100644 --- a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py +++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py @@ -138,9 +138,14 @@ class RedisMatchMaker(base.MatchMakerBase): "port": self.conf.matchmaker_redis.port, "password": self.conf.matchmaker_redis.password} - def register_publisher(self, hostname): + def _add_key_with_expire(self, key, value, expire): + self._redis.sadd(key, value) + if expire > 0: + self._redis.expire(key, expire) + + def register_publisher(self, hostname, expire=-1): host_str = ",".join(hostname) - self._redis.sadd(_PUBLISHERS_KEY, host_str) + self._add_key_with_expire(_PUBLISHERS_KEY, host_str, expire) def unregister_publisher(self, hostname): host_str = ",".join(hostname) @@ -153,8 +158,8 @@ class RedisMatchMaker(base.MatchMakerBase): self._get_hosts_by_key(_PUBLISHERS_KEY)]) return hosts - def register_router(self, hostname): - self._redis.sadd(_ROUTERS_KEY, hostname) + def register_router(self, hostname, expire=-1): + self._add_key_with_expire(_ROUTERS_KEY, hostname, expire) def unregister_router(self, hostname): self._redis.srem(_ROUTERS_KEY, hostname) @@ -167,22 +172,22 @@ class RedisMatchMaker(base.MatchMakerBase): def register(self, target, hostname, listener_type, expire=-1): - def register_key(key): - self._redis.sadd(key, hostname) - if expire > 0: - self._redis.expire(key, expire) - if target.topic and target.server: key = zmq_address.target_to_key(target, listener_type) - register_key(key) + self._add_key_with_expire(key, hostname, expire) if target.topic: key = zmq_address.prefix_str(target.topic, listener_type) - register_key(key) + self._add_key_with_expire(key, hostname, expire) def unregister(self, target, hostname, listener_type): - key = zmq_address.target_to_key(target, listener_type) - self._redis.srem(key, hostname) + if target.topic and target.server: + key = zmq_address.target_to_key(target, listener_type) + self._redis.srem(key, hostname) + + if target.topic: + key = zmq_address.prefix_str(target.topic, listener_type) + self._redis.srem(key, hostname) def get_hosts(self, target, listener_type): LOG.debug("[Redis] get_hosts for target %s", target) diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py index 86dddee61..69c6958db 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py @@ -14,7 +14,6 @@ import abc import logging -import time import six @@ -23,6 +22,7 @@ from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names from oslo_messaging._drivers.zmq_driver import zmq_socket +from oslo_messaging._drivers.zmq_driver import zmq_updater from oslo_messaging._i18n import _LE LOG = logging.getLogger(__name__) @@ -60,8 +60,8 @@ class SingleSocketConsumer(ConsumerBase): self.socket_type = socket_type self.host = None self.socket = self.subscribe_socket(socket_type) - self.target_updater = TargetUpdater(conf, self.matchmaker, self.target, - self.host, socket_type) + self.target_updater = TargetUpdater( + conf, self.matchmaker, self.target, self.host, socket_type) def subscribe_socket(self, socket_type): try: @@ -96,25 +96,20 @@ class SingleSocketConsumer(ConsumerBase): super(SingleSocketConsumer, self).cleanup() -class TargetUpdater(object): +class TargetUpdater(zmq_updater.UpdaterBase): """This entity performs periodic async updates to the matchmaker. """ def __init__(self, conf, matchmaker, target, host, socket_type): - self.conf = conf - self.matchmaker = matchmaker self.target = target self.host = host self.socket_type = socket_type - self.executor = zmq_async.get_executor(method=self._update_target) - self.executor.execute() + super(TargetUpdater, self).__init__(conf, matchmaker, + self._update_target) def _update_target(self): self.matchmaker.register( self.target, self.host, - zmq_names.socket_type_str(self.socket_type)) - time.sleep(self.conf.zmq_target_expire / 2) - - def cleanup(self): - self.executor.stop() + zmq_names.socket_type_str(self.socket_type), + expire=self.conf.zmq_target_expire) diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py index f1f5d6018..f0fd11177 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_dealer_consumer.py @@ -25,6 +25,7 @@ from oslo_messaging._drivers.zmq_driver.server.consumers\ import zmq_consumer_base from oslo_messaging._drivers.zmq_driver import zmq_async from oslo_messaging._drivers.zmq_driver import zmq_names +from oslo_messaging._drivers.zmq_driver import zmq_updater from oslo_messaging._i18n import _LE, _LI LOG = logging.getLogger(__name__) @@ -90,6 +91,8 @@ class DealerConsumer(zmq_consumer_base.ConsumerBase): self.target_updater = zmq_consumer_base.TargetUpdater( conf, self.matchmaker, self.target, self.host, zmq.DEALER) + self.connection_updater = ConsumerConnectionUpdater( + conf, self.matchmaker, self.socket) LOG.info(_LI("[%s] Run DEALER consumer"), self.host) def receive_message(self, socket): @@ -111,6 +114,19 @@ class DealerConsumer(zmq_consumer_base.ConsumerBase): else: LOG.error(_LE("Unknown message type: %s"), zmq_names.message_type_str(message_type)) - except (zmq.ZMQError, AssertionError) as e: LOG.error(_LE("Receiving message failure: %s"), str(e)) + + def cleanup(self): + LOG.info(_LI("[%s] Destroy DEALER consumer"), self.host) + super(DealerConsumer, self).cleanup() + self.matchmaker.unregister(self.target, self.host, + zmq_names.socket_type_str(zmq.DEALER)) + + +class ConsumerConnectionUpdater(zmq_updater.ConnectionUpdater): + + def _update_connection(self): + routers = self.matchmaker.get_routers() + for router_address in routers: + self.socket.connect_to_host(router_address) diff --git a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py index 254c6e5ed..c963c452f 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py +++ b/oslo_messaging/_drivers/zmq_driver/server/zmq_server.py @@ -48,11 +48,11 @@ class ZmqServer(base.PollStyleListener): conf, self.poller, self) if conf.use_pub_sub else None self.consumers = [] - if self.router_consumer: + if self.router_consumer is not None: self.consumers.append(self.router_consumer) - if self.dealer_consumer: + if self.dealer_consumer is not None: self.consumers.append(self.dealer_consumer) - if self.sub_consumer: + if self.sub_consumer is not None: self.consumers.append(self.sub_consumer) @base.batch_poll_helper diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py index 2ca816b41..a97343e7f 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py @@ -96,6 +96,8 @@ class ZmqSocket(object): self.handle.close(*args, **kwargs) def connect_to_address(self, address): + if address in self.connections: + return stype = zmq_names.socket_type_str(self.socket_type) try: LOG.info(_LI("Connecting %(stype)s id %(id)s to %(address)s"), diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_updater.py b/oslo_messaging/_drivers/zmq_driver/zmq_updater.py new file mode 100644 index 000000000..0b4594a33 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/zmq_updater.py @@ -0,0 +1,55 @@ +# Copyright 2016 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import logging +import time + +import six + +from oslo_messaging._drivers.zmq_driver import zmq_async + +LOG = logging.getLogger(__name__) + +zmq = zmq_async.import_zmq() + + +class UpdaterBase(object): + + def __init__(self, conf, matchmaker, update_method): + self.conf = conf + self.matchmaker = matchmaker + self.update_method = update_method + self.executor = zmq_async.get_executor(method=self._update_loop) + self.executor.execute() + + def _update_loop(self): + self.update_method() + time.sleep(self.conf.zmq_target_update) + + def cleanup(self): + self.executor.stop() + + +@six.add_metaclass(abc.ABCMeta) +class ConnectionUpdater(UpdaterBase): + + def __init__(self, conf, matchmaker, socket): + self.socket = socket + super(ConnectionUpdater, self).__init__( + conf, matchmaker, self._update_connection) + + @abc.abstractmethod + def _update_connection(self): + """Update connection info"""