diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index fbd9f081f..69d2bf5c8 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -17,6 +17,7 @@ import pprint import socket from oslo_config import cfg +from stevedore import driver from oslo_messaging._drivers import base from oslo_messaging._drivers import common as rpc_common @@ -39,7 +40,7 @@ zmq_opts = [ # The module.Class to use for matchmaking. cfg.StrOpt( 'rpc_zmq_matchmaker', - default='local', + default='dummy', help='MatchMaker driver.', ), @@ -97,7 +98,11 @@ class ZmqDriver(base.BaseDriver): self.conf = conf self.server = None self.client = None - self.matchmaker = None + self.matchmaker = driver.DriverManager( + 'oslo.messaging.zmq.matchmaker', + self.conf.rpc_zmq_matchmaker, + ).driver(self.conf) + super(ZmqDriver, self).__init__(conf, url, default_exchange, allowed_remote_exmods) diff --git a/oslo_messaging/_drivers/matchmaker.py b/oslo_messaging/_drivers/matchmaker.py deleted file mode 100644 index 82b0fbd2b..000000000 --- a/oslo_messaging/_drivers/matchmaker.py +++ /dev/null @@ -1,321 +0,0 @@ -# Copyright 2011 Cloudscaling Group, Inc -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -""" -The MatchMaker classes should except a Topic or Fanout exchange key and -return keys for direct exchanges, per (approximate) AMQP parlance. -""" - -import contextlib -import logging - -import eventlet -from oslo_config import cfg - -from oslo_messaging._i18n import _ - -matchmaker_opts = [ - cfg.IntOpt('matchmaker_heartbeat_freq', - default=300, - help='Heartbeat frequency.'), - cfg.IntOpt('matchmaker_heartbeat_ttl', - default=600, - help='Heartbeat time-to-live.'), -] - -CONF = cfg.CONF -CONF.register_opts(matchmaker_opts) -LOG = logging.getLogger(__name__) -contextmanager = contextlib.contextmanager - - -class MatchMakerException(Exception): - """Signified a match could not be found.""" - message = _("Match not found by MatchMaker.") - - -class Exchange(object): - """Implements lookups. - - Subclass this to support hashtables, dns, etc. - """ - def __init__(self): - pass - - def run(self, key): - raise NotImplementedError() - - -class Binding(object): - """A binding on which to perform a lookup.""" - def __init__(self): - pass - - def test(self, key): - raise NotImplementedError() - - -class MatchMakerBase(object): - """Match Maker Base Class. - - Build off HeartbeatMatchMakerBase if building a heartbeat-capable - MatchMaker. - """ - def __init__(self): - # Array of tuples. Index [2] toggles negation, [3] is last-if-true - self.bindings = [] - - self.no_heartbeat_msg = _('Matchmaker does not implement ' - 'registration or heartbeat.') - - def register(self, key, host): - """Register a host on a backend. - - Heartbeats, if applicable, may keepalive registration. - """ - pass - - def ack_alive(self, key, host): - """Acknowledge that a key.host is alive. - - Used internally for updating heartbeats, but may also be used - publicly to acknowledge a system is alive (i.e. rpc message - successfully sent to host) - """ - pass - - def is_alive(self, topic, host): - """Checks if a host is alive.""" - pass - - def expire(self, topic, host): - """Explicitly expire a host's registration.""" - pass - - def send_heartbeats(self): - """Send all heartbeats. - - Use start_heartbeat to spawn a heartbeat greenthread, - which loops this method. - """ - pass - - def unregister(self, key, host): - """Unregister a topic.""" - pass - - def start_heartbeat(self): - """Spawn heartbeat greenthread.""" - pass - - def stop_heartbeat(self): - """Destroys the heartbeat greenthread.""" - pass - - def add_binding(self, binding, rule, last=True): - self.bindings.append((binding, rule, False, last)) - - # NOTE(ewindisch): kept the following method in case we implement the - # underlying support. - # def add_negate_binding(self, binding, rule, last=True): - # self.bindings.append((binding, rule, True, last)) - - def queues(self, key): - workers = [] - - # bit is for negate bindings - if we choose to implement it. - # last stops processing rules if this matches. - for (binding, exchange, bit, last) in self.bindings: - if binding.test(key): - workers.extend(exchange.run(key)) - - # Support last. - if last: - return workers - return workers - - -class HeartbeatMatchMakerBase(MatchMakerBase): - """Base for a heart-beat capable MatchMaker. - - Provides common methods for registering, unregistering, and maintaining - heartbeats. - """ - def __init__(self): - self.hosts = set() - self._heart = None - self.host_topic = {} - - super(HeartbeatMatchMakerBase, self).__init__() - - def send_heartbeats(self): - """Send all heartbeats. - - Use start_heartbeat to spawn a heartbeat greenthread, - which loops this method. - """ - for key, host in self.host_topic.keys(): - self.ack_alive(key, host) - - def ack_alive(self, key, host): - """Acknowledge that a host.topic is alive. - - Used internally for updating heartbeats, but may also be used - publicly to acknowledge a system is alive (i.e. rpc message - successfully sent to host) - """ - raise NotImplementedError("Must implement ack_alive") - - def backend_register(self, key, host): - """Implements registration logic. - - Called by register(self,key,host) - """ - raise NotImplementedError("Must implement backend_register") - - def backend_unregister(self, key, key_host): - """Implements de-registration logic. - - Called by unregister(self,key,host) - """ - raise NotImplementedError("Must implement backend_unregister") - - def register(self, key, host): - """Register a host on a backend. - - Heartbeats, if applicable, may keepalive registration. - """ - self.hosts.add(host) - self.host_topic[(key, host)] = host - key_host = '.'.join((key, host)) - - self.backend_register(key, key_host) - - self.ack_alive(key, host) - - def unregister(self, key, host): - """Unregister a topic.""" - if (key, host) in self.host_topic: - del self.host_topic[(key, host)] - - self.hosts.discard(host) - self.backend_unregister(key, '.'.join((key, host))) - - LOG.info(_("Matchmaker unregistered: %(key)s, %(host)s"), - {'key': key, 'host': host}) - - def start_heartbeat(self): - """Implementation of MatchMakerBase.start_heartbeat. - - Launches greenthread looping send_heartbeats(), - yielding for CONF.matchmaker_heartbeat_freq seconds - between iterations. - """ - if not self.hosts: - raise MatchMakerException( - _("Register before starting heartbeat.")) - - def do_heartbeat(): - while True: - self.send_heartbeats() - eventlet.sleep(CONF.matchmaker_heartbeat_freq) - - self._heart = eventlet.spawn(do_heartbeat) - - def stop_heartbeat(self): - """Destroys the heartbeat greenthread.""" - if self._heart: - self._heart.kill() - - -class DirectBinding(Binding): - """Specifies a host in the key via a '.' character. - - Although dots are used in the key, the behavior here is - that it maps directly to a host, thus direct. - """ - def test(self, key): - return '.' in key - - -class TopicBinding(Binding): - """Where a 'bare' key without dots. - - AMQP generally considers topic exchanges to be those *with* dots, - but we deviate here in terminology as the behavior here matches - that of a topic exchange (whereas where there are dots, behavior - matches that of a direct exchange. - """ - def test(self, key): - return '.' not in key - - -class FanoutBinding(Binding): - """Match on fanout keys, where key starts with 'fanout.' string.""" - def test(self, key): - return key.startswith('fanout~') - - -class StubExchange(Exchange): - """Exchange that does nothing.""" - def run(self, key): - return [(key, None)] - - -class LocalhostExchange(Exchange): - """Exchange where all direct topics are local.""" - def __init__(self, host='localhost'): - self.host = host - super(Exchange, self).__init__() - - def run(self, key): - return [('.'.join((key.split('.')[0], self.host)), self.host)] - - -class DirectExchange(Exchange): - """Exchange where all topic keys are split, sending to second half. - - i.e. "compute.host" sends a message to "compute.host" running on "host" - """ - def __init__(self): - super(Exchange, self).__init__() - - def run(self, key): - e = key.split('.', 1)[1] - return [(key, e)] - - -class MatchMakerLocalhost(MatchMakerBase): - """Match Maker where all bare topics resolve to localhost. - - Useful for testing. - """ - def __init__(self, host='localhost'): - super(MatchMakerLocalhost, self).__init__() - self.add_binding(FanoutBinding(), LocalhostExchange(host)) - self.add_binding(DirectBinding(), DirectExchange()) - self.add_binding(TopicBinding(), LocalhostExchange(host)) - - -class MatchMakerStub(MatchMakerBase): - """Match Maker where topics are untouched. - - Useful for testing, or for AMQP/brokered queues. - Will not work where knowledge of hosts is known (i.e. zeromq) - """ - def __init__(self): - super(MatchMakerStub, self).__init__() - - self.add_binding(FanoutBinding(), StubExchange()) - self.add_binding(DirectBinding(), StubExchange()) - self.add_binding(TopicBinding(), StubExchange()) diff --git a/oslo_messaging/_drivers/matchmaker_redis.py b/oslo_messaging/_drivers/matchmaker_redis.py deleted file mode 100644 index 290b60351..000000000 --- a/oslo_messaging/_drivers/matchmaker_redis.py +++ /dev/null @@ -1,145 +0,0 @@ -# Copyright 2013 Cloudscaling Group, Inc -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -""" -The MatchMaker classes should accept a Topic or Fanout exchange key and -return keys for direct exchanges, per (approximate) AMQP parlance. -""" - -from oslo_config import cfg -from oslo_utils import importutils - -from oslo_messaging._drivers import matchmaker as mm_common - -redis = importutils.try_import('redis') - - -matchmaker_redis_opts = [ - cfg.StrOpt('host', - default='127.0.0.1', - help='Host to locate redis.'), - cfg.IntOpt('port', - default=6379, - help='Use this port to connect to redis host.'), - cfg.StrOpt('password', - help='Password for Redis server (optional).'), -] - -CONF = cfg.CONF -opt_group = cfg.OptGroup(name='matchmaker_redis', - title='Options for Redis-based MatchMaker') -CONF.register_group(opt_group) -CONF.register_opts(matchmaker_redis_opts, opt_group) - - -class RedisExchange(mm_common.Exchange): - def __init__(self, matchmaker): - self.matchmaker = matchmaker - self.redis = matchmaker.redis - super(RedisExchange, self).__init__() - - -class RedisTopicExchange(RedisExchange): - """Exchange where all topic keys are split, sending to second half. - - i.e. "compute.host" sends a message to "compute" running on "host" - """ - def run(self, topic): - while True: - member_name = self.redis.srandmember(topic) - - if not member_name: - # If this happens, there are no - # longer any members. - break - - if not self.matchmaker.is_alive(topic, member_name): - continue - - host = member_name.split('.', 1)[1] - return [(member_name, host)] - return [] - - -class RedisFanoutExchange(RedisExchange): - """Return a list of all hosts.""" - def run(self, topic): - topic = topic.split('~', 1)[1] - hosts = self.redis.smembers(topic) - good_hosts = filter( - lambda host: self.matchmaker.is_alive(topic, host), hosts) - - return [(x, x.split('.', 1)[1]) for x in good_hosts] - - -class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase): - """MatchMaker registering and looking-up hosts with a Redis server.""" - def __init__(self): - super(MatchMakerRedis, self).__init__() - - if not redis: - raise ImportError("Failed to import module redis.") - - self.redis = redis.StrictRedis( - host=CONF.matchmaker_redis.host, - port=CONF.matchmaker_redis.port, - password=CONF.matchmaker_redis.password) - - self.add_binding(mm_common.FanoutBinding(), RedisFanoutExchange(self)) - self.add_binding(mm_common.DirectBinding(), mm_common.DirectExchange()) - self.add_binding(mm_common.TopicBinding(), RedisTopicExchange(self)) - - def ack_alive(self, key, host): - topic = "%s.%s" % (key, host) - if not self.redis.expire(topic, CONF.matchmaker_heartbeat_ttl): - # If we could not update the expiration, the key - # might have been pruned. Re-register, creating a new - # key in Redis. - self.register(key, host) - - def is_alive(self, topic, host): - # After redis 2.8, if the specialized key doesn't exist, - # TTL fuction would return -2. If key exists, - # but doesn't have expiration associated, - # TTL func would return -1. For more information, - # please visit http://redis.io/commands/ttl - if self.redis.ttl(host) == -2: - self.expire(topic, host) - return False - return True - - def expire(self, topic, host): - with self.redis.pipeline() as pipe: - pipe.multi() - pipe.delete(host) - pipe.srem(topic, host) - pipe.execute() - - def backend_register(self, key, key_host): - with self.redis.pipeline() as pipe: - pipe.multi() - pipe.sadd(key, key_host) - - # No value is needed, we just - # care if it exists. Sets aren't viable - # because only keys can expire. - pipe.sadd(key_host, '') - - pipe.execute() - - def backend_unregister(self, key, key_host): - with self.redis.pipeline() as pipe: - pipe.multi() - pipe.srem(key, key_host) - pipe.delete(key_host) - pipe.execute() diff --git a/oslo_messaging/_drivers/matchmaker_ring.py b/oslo_messaging/_drivers/matchmaker_ring.py deleted file mode 100644 index 0fd918cb5..000000000 --- a/oslo_messaging/_drivers/matchmaker_ring.py +++ /dev/null @@ -1,105 +0,0 @@ -# Copyright 2011-2013 Cloudscaling Group, Inc -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -""" -The MatchMaker classes should except a Topic or Fanout exchange key and -return keys for direct exchanges, per (approximate) AMQP parlance. -""" - -import itertools -import json -import logging - -from oslo_config import cfg - -from oslo_messaging._drivers import matchmaker as mm -from oslo_messaging._i18n import _ - -matchmaker_opts = [ - # Matchmaker ring file - cfg.StrOpt('ringfile', - deprecated_name='matchmaker_ringfile', - deprecated_group='DEFAULT', - default='/etc/oslo/matchmaker_ring.json', - help='Matchmaker ring file (JSON).'), -] - -CONF = cfg.CONF -CONF.register_opts(matchmaker_opts, 'matchmaker_ring') -LOG = logging.getLogger(__name__) - - -class RingExchange(mm.Exchange): - """Match Maker where hosts are loaded from a static JSON formatted file. - - __init__ takes optional ring dictionary argument, otherwise - loads the ringfile from CONF.mathcmaker_ringfile. - """ - def __init__(self, ring=None): - super(RingExchange, self).__init__() - - if ring: - self.ring = ring - else: - fh = open(CONF.matchmaker_ring.ringfile, 'r') - self.ring = json.load(fh) - fh.close() - - self.ring0 = {} - for k in self.ring.keys(): - self.ring0[k] = itertools.cycle(self.ring[k]) - - def _ring_has(self, key): - return key in self.ring0 - - -class RoundRobinRingExchange(RingExchange): - """A Topic Exchange based on a hashmap.""" - def __init__(self, ring=None): - super(RoundRobinRingExchange, self).__init__(ring) - - def run(self, key): - if not self._ring_has(key): - LOG.warn( - _("No key defining hosts for topic '%s', " - "see ringfile"), key - ) - return [] - host = next(self.ring0[key]) - return [(key + '.' + host, host)] - - -class FanoutRingExchange(RingExchange): - """Fanout Exchange based on a hashmap.""" - def __init__(self, ring=None): - super(FanoutRingExchange, self).__init__(ring) - - def run(self, key): - # Assume starts with "fanout~", strip it for lookup. - nkey = key.split('fanout~')[1:][0] - if not self._ring_has(nkey): - LOG.warn( - _("No key defining hosts for topic '%s', " - "see ringfile"), nkey - ) - return [] - return map(lambda x: (key + '.' + x, x), self.ring[nkey]) - - -class MatchMakerRing(mm.MatchMakerBase): - """Match Maker where hosts are loaded from a static hashmap.""" - def __init__(self, ring=None): - super(MatchMakerRing, self).__init__() - self.add_binding(mm.FanoutBinding(), FanoutRingExchange(ring)) - self.add_binding(mm.DirectBinding(), mm.DirectExchange()) - self.add_binding(mm.TopicBinding(), RoundRobinRingExchange(ring)) diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/__init__.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py new file mode 100644 index 000000000..29e9d52a2 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/base.py @@ -0,0 +1,70 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import collections +import logging + +import six + +from oslo_messaging._i18n import _LI, _LW + + +LOG = logging.getLogger(__name__) + + +@six.add_metaclass(abc.ABCMeta) +class MatchMakerBase(object): + + def __init__(self, conf, *args, **kwargs): + super(MatchMakerBase, self).__init__(*args, **kwargs) + + self.conf = conf + + @abc.abstractmethod + def register(self, topic, hostname): + """Register topic on nameserver""" + + @abc.abstractmethod + def get_hosts(self, topic): + """Get hosts from nameserver by topic""" + + def get_single_host(self, topic): + """Get a single host by topic""" + hosts = self.get_hosts(topic) + if len(hosts) == 0: + LOG.warning(_LW("No hosts were found for topic %s. Using " + "localhost") % topic) + return "localhost" + elif len(hosts) == 1: + LOG.info(_LI("A single host found for topic %s.") % topic) + return hosts[0] + else: + LOG.warning(_LW("Multiple hosts were found for topic %s. Using " + "the first one.") % topic) + return hosts[0] + + +class DummyMatchMaker(MatchMakerBase): + + def __init__(self, conf, *args, **kwargs): + super(DummyMatchMaker, self).__init__(conf, *args, **kwargs) + + self._cache = collections.defaultdict(list) + + def register(self, topic, hostname): + if hostname not in self._cache[topic]: + self._cache[topic].append(hostname) + + def get_hosts(self, topic): + return self._cache[topic] diff --git a/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py new file mode 100644 index 000000000..f1a6f3827 --- /dev/null +++ b/oslo_messaging/_drivers/zmq_driver/matchmaker/matchmaker_redis.py @@ -0,0 +1,55 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from oslo_config import cfg +import redis + +from oslo_messaging._drivers.zmq_driver.matchmaker import base + + +LOG = logging.getLogger(__name__) + + +matchmaker_redis_opts = [ + cfg.StrOpt('host', + default='127.0.0.1', + help='Host to locate redis.'), + cfg.IntOpt('port', + default=6379, + help='Use this port to connect to redis host.'), + cfg.StrOpt('password', + default='', + secret=True, + help='Password for Redis server (optional).'), +] + + +class RedisMatchMaker(base.MatchMakerBase): + + def __init__(self, conf, *args, **kwargs): + super(RedisMatchMaker, self).__init__(conf, *args, **kwargs) + + self._redis = redis.StrictRedis( + host=self.conf.matchmaker_redis.host, + port=self.conf.matchmaker_redis.port, + password=self.conf.matchmaker_redis.password, + ) + + def register(self, topic, hostname): + if hostname not in self.get_hosts(topic): + self._redis.lpush(topic, hostname) + + def get_hosts(self, topic): + return self._redis.lrange(topic, 0, -1)[::-1] diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py index 47a87d1d2..11c190bf9 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_call_request.py @@ -30,8 +30,9 @@ zmq = zmq_async.import_zmq() class CallRequest(Request): def __init__(self, conf, target, context, message, timeout=None, - retry=None, allowed_remote_exmods=None): + retry=None, allowed_remote_exmods=None, matchmaker=None): self.allowed_remote_exmods = allowed_remote_exmods or [] + self.matchmaker = matchmaker try: self.zmq_context = zmq.Context() @@ -41,8 +42,9 @@ class CallRequest(Request): zmq_serializer.CALL_TYPE, timeout, retry) + self.host = self.matchmaker.get_single_host(self.topic.topic) self.connect_address = zmq_topic.get_tcp_address_call(conf, - self.topic) + self.host) LOG.info(_LI("Connecting REQ to %s") % self.connect_address) self.socket.connect(self.connect_address) except zmq.ZMQError as e: diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py index 30a117c1e..6f6640575 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_cast_dealer.py @@ -58,7 +58,8 @@ class DealerCastPublisher(zmq_cast_publisher.CastPublisherBase): def cast(self, target, context, message, timeout=None, retry=None): topic = zmq_topic.Topic.from_target(self.conf, target) - connect_address = zmq_topic.get_tcp_address_call(self.conf, topic) + host = self.matchmaker.get_single_host(topic.topic) + connect_address = zmq_topic.get_tcp_address_call(self.conf, host) dealer_socket = self._create_socket(connect_address) request = CastRequest(self.conf, target, context, message, dealer_socket, connect_address, timeout, retry) @@ -71,6 +72,7 @@ class DealerCastPublisher(zmq_cast_publisher.CastPublisherBase): dealer_socket = self.zmq_context.socket(zmq.DEALER) LOG.info(_LI("Connecting DEALER to %s") % address) dealer_socket.connect(address) + return dealer_socket except zmq.ZMQError: LOG.error(_LE("Failed connecting DEALER to %s") % address) - return dealer_socket + raise diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py index ec00cb912..cdd291b1f 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/client/zmq_client.py @@ -21,6 +21,7 @@ class ZmqClient(object): def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None): self.conf = conf + self.matchmaker = matchmaker self.allowed_remote_exmods = allowed_remote_exmods or [] self.cast_publisher = zmq_cast_dealer.DealerCastPublisher(conf, matchmaker) @@ -28,7 +29,7 @@ class ZmqClient(object): def call(self, target, context, message, timeout=None, retry=None): request = zmq_call_request.CallRequest( self.conf, target, context, message, timeout, retry, - self.allowed_remote_exmods) + self.allowed_remote_exmods, self.matchmaker) return request() def cast(self, target, context, message, timeout=None, retry=None): diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py index 7f7ec57a3..38bc43207 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_call_responder.py @@ -82,7 +82,7 @@ class CallResponder(zmq_base_consumer.ConsumerBase): self.poller) return incoming except zmq.ZMQError as e: - LOG.error(_LE("Receiving message failed ... {}"), e) + LOG.error(_LE("Receiving message failed ... %s") % str(e)) def listen(self, target): topic = topic_utils.Topic.from_target(self.conf, target) diff --git a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py index b51ff0187..778f3b273 100644 --- a/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py +++ b/oslo_messaging/_drivers/zmq_driver/rpc/server/zmq_server.py @@ -31,6 +31,7 @@ class ZmqServer(base.Listener): self.conf = conf self.context = zmq.Context() self.poller = zmq_async.get_reply_poller() + self.matchmaker = matchmaker self.call_resp = zmq_call_responder.CallResponder(self, conf, self.poller, self.context) @@ -50,6 +51,9 @@ class ZmqServer(base.Listener): def listen(self, target): LOG.info("[Server] Listen to Target %s" % target) + + self.matchmaker.register(topic=target.topic, + hostname=self.conf.rpc_zmq_host) if target.fanout: self.fanout_resp.listen(target) else: diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_topic.py b/oslo_messaging/_drivers/zmq_driver/zmq_topic.py index 332c81912..f89f5073b 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_topic.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_topic.py @@ -21,8 +21,8 @@ def get_tcp_bind_address(port): return "tcp://*:%s" % port -def get_tcp_address_call(conf, topic): - return "tcp://%s:%s" % (topic.server, conf.rpc_zmq_port) +def get_tcp_address_call(conf, host): + return "tcp://%s:%s" % (host, conf.rpc_zmq_port) def get_ipc_address_cast(conf, topic): diff --git a/oslo_messaging/conffixture.py b/oslo_messaging/conffixture.py index 8a44f9e62..bbf92caf0 100644 --- a/oslo_messaging/conffixture.py +++ b/oslo_messaging/conffixture.py @@ -59,7 +59,8 @@ class ConfFixture(fixtures.Fixture): _import_opts(self.conf, 'oslo_messaging._drivers.impl_zmq', 'zmq_opts') _import_opts(self.conf, - 'oslo_messaging._drivers.matchmaker_redis', + 'oslo_messaging._drivers.zmq_driver.' + 'matchmaker.matchmaker_redis', 'matchmaker_redis_opts', 'matchmaker_redis') _import_opts(self.conf, 'oslo_messaging.rpc.client', '_client_opts') diff --git a/oslo_messaging/opts.py b/oslo_messaging/opts.py index 5911b69d7..0accf30a0 100644 --- a/oslo_messaging/opts.py +++ b/oslo_messaging/opts.py @@ -25,10 +25,8 @@ from oslo_messaging._drivers import base as drivers_base from oslo_messaging._drivers import impl_qpid from oslo_messaging._drivers import impl_rabbit from oslo_messaging._drivers import impl_zmq -from oslo_messaging._drivers import matchmaker -from oslo_messaging._drivers import matchmaker_redis -from oslo_messaging._drivers import matchmaker_ring from oslo_messaging._drivers.protocols.amqp import opts as amqp_opts +from oslo_messaging._drivers.zmq_driver.matchmaker import matchmaker_redis from oslo_messaging._executors import base from oslo_messaging.notify import notifier from oslo_messaging.rpc import client @@ -37,7 +35,6 @@ from oslo_messaging import transport _global_opt_lists = [ drivers_base.base_opts, impl_zmq.zmq_opts, - matchmaker.matchmaker_opts, base._pool_opts, notifier._notifier_opts, client._client_opts, @@ -47,7 +44,6 @@ _global_opt_lists = [ _opts = [ (None, list(itertools.chain(*_global_opt_lists))), ('matchmaker_redis', matchmaker_redis.matchmaker_redis_opts), - ('matchmaker_ring', matchmaker_ring.matchmaker_opts), ('oslo_messaging_amqp', amqp_opts.amqp1_opts), ('oslo_messaging_rabbit', list(itertools.chain(amqp.amqp_opts, impl_rabbit.rabbit_opts))), diff --git a/oslo_messaging/tests/drivers/test_impl_matchmaker.py b/oslo_messaging/tests/drivers/test_impl_matchmaker.py new file mode 100644 index 000000000..8fa82c2a4 --- /dev/null +++ b/oslo_messaging/tests/drivers/test_impl_matchmaker.py @@ -0,0 +1,75 @@ +# Copyright 2014 Canonical, Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from stevedore import driver +import testscenarios + +from oslo_messaging.tests import utils as test_utils + + +load_tests = testscenarios.load_tests_apply_scenarios + + +class TestImplMatchmaker(test_utils.BaseTestCase): + + scenarios = [ + ("dummy", {"rpc_zmq_matchmaker": "dummy"}), + ("redis", {"rpc_zmq_matchmaker": "redis"}), + ] + + def setUp(self): + super(TestImplMatchmaker, self).setUp() + + self.test_matcher = driver.DriverManager( + 'oslo.messaging.zmq.matchmaker', + self.rpc_zmq_matchmaker, + ).driver(self.conf) + + if self.rpc_zmq_matchmaker == "redis": + self.addCleanup(self.test_matcher._redis.flushdb) + + self.topic = "test_topic" + self.host1 = b"test_host1" + self.host2 = b"test_host2" + + def test_register(self): + self.test_matcher.register(self.topic, self.host1) + + self.assertEqual(self.test_matcher.get_hosts(self.topic), [self.host1]) + self.assertEqual(self.test_matcher.get_single_host(self.topic), + self.host1) + + def test_register_two_hosts(self): + self.test_matcher.register(self.topic, self.host1) + self.test_matcher.register(self.topic, self.host2) + + self.assertEqual(self.test_matcher.get_hosts(self.topic), + [self.host1, self.host2]) + self.assertIn(self.test_matcher.get_single_host(self.topic), + [self.host1, self.host2]) + + def test_register_two_same_hosts(self): + self.test_matcher.register(self.topic, self.host1) + self.test_matcher.register(self.topic, self.host1) + + self.assertEqual(self.test_matcher.get_hosts(self.topic), [self.host1]) + self.assertEqual(self.test_matcher.get_single_host(self.topic), + self.host1) + + def test_get_hosts_wrong_topic(self): + self.assertEqual(self.test_matcher.get_hosts("no_such_topic"), []) + + def test_get_single_host_wrong_topic(self): + self.assertEqual(self.test_matcher.get_single_host("no_such_topic"), + "localhost") diff --git a/oslo_messaging/tests/drivers/test_matchmaker.py b/oslo_messaging/tests/drivers/test_matchmaker.py deleted file mode 100644 index 61c37a92b..000000000 --- a/oslo_messaging/tests/drivers/test_matchmaker.py +++ /dev/null @@ -1,69 +0,0 @@ -# Copyright 2014 Canonical, Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_utils import importutils -import testtools - -from oslo_messaging.tests import utils as test_utils - -# NOTE(jamespage) matchmaker tied directly to eventlet -# which is not yet py3 compatible - skip if import fails -matchmaker = ( - importutils.try_import('oslo_messaging._drivers.matchmaker')) - - -@testtools.skipIf(not matchmaker, "matchmaker/eventlet unavailable") -class MatchmakerTest(test_utils.BaseTestCase): - - def test_fanout_binding(self): - matcher = matchmaker.MatchMakerBase() - matcher.add_binding( - matchmaker.FanoutBinding(), matchmaker.DirectExchange()) - self.assertEqual(matcher.queues('hello.world'), []) - self.assertEqual( - matcher.queues('fanout~fantasy.unicorn'), - [('fanout~fantasy.unicorn', 'unicorn')]) - self.assertEqual( - matcher.queues('fanout~fantasy.pony'), - [('fanout~fantasy.pony', 'pony')]) - - def test_topic_binding(self): - matcher = matchmaker.MatchMakerBase() - matcher.add_binding( - matchmaker.TopicBinding(), matchmaker.StubExchange()) - self.assertEqual( - matcher.queues('hello-world'), [('hello-world', None)]) - - def test_direct_binding(self): - matcher = matchmaker.MatchMakerBase() - matcher.add_binding( - matchmaker.DirectBinding(), matchmaker.StubExchange()) - self.assertEqual( - matcher.queues('hello.server'), [('hello.server', None)]) - self.assertEqual(matcher.queues('hello-world'), []) - - def test_localhost_match(self): - matcher = matchmaker.MatchMakerLocalhost() - self.assertEqual( - matcher.queues('hello.server'), [('hello.server', 'server')]) - - # Gets remapped due to localhost exchange - # all bindings default to first match. - self.assertEqual( - matcher.queues('fanout~testing.server'), - [('fanout~testing.localhost', 'localhost')]) - - self.assertEqual( - matcher.queues('hello-world'), - [('hello-world.localhost', 'localhost')]) diff --git a/oslo_messaging/tests/drivers/test_matchmaker_ring.py b/oslo_messaging/tests/drivers/test_matchmaker_ring.py deleted file mode 100644 index 5f156007a..000000000 --- a/oslo_messaging/tests/drivers/test_matchmaker_ring.py +++ /dev/null @@ -1,73 +0,0 @@ -# Copyright 2014 Canonical, Ltd. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_utils import importutils -import testtools - -from oslo_messaging.tests import utils as test_utils - -# NOTE(jamespage) matchmaker tied directly to eventlet -# which is not yet py3 compatible - skip if import fails -matchmaker_ring = ( - importutils.try_import('oslo_messaging._drivers.matchmaker_ring')) - - -@testtools.skipIf(not matchmaker_ring, "matchmaker/eventlet unavailable") -class MatchmakerRingTest(test_utils.BaseTestCase): - - def setUp(self): - super(MatchmakerRingTest, self).setUp() - self.ring_data = { - "conductor": ["controller1", "node1", "node2", "node3"], - "scheduler": ["controller1", "node1", "node2", "node3"], - "network": ["controller1", "node1", "node2", "node3"], - "cert": ["controller1"], - "console": ["controller1"], - "consoleauth": ["controller1"]} - self.matcher = matchmaker_ring.MatchMakerRing(self.ring_data) - - def test_direct(self): - self.assertEqual( - self.matcher.queues('cert.controller1'), - [('cert.controller1', 'controller1')]) - self.assertEqual( - self.matcher.queues('conductor.node1'), - [('conductor.node1', 'node1')]) - - def test_fanout(self): - self.assertEqual( - self.matcher.queues('fanout~conductor'), - [('fanout~conductor.controller1', 'controller1'), - ('fanout~conductor.node1', 'node1'), - ('fanout~conductor.node2', 'node2'), - ('fanout~conductor.node3', 'node3')]) - - def test_bare_topic(self): - # Round robins through the hosts on the topic - self.assertEqual( - self.matcher.queues('scheduler'), - [('scheduler.controller1', 'controller1')]) - self.assertEqual( - self.matcher.queues('scheduler'), - [('scheduler.node1', 'node1')]) - self.assertEqual( - self.matcher.queues('scheduler'), - [('scheduler.node2', 'node2')]) - self.assertEqual( - self.matcher.queues('scheduler'), - [('scheduler.node3', 'node3')]) - # Cycles loop - self.assertEqual( - self.matcher.queues('scheduler'), - [('scheduler.controller1', 'controller1')]) diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py index de1673839..cc54ee5a7 100644 --- a/oslo_messaging/tests/functional/utils.py +++ b/oslo_messaging/tests/functional/utils.py @@ -125,13 +125,8 @@ class RpcServerGroupFixture(fixtures.Fixture): # NOTE(sileht): topic and servier_name must be uniq # to be able to run all tests in parallel self.topic = topic or str(uuid.uuid4()) - if self.url.startswith('zmq'): - # NOTE(viktors): We need to pass correct hots name to the to - # get_tcp_.*() methods. Should we use nameserver here? - self.names = names or [cfg.CONF.rpc_zmq_host for i in range(3)] - else: - self.names = names or ["server_%i_%s" % (i, uuid.uuid4()) - for i in range(3)] + self.names = names or ["server_%i_%s" % (i, str(uuid.uuid4())[:8]) + for i in range(3)] self.exchange = exchange self.targets = [self._target(server=n) for n in self.names] self.use_fanout_ctrl = use_fanout_ctrl diff --git a/oslo_messaging/tests/test_opts.py b/oslo_messaging/tests/test_opts.py index d1c75a0bc..e170eff9e 100644 --- a/oslo_messaging/tests/test_opts.py +++ b/oslo_messaging/tests/test_opts.py @@ -29,11 +29,10 @@ class OptsTestCase(test_utils.BaseTestCase): super(OptsTestCase, self).setUp() def _test_list_opts(self, result): - self.assertEqual(6, len(result)) + self.assertEqual(5, len(result)) groups = [g for (g, l) in result] self.assertIn(None, groups) - self.assertIn('matchmaker_ring', groups) self.assertIn('matchmaker_redis', groups) self.assertIn('oslo_messaging_amqp', groups) self.assertIn('oslo_messaging_rabbit', groups) diff --git a/setup.cfg b/setup.cfg index 17c228982..ce73f1a0b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -57,9 +57,8 @@ oslo.messaging.notify.drivers = oslo.messaging.zmq.matchmaker = # Matchmakers for ZeroMQ - redis = oslo_messaging._drivers.matchmaker_redis:MatchMakerRedis - ring = oslo_messaging._drivers.matchmaker_ring:MatchMakerRing - local = oslo_messaging._drivers.matchmaker:MatchMakerLocalhost + dummy = oslo_messaging._drivers.zmq_driver.matchmaker.base:DummyMatchMaker + redis = oslo_messaging._drivers.zmq_driver.matchmaker.matchmaker_redis:RedisMatchMaker oslo.config.opts = oslo.messaging = oslo_messaging.opts:list_opts diff --git a/tox.ini b/tox.ini index d91508bcf..7bc79ce52 100644 --- a/tox.ini +++ b/tox.ini @@ -43,8 +43,9 @@ commands = {toxinidir}/setup-test-env-qpid.sh python setup.py testr --slowest -- [testenv:py27-func-zeromq] commands = {toxinidir}/setup-test-env-zmq.sh python -m testtools.run \ oslo_messaging.tests.functional.test_functional.CallTestCase.test_exception \ - oslo_messaging.tests.functional.test_functional.CallTestCase.test_timeout -# commands = {toxinidir}/setup-test-env-zmq.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' + oslo_messaging.tests.functional.test_functional.CallTestCase.test_timeout \ + oslo_messaging.tests.functional.test_functional.CallTestCase.test_specific_server \ + oslo_messaging.tests.functional.test_functional.CastTestCase.test_specific_server [flake8] show-source = True