Merge "[zmq] Periodic updates of endpoints connections"
This commit is contained in:
commit
01338c940a
@ -72,10 +72,14 @@ zmq_opts = [
|
||||
help='The default number of seconds that poll should wait. '
|
||||
'Poll raises timeout exception when timeout expired.'),
|
||||
|
||||
cfg.IntOpt('zmq_target_expire', default=120,
|
||||
cfg.IntOpt('zmq_target_expire', default=300,
|
||||
help='Expiration timeout in seconds of a name service record '
|
||||
'about existing target ( < 0 means no timeout).'),
|
||||
|
||||
cfg.IntOpt('zmq_target_update', default=180,
|
||||
help='Update period in seconds of a name service record '
|
||||
'about existing target.'),
|
||||
|
||||
cfg.BoolOpt('use_pub_sub', default=True,
|
||||
help='Use PUB/SUB pattern for fanout methods. '
|
||||
'PUB/SUB always uses proxy.'),
|
||||
|
@ -22,6 +22,7 @@ from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_socket
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_updater
|
||||
from oslo_messaging._i18n import _LI
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
@ -55,14 +56,9 @@ class UniversalQueueProxy(object):
|
||||
self.pub_publisher = zmq_pub_publisher.PubPublisherProxy(
|
||||
conf, matchmaker)
|
||||
|
||||
self.matchmaker.register_publisher(
|
||||
(self.pub_publisher.host, self.fe_router_address))
|
||||
LOG.info(_LI("[PUB:%(pub)s, ROUTER:%(router)s] Run PUB publisher"),
|
||||
{"pub": self.pub_publisher.host,
|
||||
"router": self.fe_router_address})
|
||||
self.matchmaker.register_router(self.be_router_address)
|
||||
LOG.info(_LI("[Backend ROUTER:%(router)s] Run ROUTER"),
|
||||
{"router": self.be_router_address})
|
||||
self._router_updater = RouterUpdater(
|
||||
conf, matchmaker, self.pub_publisher.host, self.fe_router_address,
|
||||
self.be_router_address)
|
||||
|
||||
def run(self):
|
||||
message, socket = self.poller.poll()
|
||||
@ -106,7 +102,7 @@ class UniversalQueueProxy(object):
|
||||
socket.send(b'', zmq.SNDMORE)
|
||||
socket.send(reply_id, zmq.SNDMORE)
|
||||
socket.send(six.b(str(message_type)), zmq.SNDMORE)
|
||||
LOG.debug("Redirecting message %s" % message_id)
|
||||
LOG.debug("Dispatching message %s" % message_id)
|
||||
socket.send_multipart(multipart_message)
|
||||
|
||||
def cleanup(self):
|
||||
@ -116,3 +112,29 @@ class UniversalQueueProxy(object):
|
||||
self.matchmaker.unregister_publisher(
|
||||
(self.pub_publisher.host, self.fe_router_address))
|
||||
self.matchmaker.unregister_router(self.be_router_address)
|
||||
|
||||
|
||||
class RouterUpdater(zmq_updater.UpdaterBase):
|
||||
"""This entity performs periodic async updates
|
||||
from router proxy to the matchmaker.
|
||||
"""
|
||||
|
||||
def __init__(self, conf, matchmaker, publisher_address, fe_router_address,
|
||||
be_router_address):
|
||||
self.publisher_address = publisher_address
|
||||
self.fe_router_address = fe_router_address
|
||||
self.be_router_address = be_router_address
|
||||
super(RouterUpdater, self).__init__(conf, matchmaker,
|
||||
self._update_records)
|
||||
|
||||
def _update_records(self):
|
||||
self.matchmaker.register_publisher(
|
||||
(self.publisher_address, self.fe_router_address),
|
||||
expire=self.conf.zmq_target_expire)
|
||||
LOG.info(_LI("[PUB:%(pub)s, ROUTER:%(router)s] Update PUB publisher"),
|
||||
{"pub": self.publisher_address,
|
||||
"router": self.fe_router_address})
|
||||
self.matchmaker.register_router(self.be_router_address,
|
||||
expire=self.conf.zmq_target_expire)
|
||||
LOG.info(_LI("[Backend ROUTER:%(router)s] Update ROUTER"),
|
||||
{"router": self.be_router_address})
|
||||
|
@ -25,6 +25,7 @@ from oslo_messaging._drivers.zmq_driver.client.publishers \
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_updater
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
@ -40,6 +41,8 @@ class DealerPublisherProxy(object):
|
||||
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
|
||||
self.socket = socket_to_proxy
|
||||
self.routing_table = RoutingTable(conf, matchmaker)
|
||||
self.connection_updater = PublisherConnectionUpdater(
|
||||
conf, matchmaker, self.socket)
|
||||
|
||||
def send_request(self, request):
|
||||
if request.msg_type == zmq_names.CALL_TYPE:
|
||||
@ -92,6 +95,8 @@ class CallSenderProxy(zmq_dealer_call_publisher.CallSender):
|
||||
self.socket = self.outbound_sockets.get_socket_to_publishers()
|
||||
self.reply_waiter.poll_socket(self.socket)
|
||||
self.routing_table = RoutingTable(conf, matchmaker)
|
||||
self.connection_updater = PublisherConnectionUpdater(
|
||||
conf, matchmaker, self.socket)
|
||||
|
||||
def _connect_socket(self, target):
|
||||
return self.socket
|
||||
@ -170,3 +175,11 @@ class RoutingTable(object):
|
||||
def _renew_routable_hosts(self, target):
|
||||
hosts, _ = self.routing_table[str(target)]
|
||||
self.routable_hosts[str(target)] = list(hosts)
|
||||
|
||||
|
||||
class PublisherConnectionUpdater(zmq_updater.ConnectionUpdater):
|
||||
|
||||
def _update_connection(self):
|
||||
publishers = self.matchmaker.get_publishers()
|
||||
for pub_address, router_address in publishers:
|
||||
self.socket.connect_to_host(router_address)
|
||||
|
@ -138,9 +138,14 @@ class RedisMatchMaker(base.MatchMakerBase):
|
||||
"port": self.conf.matchmaker_redis.port,
|
||||
"password": self.conf.matchmaker_redis.password}
|
||||
|
||||
def register_publisher(self, hostname):
|
||||
def _add_key_with_expire(self, key, value, expire):
|
||||
self._redis.sadd(key, value)
|
||||
if expire > 0:
|
||||
self._redis.expire(key, expire)
|
||||
|
||||
def register_publisher(self, hostname, expire=-1):
|
||||
host_str = ",".join(hostname)
|
||||
self._redis.sadd(_PUBLISHERS_KEY, host_str)
|
||||
self._add_key_with_expire(_PUBLISHERS_KEY, host_str, expire)
|
||||
|
||||
def unregister_publisher(self, hostname):
|
||||
host_str = ",".join(hostname)
|
||||
@ -153,8 +158,8 @@ class RedisMatchMaker(base.MatchMakerBase):
|
||||
self._get_hosts_by_key(_PUBLISHERS_KEY)])
|
||||
return hosts
|
||||
|
||||
def register_router(self, hostname):
|
||||
self._redis.sadd(_ROUTERS_KEY, hostname)
|
||||
def register_router(self, hostname, expire=-1):
|
||||
self._add_key_with_expire(_ROUTERS_KEY, hostname, expire)
|
||||
|
||||
def unregister_router(self, hostname):
|
||||
self._redis.srem(_ROUTERS_KEY, hostname)
|
||||
@ -167,22 +172,22 @@ class RedisMatchMaker(base.MatchMakerBase):
|
||||
|
||||
def register(self, target, hostname, listener_type, expire=-1):
|
||||
|
||||
def register_key(key):
|
||||
self._redis.sadd(key, hostname)
|
||||
if expire > 0:
|
||||
self._redis.expire(key, expire)
|
||||
|
||||
if target.topic and target.server:
|
||||
key = zmq_address.target_to_key(target, listener_type)
|
||||
register_key(key)
|
||||
self._add_key_with_expire(key, hostname, expire)
|
||||
|
||||
if target.topic:
|
||||
key = zmq_address.prefix_str(target.topic, listener_type)
|
||||
register_key(key)
|
||||
self._add_key_with_expire(key, hostname, expire)
|
||||
|
||||
def unregister(self, target, hostname, listener_type):
|
||||
key = zmq_address.target_to_key(target, listener_type)
|
||||
self._redis.srem(key, hostname)
|
||||
if target.topic and target.server:
|
||||
key = zmq_address.target_to_key(target, listener_type)
|
||||
self._redis.srem(key, hostname)
|
||||
|
||||
if target.topic:
|
||||
key = zmq_address.prefix_str(target.topic, listener_type)
|
||||
self._redis.srem(key, hostname)
|
||||
|
||||
def get_hosts(self, target, listener_type):
|
||||
LOG.debug("[Redis] get_hosts for target %s", target)
|
||||
|
@ -14,7 +14,6 @@
|
||||
|
||||
import abc
|
||||
import logging
|
||||
import time
|
||||
|
||||
import six
|
||||
|
||||
@ -23,6 +22,7 @@ from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_socket
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_updater
|
||||
from oslo_messaging._i18n import _LE
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -60,8 +60,8 @@ class SingleSocketConsumer(ConsumerBase):
|
||||
self.socket_type = socket_type
|
||||
self.host = None
|
||||
self.socket = self.subscribe_socket(socket_type)
|
||||
self.target_updater = TargetUpdater(conf, self.matchmaker, self.target,
|
||||
self.host, socket_type)
|
||||
self.target_updater = TargetUpdater(
|
||||
conf, self.matchmaker, self.target, self.host, socket_type)
|
||||
|
||||
def subscribe_socket(self, socket_type):
|
||||
try:
|
||||
@ -96,25 +96,20 @@ class SingleSocketConsumer(ConsumerBase):
|
||||
super(SingleSocketConsumer, self).cleanup()
|
||||
|
||||
|
||||
class TargetUpdater(object):
|
||||
class TargetUpdater(zmq_updater.UpdaterBase):
|
||||
"""This entity performs periodic async updates
|
||||
to the matchmaker.
|
||||
"""
|
||||
|
||||
def __init__(self, conf, matchmaker, target, host, socket_type):
|
||||
self.conf = conf
|
||||
self.matchmaker = matchmaker
|
||||
self.target = target
|
||||
self.host = host
|
||||
self.socket_type = socket_type
|
||||
self.executor = zmq_async.get_executor(method=self._update_target)
|
||||
self.executor.execute()
|
||||
super(TargetUpdater, self).__init__(conf, matchmaker,
|
||||
self._update_target)
|
||||
|
||||
def _update_target(self):
|
||||
self.matchmaker.register(
|
||||
self.target, self.host,
|
||||
zmq_names.socket_type_str(self.socket_type))
|
||||
time.sleep(self.conf.zmq_target_expire / 2)
|
||||
|
||||
def cleanup(self):
|
||||
self.executor.stop()
|
||||
zmq_names.socket_type_str(self.socket_type),
|
||||
expire=self.conf.zmq_target_expire)
|
||||
|
@ -25,6 +25,7 @@ from oslo_messaging._drivers.zmq_driver.server.consumers\
|
||||
import zmq_consumer_base
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_updater
|
||||
from oslo_messaging._i18n import _LE, _LI
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -90,6 +91,8 @@ class DealerConsumer(zmq_consumer_base.ConsumerBase):
|
||||
self.target_updater = zmq_consumer_base.TargetUpdater(
|
||||
conf, self.matchmaker, self.target, self.host,
|
||||
zmq.DEALER)
|
||||
self.connection_updater = ConsumerConnectionUpdater(
|
||||
conf, self.matchmaker, self.socket)
|
||||
LOG.info(_LI("[%s] Run DEALER consumer"), self.host)
|
||||
|
||||
def receive_message(self, socket):
|
||||
@ -111,6 +114,19 @@ class DealerConsumer(zmq_consumer_base.ConsumerBase):
|
||||
else:
|
||||
LOG.error(_LE("Unknown message type: %s"),
|
||||
zmq_names.message_type_str(message_type))
|
||||
|
||||
except (zmq.ZMQError, AssertionError) as e:
|
||||
LOG.error(_LE("Receiving message failure: %s"), str(e))
|
||||
|
||||
def cleanup(self):
|
||||
LOG.info(_LI("[%s] Destroy DEALER consumer"), self.host)
|
||||
super(DealerConsumer, self).cleanup()
|
||||
self.matchmaker.unregister(self.target, self.host,
|
||||
zmq_names.socket_type_str(zmq.DEALER))
|
||||
|
||||
|
||||
class ConsumerConnectionUpdater(zmq_updater.ConnectionUpdater):
|
||||
|
||||
def _update_connection(self):
|
||||
routers = self.matchmaker.get_routers()
|
||||
for router_address in routers:
|
||||
self.socket.connect_to_host(router_address)
|
||||
|
@ -48,11 +48,11 @@ class ZmqServer(base.PollStyleListener):
|
||||
conf, self.poller, self) if conf.use_pub_sub else None
|
||||
|
||||
self.consumers = []
|
||||
if self.router_consumer:
|
||||
if self.router_consumer is not None:
|
||||
self.consumers.append(self.router_consumer)
|
||||
if self.dealer_consumer:
|
||||
if self.dealer_consumer is not None:
|
||||
self.consumers.append(self.dealer_consumer)
|
||||
if self.sub_consumer:
|
||||
if self.sub_consumer is not None:
|
||||
self.consumers.append(self.sub_consumer)
|
||||
|
||||
@base.batch_poll_helper
|
||||
|
@ -96,6 +96,8 @@ class ZmqSocket(object):
|
||||
self.handle.close(*args, **kwargs)
|
||||
|
||||
def connect_to_address(self, address):
|
||||
if address in self.connections:
|
||||
return
|
||||
stype = zmq_names.socket_type_str(self.socket_type)
|
||||
try:
|
||||
LOG.info(_LI("Connecting %(stype)s id %(id)s to %(address)s"),
|
||||
|
55
oslo_messaging/_drivers/zmq_driver/zmq_updater.py
Normal file
55
oslo_messaging/_drivers/zmq_driver/zmq_updater.py
Normal file
@ -0,0 +1,55 @@
|
||||
# Copyright 2016 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
import logging
|
||||
import time
|
||||
|
||||
import six
|
||||
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
zmq = zmq_async.import_zmq()
|
||||
|
||||
|
||||
class UpdaterBase(object):
|
||||
|
||||
def __init__(self, conf, matchmaker, update_method):
|
||||
self.conf = conf
|
||||
self.matchmaker = matchmaker
|
||||
self.update_method = update_method
|
||||
self.executor = zmq_async.get_executor(method=self._update_loop)
|
||||
self.executor.execute()
|
||||
|
||||
def _update_loop(self):
|
||||
self.update_method()
|
||||
time.sleep(self.conf.zmq_target_update)
|
||||
|
||||
def cleanup(self):
|
||||
self.executor.stop()
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class ConnectionUpdater(UpdaterBase):
|
||||
|
||||
def __init__(self, conf, matchmaker, socket):
|
||||
self.socket = socket
|
||||
super(ConnectionUpdater, self).__init__(
|
||||
conf, matchmaker, self._update_connection)
|
||||
|
||||
@abc.abstractmethod
|
||||
def _update_connection(self):
|
||||
"""Update connection info"""
|
Loading…
Reference in New Issue
Block a user