Merge "[zmq] Dummy add value aging mechanism"

This commit is contained in:
Jenkins 2017-02-07 03:13:57 +00:00 committed by Gerrit Code Review
commit f3cc165dba
3 changed files with 74 additions and 13 deletions

View File

@ -16,9 +16,11 @@ import collections
import logging import logging
import six import six
import time
from oslo_messaging._drivers import common as rpc_common from oslo_messaging._drivers import common as rpc_common
from oslo_messaging._drivers.zmq_driver import zmq_address from oslo_messaging._drivers.zmq_driver import zmq_address
from oslo_messaging._drivers.zmq_driver import zmq_async
from oslo_messaging._i18n import _LE from oslo_messaging._i18n import _LE
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -181,28 +183,48 @@ class MatchmakerDummy(MatchmakerBase):
self._cache = collections.defaultdict(list) self._cache = collections.defaultdict(list)
self._publishers = set() self._publishers = set()
self._routers = set() self._routers = set()
self._address = {}
self.executor = zmq_async.get_executor(method=self._loop)
self.executor.execute()
def register_publisher(self, hostname, expire=-1): def register_publisher(self, hostname, expire=-1):
if hostname not in self._publishers: if hostname not in self._publishers:
self._publishers.add(hostname) self._publishers.add(hostname)
self._address[hostname] = expire
def unregister_publisher(self, hostname): def unregister_publisher(self, hostname):
if hostname in self._publishers: if hostname in self._publishers:
self._publishers.remove(hostname) self._publishers.remove(hostname)
if hostname in self._address:
self._address.pop(hostname)
def get_publishers(self): def get_publishers(self):
return list(self._publishers) hosts = [host for host in self._publishers
if self._address[host] > 0]
return hosts
def register_router(self, hostname, expire=-1): def register_router(self, hostname, expire=-1):
if hostname not in self._routers: if hostname not in self._routers:
self._routers.add(hostname) self._routers.add(hostname)
self._address[hostname] = expire
def unregister_router(self, hostname): def unregister_router(self, hostname):
if hostname in self._routers: if hostname in self._routers:
self._routers.remove(hostname) self._routers.remove(hostname)
if hostname in self._address:
self._address.pop(hostname)
def get_routers(self): def get_routers(self):
return list(self._routers) hosts = [host for host in self._routers
if self._address[host] > 0]
return hosts
def _loop(self):
for hostname in self._address:
expire = self._address[hostname]
if expire > 0:
self._address[hostname] = expire - 1
time.sleep(1)
def register(self, target, hostname, listener_type, expire=-1): def register(self, target, hostname, listener_type, expire=-1):
if target.server: if target.server:
@ -214,6 +236,8 @@ class MatchmakerDummy(MatchmakerBase):
if hostname not in self._cache[key]: if hostname not in self._cache[key]:
self._cache[key].append(hostname) self._cache[key].append(hostname)
self._address[hostname] = expire
def unregister(self, target, hostname, listener_type): def unregister(self, target, hostname, listener_type):
if target.server: if target.server:
key = zmq_address.target_to_key(target, listener_type) key = zmq_address.target_to_key(target, listener_type)
@ -224,16 +248,21 @@ class MatchmakerDummy(MatchmakerBase):
if hostname in self._cache[key]: if hostname in self._cache[key]:
self._cache[key].remove(hostname) self._cache[key].remove(hostname)
if hostname in self._address:
self._address.pop(hostname)
def get_hosts(self, target, listener_type): def get_hosts(self, target, listener_type):
hosts = [] hosts = []
if target.server: if target.server:
key = zmq_address.target_to_key(target, listener_type) key = zmq_address.target_to_key(target, listener_type)
hosts.extend(self._cache[key]) hosts.extend([host for host in self._cache[key]
if self._address[host] > 0])
if not hosts: if not hosts:
key = zmq_address.prefix_str(target.topic, listener_type) key = zmq_address.prefix_str(target.topic, listener_type)
hosts.extend(self._cache[key]) hosts.extend([host for host in self._cache[key]
if self._address[host] > 0])
LOG.debug("[Dummy] get_hosts for target %(target)s: %(hosts)s", LOG.debug("[Dummy] get_hosts for target %(target)s: %(hosts)s",
{"target": target, "hosts": hosts}) {"target": target, "hosts": hosts})
@ -246,8 +275,10 @@ class MatchmakerDummy(MatchmakerBase):
return self.get_hosts(target, listener_type) return self.get_hosts(target, listener_type)
def get_hosts_fanout(self, target, listener_type): def get_hosts_fanout(self, target, listener_type):
hosts = []
key = zmq_address.target_to_key(target, listener_type) key = zmq_address.target_to_key(target, listener_type)
hosts = list(self._cache[key]) hosts.extend([host for host in self._cache[key]
if self._address[host] > 0])
LOG.debug("[Dummy] get_hosts_fanout for target %(target)s: %(hosts)s", LOG.debug("[Dummy] get_hosts_fanout for target %(target)s: %(hosts)s",
{"target": target, "hosts": hosts}) {"target": target, "hosts": hosts})

View File

@ -65,21 +65,41 @@ class TestImplMatchmaker(test_utils.BaseTestCase):
self.host2 = b"test_host2" self.host2 = b"test_host2"
def test_register(self): def test_register(self):
self.test_matcher.register(self.target, self.host1, "test") self.test_matcher.register(
self.target,
self.host1,
"test",
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
self.assertEqual([self.host1], self.assertEqual([self.host1],
self.test_matcher.get_hosts(self.target, "test")) self.test_matcher.get_hosts(self.target, "test"))
def test_register_two_hosts(self): def test_register_two_hosts(self):
self.test_matcher.register(self.target, self.host1, "test") self.test_matcher.register(
self.test_matcher.register(self.target, self.host2, "test") self.target,
self.host1,
"test",
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
self.test_matcher.register(
self.target,
self.host2,
"test",
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"), self.assertItemsEqual(self.test_matcher.get_hosts(self.target, "test"),
[self.host1, self.host2]) [self.host1, self.host2])
def test_register_unregister(self): def test_register_unregister(self):
self.test_matcher.register(self.target, self.host1, "test") self.test_matcher.register(
self.test_matcher.register(self.target, self.host2, "test") self.target,
self.host1,
"test",
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
self.test_matcher.register(
self.target,
self.host2,
"test",
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
self.test_matcher.unregister(self.target, self.host2, "test") self.test_matcher.unregister(self.target, self.host2, "test")
@ -87,8 +107,16 @@ class TestImplMatchmaker(test_utils.BaseTestCase):
[self.host1]) [self.host1])
def test_register_two_same_hosts(self): def test_register_two_same_hosts(self):
self.test_matcher.register(self.target, self.host1, "test") self.test_matcher.register(
self.test_matcher.register(self.target, self.host1, "test") self.target,
self.host1,
"test",
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
self.test_matcher.register(
self.target,
self.host1,
"test",
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
self.assertEqual([self.host1], self.assertEqual([self.host1],
self.test_matcher.get_hosts(self.target, "test")) self.test_matcher.get_hosts(self.target, "test"))

4
oslo_messaging/tests/drivers/zmq/test_pub_sub.py Normal file → Executable file
View File

@ -63,7 +63,9 @@ class TestPubSub(zmq_common.ZmqBaseTestCase):
self.publisher = zmq_publisher_proxy.PublisherProxy( self.publisher = zmq_publisher_proxy.PublisherProxy(
self.conf, self.driver.matchmaker) self.conf, self.driver.matchmaker)
self.driver.matchmaker.register_publisher((self.publisher.host, '')) self.driver.matchmaker.register_publisher(
(self.publisher.host, ''),
expire=self.conf.oslo_messaging_zmq.zmq_target_expire)
self.listeners = [] self.listeners = []
for _ in range(self.LISTENERS_COUNT): for _ in range(self.LISTENERS_COUNT):