From f286ef1114aad1e13f2e9c14c4ee275e1aa1e97c Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Wed, 21 Jan 2015 12:22:09 +0100 Subject: [PATCH] Fix functionnal tests This removes the functionnal tests from the legacy directory and run the ones in the new directory with tox. Change-Id: Ibbfd6c946e25435ff37e459bf5e82565a1e21778 --- tests/functional/__init__.py | 0 tests/functional/test_functional.py | 279 ---------------------- tests/functional/utils.py | 343 ---------------------------- tox.ini | 6 +- 4 files changed, 3 insertions(+), 625 deletions(-) delete mode 100644 tests/functional/__init__.py delete mode 100644 tests/functional/test_functional.py delete mode 100644 tests/functional/utils.py diff --git a/tests/functional/__init__.py b/tests/functional/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/functional/test_functional.py b/tests/functional/test_functional.py deleted file mode 100644 index 5c4077478..000000000 --- a/tests/functional/test_functional.py +++ /dev/null @@ -1,279 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo import messaging - -from testtools import matchers - -from tests.functional.utils import ClientStub -from tests.functional.utils import IsValidDistributionOf -from tests.functional.utils import NotificationFixture -from tests.functional.utils import RpcServerGroupFixture -from tests.functional.utils import SkipIfNoTransportURL -from tests.functional.utils import TransportFixture - - -class CallTestCase(SkipIfNoTransportURL): - def test_specific_server(self): - group = self.useFixture(RpcServerGroupFixture(self.url)) - client = group.client(1) - client.append(text='open') - self.assertEqual('openstack', client.append(text='stack')) - client.add(increment=2) - self.assertEqual(12, client.add(increment=10)) - self.assertEqual(9, client.subtract(increment=3)) - self.assertEqual('openstack', group.servers[1].endpoint.sval) - self.assertEqual(9, group.servers[1].endpoint.ival) - for i in [0, 2]: - self.assertEqual('', group.servers[i].endpoint.sval) - self.assertEqual(0, group.servers[i].endpoint.ival) - - def test_server_in_group(self): - group = self.useFixture(RpcServerGroupFixture(self.url)) - - client = group.client() - data = [c for c in 'abcdefghijklmn'] - for i in data: - client.append(text=i) - - for s in group.servers: - self.assertThat(len(s.endpoint.sval), matchers.GreaterThan(0)) - actual = [[c for c in s.endpoint.sval] for s in group.servers] - self.assertThat(actual, IsValidDistributionOf(data)) - - def test_different_exchanges(self): - t = self.useFixture(TransportFixture(self.url)) - # If the different exchanges are not honoured, then the - # teardown may hang unless we broadcast all control messages - # to each server - group1 = self.useFixture(RpcServerGroupFixture(self.url, transport=t, - use_fanout_ctrl=True)) - group2 = self.useFixture(RpcServerGroupFixture(self.url, exchange="a", - transport=t, - use_fanout_ctrl=True)) - group3 = self.useFixture(RpcServerGroupFixture(self.url, exchange="b", - transport=t, - use_fanout_ctrl=True)) - - client1 = group1.client(1) - data1 = [c for c in 'abcdefghijklmn'] - for i in data1: - client1.append(text=i) - - client2 = group2.client() - data2 = [c for c in 'opqrstuvwxyz'] - for i in data2: - client2.append(text=i) - - actual1 = [[c for c in s.endpoint.sval] for s in group1.servers] - self.assertThat(actual1, IsValidDistributionOf(data1)) - actual1 = [c for c in group1.servers[1].endpoint.sval] - self.assertThat([actual1], IsValidDistributionOf(data1)) - for s in group1.servers: - expected = len(data1) if group1.servers.index(s) == 1 else 0 - self.assertEqual(expected, len(s.endpoint.sval)) - self.assertEqual(0, s.endpoint.ival) - - actual2 = [[c for c in s.endpoint.sval] for s in group2.servers] - for s in group2.servers: - self.assertThat(len(s.endpoint.sval), matchers.GreaterThan(0)) - self.assertEqual(0, s.endpoint.ival) - self.assertThat(actual2, IsValidDistributionOf(data2)) - - for s in group3.servers: - self.assertEqual(0, len(s.endpoint.sval)) - self.assertEqual(0, s.endpoint.ival) - - def test_timeout(self): - transport = self.useFixture(TransportFixture(self.url)) - target = messaging.Target(topic="no_such_topic") - c = ClientStub(transport.transport, target, timeout=1) - self.assertThat(c.ping, matchers.raises(messaging.MessagingTimeout)) - - def test_exception(self): - group = self.useFixture(RpcServerGroupFixture(self.url)) - client = group.client(1) - client.add(increment=2) - f = lambda: client.subtract(increment=3) - self.assertThat(f, matchers.raises(ValueError)) - - -class CastTestCase(SkipIfNoTransportURL): - # Note: casts return immediately, so these tests utilise a special - # internal sync() cast to ensure prior casts are complete before - # making the necessary assertions. - - def test_specific_server(self): - group = self.useFixture(RpcServerGroupFixture(self.url)) - client = group.client(1, cast=True) - client.append(text='open') - client.append(text='stack') - client.add(increment=2) - client.add(increment=10) - group.sync() - - self.assertEqual('openstack', group.servers[1].endpoint.sval) - self.assertEqual(12, group.servers[1].endpoint.ival) - for i in [0, 2]: - self.assertEqual('', group.servers[i].endpoint.sval) - self.assertEqual(0, group.servers[i].endpoint.ival) - - def test_server_in_group(self): - group = self.useFixture(RpcServerGroupFixture(self.url)) - client = group.client(cast=True) - for i in range(20): - client.add(increment=1) - group.sync() - total = 0 - for s in group.servers: - ival = s.endpoint.ival - self.assertThat(ival, matchers.GreaterThan(0)) - self.assertThat(ival, matchers.LessThan(20)) - total += ival - self.assertEqual(20, total) - - def test_fanout(self): - group = self.useFixture(RpcServerGroupFixture(self.url)) - client = group.client('all', cast=True) - client.append(text='open') - client.append(text='stack') - client.add(increment=2) - client.add(increment=10) - group.sync(server='all') - for s in group.servers: - self.assertEqual('openstack', s.endpoint.sval) - self.assertEqual(12, s.endpoint.ival) - - -class NotifyTestCase(SkipIfNoTransportURL): - # NOTE(sileht): Each test must not use the same topics - # to be run in parallel - - def test_simple(self): - transport = self.useFixture(TransportFixture(self.url)) - listener = self.useFixture(NotificationFixture(transport.transport, - ['test_simple'])) - transport.wait() - notifier = listener.notifier('abc') - - notifier.info({}, 'test', 'Hello World!') - event = listener.events.get(timeout=1) - self.assertEqual('info', event[0]) - self.assertEqual('test', event[1]) - self.assertEqual('Hello World!', event[2]) - self.assertEqual('abc', event[3]) - - def test_multiple_topics(self): - transport = self.useFixture(TransportFixture(self.url)) - listener = self.useFixture(NotificationFixture(transport.transport, - ['a', 'b'])) - transport.wait() - a = listener.notifier('pub-a', topic='a') - b = listener.notifier('pub-b', topic='b') - - sent = { - 'pub-a': [a, 'test-a', 'payload-a'], - 'pub-b': [b, 'test-b', 'payload-b'] - } - for e in sent.values(): - e[0].info({}, e[1], e[2]) - - received = {} - while len(received) < len(sent): - e = listener.events.get(timeout=1) - received[e[3]] = e - - for key in received: - actual = received[key] - expected = sent[key] - self.assertEqual('info', actual[0]) - self.assertEqual(expected[1], actual[1]) - self.assertEqual(expected[2], actual[2]) - - def test_multiple_servers(self): - transport = self.useFixture(TransportFixture(self.url)) - listener_a = self.useFixture(NotificationFixture(transport.transport, - ['test-topic'])) - listener_b = self.useFixture(NotificationFixture(transport.transport, - ['test-topic'])) - transport.wait() - n = listener_a.notifier('pub') - - events_out = [('test-%s' % c, 'payload-%s' % c) for c in 'abcdefgh'] - - for event_type, payload in events_out: - n.info({}, event_type, payload) - - events_in = [[(e[1], e[2]) for e in listener_a.get_events()], - [(e[1], e[2]) for e in listener_b.get_events()]] - - self.assertThat(events_in, IsValidDistributionOf(events_out)) - for stream in events_in: - self.assertThat(len(stream), matchers.GreaterThan(0)) - - def test_independent_topics(self): - transport = self.useFixture(TransportFixture(self.url)) - listener_a = self.useFixture(NotificationFixture(transport.transport, - ['1'])) - listener_b = self.useFixture(NotificationFixture(transport.transport, - ['2'])) - transport.wait() - - a = listener_a.notifier('pub-1', topic='1') - b = listener_b.notifier('pub-2', topic='2') - - a_out = [('test-1-%s' % c, 'payload-1-%s' % c) for c in 'abcdefgh'] - for event_type, payload in a_out: - a.info({}, event_type, payload) - - b_out = [('test-2-%s' % c, 'payload-2-%s' % c) for c in 'ijklmnop'] - for event_type, payload in b_out: - b.info({}, event_type, payload) - - for expected in a_out: - actual = listener_a.events.get(timeout=0.5) - self.assertEqual('info', actual[0]) - self.assertEqual(expected[0], actual[1]) - self.assertEqual(expected[1], actual[2]) - self.assertEqual('pub-1', actual[3]) - - for expected in b_out: - actual = listener_b.events.get(timeout=0.5) - self.assertEqual('info', actual[0]) - self.assertEqual(expected[0], actual[1]) - self.assertEqual(expected[1], actual[2]) - self.assertEqual('pub-2', actual[3]) - - def test_all_categories(self): - transport = self.useFixture(TransportFixture(self.url)) - listener = self.useFixture(NotificationFixture( - transport.transport, ['test_all_categories'])) - transport.wait() - n = listener.notifier('abc') - - cats = ['debug', 'audit', 'info', 'warn', 'error', 'critical'] - events = [(getattr(n, c), c, 'type-' + c, c + '-data') for c in cats] - for e in events: - e[0]({}, e[2], e[3]) - - # order between events with different categories is not guaranteed - received = {} - for expected in events: - e = listener.events.get(timeout=0.5) - received[e[0]] = e - - for expected in events: - actual = received[expected[1]] - self.assertEqual(expected[1], actual[0]) - self.assertEqual(expected[2], actual[1]) - self.assertEqual(expected[3], actual[2]) diff --git a/tests/functional/utils.py b/tests/functional/utils.py deleted file mode 100644 index fc0c78025..000000000 --- a/tests/functional/utils.py +++ /dev/null @@ -1,343 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import os -import threading -import time -import uuid - -import fixtures -from six import moves - -from oslo.config import cfg -from oslo import messaging -from oslo.messaging.notify import notifier -from oslo_messaging.tests import utils as test_utils - - -class TestServerEndpoint(object): - """This MessagingServer that will be used during functional testing.""" - - def __init__(self): - self.ival = 0 - self.sval = '' - - def add(self, ctxt, increment): - self.ival += increment - return self.ival - - def subtract(self, ctxt, increment): - if self.ival < increment: - raise ValueError("ival can't go negative!") - self.ival -= increment - return self.ival - - def append(self, ctxt, text): - self.sval += text - return self.sval - - -class TransportFixture(fixtures.Fixture): - """Fixture defined to setup the oslo.messaging transport.""" - - def __init__(self, url): - self.url = url - - def setUp(self): - super(TransportFixture, self).setUp() - self.transport = messaging.get_transport(cfg.CONF, url=self.url) - - def cleanUp(self): - self.transport.cleanup() - super(TransportFixture, self).cleanUp() - - def wait(self): - if self.url.startswith("rabbit") or self.url.startswith("qpid"): - time.sleep(0.5) - - -class RpcServerFixture(fixtures.Fixture): - """Fixture to setup the TestServerEndpoint.""" - - def __init__(self, transport, target, endpoint=None, ctrl_target=None): - super(RpcServerFixture, self).__init__() - self.transport = transport - self.target = target - self.endpoint = endpoint or TestServerEndpoint() - self.syncq = moves.queue.Queue() - self.ctrl_target = ctrl_target or self.target - - def setUp(self): - super(RpcServerFixture, self).setUp() - endpoints = [self.endpoint, self] - self.server = messaging.get_rpc_server(self.transport, - self.target, - endpoints) - self._ctrl = messaging.RPCClient(self.transport, self.ctrl_target) - self._start() - - def cleanUp(self): - self._stop() - super(RpcServerFixture, self).cleanUp() - - def _start(self): - self.thread = threading.Thread(target=self.server.start) - self.thread.daemon = True - self.thread.start() - - def _stop(self): - self.server.stop() - self._ctrl.cast({}, 'ping') - self.server.wait() - self.thread.join() - - def ping(self, ctxt): - pass - - def sync(self, ctxt, item): - self.syncq.put(item) - - -class RpcServerGroupFixture(fixtures.Fixture): - def __init__(self, url, topic=None, names=None, exchange=None, - transport=None, use_fanout_ctrl=False): - self.url = url - # NOTE(sileht): topic and servier_name must be uniq - # to be able to run all tests in parallel - self.topic = topic or str(uuid.uuid4()) - self.names = names or ["server_%i_%s" % (i, uuid.uuid4()) - for i in range(3)] - self.exchange = exchange - self.targets = [self._target(server=n) for n in self.names] - self.transport = transport - self.use_fanout_ctrl = use_fanout_ctrl - - def setUp(self): - super(RpcServerGroupFixture, self).setUp() - if not self.transport: - self.transport = self.useFixture(TransportFixture(self.url)) - self.servers = [self.useFixture(self._server(t)) for t in self.targets] - self.transport.wait() - - def _target(self, server=None, fanout=False): - t = messaging.Target(exchange=self.exchange, topic=self.topic) - t.server = server - t.fanout = fanout - return t - - def _server(self, target): - ctrl = None - if self.use_fanout_ctrl: - ctrl = self._target(fanout=True) - return RpcServerFixture(self.transport.transport, target, - ctrl_target=ctrl) - - def client(self, server=None, cast=False): - if server: - if server == 'all': - target = self._target(fanout=True) - elif server >= 0 and server < len(self.targets): - target = self.targets[server] - else: - raise ValueError("Invalid value for server: %r" % server) - else: - target = self._target() - return ClientStub(self.transport.transport, target, cast=cast, - timeout=5) - - def sync(self, server=None): - if server: - if server == 'all': - c = self.client(server='all', cast=True) - c.sync(item='x') - for s in self.servers: - s.syncq.get(timeout=5) - elif server >= 0 and server < len(self.targets): - c = self.client(server=server, cast=True) - c.sync(item='x') - self.servers[server].syncq.get(timeout=5) - else: - raise ValueError("Invalid value for server: %r" % server) - else: - for i in range(len(self.servers)): - self.client(i).ping() - - -class RpcCall(object): - def __init__(self, client, method, context): - self.client = client - self.method = method - self.context = context - - def __call__(self, **kwargs): - self.context['time'] = time.ctime() - self.context['cast'] = False - result = self.client.call(self.context, self.method, **kwargs) - return result - - -class RpcCast(RpcCall): - def __call__(self, **kwargs): - self.context['time'] = time.ctime() - self.context['cast'] = True - self.client.cast(self.context, self.method, **kwargs) - - -class ClientStub(object): - def __init__(self, transport, target, cast=False, name=None, **kwargs): - self.name = name or "functional-tests" - self.cast = cast - self.client = messaging.RPCClient(transport, target, **kwargs) - - def __getattr__(self, name): - context = {"application": self.name} - if self.cast: - return RpcCast(self.client, name, context) - else: - return RpcCall(self.client, name, context) - - -class InvalidDistribution(object): - def __init__(self, original, received): - self.original = original - self.received = received - self.missing = [] - self.extra = [] - self.wrong_order = [] - - def describe(self): - text = "Sent %s, got %s; " % (self.original, self.received) - e1 = ["%r was missing" % m for m in self.missing] - e2 = ["%r was not expected" % m for m in self.extra] - e3 = ["%r expected before %r" % (m[0], m[1]) for m in self.wrong_order] - return text + ", ".join(e1 + e2 + e3) - - def __len__(self): - return len(self.extra) + len(self.missing) + len(self.wrong_order) - - def get_details(self): - return {} - - -class IsValidDistributionOf(object): - """Test whether a given list can be split into particular - sub-lists. All items in the original list must be in exactly one - sub-list, and must appear in that sub-list in the same order with - respect to any other items as in the original list. - """ - def __init__(self, original): - self.original = original - - def __str__(self): - return 'IsValidDistribution(%s)' % self.original - - def match(self, actual): - errors = InvalidDistribution(self.original, actual) - received = [[i for i in l] for l in actual] - - def _remove(obj, lists): - for l in lists: - if obj in l: - front = l[0] - l.remove(obj) - return front - return None - - for item in self.original: - o = _remove(item, received) - if not o: - errors.missing += item - elif item != o: - errors.wrong_order.append([item, o]) - for l in received: - errors.extra += l - return errors or None - - -class SkipIfNoTransportURL(test_utils.BaseTestCase): - def setUp(self): - super(SkipIfNoTransportURL, self).setUp() - self.url = os.environ.get('TRANSPORT_URL') - if not self.url: - self.skipTest("No transport url configured") - - -class NotificationFixture(fixtures.Fixture): - def __init__(self, transport, topics): - super(NotificationFixture, self).__init__() - self.transport = transport - self.topics = topics - self.events = moves.queue.Queue() - self.name = str(id(self)) - - def setUp(self): - super(NotificationFixture, self).setUp() - targets = [messaging.Target(topic=t) for t in self.topics] - # add a special topic for internal notifications - targets.append(messaging.Target(topic=self.name)) - self.server = messaging.get_notification_listener(self.transport, - targets, - [self]) - self._ctrl = self.notifier('internal', topic=self.name) - self._start() - - def cleanUp(self): - self._stop() - super(NotificationFixture, self).cleanUp() - - def _start(self): - self.thread = threading.Thread(target=self.server.start) - self.thread.daemon = True - self.thread.start() - - def _stop(self): - self.server.stop() - self._ctrl.sample({}, 'shutdown', 'shutdown') - self.server.wait() - self.thread.join() - - def notifier(self, publisher, topic=None): - return notifier.Notifier(self.transport, - publisher, - driver='messaging', - topic=topic or self.topics[0]) - - def debug(self, ctxt, publisher, event_type, payload, metadata): - self.events.put(['debug', event_type, payload, publisher]) - - def audit(self, ctxt, publisher, event_type, payload, metadata): - self.events.put(['audit', event_type, payload, publisher]) - - def info(self, ctxt, publisher, event_type, payload, metadata): - self.events.put(['info', event_type, payload, publisher]) - - def warn(self, ctxt, publisher, event_type, payload, metadata): - self.events.put(['warn', event_type, payload, publisher]) - - def error(self, ctxt, publisher, event_type, payload, metadata): - self.events.put(['error', event_type, payload, publisher]) - - def critical(self, ctxt, publisher, event_type, payload, metadata): - self.events.put(['critical', event_type, payload, publisher]) - - def sample(self, ctxt, publisher, event_type, payload, metadata): - pass # Just used for internal shutdown control - - def get_events(self, timeout=0.5): - results = [] - try: - while True: - results.append(self.events.get(timeout=timeout)) - except moves.queue.Empty: - pass - return results diff --git a/tox.ini b/tox.ini index 428b051bf..04971209c 100644 --- a/tox.ini +++ b/tox.ini @@ -33,11 +33,11 @@ deps = -r{toxinidir}/requirements-py3.txt [testenv:py27-func-qpid] setenv = TRANSPORT_URL=qpid://guest:password@localhost// -commands = python setup.py testr --slowest --testr-args='tests.functional' +commands = python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' [testenv:py27-func-rabbit] setenv = TRANSPORT_URL=rabbit://guest:password@localhost// -commands = python setup.py testr --slowest --testr-args='tests.functional' +commands = python setup.py testr --slowest --testr-args='oslo_messaging.tests.functional' [testenv:py27-func-amqp1] setenv = TRANSPORT_URL=amqp://guest:password@localhost// @@ -45,7 +45,7 @@ deps = -r{toxinidir}/amqp1-requirements.txt {[testenv]deps} # NOTE(sileht): until ubuntu get proto packages, we run amqp_driver tests here # because this is the only target to run fedora 20 in gate -commands = python setup.py testr --slowest --testr-args='tests.(functional|test_amqp_driver)' +commands = python setup.py testr --slowest --testr-args='oslo_messaging.tests.(functional|test_amqp_driver)' [flake8] show-source = True