Merge "Fix functionnal tests"
This commit is contained in:
		@@ -1,279 +0,0 @@
 | 
			
		||||
#
 | 
			
		||||
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
 | 
			
		||||
#    not use this file except in compliance with the License. You may obtain
 | 
			
		||||
#    a copy of the License at
 | 
			
		||||
#
 | 
			
		||||
#         http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
#
 | 
			
		||||
#    Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | 
			
		||||
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | 
			
		||||
#    License for the specific language governing permissions and limitations
 | 
			
		||||
#    under the License.
 | 
			
		||||
 | 
			
		||||
from oslo import messaging
 | 
			
		||||
 | 
			
		||||
from testtools import matchers
 | 
			
		||||
 | 
			
		||||
from tests.functional.utils import ClientStub
 | 
			
		||||
from tests.functional.utils import IsValidDistributionOf
 | 
			
		||||
from tests.functional.utils import NotificationFixture
 | 
			
		||||
from tests.functional.utils import RpcServerGroupFixture
 | 
			
		||||
from tests.functional.utils import SkipIfNoTransportURL
 | 
			
		||||
from tests.functional.utils import TransportFixture
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class CallTestCase(SkipIfNoTransportURL):
 | 
			
		||||
    def test_specific_server(self):
 | 
			
		||||
        group = self.useFixture(RpcServerGroupFixture(self.url))
 | 
			
		||||
        client = group.client(1)
 | 
			
		||||
        client.append(text='open')
 | 
			
		||||
        self.assertEqual('openstack', client.append(text='stack'))
 | 
			
		||||
        client.add(increment=2)
 | 
			
		||||
        self.assertEqual(12, client.add(increment=10))
 | 
			
		||||
        self.assertEqual(9, client.subtract(increment=3))
 | 
			
		||||
        self.assertEqual('openstack', group.servers[1].endpoint.sval)
 | 
			
		||||
        self.assertEqual(9, group.servers[1].endpoint.ival)
 | 
			
		||||
        for i in [0, 2]:
 | 
			
		||||
            self.assertEqual('', group.servers[i].endpoint.sval)
 | 
			
		||||
            self.assertEqual(0, group.servers[i].endpoint.ival)
 | 
			
		||||
 | 
			
		||||
    def test_server_in_group(self):
 | 
			
		||||
        group = self.useFixture(RpcServerGroupFixture(self.url))
 | 
			
		||||
 | 
			
		||||
        client = group.client()
 | 
			
		||||
        data = [c for c in 'abcdefghijklmn']
 | 
			
		||||
        for i in data:
 | 
			
		||||
            client.append(text=i)
 | 
			
		||||
 | 
			
		||||
        for s in group.servers:
 | 
			
		||||
            self.assertThat(len(s.endpoint.sval), matchers.GreaterThan(0))
 | 
			
		||||
        actual = [[c for c in s.endpoint.sval] for s in group.servers]
 | 
			
		||||
        self.assertThat(actual, IsValidDistributionOf(data))
 | 
			
		||||
 | 
			
		||||
    def test_different_exchanges(self):
 | 
			
		||||
        t = self.useFixture(TransportFixture(self.url))
 | 
			
		||||
        # If the different exchanges are not honoured, then the
 | 
			
		||||
        # teardown may hang unless we broadcast all control messages
 | 
			
		||||
        # to each server
 | 
			
		||||
        group1 = self.useFixture(RpcServerGroupFixture(self.url, transport=t,
 | 
			
		||||
                                                       use_fanout_ctrl=True))
 | 
			
		||||
        group2 = self.useFixture(RpcServerGroupFixture(self.url, exchange="a",
 | 
			
		||||
                                                       transport=t,
 | 
			
		||||
                                                       use_fanout_ctrl=True))
 | 
			
		||||
        group3 = self.useFixture(RpcServerGroupFixture(self.url, exchange="b",
 | 
			
		||||
                                                       transport=t,
 | 
			
		||||
                                                       use_fanout_ctrl=True))
 | 
			
		||||
 | 
			
		||||
        client1 = group1.client(1)
 | 
			
		||||
        data1 = [c for c in 'abcdefghijklmn']
 | 
			
		||||
        for i in data1:
 | 
			
		||||
            client1.append(text=i)
 | 
			
		||||
 | 
			
		||||
        client2 = group2.client()
 | 
			
		||||
        data2 = [c for c in 'opqrstuvwxyz']
 | 
			
		||||
        for i in data2:
 | 
			
		||||
            client2.append(text=i)
 | 
			
		||||
 | 
			
		||||
        actual1 = [[c for c in s.endpoint.sval] for s in group1.servers]
 | 
			
		||||
        self.assertThat(actual1, IsValidDistributionOf(data1))
 | 
			
		||||
        actual1 = [c for c in group1.servers[1].endpoint.sval]
 | 
			
		||||
        self.assertThat([actual1], IsValidDistributionOf(data1))
 | 
			
		||||
        for s in group1.servers:
 | 
			
		||||
            expected = len(data1) if group1.servers.index(s) == 1 else 0
 | 
			
		||||
            self.assertEqual(expected, len(s.endpoint.sval))
 | 
			
		||||
            self.assertEqual(0, s.endpoint.ival)
 | 
			
		||||
 | 
			
		||||
        actual2 = [[c for c in s.endpoint.sval] for s in group2.servers]
 | 
			
		||||
        for s in group2.servers:
 | 
			
		||||
            self.assertThat(len(s.endpoint.sval), matchers.GreaterThan(0))
 | 
			
		||||
            self.assertEqual(0, s.endpoint.ival)
 | 
			
		||||
        self.assertThat(actual2, IsValidDistributionOf(data2))
 | 
			
		||||
 | 
			
		||||
        for s in group3.servers:
 | 
			
		||||
            self.assertEqual(0, len(s.endpoint.sval))
 | 
			
		||||
            self.assertEqual(0, s.endpoint.ival)
 | 
			
		||||
 | 
			
		||||
    def test_timeout(self):
 | 
			
		||||
        transport = self.useFixture(TransportFixture(self.url))
 | 
			
		||||
        target = messaging.Target(topic="no_such_topic")
 | 
			
		||||
        c = ClientStub(transport.transport, target, timeout=1)
 | 
			
		||||
        self.assertThat(c.ping, matchers.raises(messaging.MessagingTimeout))
 | 
			
		||||
 | 
			
		||||
    def test_exception(self):
 | 
			
		||||
        group = self.useFixture(RpcServerGroupFixture(self.url))
 | 
			
		||||
        client = group.client(1)
 | 
			
		||||
        client.add(increment=2)
 | 
			
		||||
        f = lambda: client.subtract(increment=3)
 | 
			
		||||
        self.assertThat(f, matchers.raises(ValueError))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class CastTestCase(SkipIfNoTransportURL):
 | 
			
		||||
    # Note: casts return immediately, so these tests utilise a special
 | 
			
		||||
    # internal sync() cast to ensure prior casts are complete before
 | 
			
		||||
    # making the necessary assertions.
 | 
			
		||||
 | 
			
		||||
    def test_specific_server(self):
 | 
			
		||||
        group = self.useFixture(RpcServerGroupFixture(self.url))
 | 
			
		||||
        client = group.client(1, cast=True)
 | 
			
		||||
        client.append(text='open')
 | 
			
		||||
        client.append(text='stack')
 | 
			
		||||
        client.add(increment=2)
 | 
			
		||||
        client.add(increment=10)
 | 
			
		||||
        group.sync()
 | 
			
		||||
 | 
			
		||||
        self.assertEqual('openstack', group.servers[1].endpoint.sval)
 | 
			
		||||
        self.assertEqual(12, group.servers[1].endpoint.ival)
 | 
			
		||||
        for i in [0, 2]:
 | 
			
		||||
            self.assertEqual('', group.servers[i].endpoint.sval)
 | 
			
		||||
            self.assertEqual(0, group.servers[i].endpoint.ival)
 | 
			
		||||
 | 
			
		||||
    def test_server_in_group(self):
 | 
			
		||||
        group = self.useFixture(RpcServerGroupFixture(self.url))
 | 
			
		||||
        client = group.client(cast=True)
 | 
			
		||||
        for i in range(20):
 | 
			
		||||
            client.add(increment=1)
 | 
			
		||||
        group.sync()
 | 
			
		||||
        total = 0
 | 
			
		||||
        for s in group.servers:
 | 
			
		||||
            ival = s.endpoint.ival
 | 
			
		||||
            self.assertThat(ival, matchers.GreaterThan(0))
 | 
			
		||||
            self.assertThat(ival, matchers.LessThan(20))
 | 
			
		||||
            total += ival
 | 
			
		||||
        self.assertEqual(20, total)
 | 
			
		||||
 | 
			
		||||
    def test_fanout(self):
 | 
			
		||||
        group = self.useFixture(RpcServerGroupFixture(self.url))
 | 
			
		||||
        client = group.client('all', cast=True)
 | 
			
		||||
        client.append(text='open')
 | 
			
		||||
        client.append(text='stack')
 | 
			
		||||
        client.add(increment=2)
 | 
			
		||||
        client.add(increment=10)
 | 
			
		||||
        group.sync(server='all')
 | 
			
		||||
        for s in group.servers:
 | 
			
		||||
            self.assertEqual('openstack', s.endpoint.sval)
 | 
			
		||||
            self.assertEqual(12, s.endpoint.ival)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class NotifyTestCase(SkipIfNoTransportURL):
 | 
			
		||||
    # NOTE(sileht): Each test must not use the same topics
 | 
			
		||||
    # to be run in parallel
 | 
			
		||||
 | 
			
		||||
    def test_simple(self):
 | 
			
		||||
        transport = self.useFixture(TransportFixture(self.url))
 | 
			
		||||
        listener = self.useFixture(NotificationFixture(transport.transport,
 | 
			
		||||
                                                       ['test_simple']))
 | 
			
		||||
        transport.wait()
 | 
			
		||||
        notifier = listener.notifier('abc')
 | 
			
		||||
 | 
			
		||||
        notifier.info({}, 'test', 'Hello World!')
 | 
			
		||||
        event = listener.events.get(timeout=1)
 | 
			
		||||
        self.assertEqual('info', event[0])
 | 
			
		||||
        self.assertEqual('test', event[1])
 | 
			
		||||
        self.assertEqual('Hello World!', event[2])
 | 
			
		||||
        self.assertEqual('abc', event[3])
 | 
			
		||||
 | 
			
		||||
    def test_multiple_topics(self):
 | 
			
		||||
        transport = self.useFixture(TransportFixture(self.url))
 | 
			
		||||
        listener = self.useFixture(NotificationFixture(transport.transport,
 | 
			
		||||
                                                       ['a', 'b']))
 | 
			
		||||
        transport.wait()
 | 
			
		||||
        a = listener.notifier('pub-a', topic='a')
 | 
			
		||||
        b = listener.notifier('pub-b', topic='b')
 | 
			
		||||
 | 
			
		||||
        sent = {
 | 
			
		||||
            'pub-a': [a, 'test-a', 'payload-a'],
 | 
			
		||||
            'pub-b': [b, 'test-b', 'payload-b']
 | 
			
		||||
        }
 | 
			
		||||
        for e in sent.values():
 | 
			
		||||
            e[0].info({}, e[1], e[2])
 | 
			
		||||
 | 
			
		||||
        received = {}
 | 
			
		||||
        while len(received) < len(sent):
 | 
			
		||||
            e = listener.events.get(timeout=1)
 | 
			
		||||
            received[e[3]] = e
 | 
			
		||||
 | 
			
		||||
        for key in received:
 | 
			
		||||
            actual = received[key]
 | 
			
		||||
            expected = sent[key]
 | 
			
		||||
            self.assertEqual('info', actual[0])
 | 
			
		||||
            self.assertEqual(expected[1], actual[1])
 | 
			
		||||
            self.assertEqual(expected[2], actual[2])
 | 
			
		||||
 | 
			
		||||
    def test_multiple_servers(self):
 | 
			
		||||
        transport = self.useFixture(TransportFixture(self.url))
 | 
			
		||||
        listener_a = self.useFixture(NotificationFixture(transport.transport,
 | 
			
		||||
                                                         ['test-topic']))
 | 
			
		||||
        listener_b = self.useFixture(NotificationFixture(transport.transport,
 | 
			
		||||
                                                         ['test-topic']))
 | 
			
		||||
        transport.wait()
 | 
			
		||||
        n = listener_a.notifier('pub')
 | 
			
		||||
 | 
			
		||||
        events_out = [('test-%s' % c, 'payload-%s' % c) for c in 'abcdefgh']
 | 
			
		||||
 | 
			
		||||
        for event_type, payload in events_out:
 | 
			
		||||
            n.info({}, event_type, payload)
 | 
			
		||||
 | 
			
		||||
        events_in = [[(e[1], e[2]) for e in listener_a.get_events()],
 | 
			
		||||
                     [(e[1], e[2]) for e in listener_b.get_events()]]
 | 
			
		||||
 | 
			
		||||
        self.assertThat(events_in, IsValidDistributionOf(events_out))
 | 
			
		||||
        for stream in events_in:
 | 
			
		||||
            self.assertThat(len(stream), matchers.GreaterThan(0))
 | 
			
		||||
 | 
			
		||||
    def test_independent_topics(self):
 | 
			
		||||
        transport = self.useFixture(TransportFixture(self.url))
 | 
			
		||||
        listener_a = self.useFixture(NotificationFixture(transport.transport,
 | 
			
		||||
                                                         ['1']))
 | 
			
		||||
        listener_b = self.useFixture(NotificationFixture(transport.transport,
 | 
			
		||||
                                                         ['2']))
 | 
			
		||||
        transport.wait()
 | 
			
		||||
 | 
			
		||||
        a = listener_a.notifier('pub-1', topic='1')
 | 
			
		||||
        b = listener_b.notifier('pub-2', topic='2')
 | 
			
		||||
 | 
			
		||||
        a_out = [('test-1-%s' % c, 'payload-1-%s' % c) for c in 'abcdefgh']
 | 
			
		||||
        for event_type, payload in a_out:
 | 
			
		||||
            a.info({}, event_type, payload)
 | 
			
		||||
 | 
			
		||||
        b_out = [('test-2-%s' % c, 'payload-2-%s' % c) for c in 'ijklmnop']
 | 
			
		||||
        for event_type, payload in b_out:
 | 
			
		||||
            b.info({}, event_type, payload)
 | 
			
		||||
 | 
			
		||||
        for expected in a_out:
 | 
			
		||||
            actual = listener_a.events.get(timeout=0.5)
 | 
			
		||||
            self.assertEqual('info', actual[0])
 | 
			
		||||
            self.assertEqual(expected[0], actual[1])
 | 
			
		||||
            self.assertEqual(expected[1], actual[2])
 | 
			
		||||
            self.assertEqual('pub-1', actual[3])
 | 
			
		||||
 | 
			
		||||
        for expected in b_out:
 | 
			
		||||
            actual = listener_b.events.get(timeout=0.5)
 | 
			
		||||
            self.assertEqual('info', actual[0])
 | 
			
		||||
            self.assertEqual(expected[0], actual[1])
 | 
			
		||||
            self.assertEqual(expected[1], actual[2])
 | 
			
		||||
            self.assertEqual('pub-2', actual[3])
 | 
			
		||||
 | 
			
		||||
    def test_all_categories(self):
 | 
			
		||||
        transport = self.useFixture(TransportFixture(self.url))
 | 
			
		||||
        listener = self.useFixture(NotificationFixture(
 | 
			
		||||
            transport.transport, ['test_all_categories']))
 | 
			
		||||
        transport.wait()
 | 
			
		||||
        n = listener.notifier('abc')
 | 
			
		||||
 | 
			
		||||
        cats = ['debug', 'audit', 'info', 'warn', 'error', 'critical']
 | 
			
		||||
        events = [(getattr(n, c), c, 'type-' + c, c + '-data') for c in cats]
 | 
			
		||||
        for e in events:
 | 
			
		||||
            e[0]({}, e[2], e[3])
 | 
			
		||||
 | 
			
		||||
        # order between events with different categories is not guaranteed
 | 
			
		||||
        received = {}
 | 
			
		||||
        for expected in events:
 | 
			
		||||
            e = listener.events.get(timeout=0.5)
 | 
			
		||||
            received[e[0]] = e
 | 
			
		||||
 | 
			
		||||
        for expected in events:
 | 
			
		||||
            actual = received[expected[1]]
 | 
			
		||||
            self.assertEqual(expected[1], actual[0])
 | 
			
		||||
            self.assertEqual(expected[2], actual[1])
 | 
			
		||||
            self.assertEqual(expected[3], actual[2])
 | 
			
		||||
@@ -1,343 +0,0 @@
 | 
			
		||||
#
 | 
			
		||||
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
 | 
			
		||||
#    not use this file except in compliance with the License. You may obtain
 | 
			
		||||
#    a copy of the License at
 | 
			
		||||
#
 | 
			
		||||
#         http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
#
 | 
			
		||||
#    Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | 
			
		||||
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | 
			
		||||
#    License for the specific language governing permissions and limitations
 | 
			
		||||
#    under the License.
 | 
			
		||||
 | 
			
		||||
import os
 | 
			
		||||
import threading
 | 
			
		||||
import time
 | 
			
		||||
import uuid
 | 
			
		||||
 | 
			
		||||
import fixtures
 | 
			
		||||
from six import moves
 | 
			
		||||
 | 
			
		||||
from oslo.config import cfg
 | 
			
		||||
from oslo import messaging
 | 
			
		||||
from oslo.messaging.notify import notifier
 | 
			
		||||
from oslo_messaging.tests import utils as test_utils
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TestServerEndpoint(object):
 | 
			
		||||
    """This MessagingServer that will be used during functional testing."""
 | 
			
		||||
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        self.ival = 0
 | 
			
		||||
        self.sval = ''
 | 
			
		||||
 | 
			
		||||
    def add(self, ctxt, increment):
 | 
			
		||||
        self.ival += increment
 | 
			
		||||
        return self.ival
 | 
			
		||||
 | 
			
		||||
    def subtract(self, ctxt, increment):
 | 
			
		||||
        if self.ival < increment:
 | 
			
		||||
            raise ValueError("ival can't go negative!")
 | 
			
		||||
        self.ival -= increment
 | 
			
		||||
        return self.ival
 | 
			
		||||
 | 
			
		||||
    def append(self, ctxt, text):
 | 
			
		||||
        self.sval += text
 | 
			
		||||
        return self.sval
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class TransportFixture(fixtures.Fixture):
 | 
			
		||||
    """Fixture defined to setup the oslo.messaging transport."""
 | 
			
		||||
 | 
			
		||||
    def __init__(self, url):
 | 
			
		||||
        self.url = url
 | 
			
		||||
 | 
			
		||||
    def setUp(self):
 | 
			
		||||
        super(TransportFixture, self).setUp()
 | 
			
		||||
        self.transport = messaging.get_transport(cfg.CONF, url=self.url)
 | 
			
		||||
 | 
			
		||||
    def cleanUp(self):
 | 
			
		||||
        self.transport.cleanup()
 | 
			
		||||
        super(TransportFixture, self).cleanUp()
 | 
			
		||||
 | 
			
		||||
    def wait(self):
 | 
			
		||||
        if self.url.startswith("rabbit") or self.url.startswith("qpid"):
 | 
			
		||||
            time.sleep(0.5)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class RpcServerFixture(fixtures.Fixture):
 | 
			
		||||
    """Fixture to setup the TestServerEndpoint."""
 | 
			
		||||
 | 
			
		||||
    def __init__(self, transport, target, endpoint=None, ctrl_target=None):
 | 
			
		||||
        super(RpcServerFixture, self).__init__()
 | 
			
		||||
        self.transport = transport
 | 
			
		||||
        self.target = target
 | 
			
		||||
        self.endpoint = endpoint or TestServerEndpoint()
 | 
			
		||||
        self.syncq = moves.queue.Queue()
 | 
			
		||||
        self.ctrl_target = ctrl_target or self.target
 | 
			
		||||
 | 
			
		||||
    def setUp(self):
 | 
			
		||||
        super(RpcServerFixture, self).setUp()
 | 
			
		||||
        endpoints = [self.endpoint, self]
 | 
			
		||||
        self.server = messaging.get_rpc_server(self.transport,
 | 
			
		||||
                                               self.target,
 | 
			
		||||
                                               endpoints)
 | 
			
		||||
        self._ctrl = messaging.RPCClient(self.transport, self.ctrl_target)
 | 
			
		||||
        self._start()
 | 
			
		||||
 | 
			
		||||
    def cleanUp(self):
 | 
			
		||||
        self._stop()
 | 
			
		||||
        super(RpcServerFixture, self).cleanUp()
 | 
			
		||||
 | 
			
		||||
    def _start(self):
 | 
			
		||||
        self.thread = threading.Thread(target=self.server.start)
 | 
			
		||||
        self.thread.daemon = True
 | 
			
		||||
        self.thread.start()
 | 
			
		||||
 | 
			
		||||
    def _stop(self):
 | 
			
		||||
        self.server.stop()
 | 
			
		||||
        self._ctrl.cast({}, 'ping')
 | 
			
		||||
        self.server.wait()
 | 
			
		||||
        self.thread.join()
 | 
			
		||||
 | 
			
		||||
    def ping(self, ctxt):
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
    def sync(self, ctxt, item):
 | 
			
		||||
        self.syncq.put(item)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class RpcServerGroupFixture(fixtures.Fixture):
 | 
			
		||||
    def __init__(self, url, topic=None, names=None, exchange=None,
 | 
			
		||||
                 transport=None, use_fanout_ctrl=False):
 | 
			
		||||
        self.url = url
 | 
			
		||||
        # NOTE(sileht): topic and servier_name must be uniq
 | 
			
		||||
        # to be able to run all tests in parallel
 | 
			
		||||
        self.topic = topic or str(uuid.uuid4())
 | 
			
		||||
        self.names = names or ["server_%i_%s" % (i, uuid.uuid4())
 | 
			
		||||
                               for i in range(3)]
 | 
			
		||||
        self.exchange = exchange
 | 
			
		||||
        self.targets = [self._target(server=n) for n in self.names]
 | 
			
		||||
        self.transport = transport
 | 
			
		||||
        self.use_fanout_ctrl = use_fanout_ctrl
 | 
			
		||||
 | 
			
		||||
    def setUp(self):
 | 
			
		||||
        super(RpcServerGroupFixture, self).setUp()
 | 
			
		||||
        if not self.transport:
 | 
			
		||||
            self.transport = self.useFixture(TransportFixture(self.url))
 | 
			
		||||
        self.servers = [self.useFixture(self._server(t)) for t in self.targets]
 | 
			
		||||
        self.transport.wait()
 | 
			
		||||
 | 
			
		||||
    def _target(self, server=None, fanout=False):
 | 
			
		||||
        t = messaging.Target(exchange=self.exchange, topic=self.topic)
 | 
			
		||||
        t.server = server
 | 
			
		||||
        t.fanout = fanout
 | 
			
		||||
        return t
 | 
			
		||||
 | 
			
		||||
    def _server(self, target):
 | 
			
		||||
        ctrl = None
 | 
			
		||||
        if self.use_fanout_ctrl:
 | 
			
		||||
            ctrl = self._target(fanout=True)
 | 
			
		||||
        return RpcServerFixture(self.transport.transport, target,
 | 
			
		||||
                                ctrl_target=ctrl)
 | 
			
		||||
 | 
			
		||||
    def client(self, server=None, cast=False):
 | 
			
		||||
        if server:
 | 
			
		||||
            if server == 'all':
 | 
			
		||||
                target = self._target(fanout=True)
 | 
			
		||||
            elif server >= 0 and server < len(self.targets):
 | 
			
		||||
                target = self.targets[server]
 | 
			
		||||
            else:
 | 
			
		||||
                raise ValueError("Invalid value for server: %r" % server)
 | 
			
		||||
        else:
 | 
			
		||||
            target = self._target()
 | 
			
		||||
        return ClientStub(self.transport.transport, target, cast=cast,
 | 
			
		||||
                          timeout=5)
 | 
			
		||||
 | 
			
		||||
    def sync(self, server=None):
 | 
			
		||||
        if server:
 | 
			
		||||
            if server == 'all':
 | 
			
		||||
                c = self.client(server='all', cast=True)
 | 
			
		||||
                c.sync(item='x')
 | 
			
		||||
                for s in self.servers:
 | 
			
		||||
                    s.syncq.get(timeout=5)
 | 
			
		||||
            elif server >= 0 and server < len(self.targets):
 | 
			
		||||
                c = self.client(server=server, cast=True)
 | 
			
		||||
                c.sync(item='x')
 | 
			
		||||
                self.servers[server].syncq.get(timeout=5)
 | 
			
		||||
            else:
 | 
			
		||||
                raise ValueError("Invalid value for server: %r" % server)
 | 
			
		||||
        else:
 | 
			
		||||
            for i in range(len(self.servers)):
 | 
			
		||||
                self.client(i).ping()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class RpcCall(object):
 | 
			
		||||
    def __init__(self, client, method, context):
 | 
			
		||||
        self.client = client
 | 
			
		||||
        self.method = method
 | 
			
		||||
        self.context = context
 | 
			
		||||
 | 
			
		||||
    def __call__(self, **kwargs):
 | 
			
		||||
        self.context['time'] = time.ctime()
 | 
			
		||||
        self.context['cast'] = False
 | 
			
		||||
        result = self.client.call(self.context, self.method, **kwargs)
 | 
			
		||||
        return result
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class RpcCast(RpcCall):
 | 
			
		||||
    def __call__(self, **kwargs):
 | 
			
		||||
        self.context['time'] = time.ctime()
 | 
			
		||||
        self.context['cast'] = True
 | 
			
		||||
        self.client.cast(self.context, self.method, **kwargs)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ClientStub(object):
 | 
			
		||||
    def __init__(self, transport, target, cast=False, name=None, **kwargs):
 | 
			
		||||
        self.name = name or "functional-tests"
 | 
			
		||||
        self.cast = cast
 | 
			
		||||
        self.client = messaging.RPCClient(transport, target, **kwargs)
 | 
			
		||||
 | 
			
		||||
    def __getattr__(self, name):
 | 
			
		||||
        context = {"application": self.name}
 | 
			
		||||
        if self.cast:
 | 
			
		||||
            return RpcCast(self.client, name, context)
 | 
			
		||||
        else:
 | 
			
		||||
            return RpcCall(self.client, name, context)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class InvalidDistribution(object):
 | 
			
		||||
    def __init__(self, original, received):
 | 
			
		||||
        self.original = original
 | 
			
		||||
        self.received = received
 | 
			
		||||
        self.missing = []
 | 
			
		||||
        self.extra = []
 | 
			
		||||
        self.wrong_order = []
 | 
			
		||||
 | 
			
		||||
    def describe(self):
 | 
			
		||||
        text = "Sent %s, got %s; " % (self.original, self.received)
 | 
			
		||||
        e1 = ["%r was missing" % m for m in self.missing]
 | 
			
		||||
        e2 = ["%r was not expected" % m for m in self.extra]
 | 
			
		||||
        e3 = ["%r expected before %r" % (m[0], m[1]) for m in self.wrong_order]
 | 
			
		||||
        return text + ", ".join(e1 + e2 + e3)
 | 
			
		||||
 | 
			
		||||
    def __len__(self):
 | 
			
		||||
        return len(self.extra) + len(self.missing) + len(self.wrong_order)
 | 
			
		||||
 | 
			
		||||
    def get_details(self):
 | 
			
		||||
        return {}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class IsValidDistributionOf(object):
 | 
			
		||||
    """Test whether a given list can be split into particular
 | 
			
		||||
    sub-lists. All items in the original list must be in exactly one
 | 
			
		||||
    sub-list, and must appear in that sub-list in the same order with
 | 
			
		||||
    respect to any other items as in the original list.
 | 
			
		||||
    """
 | 
			
		||||
    def __init__(self, original):
 | 
			
		||||
        self.original = original
 | 
			
		||||
 | 
			
		||||
    def __str__(self):
 | 
			
		||||
        return 'IsValidDistribution(%s)' % self.original
 | 
			
		||||
 | 
			
		||||
    def match(self, actual):
 | 
			
		||||
        errors = InvalidDistribution(self.original, actual)
 | 
			
		||||
        received = [[i for i in l] for l in actual]
 | 
			
		||||
 | 
			
		||||
        def _remove(obj, lists):
 | 
			
		||||
            for l in lists:
 | 
			
		||||
                if obj in l:
 | 
			
		||||
                    front = l[0]
 | 
			
		||||
                    l.remove(obj)
 | 
			
		||||
                    return front
 | 
			
		||||
            return None
 | 
			
		||||
 | 
			
		||||
        for item in self.original:
 | 
			
		||||
            o = _remove(item, received)
 | 
			
		||||
            if not o:
 | 
			
		||||
                errors.missing += item
 | 
			
		||||
            elif item != o:
 | 
			
		||||
                errors.wrong_order.append([item, o])
 | 
			
		||||
        for l in received:
 | 
			
		||||
            errors.extra += l
 | 
			
		||||
        return errors or None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SkipIfNoTransportURL(test_utils.BaseTestCase):
 | 
			
		||||
    def setUp(self):
 | 
			
		||||
        super(SkipIfNoTransportURL, self).setUp()
 | 
			
		||||
        self.url = os.environ.get('TRANSPORT_URL')
 | 
			
		||||
        if not self.url:
 | 
			
		||||
            self.skipTest("No transport url configured")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class NotificationFixture(fixtures.Fixture):
 | 
			
		||||
    def __init__(self, transport, topics):
 | 
			
		||||
        super(NotificationFixture, self).__init__()
 | 
			
		||||
        self.transport = transport
 | 
			
		||||
        self.topics = topics
 | 
			
		||||
        self.events = moves.queue.Queue()
 | 
			
		||||
        self.name = str(id(self))
 | 
			
		||||
 | 
			
		||||
    def setUp(self):
 | 
			
		||||
        super(NotificationFixture, self).setUp()
 | 
			
		||||
        targets = [messaging.Target(topic=t) for t in self.topics]
 | 
			
		||||
        # add a special topic for internal notifications
 | 
			
		||||
        targets.append(messaging.Target(topic=self.name))
 | 
			
		||||
        self.server = messaging.get_notification_listener(self.transport,
 | 
			
		||||
                                                          targets,
 | 
			
		||||
                                                          [self])
 | 
			
		||||
        self._ctrl = self.notifier('internal', topic=self.name)
 | 
			
		||||
        self._start()
 | 
			
		||||
 | 
			
		||||
    def cleanUp(self):
 | 
			
		||||
        self._stop()
 | 
			
		||||
        super(NotificationFixture, self).cleanUp()
 | 
			
		||||
 | 
			
		||||
    def _start(self):
 | 
			
		||||
        self.thread = threading.Thread(target=self.server.start)
 | 
			
		||||
        self.thread.daemon = True
 | 
			
		||||
        self.thread.start()
 | 
			
		||||
 | 
			
		||||
    def _stop(self):
 | 
			
		||||
        self.server.stop()
 | 
			
		||||
        self._ctrl.sample({}, 'shutdown', 'shutdown')
 | 
			
		||||
        self.server.wait()
 | 
			
		||||
        self.thread.join()
 | 
			
		||||
 | 
			
		||||
    def notifier(self, publisher, topic=None):
 | 
			
		||||
        return notifier.Notifier(self.transport,
 | 
			
		||||
                                 publisher,
 | 
			
		||||
                                 driver='messaging',
 | 
			
		||||
                                 topic=topic or self.topics[0])
 | 
			
		||||
 | 
			
		||||
    def debug(self, ctxt, publisher, event_type, payload, metadata):
 | 
			
		||||
        self.events.put(['debug', event_type, payload, publisher])
 | 
			
		||||
 | 
			
		||||
    def audit(self, ctxt, publisher, event_type, payload, metadata):
 | 
			
		||||
        self.events.put(['audit', event_type, payload, publisher])
 | 
			
		||||
 | 
			
		||||
    def info(self, ctxt, publisher, event_type, payload, metadata):
 | 
			
		||||
        self.events.put(['info', event_type, payload, publisher])
 | 
			
		||||
 | 
			
		||||
    def warn(self, ctxt, publisher, event_type, payload, metadata):
 | 
			
		||||
        self.events.put(['warn', event_type, payload, publisher])
 | 
			
		||||
 | 
			
		||||
    def error(self, ctxt, publisher, event_type, payload, metadata):
 | 
			
		||||
        self.events.put(['error', event_type, payload, publisher])
 | 
			
		||||
 | 
			
		||||
    def critical(self, ctxt, publisher, event_type, payload, metadata):
 | 
			
		||||
        self.events.put(['critical', event_type, payload, publisher])
 | 
			
		||||
 | 
			
		||||
    def sample(self, ctxt, publisher, event_type, payload, metadata):
 | 
			
		||||
        pass  # Just used for internal shutdown control
 | 
			
		||||
 | 
			
		||||
    def get_events(self, timeout=0.5):
 | 
			
		||||
        results = []
 | 
			
		||||
        try:
 | 
			
		||||
            while True:
 | 
			
		||||
                results.append(self.events.get(timeout=timeout))
 | 
			
		||||
        except moves.queue.Empty:
 | 
			
		||||
            pass
 | 
			
		||||
        return results
 | 
			
		||||
							
								
								
									
										6
									
								
								tox.ini
									
									
									
									
									
								
							
							
						
						
									
										6
									
								
								tox.ini
									
									
									
									
									
								
							@@ -33,11 +33,11 @@ deps = -r{toxinidir}/requirements-py3.txt
 | 
			
		||||
 | 
			
		||||
[testenv:py27-func-qpid]
 | 
			
		||||
setenv = TRANSPORT_URL=qpid://guest:password@localhost//
 | 
			
		||||
commands = python setup.py testr --slowest --testr-args='tests.functional'
 | 
			
		||||
commands = python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
 | 
			
		||||
 | 
			
		||||
[testenv:py27-func-rabbit]
 | 
			
		||||
setenv = TRANSPORT_URL=rabbit://guest:password@localhost//
 | 
			
		||||
commands = python setup.py testr --slowest --testr-args='tests.functional'
 | 
			
		||||
commands = python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional'
 | 
			
		||||
 | 
			
		||||
[testenv:py27-func-amqp1]
 | 
			
		||||
setenv = TRANSPORT_URL=amqp://guest:password@localhost//
 | 
			
		||||
@@ -45,7 +45,7 @@ deps = -r{toxinidir}/amqp1-requirements.txt
 | 
			
		||||
        {[testenv]deps}
 | 
			
		||||
# NOTE(sileht): until ubuntu get proto packages, we run amqp_driver tests here
 | 
			
		||||
# because this is the only target to run fedora 20 in gate
 | 
			
		||||
commands = python setup.py testr --slowest --testr-args='tests.(functional|test_amqp_driver)'
 | 
			
		||||
commands = python setup.py testr --slowest --testr-args='oslo_messaging.tests.(functional|test_amqp_driver)'
 | 
			
		||||
 | 
			
		||||
[flake8]
 | 
			
		||||
show-source = True
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user