[zmq] Periodic updates of endpoints connections

* Connect endpoints to a newcomer router-proxy.
* Add TTL expriation to all redis keys.
* Update redis periodically from routers.

Change-Id: I0f73cc47f32f95bd4bb7e7f144adba491e94a346
This commit is contained in:
ozamiatin 2016-05-23 03:31:17 +03:00
parent a4a4a50eb7
commit 956bbb92e7
9 changed files with 152 additions and 40 deletions

View File

@ -72,10 +72,14 @@ zmq_opts = [
help='The default number of seconds that poll should wait. '
'Poll raises timeout exception when timeout expired.'),
cfg.IntOpt('zmq_target_expire', default=120,
cfg.IntOpt('zmq_target_expire', default=300,
help='Expiration timeout in seconds of a name service record '
'about existing target ( < 0 means no timeout).'),
cfg.IntOpt('zmq_target_update', default=180,
help='Update period in seconds of a name service record '
'about existing target.'),
cfg.BoolOpt('use_pub_sub', default=True,
help='Use PUB/SUB pattern for fanout methods. '
'PUB/SUB always uses proxy.'),

View File

@ -22,6 +22,7 @@ from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket
from oslo_messaging._drivers.zmq_driver import zmq_updater
from oslo_messaging._i18n import _LI
zmq = zmq_async.import_zmq()
@ -55,14 +56,9 @@ class UniversalQueueProxy(object):
self.pub_publisher = zmq_pub_publisher.PubPublisherProxy(
conf, matchmaker)
self.matchmaker.register_publisher(
(self.pub_publisher.host, self.fe_router_address))
LOG.info(_LI("[PUB:%(pub)s, ROUTER:%(router)s] Run PUB publisher"),
{"pub": self.pub_publisher.host,
"router": self.fe_router_address})
self.matchmaker.register_router(self.be_router_address)
LOG.info(_LI("[Backend ROUTER:%(router)s] Run ROUTER"),
{"router": self.be_router_address})
self._router_updater = RouterUpdater(
conf, matchmaker, self.pub_publisher.host, self.fe_router_address,
self.be_router_address)
def run(self):
message, socket = self.poller.poll()
@ -106,7 +102,7 @@ class UniversalQueueProxy(object):
socket.send(b'', zmq.SNDMORE)
socket.send(reply_id, zmq.SNDMORE)
socket.send(six.b(str(message_type)), zmq.SNDMORE)
LOG.debug("Redirecting message %s" % message_id)
LOG.debug("Dispatching message %s" % message_id)
socket.send_multipart(multipart_message)
def cleanup(self):
@ -116,3 +112,29 @@ class UniversalQueueProxy(object):
self.matchmaker.unregister_publisher(
(self.pub_publisher.host, self.fe_router_address))
self.matchmaker.unregister_router(self.be_router_address)
class RouterUpdater(zmq_updater.UpdaterBase):
"""This entity performs periodic async updates
from router proxy to the matchmaker.
"""
def __init__(self, conf, matchmaker, publisher_address, fe_router_address,
be_router_address):
self.publisher_address = publisher_address
self.fe_router_address = fe_router_address
self.be_router_address = be_router_address
super(RouterUpdater, self).__init__(conf, matchmaker,
self._update_records)
def _update_records(self):
self.matchmaker.register_publisher(
(self.publisher_address, self.fe_router_address),
expire=self.conf.zmq_target_expire)
LOG.info(_LI("[PUB:%(pub)s, ROUTER:%(router)s] Update PUB publisher"),
{"pub": self.publisher_address,
"router": self.fe_router_address})
self.matchmaker.register_router(self.be_router_address,
expire=self.conf.zmq_target_expire)
LOG.info(_LI("[Backend ROUTER:%(router)s] Update ROUTER"),
{"router": self.be_router_address})

View File

@ -25,6 +25,7 @@ from oslo_messaging._drivers.zmq_driver.client.publishers \
from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_updater
zmq = zmq_async.import_zmq()
@ -40,6 +41,8 @@ class DealerPublisherProxy(object):
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
self.socket = socket_to_proxy
self.routing_table = RoutingTable(conf, matchmaker)
self.connection_updater = PublisherConnectionUpdater(
conf, matchmaker, self.socket)
def send_request(self, request):
if request.msg_type == zmq_names.CALL_TYPE:
@ -92,6 +95,8 @@ class CallSenderProxy(zmq_dealer_call_publisher.CallSender):
self.socket = self.outbound_sockets.get_socket_to_publishers()
self.reply_waiter.poll_socket(self.socket)
self.routing_table = RoutingTable(conf, matchmaker)
self.connection_updater = PublisherConnectionUpdater(
conf, matchmaker, self.socket)
def _connect_socket(self, target):
return self.socket
@ -170,3 +175,11 @@ class RoutingTable(object):
def _renew_routable_hosts(self, target):
hosts, _ = self.routing_table[str(target)]
self.routable_hosts[str(target)] = list(hosts)
class PublisherConnectionUpdater(zmq_updater.ConnectionUpdater):
def _update_connection(self):
publishers = self.matchmaker.get_publishers()
for pub_address, router_address in publishers:
self.socket.connect_to_host(router_address)

View File

@ -138,9 +138,14 @@ class RedisMatchMaker(base.MatchMakerBase):
"port": self.conf.matchmaker_redis.port,
"password": self.conf.matchmaker_redis.password}
def register_publisher(self, hostname):
def _add_key_with_expire(self, key, value, expire):
self._redis.sadd(key, value)
if expire > 0:
self._redis.expire(key, expire)
def register_publisher(self, hostname, expire=-1):
host_str = ",".join(hostname)
self._redis.sadd(_PUBLISHERS_KEY, host_str)
self._add_key_with_expire(_PUBLISHERS_KEY, host_str, expire)
def unregister_publisher(self, hostname):
host_str = ",".join(hostname)
@ -153,8 +158,8 @@ class RedisMatchMaker(base.MatchMakerBase):
self._get_hosts_by_key(_PUBLISHERS_KEY)])
return hosts
def register_router(self, hostname):
self._redis.sadd(_ROUTERS_KEY, hostname)
def register_router(self, hostname, expire=-1):
self._add_key_with_expire(_ROUTERS_KEY, hostname, expire)
def unregister_router(self, hostname):
self._redis.srem(_ROUTERS_KEY, hostname)
@ -167,22 +172,22 @@ class RedisMatchMaker(base.MatchMakerBase):
def register(self, target, hostname, listener_type, expire=-1):
def register_key(key):
self._redis.sadd(key, hostname)
if expire > 0:
self._redis.expire(key, expire)
if target.topic and target.server:
key = zmq_address.target_to_key(target, listener_type)
register_key(key)
self._add_key_with_expire(key, hostname, expire)
if target.topic:
key = zmq_address.prefix_str(target.topic, listener_type)
register_key(key)
self._add_key_with_expire(key, hostname, expire)
def unregister(self, target, hostname, listener_type):
key = zmq_address.target_to_key(target, listener_type)
self._redis.srem(key, hostname)
if target.topic and target.server:
key = zmq_address.target_to_key(target, listener_type)
self._redis.srem(key, hostname)
if target.topic:
key = zmq_address.prefix_str(target.topic, listener_type)
self._redis.srem(key, hostname)
def get_hosts(self, target, listener_type):
LOG.debug("[Redis] get_hosts for target %s", target)

View File

@ -14,7 +14,6 @@
import abc
import logging
import time
import six
@ -23,6 +22,7 @@ from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_socket
from oslo_messaging._drivers.zmq_driver import zmq_updater
from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__)
@ -60,8 +60,8 @@ class SingleSocketConsumer(ConsumerBase):
self.socket_type = socket_type
self.host = None
self.socket = self.subscribe_socket(socket_type)
self.target_updater = TargetUpdater(conf, self.matchmaker, self.target,
self.host, socket_type)
self.target_updater = TargetUpdater(
conf, self.matchmaker, self.target, self.host, socket_type)
def subscribe_socket(self, socket_type):
try:
@ -96,25 +96,20 @@ class SingleSocketConsumer(ConsumerBase):
super(SingleSocketConsumer, self).cleanup()
class TargetUpdater(object):
class TargetUpdater(zmq_updater.UpdaterBase):
"""This entity performs periodic async updates
to the matchmaker.
"""
def __init__(self, conf, matchmaker, target, host, socket_type):
self.conf = conf
self.matchmaker = matchmaker
self.target = target
self.host = host
self.socket_type = socket_type
self.executor = zmq_async.get_executor(method=self._update_target)
self.executor.execute()
super(TargetUpdater, self).__init__(conf, matchmaker,
self._update_target)
def _update_target(self):
self.matchmaker.register(
self.target, self.host,
zmq_names.socket_type_str(self.socket_type))
time.sleep(self.conf.zmq_target_expire / 2)
def cleanup(self):
self.executor.stop()
zmq_names.socket_type_str(self.socket_type),
expire=self.conf.zmq_target_expire)

View File

@ -25,6 +25,7 @@ from oslo_messaging._drivers.zmq_driver.server.consumers\
import zmq_consumer_base
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._drivers.zmq_driver import zmq_names
from oslo_messaging._drivers.zmq_driver import zmq_updater
from oslo_messaging._i18n import _LE, _LI
LOG = logging.getLogger(__name__)
@ -90,6 +91,8 @@ class DealerConsumer(zmq_consumer_base.ConsumerBase):
self.target_updater = zmq_consumer_base.TargetUpdater(
conf, self.matchmaker, self.target, self.host,
zmq.DEALER)
self.connection_updater = ConsumerConnectionUpdater(
conf, self.matchmaker, self.socket)
LOG.info(_LI("[%s] Run DEALER consumer"), self.host)
def receive_message(self, socket):
@ -111,6 +114,19 @@ class DealerConsumer(zmq_consumer_base.ConsumerBase):
else:
LOG.error(_LE("Unknown message type: %s"),
zmq_names.message_type_str(message_type))
except (zmq.ZMQError, AssertionError) as e:
LOG.error(_LE("Receiving message failure: %s"), str(e))
def cleanup(self):
LOG.info(_LI("[%s] Destroy DEALER consumer"), self.host)
super(DealerConsumer, self).cleanup()
self.matchmaker.unregister(self.target, self.host,
zmq_names.socket_type_str(zmq.DEALER))
class ConsumerConnectionUpdater(zmq_updater.ConnectionUpdater):
def _update_connection(self):
routers = self.matchmaker.get_routers()
for router_address in routers:
self.socket.connect_to_host(router_address)

View File

@ -48,11 +48,11 @@ class ZmqServer(base.PollStyleListener):
conf, self.poller, self) if conf.use_pub_sub else None
self.consumers = []
if self.router_consumer:
if self.router_consumer is not None:
self.consumers.append(self.router_consumer)
if self.dealer_consumer:
if self.dealer_consumer is not None:
self.consumers.append(self.dealer_consumer)
if self.sub_consumer:
if self.sub_consumer is not None:
self.consumers.append(self.sub_consumer)
@base.batch_poll_helper

View File

@ -96,6 +96,8 @@ class ZmqSocket(object):
self.handle.close(*args, **kwargs)
def connect_to_address(self, address):
if address in self.connections:
return
stype = zmq_names.socket_type_str(self.socket_type)
try:
LOG.info(_LI("Connecting %(stype)s id %(id)s to %(address)s"),

View File

@ -0,0 +1,55 @@
# Copyright 2016 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import logging
import time
import six
from oslo_messaging._drivers.zmq_driver import zmq_async
LOG = logging.getLogger(__name__)
zmq = zmq_async.import_zmq()
class UpdaterBase(object):
def __init__(self, conf, matchmaker, update_method):
self.conf = conf
self.matchmaker = matchmaker
self.update_method = update_method
self.executor = zmq_async.get_executor(method=self._update_loop)
self.executor.execute()
def _update_loop(self):
self.update_method()
time.sleep(self.conf.zmq_target_update)
def cleanup(self):
self.executor.stop()
@six.add_metaclass(abc.ABCMeta)
class ConnectionUpdater(UpdaterBase):
def __init__(self, conf, matchmaker, socket):
self.socket = socket
super(ConnectionUpdater, self).__init__(
conf, matchmaker, self._update_connection)
@abc.abstractmethod
def _update_connection(self):
"""Update connection info"""