Merge "[zmq] Add TTL to redis records"
This commit is contained in:
commit
6d9c4815de
@ -68,6 +68,10 @@ zmq_opts = [
|
||||
help='The default number of seconds that poll should wait. '
|
||||
'Poll raises timeout exception when timeout expired.'),
|
||||
|
||||
cfg.IntOpt('zmq_target_expire', default=120,
|
||||
help='Expiration timeout in seconds of a name service record '
|
||||
'about existing target ( < 0 means no timeout).'),
|
||||
|
||||
cfg.BoolOpt('direct_over_proxy', default=True,
|
||||
help='Configures zmq-messaging to use proxy with '
|
||||
'non PUB/SUB patterns.'),
|
||||
|
@ -75,8 +75,10 @@ class MatchMakerBase(object):
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def register(self, target, hostname, listener_type):
|
||||
def register(self, target, hostname, listener_type, expire=-1):
|
||||
"""Register target on nameserver.
|
||||
If record already exists and has expiration timeout it will be
|
||||
updated. Existing records without timeout will stay untouched
|
||||
|
||||
:param target: the target for host
|
||||
:type target: Target
|
||||
@ -84,6 +86,8 @@ class MatchMakerBase(object):
|
||||
:type hostname: String
|
||||
:param listener_type: Listener socket type ROUTER, SUB etc.
|
||||
:type listener_type: String
|
||||
:param expire: Record expiration timeout
|
||||
:type expire: int
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
@ -127,7 +131,7 @@ class DummyMatchMaker(MatchMakerBase):
|
||||
def get_publishers(self):
|
||||
return list(self._publishers)
|
||||
|
||||
def register(self, target, hostname, listener_type):
|
||||
def register(self, target, hostname, listener_type, expire=-1):
|
||||
key = zmq_address.target_to_key(target, listener_type)
|
||||
if hostname not in self._cache[key]:
|
||||
self._cache[key].append(hostname)
|
||||
|
@ -70,22 +70,25 @@ class RedisMatchMaker(base.MatchMakerBase):
|
||||
def _get_hosts_by_key(self, key):
|
||||
return self._redis.lrange(key, 0, -1)
|
||||
|
||||
def register(self, target, hostname, listener_type):
|
||||
def register(self, target, hostname, listener_type, expire=-1):
|
||||
|
||||
def register_key(key):
|
||||
if hostname not in self._get_hosts_by_key(key):
|
||||
self._redis.lpush(key, hostname)
|
||||
if expire > 0:
|
||||
self._redis.expire(key, expire)
|
||||
|
||||
if target.topic and target.server:
|
||||
key = zmq_address.target_to_key(target, listener_type)
|
||||
if hostname not in self._get_hosts_by_key(key):
|
||||
self._redis.lpush(key, hostname)
|
||||
register_key(key)
|
||||
|
||||
if target.topic:
|
||||
key = zmq_address.prefix_str(target.topic, listener_type)
|
||||
if hostname not in self._get_hosts_by_key(key):
|
||||
self._redis.lpush(key, hostname)
|
||||
register_key(key)
|
||||
|
||||
if target.server:
|
||||
key = zmq_address.prefix_str(target.server, listener_type)
|
||||
if hostname not in self._get_hosts_by_key(key):
|
||||
self._redis.lpush(key, hostname)
|
||||
register_key(key)
|
||||
|
||||
def unregister(self, target, hostname, listener_type):
|
||||
key = zmq_address.target_to_key(target, listener_type)
|
||||
|
@ -13,6 +13,8 @@
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
|
||||
from oslo_messaging._drivers import base
|
||||
from oslo_messaging._drivers.zmq_driver.server.consumers\
|
||||
@ -54,25 +56,19 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
|
||||
def __init__(self, conf, poller, server):
|
||||
super(RouterConsumer, self).__init__(conf, poller, server, zmq.ROUTER)
|
||||
self.matchmaker = server.matchmaker
|
||||
self.targets = []
|
||||
self.host = zmq_address.combine_address(self.conf.rpc_zmq_host,
|
||||
self.port)
|
||||
self.targets = TargetsManager(conf, self.matchmaker, self.host)
|
||||
LOG.info(_LI("[%s] Run ROUTER consumer"), self.host)
|
||||
|
||||
def listen(self, target):
|
||||
|
||||
LOG.info(_LI("[%(host)s] Listen to target %(target)s"),
|
||||
{'host': self.host, 'target': target})
|
||||
|
||||
self.targets.append(target)
|
||||
self.matchmaker.register(target, self.host,
|
||||
zmq_names.socket_type_str(zmq.ROUTER))
|
||||
self.targets.listen(target)
|
||||
|
||||
def cleanup(self):
|
||||
super(RouterConsumer, self).cleanup()
|
||||
for target in self.targets:
|
||||
self.matchmaker.unregister(target, self.host,
|
||||
zmq_names.socket_type_str(zmq.ROUTER))
|
||||
self.targets.cleanup()
|
||||
|
||||
def _receive_request(self, socket):
|
||||
reply_id = socket.recv()
|
||||
@ -119,3 +115,39 @@ class RouterConsumerBroker(RouterConsumer):
|
||||
if zmq_names.FIELD_REPLY_ID in envelope:
|
||||
request.proxy_reply_id = envelope[zmq_names.FIELD_REPLY_ID]
|
||||
return request, reply_id
|
||||
|
||||
|
||||
class TargetsManager(object):
|
||||
|
||||
def __init__(self, conf, matchmaker, host):
|
||||
self.targets = []
|
||||
self.conf = conf
|
||||
self.matchmaker = matchmaker
|
||||
self.host = host
|
||||
self.targets_lock = threading.Lock()
|
||||
self.updater = zmq_async.get_executor(method=self._update_targets) \
|
||||
if conf.zmq_target_expire > 0 else None
|
||||
if self.updater:
|
||||
self.updater.execute()
|
||||
|
||||
def _update_targets(self):
|
||||
with self.targets_lock:
|
||||
for target in self.targets:
|
||||
self.matchmaker.register(
|
||||
target, self.host, zmq_names.socket_type_str(zmq.ROUTER))
|
||||
|
||||
# Update target-records once per half expiration time
|
||||
time.sleep(self.conf.zmq_target_expire / 2)
|
||||
|
||||
def listen(self, target):
|
||||
with self.targets_lock:
|
||||
self.targets.append(target)
|
||||
self.matchmaker.register(target, self.host,
|
||||
zmq_names.socket_type_str(zmq.ROUTER))
|
||||
|
||||
def cleanup(self):
|
||||
if self.updater:
|
||||
self.updater.stop()
|
||||
for target in self.targets:
|
||||
self.matchmaker.unregister(target, self.host,
|
||||
zmq_names.socket_type_str(zmq.ROUTER))
|
||||
|
Loading…
x
Reference in New Issue
Block a user