ZMQ: Initial matchmaker implementation

This patch replaces the old outdated matchmakers and replace it into the
new ones.

Call/Cast test_specific_server() functional tests passes now.

Change-Id: I8635396110d30d26812f39b242fbbabd1a0feaaa
This commit is contained in:
Victor Sergeyev 2015-07-10 16:45:20 +03:00
parent ebcadf3d5e
commit 9e4831c022
22 changed files with 235 additions and 743 deletions

View File

@ -17,6 +17,7 @@ import pprint
import socket
from oslo_config import cfg
from stevedore import driver
from oslo_messaging._drivers import base
from oslo_messaging._drivers import common as rpc_common
@ -39,7 +40,7 @@ zmq_opts = [
# The module.Class to use for matchmaking.
cfg.StrOpt(
'rpc_zmq_matchmaker',
default='local',
default='dummy',
help='MatchMaker driver.',
),
@ -97,7 +98,11 @@ class ZmqDriver(base.BaseDriver):
self.conf = conf
self.server = None
self.client = None
self.matchmaker = None
self.matchmaker = driver.DriverManager(
'oslo.messaging.zmq.matchmaker',
self.conf.rpc_zmq_matchmaker,
).driver(self.conf)
super(ZmqDriver, self).__init__(conf, url, default_exchange,
allowed_remote_exmods)

View File

@ -1,321 +0,0 @@
# Copyright 2011 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The MatchMaker classes should except a Topic or Fanout exchange key and
return keys for direct exchanges, per (approximate) AMQP parlance.
"""
import contextlib
import logging
import eventlet
from oslo_config import cfg
from oslo_messaging._i18n import _
matchmaker_opts = [
cfg.IntOpt('matchmaker_heartbeat_freq',
default=300,
help='Heartbeat frequency.'),
cfg.IntOpt('matchmaker_heartbeat_ttl',
default=600,
help='Heartbeat time-to-live.'),
]
CONF = cfg.CONF
CONF.register_opts(matchmaker_opts)
LOG = logging.getLogger(__name__)
contextmanager = contextlib.contextmanager
class MatchMakerException(Exception):
"""Signified a match could not be found."""
message = _("Match not found by MatchMaker.")
class Exchange(object):
"""Implements lookups.
Subclass this to support hashtables, dns, etc.
"""
def __init__(self):
pass
def run(self, key):
raise NotImplementedError()
class Binding(object):
"""A binding on which to perform a lookup."""
def __init__(self):
pass
def test(self, key):
raise NotImplementedError()
class MatchMakerBase(object):
"""Match Maker Base Class.
Build off HeartbeatMatchMakerBase if building a heartbeat-capable
MatchMaker.
"""
def __init__(self):
# Array of tuples. Index [2] toggles negation, [3] is last-if-true
self.bindings = []
self.no_heartbeat_msg = _('Matchmaker does not implement '
'registration or heartbeat.')
def register(self, key, host):
"""Register a host on a backend.
Heartbeats, if applicable, may keepalive registration.
"""
pass
def ack_alive(self, key, host):
"""Acknowledge that a key.host is alive.
Used internally for updating heartbeats, but may also be used
publicly to acknowledge a system is alive (i.e. rpc message
successfully sent to host)
"""
pass
def is_alive(self, topic, host):
"""Checks if a host is alive."""
pass
def expire(self, topic, host):
"""Explicitly expire a host's registration."""
pass
def send_heartbeats(self):
"""Send all heartbeats.
Use start_heartbeat to spawn a heartbeat greenthread,
which loops this method.
"""
pass
def unregister(self, key, host):
"""Unregister a topic."""
pass
def start_heartbeat(self):
"""Spawn heartbeat greenthread."""
pass
def stop_heartbeat(self):
"""Destroys the heartbeat greenthread."""
pass
def add_binding(self, binding, rule, last=True):
self.bindings.append((binding, rule, False, last))
# NOTE(ewindisch): kept the following method in case we implement the
# underlying support.
# def add_negate_binding(self, binding, rule, last=True):
# self.bindings.append((binding, rule, True, last))
def queues(self, key):
workers = []
# bit is for negate bindings - if we choose to implement it.
# last stops processing rules if this matches.
for (binding, exchange, bit, last) in self.bindings:
if binding.test(key):
workers.extend(exchange.run(key))
# Support last.
if last:
return workers
return workers
class HeartbeatMatchMakerBase(MatchMakerBase):
"""Base for a heart-beat capable MatchMaker.
Provides common methods for registering, unregistering, and maintaining
heartbeats.
"""
def __init__(self):
self.hosts = set()
self._heart = None
self.host_topic = {}
super(HeartbeatMatchMakerBase, self).__init__()
def send_heartbeats(self):
"""Send all heartbeats.
Use start_heartbeat to spawn a heartbeat greenthread,
which loops this method.
"""
for key, host in self.host_topic.keys():
self.ack_alive(key, host)
def ack_alive(self, key, host):
"""Acknowledge that a host.topic is alive.
Used internally for updating heartbeats, but may also be used
publicly to acknowledge a system is alive (i.e. rpc message
successfully sent to host)
"""
raise NotImplementedError("Must implement ack_alive")
def backend_register(self, key, host):
"""Implements registration logic.
Called by register(self,key,host)
"""
raise NotImplementedError("Must implement backend_register")
def backend_unregister(self, key, key_host):
"""Implements de-registration logic.
Called by unregister(self,key,host)
"""
raise NotImplementedError("Must implement backend_unregister")
def register(self, key, host):
"""Register a host on a backend.
Heartbeats, if applicable, may keepalive registration.
"""
self.hosts.add(host)
self.host_topic[(key, host)] = host
key_host = '.'.join((key, host))
self.backend_register(key, key_host)
self.ack_alive(key, host)
def unregister(self, key, host):
"""Unregister a topic."""
if (key, host) in self.host_topic:
del self.host_topic[(key, host)]
self.hosts.discard(host)
self.backend_unregister(key, '.'.join((key, host)))
LOG.info(_("Matchmaker unregistered: %(key)s, %(host)s"),
{'key': key, 'host': host})
def start_heartbeat(self):
"""Implementation of MatchMakerBase.start_heartbeat.
Launches greenthread looping send_heartbeats(),
yielding for CONF.matchmaker_heartbeat_freq seconds
between iterations.
"""
if not self.hosts:
raise MatchMakerException(
_("Register before starting heartbeat."))
def do_heartbeat():
while True:
self.send_heartbeats()
eventlet.sleep(CONF.matchmaker_heartbeat_freq)
self._heart = eventlet.spawn(do_heartbeat)
def stop_heartbeat(self):
"""Destroys the heartbeat greenthread."""
if self._heart:
self._heart.kill()
class DirectBinding(Binding):
"""Specifies a host in the key via a '.' character.
Although dots are used in the key, the behavior here is
that it maps directly to a host, thus direct.
"""
def test(self, key):
return '.' in key
class TopicBinding(Binding):
"""Where a 'bare' key without dots.
AMQP generally considers topic exchanges to be those *with* dots,
but we deviate here in terminology as the behavior here matches
that of a topic exchange (whereas where there are dots, behavior
matches that of a direct exchange.
"""
def test(self, key):
return '.' not in key
class FanoutBinding(Binding):
"""Match on fanout keys, where key starts with 'fanout.' string."""
def test(self, key):
return key.startswith('fanout~')
class StubExchange(Exchange):
"""Exchange that does nothing."""
def run(self, key):
return [(key, None)]
class LocalhostExchange(Exchange):
"""Exchange where all direct topics are local."""
def __init__(self, host='localhost'):
self.host = host
super(Exchange, self).__init__()
def run(self, key):
return [('.'.join((key.split('.')[0], self.host)), self.host)]
class DirectExchange(Exchange):
"""Exchange where all topic keys are split, sending to second half.
i.e. "compute.host" sends a message to "compute.host" running on "host"
"""
def __init__(self):
super(Exchange, self).__init__()
def run(self, key):
e = key.split('.', 1)[1]
return [(key, e)]
class MatchMakerLocalhost(MatchMakerBase):
"""Match Maker where all bare topics resolve to localhost.
Useful for testing.
"""
def __init__(self, host='localhost'):
super(MatchMakerLocalhost, self).__init__()
self.add_binding(FanoutBinding(), LocalhostExchange(host))
self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), LocalhostExchange(host))
class MatchMakerStub(MatchMakerBase):
"""Match Maker where topics are untouched.
Useful for testing, or for AMQP/brokered queues.
Will not work where knowledge of hosts is known (i.e. zeromq)
"""
def __init__(self):
super(MatchMakerStub, self).__init__()
self.add_binding(FanoutBinding(), StubExchange())
self.add_binding(DirectBinding(), StubExchange())
self.add_binding(TopicBinding(), StubExchange())

View File

@ -1,145 +0,0 @@
# Copyright 2013 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The MatchMaker classes should accept a Topic or Fanout exchange key and
return keys for direct exchanges, per (approximate) AMQP parlance.
"""
from oslo_config import cfg
from oslo_utils import importutils
from oslo_messaging._drivers import matchmaker as mm_common
redis = importutils.try_import('redis')
matchmaker_redis_opts = [
cfg.StrOpt('host',
default='127.0.0.1',
help='Host to locate redis.'),
cfg.IntOpt('port',
default=6379,
help='Use this port to connect to redis host.'),
cfg.StrOpt('password',
help='Password for Redis server (optional).'),
]
CONF = cfg.CONF
opt_group = cfg.OptGroup(name='matchmaker_redis',
title='Options for Redis-based MatchMaker')
CONF.register_group(opt_group)
CONF.register_opts(matchmaker_redis_opts, opt_group)
class RedisExchange(mm_common.Exchange):
def __init__(self, matchmaker):
self.matchmaker = matchmaker
self.redis = matchmaker.redis
super(RedisExchange, self).__init__()
class RedisTopicExchange(RedisExchange):
"""Exchange where all topic keys are split, sending to second half.
i.e. "compute.host" sends a message to "compute" running on "host"
"""
def run(self, topic):
while True:
member_name = self.redis.srandmember(topic)
if not member_name:
# If this happens, there are no
# longer any members.
break
if not self.matchmaker.is_alive(topic, member_name):
continue
host = member_name.split('.', 1)[1]
return [(member_name, host)]
return []
class RedisFanoutExchange(RedisExchange):
"""Return a list of all hosts."""
def run(self, topic):
topic = topic.split('~', 1)[1]
hosts = self.redis.smembers(topic)
good_hosts = filter(
lambda host: self.matchmaker.is_alive(topic, host), hosts)
return [(x, x.split('.', 1)[1]) for x in good_hosts]
class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase):
"""MatchMaker registering and looking-up hosts with a Redis server."""
def __init__(self):
super(MatchMakerRedis, self).__init__()
if not redis:
raise ImportError("Failed to import module redis.")
self.redis = redis.StrictRedis(
host=CONF.matchmaker_redis.host,
port=CONF.matchmaker_redis.port,
password=CONF.matchmaker_redis.password)
self.add_binding(mm_common.FanoutBinding(), RedisFanoutExchange(self))
self.add_binding(mm_common.DirectBinding(), mm_common.DirectExchange())
self.add_binding(mm_common.TopicBinding(), RedisTopicExchange(self))
def ack_alive(self, key, host):
topic = "%s.%s" % (key, host)
if not self.redis.expire(topic, CONF.matchmaker_heartbeat_ttl):
# If we could not update the expiration, the key
# might have been pruned. Re-register, creating a new
# key in Redis.
self.register(key, host)
def is_alive(self, topic, host):
# After redis 2.8, if the specialized key doesn't exist,
# TTL fuction would return -2. If key exists,
# but doesn't have expiration associated,
# TTL func would return -1. For more information,
# please visit http://redis.io/commands/ttl
if self.redis.ttl(host) == -2:
self.expire(topic, host)
return False
return True
def expire(self, topic, host):
with self.redis.pipeline() as pipe:
pipe.multi()
pipe.delete(host)
pipe.srem(topic, host)
pipe.execute()
def backend_register(self, key, key_host):
with self.redis.pipeline() as pipe:
pipe.multi()
pipe.sadd(key, key_host)
# No value is needed, we just
# care if it exists. Sets aren't viable
# because only keys can expire.
pipe.sadd(key_host, '')
pipe.execute()
def backend_unregister(self, key, key_host):
with self.redis.pipeline() as pipe:
pipe.multi()
pipe.srem(key, key_host)
pipe.delete(key_host)
pipe.execute()

View File

@ -1,105 +0,0 @@
# Copyright 2011-2013 Cloudscaling Group, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The MatchMaker classes should except a Topic or Fanout exchange key and
return keys for direct exchanges, per (approximate) AMQP parlance.
"""
import itertools
import json
import logging
from oslo_config import cfg
from oslo_messaging._drivers import matchmaker as mm
from oslo_messaging._i18n import _
matchmaker_opts = [
# Matchmaker ring file
cfg.StrOpt('ringfile',
deprecated_name='matchmaker_ringfile',
deprecated_group='DEFAULT',
default='/etc/oslo/matchmaker_ring.json',
help='Matchmaker ring file (JSON).'),
]
CONF = cfg.CONF
CONF.register_opts(matchmaker_opts, 'matchmaker_ring')
LOG = logging.getLogger(__name__)
class RingExchange(mm.Exchange):
"""Match Maker where hosts are loaded from a static JSON formatted file.
__init__ takes optional ring dictionary argument, otherwise
loads the ringfile from CONF.mathcmaker_ringfile.
"""
def __init__(self, ring=None):
super(RingExchange, self).__init__()
if ring:
self.ring = ring
else:
fh = open(CONF.matchmaker_ring.ringfile, 'r')
self.ring = json.load(fh)
fh.close()
self.ring0 = {}
for k in self.ring.keys():
self.ring0[k] = itertools.cycle(self.ring[k])
def _ring_has(self, key):
return key in self.ring0
class RoundRobinRingExchange(RingExchange):
"""A Topic Exchange based on a hashmap."""
def __init__(self, ring=None):
super(RoundRobinRingExchange, self).__init__(ring)
def run(self, key):
if not self._ring_has(key):
LOG.warn(
_("No key defining hosts for topic '%s', "
"see ringfile"), key
)
return []
host = next(self.ring0[key])
return [(key + '.' + host, host)]
class FanoutRingExchange(RingExchange):
"""Fanout Exchange based on a hashmap."""
def __init__(self, ring=None):
super(FanoutRingExchange, self).__init__(ring)
def run(self, key):
# Assume starts with "fanout~", strip it for lookup.
nkey = key.split('fanout~')[1:][0]
if not self._ring_has(nkey):
LOG.warn(
_("No key defining hosts for topic '%s', "
"see ringfile"), nkey
)
return []
return map(lambda x: (key + '.' + x, x), self.ring[nkey])
class MatchMakerRing(mm.MatchMakerBase):
"""Match Maker where hosts are loaded from a static hashmap."""
def __init__(self, ring=None):
super(MatchMakerRing, self).__init__()
self.add_binding(mm.FanoutBinding(), FanoutRingExchange(ring))
self.add_binding(mm.DirectBinding(), mm.DirectExchange())
self.add_binding(mm.TopicBinding(), RoundRobinRingExchange(ring))

View File

@ -0,0 +1,70 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import collections
import logging
import six
from oslo_messaging._i18n import _LI, _LW
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class MatchMakerBase(object):
def __init__(self, conf, *args, **kwargs):
super(MatchMakerBase, self).__init__(*args, **kwargs)
self.conf = conf
@abc.abstractmethod
def register(self, topic, hostname):
"""Register topic on nameserver"""
@abc.abstractmethod
def get_hosts(self, topic):
"""Get hosts from nameserver by topic"""
def get_single_host(self, topic):
"""Get a single host by topic"""
hosts = self.get_hosts(topic)
if len(hosts) == 0:
LOG.warning(_LW("No hosts were found for topic %s. Using "
"localhost") % topic)
return "localhost"
elif len(hosts) == 1:
LOG.info(_LI("A single host found for topic %s.") % topic)
return hosts[0]
else:
LOG.warning(_LW("Multiple hosts were found for topic %s. Using "
"the first one.") % topic)
return hosts[0]
class DummyMatchMaker(MatchMakerBase):
def __init__(self, conf, *args, **kwargs):
super(DummyMatchMaker, self).__init__(conf, *args, **kwargs)
self._cache = collections.defaultdict(list)
def register(self, topic, hostname):
if hostname not in self._cache[topic]:
self._cache[topic].append(hostname)
def get_hosts(self, topic):
return self._cache[topic]

View File

@ -0,0 +1,55 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from oslo_config import cfg
import redis
from oslo_messaging._drivers.zmq_driver.matchmaker import base
LOG = logging.getLogger(__name__)
matchmaker_redis_opts = [
cfg.StrOpt('host',
default='127.0.0.1',
help='Host to locate redis.'),
cfg.IntOpt('port',
default=6379,
help='Use this port to connect to redis host.'),
cfg.StrOpt('password',
default='',
secret=True,
help='Password for Redis server (optional).'),
]
class RedisMatchMaker(base.MatchMakerBase):
def __init__(self, conf, *args, **kwargs):
super(RedisMatchMaker, self).__init__(conf, *args, **kwargs)
self._redis = redis.StrictRedis(
host=self.conf.matchmaker_redis.host,
port=self.conf.matchmaker_redis.port,
password=self.conf.matchmaker_redis.password,
)
def register(self, topic, hostname):
if hostname not in self.get_hosts(topic):
self._redis.lpush(topic, hostname)
def get_hosts(self, topic):
return self._redis.lrange(topic, 0, -1)[::-1]

View File

@ -30,8 +30,9 @@ zmq = zmq_async.import_zmq()
class CallRequest(Request):
def __init__(self, conf, target, context, message, timeout=None,
retry=None, allowed_remote_exmods=None):
retry=None, allowed_remote_exmods=None, matchmaker=None):
self.allowed_remote_exmods = allowed_remote_exmods or []
self.matchmaker = matchmaker
try:
self.zmq_context = zmq.Context()
@ -41,8 +42,9 @@ class CallRequest(Request):
zmq_serializer.CALL_TYPE,
timeout, retry)
self.host = self.matchmaker.get_single_host(self.topic.topic)
self.connect_address = zmq_topic.get_tcp_address_call(conf,
self.topic)
self.host)
LOG.info(_LI("Connecting REQ to %s") % self.connect_address)
self.socket.connect(self.connect_address)
except zmq.ZMQError as e:

View File

@ -58,7 +58,8 @@ class DealerCastPublisher(zmq_cast_publisher.CastPublisherBase):
def cast(self, target, context,
message, timeout=None, retry=None):
topic = zmq_topic.Topic.from_target(self.conf, target)
connect_address = zmq_topic.get_tcp_address_call(self.conf, topic)
host = self.matchmaker.get_single_host(topic.topic)
connect_address = zmq_topic.get_tcp_address_call(self.conf, host)
dealer_socket = self._create_socket(connect_address)
request = CastRequest(self.conf, target, context, message,
dealer_socket, connect_address, timeout, retry)
@ -71,6 +72,7 @@ class DealerCastPublisher(zmq_cast_publisher.CastPublisherBase):
dealer_socket = self.zmq_context.socket(zmq.DEALER)
LOG.info(_LI("Connecting DEALER to %s") % address)
dealer_socket.connect(address)
return dealer_socket
except zmq.ZMQError:
LOG.error(_LE("Failed connecting DEALER to %s") % address)
return dealer_socket
raise

View File

@ -21,6 +21,7 @@ class ZmqClient(object):
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
self.conf = conf
self.matchmaker = matchmaker
self.allowed_remote_exmods = allowed_remote_exmods or []
self.cast_publisher = zmq_cast_dealer.DealerCastPublisher(conf,
matchmaker)
@ -28,7 +29,7 @@ class ZmqClient(object):
def call(self, target, context, message, timeout=None, retry=None):
request = zmq_call_request.CallRequest(
self.conf, target, context, message, timeout, retry,
self.allowed_remote_exmods)
self.allowed_remote_exmods, self.matchmaker)
return request()
def cast(self, target, context, message, timeout=None, retry=None):

View File

@ -82,7 +82,7 @@ class CallResponder(zmq_base_consumer.ConsumerBase):
self.poller)
return incoming
except zmq.ZMQError as e:
LOG.error(_LE("Receiving message failed ... {}"), e)
LOG.error(_LE("Receiving message failed ... %s") % str(e))
def listen(self, target):
topic = topic_utils.Topic.from_target(self.conf, target)

View File

@ -31,6 +31,7 @@ class ZmqServer(base.Listener):
self.conf = conf
self.context = zmq.Context()
self.poller = zmq_async.get_reply_poller()
self.matchmaker = matchmaker
self.call_resp = zmq_call_responder.CallResponder(self, conf,
self.poller,
self.context)
@ -50,6 +51,9 @@ class ZmqServer(base.Listener):
def listen(self, target):
LOG.info("[Server] Listen to Target %s" % target)
self.matchmaker.register(topic=target.topic,
hostname=self.conf.rpc_zmq_host)
if target.fanout:
self.fanout_resp.listen(target)
else:

View File

@ -21,8 +21,8 @@ def get_tcp_bind_address(port):
return "tcp://*:%s" % port
def get_tcp_address_call(conf, topic):
return "tcp://%s:%s" % (topic.server, conf.rpc_zmq_port)
def get_tcp_address_call(conf, host):
return "tcp://%s:%s" % (host, conf.rpc_zmq_port)
def get_ipc_address_cast(conf, topic):

View File

@ -59,7 +59,8 @@ class ConfFixture(fixtures.Fixture):
_import_opts(self.conf,
'oslo_messaging._drivers.impl_zmq', 'zmq_opts')
_import_opts(self.conf,
'oslo_messaging._drivers.matchmaker_redis',
'oslo_messaging._drivers.zmq_driver.'
'matchmaker.matchmaker_redis',
'matchmaker_redis_opts',
'matchmaker_redis')
_import_opts(self.conf, 'oslo_messaging.rpc.client', '_client_opts')

View File

@ -25,10 +25,8 @@ from oslo_messaging._drivers import base as drivers_base
from oslo_messaging._drivers import impl_qpid
from oslo_messaging._drivers import impl_rabbit
from oslo_messaging._drivers import impl_zmq
from oslo_messaging._drivers import matchmaker
from oslo_messaging._drivers import matchmaker_redis
from oslo_messaging._drivers import matchmaker_ring
from oslo_messaging._drivers.protocols.amqp import opts as amqp_opts
from oslo_messaging._drivers.zmq_driver.matchmaker import matchmaker_redis
from oslo_messaging._executors import base
from oslo_messaging.notify import notifier
from oslo_messaging.rpc import client
@ -37,7 +35,6 @@ from oslo_messaging import transport
_global_opt_lists = [
drivers_base.base_opts,
impl_zmq.zmq_opts,
matchmaker.matchmaker_opts,
base._pool_opts,
notifier._notifier_opts,
client._client_opts,
@ -47,7 +44,6 @@ _global_opt_lists = [
_opts = [
(None, list(itertools.chain(*_global_opt_lists))),
('matchmaker_redis', matchmaker_redis.matchmaker_redis_opts),
('matchmaker_ring', matchmaker_ring.matchmaker_opts),
('oslo_messaging_amqp', amqp_opts.amqp1_opts),
('oslo_messaging_rabbit', list(itertools.chain(amqp.amqp_opts,
impl_rabbit.rabbit_opts))),

View File

@ -0,0 +1,75 @@
# Copyright 2014 Canonical, Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from stevedore import driver
import testscenarios
from oslo_messaging.tests import utils as test_utils
load_tests = testscenarios.load_tests_apply_scenarios
class TestImplMatchmaker(test_utils.BaseTestCase):
scenarios = [
("dummy", {"rpc_zmq_matchmaker": "dummy"}),
("redis", {"rpc_zmq_matchmaker": "redis"}),
]
def setUp(self):
super(TestImplMatchmaker, self).setUp()
self.test_matcher = driver.DriverManager(
'oslo.messaging.zmq.matchmaker',
self.rpc_zmq_matchmaker,
).driver(self.conf)
if self.rpc_zmq_matchmaker == "redis":
self.addCleanup(self.test_matcher._redis.flushdb)
self.topic = "test_topic"
self.host1 = b"test_host1"
self.host2 = b"test_host2"
def test_register(self):
self.test_matcher.register(self.topic, self.host1)
self.assertEqual(self.test_matcher.get_hosts(self.topic), [self.host1])
self.assertEqual(self.test_matcher.get_single_host(self.topic),
self.host1)
def test_register_two_hosts(self):
self.test_matcher.register(self.topic, self.host1)
self.test_matcher.register(self.topic, self.host2)
self.assertEqual(self.test_matcher.get_hosts(self.topic),
[self.host1, self.host2])
self.assertIn(self.test_matcher.get_single_host(self.topic),
[self.host1, self.host2])
def test_register_two_same_hosts(self):
self.test_matcher.register(self.topic, self.host1)
self.test_matcher.register(self.topic, self.host1)
self.assertEqual(self.test_matcher.get_hosts(self.topic), [self.host1])
self.assertEqual(self.test_matcher.get_single_host(self.topic),
self.host1)
def test_get_hosts_wrong_topic(self):
self.assertEqual(self.test_matcher.get_hosts("no_such_topic"), [])
def test_get_single_host_wrong_topic(self):
self.assertEqual(self.test_matcher.get_single_host("no_such_topic"),
"localhost")

View File

@ -1,69 +0,0 @@
# Copyright 2014 Canonical, Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import importutils
import testtools
from oslo_messaging.tests import utils as test_utils
# NOTE(jamespage) matchmaker tied directly to eventlet
# which is not yet py3 compatible - skip if import fails
matchmaker = (
importutils.try_import('oslo_messaging._drivers.matchmaker'))
@testtools.skipIf(not matchmaker, "matchmaker/eventlet unavailable")
class MatchmakerTest(test_utils.BaseTestCase):
def test_fanout_binding(self):
matcher = matchmaker.MatchMakerBase()
matcher.add_binding(
matchmaker.FanoutBinding(), matchmaker.DirectExchange())
self.assertEqual(matcher.queues('hello.world'), [])
self.assertEqual(
matcher.queues('fanout~fantasy.unicorn'),
[('fanout~fantasy.unicorn', 'unicorn')])
self.assertEqual(
matcher.queues('fanout~fantasy.pony'),
[('fanout~fantasy.pony', 'pony')])
def test_topic_binding(self):
matcher = matchmaker.MatchMakerBase()
matcher.add_binding(
matchmaker.TopicBinding(), matchmaker.StubExchange())
self.assertEqual(
matcher.queues('hello-world'), [('hello-world', None)])
def test_direct_binding(self):
matcher = matchmaker.MatchMakerBase()
matcher.add_binding(
matchmaker.DirectBinding(), matchmaker.StubExchange())
self.assertEqual(
matcher.queues('hello.server'), [('hello.server', None)])
self.assertEqual(matcher.queues('hello-world'), [])
def test_localhost_match(self):
matcher = matchmaker.MatchMakerLocalhost()
self.assertEqual(
matcher.queues('hello.server'), [('hello.server', 'server')])
# Gets remapped due to localhost exchange
# all bindings default to first match.
self.assertEqual(
matcher.queues('fanout~testing.server'),
[('fanout~testing.localhost', 'localhost')])
self.assertEqual(
matcher.queues('hello-world'),
[('hello-world.localhost', 'localhost')])

View File

@ -1,73 +0,0 @@
# Copyright 2014 Canonical, Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import importutils
import testtools
from oslo_messaging.tests import utils as test_utils
# NOTE(jamespage) matchmaker tied directly to eventlet
# which is not yet py3 compatible - skip if import fails
matchmaker_ring = (
importutils.try_import('oslo_messaging._drivers.matchmaker_ring'))
@testtools.skipIf(not matchmaker_ring, "matchmaker/eventlet unavailable")
class MatchmakerRingTest(test_utils.BaseTestCase):
def setUp(self):
super(MatchmakerRingTest, self).setUp()
self.ring_data = {
"conductor": ["controller1", "node1", "node2", "node3"],
"scheduler": ["controller1", "node1", "node2", "node3"],
"network": ["controller1", "node1", "node2", "node3"],
"cert": ["controller1"],
"console": ["controller1"],
"consoleauth": ["controller1"]}
self.matcher = matchmaker_ring.MatchMakerRing(self.ring_data)
def test_direct(self):
self.assertEqual(
self.matcher.queues('cert.controller1'),
[('cert.controller1', 'controller1')])
self.assertEqual(
self.matcher.queues('conductor.node1'),
[('conductor.node1', 'node1')])
def test_fanout(self):
self.assertEqual(
self.matcher.queues('fanout~conductor'),
[('fanout~conductor.controller1', 'controller1'),
('fanout~conductor.node1', 'node1'),
('fanout~conductor.node2', 'node2'),
('fanout~conductor.node3', 'node3')])
def test_bare_topic(self):
# Round robins through the hosts on the topic
self.assertEqual(
self.matcher.queues('scheduler'),
[('scheduler.controller1', 'controller1')])
self.assertEqual(
self.matcher.queues('scheduler'),
[('scheduler.node1', 'node1')])
self.assertEqual(
self.matcher.queues('scheduler'),
[('scheduler.node2', 'node2')])
self.assertEqual(
self.matcher.queues('scheduler'),
[('scheduler.node3', 'node3')])
# Cycles loop
self.assertEqual(
self.matcher.queues('scheduler'),
[('scheduler.controller1', 'controller1')])

View File

@ -125,13 +125,8 @@ class RpcServerGroupFixture(fixtures.Fixture):
# NOTE(sileht): topic and servier_name must be uniq
# to be able to run all tests in parallel
self.topic = topic or str(uuid.uuid4())
if self.url.startswith('zmq'):
# NOTE(viktors): We need to pass correct hots name to the to
# get_tcp_.*() methods. Should we use nameserver here?
self.names = names or [cfg.CONF.rpc_zmq_host for i in range(3)]
else:
self.names = names or ["server_%i_%s" % (i, uuid.uuid4())
for i in range(3)]
self.names = names or ["server_%i_%s" % (i, str(uuid.uuid4())[:8])
for i in range(3)]
self.exchange = exchange
self.targets = [self._target(server=n) for n in self.names]
self.use_fanout_ctrl = use_fanout_ctrl

View File

@ -29,11 +29,10 @@ class OptsTestCase(test_utils.BaseTestCase):
super(OptsTestCase, self).setUp()
def _test_list_opts(self, result):
self.assertEqual(6, len(result))
self.assertEqual(5, len(result))
groups = [g for (g, l) in result]
self.assertIn(None, groups)
self.assertIn('matchmaker_ring', groups)
self.assertIn('matchmaker_redis', groups)
self.assertIn('oslo_messaging_amqp', groups)
self.assertIn('oslo_messaging_rabbit', groups)

View File

@ -57,9 +57,8 @@ oslo.messaging.notify.drivers =
oslo.messaging.zmq.matchmaker =
# Matchmakers for ZeroMQ
redis = oslo_messaging._drivers.matchmaker_redis:MatchMakerRedis
ring = oslo_messaging._drivers.matchmaker_ring:MatchMakerRing
local = oslo_messaging._drivers.matchmaker:MatchMakerLocalhost
dummy = oslo_messaging._drivers.zmq_driver.matchmaker.base:DummyMatchMaker
redis = oslo_messaging._drivers.zmq_driver.matchmaker.matchmaker_redis:RedisMatchMaker
oslo.config.opts =
oslo.messaging = oslo_messaging.opts:list_opts

View File

@ -43,8 +43,9 @@ commands = {toxinidir}/setup-test-env-qpid.sh python setup.py testr --slowest --
[testenv:py27-func-zeromq]
commands = {toxinidir}/setup-test-env-zmq.sh python -m testtools.run \
oslo_messaging.tests.functional.test_functional.CallTestCase.test_exception \
oslo_messaging.tests.functional.test_functional.CallTestCase.test_timeout
# commands = {toxinidir}/setup-test-env-zmq.sh python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
oslo_messaging.tests.functional.test_functional.CallTestCase.test_timeout \
oslo_messaging.tests.functional.test_functional.CallTestCase.test_specific_server \
oslo_messaging.tests.functional.test_functional.CastTestCase.test_specific_server
[flake8]
show-source = True