diff --git a/oslo_messaging/tests/functional/test_functional.py b/oslo_messaging/tests/functional/test_functional.py index 9e9ec9945..f474ec75f 100644 --- a/oslo_messaging/tests/functional/test_functional.py +++ b/oslo_messaging/tests/functional/test_functional.py @@ -34,8 +34,6 @@ class CallTestCase(utils.SkipIfNoTransportURL): self.assertEqual(0, group.servers[i].endpoint.ival) def test_server_in_group(self): - if self.url.startswith("amqp:"): - self.skipTest("QPID-6307") group = self.useFixture(utils.RpcServerGroupFixture(self.url)) client = group.client() @@ -49,22 +47,17 @@ class CallTestCase(utils.SkipIfNoTransportURL): self.assertThat(actual, utils.IsValidDistributionOf(data)) def test_different_exchanges(self): - if self.url.startswith("amqp:"): - self.skipTest("QPID-6307") - t = self.useFixture(utils.TransportFixture(self.url)) # If the different exchanges are not honoured, then the # teardown may hang unless we broadcast all control messages # to each server group1 = self.useFixture( - utils.RpcServerGroupFixture(self.url, transport=t, + utils.RpcServerGroupFixture(self.url, use_fanout_ctrl=True)) group2 = self.useFixture( utils.RpcServerGroupFixture(self.url, exchange="a", - transport=t, use_fanout_ctrl=True)) group3 = self.useFixture( utils.RpcServerGroupFixture(self.url, exchange="b", - transport=t, use_fanout_ctrl=True)) client1 = group1.client(1) @@ -123,8 +116,9 @@ class CastTestCase(utils.SkipIfNoTransportURL): client.append(text='stack') client.add(increment=2) client.add(increment=10) - group.sync() + client.sync() + group.sync(1) self.assertEqual('openstack', group.servers[1].endpoint.sval) self.assertEqual(12, group.servers[1].endpoint.ival) for i in [0, 2]: @@ -132,11 +126,16 @@ class CastTestCase(utils.SkipIfNoTransportURL): self.assertEqual(0, group.servers[i].endpoint.ival) def test_server_in_group(self): + if self.url.startswith("amqp:"): + self.skipTest("QPID-6307") group = self.useFixture(utils.RpcServerGroupFixture(self.url)) client = group.client(cast=True) for i in range(20): client.add(increment=1) - group.sync() + for i in range(len(group.servers)): + # expect each server to get a sync + client.sync() + group.sync(server="all") total = 0 for s in group.servers: ival = s.endpoint.ival @@ -152,6 +151,7 @@ class CastTestCase(utils.SkipIfNoTransportURL): client.append(text='stack') client.add(increment=2) client.add(increment=10) + client.sync() group.sync(server='all') for s in group.servers: self.assertEqual('openstack', s.endpoint.sval) @@ -163,11 +163,8 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): # to be run in parallel def test_simple(self): - transport = self.useFixture(utils.TransportFixture(self.url)) listener = self.useFixture( - utils.NotificationFixture(transport.transport, - ['test_simple'])) - transport.wait() + utils.NotificationFixture(self.url, ['test_simple'])) notifier = listener.notifier('abc') notifier.info({}, 'test', 'Hello World!') @@ -178,11 +175,8 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): self.assertEqual('abc', event[3]) def test_multiple_topics(self): - transport = self.useFixture(utils.TransportFixture(self.url)) listener = self.useFixture( - utils.NotificationFixture(transport.transport, - ['a', 'b'])) - transport.wait() + utils.NotificationFixture(self.url, ['a', 'b'])) a = listener.notifier('pub-a', topic='a') b = listener.notifier('pub-b', topic='b') @@ -206,18 +200,17 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): self.assertEqual(expected[2], actual[2]) def test_multiple_servers(self): - transport = self.useFixture(utils.TransportFixture(self.url)) + if self.url.startswith("amqp:"): + self.skipTest("QPID-6307") listener_a = self.useFixture( - utils.NotificationFixture(transport.transport, - ['test-topic'])) + utils.NotificationFixture(self.url, ['test-topic'])) + listener_b = self.useFixture( - utils.NotificationFixture(transport.transport, - ['test-topic'])) - transport.wait() + utils.NotificationFixture(self.url, ['test-topic'])) + n = listener_a.notifier('pub') events_out = [('test-%s' % c, 'payload-%s' % c) for c in 'abcdefgh'] - for event_type, payload in events_out: n.info({}, event_type, payload) @@ -229,14 +222,10 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): self.assertThat(len(stream), matchers.GreaterThan(0)) def test_independent_topics(self): - transport = self.useFixture(utils.TransportFixture(self.url)) listener_a = self.useFixture( - utils.NotificationFixture(transport.transport, - ['1'])) + utils.NotificationFixture(self.url, ['1'])) listener_b = self.useFixture( - utils.NotificationFixture(transport.transport, - ['2'])) - transport.wait() + utils.NotificationFixture(self.url, ['2'])) a = listener_a.notifier('pub-1', topic='1') b = listener_b.notifier('pub-2', topic='2') @@ -264,10 +253,8 @@ class NotifyTestCase(utils.SkipIfNoTransportURL): self.assertEqual('pub-2', actual[3]) def test_all_categories(self): - transport = self.useFixture(utils.TransportFixture(self.url)) listener = self.useFixture(utils.NotificationFixture( - transport.transport, ['test_all_categories'])) - transport.wait() + self.url, ['test_all_categories'])) n = listener.notifier('abc') cats = ['debug', 'audit', 'info', 'warn', 'error', 'critical'] diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py index 1617506c8..3aaa2d607 100644 --- a/oslo_messaging/tests/functional/utils.py +++ b/oslo_messaging/tests/functional/utils.py @@ -62,16 +62,16 @@ class TransportFixture(fixtures.Fixture): super(TransportFixture, self).cleanUp() def wait(self): - if self.url.startswith("rabbit") or self.url.startswith("qpid"): - time.sleep(0.5) + # allow time for the server to connect to the broker + time.sleep(0.5) class RpcServerFixture(fixtures.Fixture): """Fixture to setup the TestServerEndpoint.""" - def __init__(self, transport, target, endpoint=None, ctrl_target=None): + def __init__(self, url, target, endpoint=None, ctrl_target=None): super(RpcServerFixture, self).__init__() - self.transport = transport + self.url = url self.target = target self.endpoint = endpoint or TestServerEndpoint() self.syncq = moves.queue.Queue() @@ -80,11 +80,14 @@ class RpcServerFixture(fixtures.Fixture): def setUp(self): super(RpcServerFixture, self).setUp() endpoints = [self.endpoint, self] - self.server = oslo_messaging.get_rpc_server(self.transport, + transport = self.useFixture(TransportFixture(self.url)) + self.server = oslo_messaging.get_rpc_server(transport.transport, self.target, endpoints) - self._ctrl = oslo_messaging.RPCClient(self.transport, self.ctrl_target) + self._ctrl = oslo_messaging.RPCClient(transport.transport, + self.ctrl_target) self._start() + transport.wait() def cleanUp(self): self._stop() @@ -104,13 +107,13 @@ class RpcServerFixture(fixtures.Fixture): def ping(self, ctxt): pass - def sync(self, ctxt, item): - self.syncq.put(item) + def sync(self, ctxt): + self.syncq.put('x') class RpcServerGroupFixture(fixtures.Fixture): def __init__(self, url, topic=None, names=None, exchange=None, - transport=None, use_fanout_ctrl=False): + use_fanout_ctrl=False): self.url = url # NOTE(sileht): topic and servier_name must be uniq # to be able to run all tests in parallel @@ -119,15 +122,11 @@ class RpcServerGroupFixture(fixtures.Fixture): for i in range(3)] self.exchange = exchange self.targets = [self._target(server=n) for n in self.names] - self.transport = transport self.use_fanout_ctrl = use_fanout_ctrl def setUp(self): super(RpcServerGroupFixture, self).setUp() - if not self.transport: - self.transport = self.useFixture(TransportFixture(self.url)) self.servers = [self.useFixture(self._server(t)) for t in self.targets] - self.transport.wait() def _target(self, server=None, fanout=False): t = oslo_messaging.Target(exchange=self.exchange, topic=self.topic) @@ -139,8 +138,8 @@ class RpcServerGroupFixture(fixtures.Fixture): ctrl = None if self.use_fanout_ctrl: ctrl = self._target(fanout=True) - return RpcServerFixture(self.transport.transport, target, - ctrl_target=ctrl) + server = RpcServerFixture(self.url, target, ctrl_target=ctrl) + return server def client(self, server=None, cast=False): if server is None: @@ -152,8 +151,12 @@ class RpcServerGroupFixture(fixtures.Fixture): target = self.targets[server] else: raise ValueError("Invalid value for server: %r" % server) - return ClientStub(self.transport.transport, target, cast=cast, - timeout=5) + + transport = self.useFixture(TransportFixture(self.url)) + client = ClientStub(transport.transport, target, cast=cast, + timeout=5) + transport.wait() + return client def sync(self, server=None): if server is None: @@ -161,13 +164,9 @@ class RpcServerGroupFixture(fixtures.Fixture): self.client(i).ping() else: if server == 'all': - c = self.client(server='all', cast=True) - c.sync(item='x') for s in self.servers: s.syncq.get(timeout=5) elif server >= 0 and server < len(self.targets): - c = self.client(server=server, cast=True) - c.sync(item='x') self.servers[server].syncq.get(timeout=5) else: raise ValueError("Invalid value for server: %r" % server) @@ -276,9 +275,9 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase): class NotificationFixture(fixtures.Fixture): - def __init__(self, transport, topics): + def __init__(self, url, topics): super(NotificationFixture, self).__init__() - self.transport = transport + self.url = url self.topics = topics self.events = moves.queue.Queue() self.name = str(id(self)) @@ -288,12 +287,14 @@ class NotificationFixture(fixtures.Fixture): targets = [oslo_messaging.Target(topic=t) for t in self.topics] # add a special topic for internal notifications targets.append(oslo_messaging.Target(topic=self.name)) + transport = self.useFixture(TransportFixture(self.url)) self.server = oslo_messaging.get_notification_listener( - self.transport, + transport.transport, targets, [self]) self._ctrl = self.notifier('internal', topic=self.name) self._start() + transport.wait() def cleanUp(self): self._stop() @@ -311,10 +312,13 @@ class NotificationFixture(fixtures.Fixture): self.thread.join() def notifier(self, publisher, topic=None): - return notifier.Notifier(self.transport, - publisher, - driver='messaging', - topic=topic or self.topics[0]) + transport = self.useFixture(TransportFixture(self.url)) + n = notifier.Notifier(transport.transport, + publisher, + driver='messaging', + topic=topic or self.topics[0]) + transport.wait() + return n def debug(self, ctxt, publisher, event_type, payload, metadata): self.events.put(['debug', event_type, payload, publisher])