diff --git a/nova/rpc/matchmaker.py b/nova/rpc/matchmaker.py new file mode 100644 index 00000000..f59e2555 --- /dev/null +++ b/nova/rpc/matchmaker.py @@ -0,0 +1,257 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 Cloudscaling Group, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +The MatchMaker classes should except a Topic or Fanout exchange key and +return keys for direct exchanges, per (approximate) AMQP parlance. +""" + +import contextlib +import itertools +import json +import logging + +from nova.openstack.common import cfg + + +matchmaker_opts = [ + # Matchmaker ring file + cfg.StrOpt('matchmaker_ringfile', + default='/etc/nova/matchmaker_ring.json', + help='Matchmaker ring file (JSON)'), +] + +CONF = cfg.CONF +CONF.register_opts(matchmaker_opts) +LOG = logging.getLogger(__name__) +contextmanager = contextlib.contextmanager + + +class MatchMakerException(Exception): + """Signified a match could not be found.""" + message = _("Match not found by MatchMaker.") + + +class Exchange(object): + """ + Implements lookups. + Subclass this to support hashtables, dns, etc. + """ + def __init__(self): + pass + + def run(self, key): + raise NotImplementedError() + + +class Binding(object): + """ + A binding on which to perform a lookup. + """ + def __init__(self): + pass + + def test(self, key): + raise NotImplementedError() + + +class MatchMakerBase(object): + """Match Maker Base Class.""" + + def __init__(self): + # Array of tuples. Index [2] toggles negation, [3] is last-if-true + self.bindings = [] + + def add_binding(self, binding, rule, last=True): + self.bindings.append((binding, rule, False, last)) + + #NOTE(ewindisch): kept the following method in case we implement the + # underlying support. + #def add_negate_binding(self, binding, rule, last=True): + # self.bindings.append((binding, rule, True, last)) + + def queues(self, key): + workers = [] + + # bit is for negate bindings - if we choose to implement it. + # last stops processing rules if this matches. + for (binding, exchange, bit, last) in self.bindings: + if binding.test(key): + workers.extend(exchange.run(key)) + + # Support last. + if last: + return workers + return workers + + +class DirectBinding(Binding): + """ + Specifies a host in the key via a '.' character + Although dots are used in the key, the behavior here is + that it maps directly to a host, thus direct. + """ + def test(self, key): + if '.' in key: + return True + return False + + +class TopicBinding(Binding): + """ + Where a 'bare' key without dots. + AMQP generally considers topic exchanges to be those *with* dots, + but we deviate here in terminology as the behavior here matches + that of a topic exchange (whereas where there are dots, behavior + matches that of a direct exchange. + """ + def test(self, key): + if '.' not in key: + return True + return False + + +class FanoutBinding(Binding): + """Match on fanout keys, where key starts with 'fanout.' string.""" + def test(self, key): + if key.startswith('fanout~'): + return True + return False + + +class StubExchange(Exchange): + """Exchange that does nothing.""" + def run(self, key): + return [(key, None)] + + +class RingExchange(Exchange): + """ + Match Maker where hosts are loaded from a static file containing + a hashmap (JSON formatted). + + __init__ takes optional ring dictionary argument, otherwise + loads the ringfile from CONF.mathcmaker_ringfile. + """ + def __init__(self, ring=None): + super(RingExchange, self).__init__() + + if ring: + self.ring = ring + else: + fh = open(CONF.matchmaker_ringfile, 'r') + self.ring = json.load(fh) + fh.close() + + self.ring0 = {} + for k in self.ring.keys(): + self.ring0[k] = itertools.cycle(self.ring[k]) + + def _ring_has(self, key): + if key in self.ring0: + return True + return False + + +class RoundRobinRingExchange(RingExchange): + """A Topic Exchange based on a hashmap.""" + def __init__(self, ring=None): + super(RoundRobinRingExchange, self).__init__(ring) + + def run(self, key): + if not self._ring_has(key): + LOG.warn( + _("No key defining hosts for topic '%s', " + "see ringfile") % (key, ) + ) + return [] + host = next(self.ring0[key]) + return [(key + '.' + host, host)] + + +class FanoutRingExchange(RingExchange): + """Fanout Exchange based on a hashmap.""" + def __init__(self, ring=None): + super(FanoutRingExchange, self).__init__(ring) + + def run(self, key): + # Assume starts with "fanout~", strip it for lookup. + nkey = key.split('fanout~')[1:][0] + if not self._ring_has(nkey): + LOG.warn( + _("No key defining hosts for topic '%s', " + "see ringfile") % (nkey, ) + ) + return [] + return map(lambda x: (key + '.' + x, x), self.ring[nkey]) + + +class LocalhostExchange(Exchange): + """Exchange where all direct topics are local.""" + def __init__(self): + super(Exchange, self).__init__() + + def run(self, key): + return [(key.split('.')[0] + '.localhost', 'localhost')] + + +class DirectExchange(Exchange): + """ + Exchange where all topic keys are split, sending to second half. + i.e. "compute.host" sends a message to "compute" running on "host" + """ + def __init__(self): + super(Exchange, self).__init__() + + def run(self, key): + b, e = key.split('.', 1) + return [(b, e)] + + +class MatchMakerRing(MatchMakerBase): + """ + Match Maker where hosts are loaded from a static hashmap. + """ + def __init__(self, ring=None): + super(MatchMakerRing, self).__init__() + self.add_binding(FanoutBinding(), FanoutRingExchange(ring)) + self.add_binding(DirectBinding(), DirectExchange()) + self.add_binding(TopicBinding(), RoundRobinRingExchange(ring)) + + +class MatchMakerLocalhost(MatchMakerBase): + """ + Match Maker where all bare topics resolve to localhost. + Useful for testing. + """ + def __init__(self): + super(MatchMakerLocalhost, self).__init__() + self.add_binding(FanoutBinding(), LocalhostExchange()) + self.add_binding(DirectBinding(), DirectExchange()) + self.add_binding(TopicBinding(), LocalhostExchange()) + + +class MatchMakerStub(MatchMakerBase): + """ + Match Maker where topics are untouched. + Useful for testing, or for AMQP/brokered queues. + Will not work where knowledge of hosts is known (i.e. zeromq) + """ + def __init__(self): + super(MatchMakerLocalhost, self).__init__() + + self.add_binding(FanoutBinding(), StubExchange()) + self.add_binding(DirectBinding(), StubExchange()) + self.add_binding(TopicBinding(), StubExchange()) diff --git a/nova/tests/rpc/test_matchmaker.py b/nova/tests/rpc/test_matchmaker.py new file mode 100644 index 00000000..f7bffa67 --- /dev/null +++ b/nova/tests/rpc/test_matchmaker.py @@ -0,0 +1,58 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012 Cloudscaling Group, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from nova import log as logging +from nova.rpc import matchmaker +from nova import test + +LOG = logging.getLogger(__name__) + + +class _MatchMakerTestCase(test.TestCase): + def test_valid_host_matches(self): + queues = self.driver.queues(self.topic) + matched_hosts = map(lambda x: x[1], queues) + + for host in matched_hosts: + self.assertIn(host, self.hosts) + + def test_fanout_host_matches(self): + """For known hosts, see if they're in fanout.""" + queues = self.driver.queues("fanout~" + self.topic) + matched_hosts = map(lambda x: x[1], queues) + + LOG.info("Received result from matchmaker: %s", queues) + for host in self.hosts: + self.assertIn(host, matched_hosts) + + +class MatchMakerFileTestCase(_MatchMakerTestCase): + def setUp(self): + self.topic = "test" + self.hosts = ['hello', 'world', 'foo', 'bar', 'baz'] + ring = { + self.topic: self.hosts + } + self.driver = matchmaker.MatchMakerRing(ring) + super(MatchMakerFileTestCase, self).setUp() + + +class MatchMakerLocalhostTestCase(_MatchMakerTestCase): + def setUp(self): + self.driver = matchmaker.MatchMakerLocalhost() + self.topic = "test" + self.hosts = ['localhost'] + super(MatchMakerLocalhostTestCase, self).setUp()