Create a unique transport for each server in the functional tests

Change-Id: I77a3670b3bdd3a4697b5a8b559936220cdba41ae
Closes-bug: #1421397
This commit is contained in:
Kenneth Giusti 2015-02-12 15:35:12 -05:00
parent 5d5c1d8d20
commit 3f967effe2
2 changed files with 53 additions and 62 deletions

View File

@ -34,8 +34,6 @@ class CallTestCase(utils.SkipIfNoTransportURL):
self.assertEqual(0, group.servers[i].endpoint.ival) self.assertEqual(0, group.servers[i].endpoint.ival)
def test_server_in_group(self): def test_server_in_group(self):
if self.url.startswith("amqp:"):
self.skipTest("QPID-6307")
group = self.useFixture(utils.RpcServerGroupFixture(self.url)) group = self.useFixture(utils.RpcServerGroupFixture(self.url))
client = group.client() client = group.client()
@ -49,22 +47,17 @@ class CallTestCase(utils.SkipIfNoTransportURL):
self.assertThat(actual, utils.IsValidDistributionOf(data)) self.assertThat(actual, utils.IsValidDistributionOf(data))
def test_different_exchanges(self): def test_different_exchanges(self):
if self.url.startswith("amqp:"):
self.skipTest("QPID-6307")
t = self.useFixture(utils.TransportFixture(self.url))
# If the different exchanges are not honoured, then the # If the different exchanges are not honoured, then the
# teardown may hang unless we broadcast all control messages # teardown may hang unless we broadcast all control messages
# to each server # to each server
group1 = self.useFixture( group1 = self.useFixture(
utils.RpcServerGroupFixture(self.url, transport=t, utils.RpcServerGroupFixture(self.url,
use_fanout_ctrl=True)) use_fanout_ctrl=True))
group2 = self.useFixture( group2 = self.useFixture(
utils.RpcServerGroupFixture(self.url, exchange="a", utils.RpcServerGroupFixture(self.url, exchange="a",
transport=t,
use_fanout_ctrl=True)) use_fanout_ctrl=True))
group3 = self.useFixture( group3 = self.useFixture(
utils.RpcServerGroupFixture(self.url, exchange="b", utils.RpcServerGroupFixture(self.url, exchange="b",
transport=t,
use_fanout_ctrl=True)) use_fanout_ctrl=True))
client1 = group1.client(1) client1 = group1.client(1)
@ -123,8 +116,9 @@ class CastTestCase(utils.SkipIfNoTransportURL):
client.append(text='stack') client.append(text='stack')
client.add(increment=2) client.add(increment=2)
client.add(increment=10) client.add(increment=10)
group.sync() client.sync()
group.sync(1)
self.assertEqual('openstack', group.servers[1].endpoint.sval) self.assertEqual('openstack', group.servers[1].endpoint.sval)
self.assertEqual(12, group.servers[1].endpoint.ival) self.assertEqual(12, group.servers[1].endpoint.ival)
for i in [0, 2]: for i in [0, 2]:
@ -132,11 +126,16 @@ class CastTestCase(utils.SkipIfNoTransportURL):
self.assertEqual(0, group.servers[i].endpoint.ival) self.assertEqual(0, group.servers[i].endpoint.ival)
def test_server_in_group(self): def test_server_in_group(self):
if self.url.startswith("amqp:"):
self.skipTest("QPID-6307")
group = self.useFixture(utils.RpcServerGroupFixture(self.url)) group = self.useFixture(utils.RpcServerGroupFixture(self.url))
client = group.client(cast=True) client = group.client(cast=True)
for i in range(20): for i in range(20):
client.add(increment=1) client.add(increment=1)
group.sync() for i in range(len(group.servers)):
# expect each server to get a sync
client.sync()
group.sync(server="all")
total = 0 total = 0
for s in group.servers: for s in group.servers:
ival = s.endpoint.ival ival = s.endpoint.ival
@ -152,6 +151,7 @@ class CastTestCase(utils.SkipIfNoTransportURL):
client.append(text='stack') client.append(text='stack')
client.add(increment=2) client.add(increment=2)
client.add(increment=10) client.add(increment=10)
client.sync()
group.sync(server='all') group.sync(server='all')
for s in group.servers: for s in group.servers:
self.assertEqual('openstack', s.endpoint.sval) self.assertEqual('openstack', s.endpoint.sval)
@ -163,11 +163,8 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
# to be run in parallel # to be run in parallel
def test_simple(self): def test_simple(self):
transport = self.useFixture(utils.TransportFixture(self.url))
listener = self.useFixture( listener = self.useFixture(
utils.NotificationFixture(transport.transport, utils.NotificationFixture(self.url, ['test_simple']))
['test_simple']))
transport.wait()
notifier = listener.notifier('abc') notifier = listener.notifier('abc')
notifier.info({}, 'test', 'Hello World!') notifier.info({}, 'test', 'Hello World!')
@ -178,11 +175,8 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
self.assertEqual('abc', event[3]) self.assertEqual('abc', event[3])
def test_multiple_topics(self): def test_multiple_topics(self):
transport = self.useFixture(utils.TransportFixture(self.url))
listener = self.useFixture( listener = self.useFixture(
utils.NotificationFixture(transport.transport, utils.NotificationFixture(self.url, ['a', 'b']))
['a', 'b']))
transport.wait()
a = listener.notifier('pub-a', topic='a') a = listener.notifier('pub-a', topic='a')
b = listener.notifier('pub-b', topic='b') b = listener.notifier('pub-b', topic='b')
@ -206,18 +200,17 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
self.assertEqual(expected[2], actual[2]) self.assertEqual(expected[2], actual[2])
def test_multiple_servers(self): def test_multiple_servers(self):
transport = self.useFixture(utils.TransportFixture(self.url)) if self.url.startswith("amqp:"):
self.skipTest("QPID-6307")
listener_a = self.useFixture( listener_a = self.useFixture(
utils.NotificationFixture(transport.transport, utils.NotificationFixture(self.url, ['test-topic']))
['test-topic']))
listener_b = self.useFixture( listener_b = self.useFixture(
utils.NotificationFixture(transport.transport, utils.NotificationFixture(self.url, ['test-topic']))
['test-topic']))
transport.wait()
n = listener_a.notifier('pub') n = listener_a.notifier('pub')
events_out = [('test-%s' % c, 'payload-%s' % c) for c in 'abcdefgh'] events_out = [('test-%s' % c, 'payload-%s' % c) for c in 'abcdefgh']
for event_type, payload in events_out: for event_type, payload in events_out:
n.info({}, event_type, payload) n.info({}, event_type, payload)
@ -229,14 +222,10 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
self.assertThat(len(stream), matchers.GreaterThan(0)) self.assertThat(len(stream), matchers.GreaterThan(0))
def test_independent_topics(self): def test_independent_topics(self):
transport = self.useFixture(utils.TransportFixture(self.url))
listener_a = self.useFixture( listener_a = self.useFixture(
utils.NotificationFixture(transport.transport, utils.NotificationFixture(self.url, ['1']))
['1']))
listener_b = self.useFixture( listener_b = self.useFixture(
utils.NotificationFixture(transport.transport, utils.NotificationFixture(self.url, ['2']))
['2']))
transport.wait()
a = listener_a.notifier('pub-1', topic='1') a = listener_a.notifier('pub-1', topic='1')
b = listener_b.notifier('pub-2', topic='2') b = listener_b.notifier('pub-2', topic='2')
@ -264,10 +253,8 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
self.assertEqual('pub-2', actual[3]) self.assertEqual('pub-2', actual[3])
def test_all_categories(self): def test_all_categories(self):
transport = self.useFixture(utils.TransportFixture(self.url))
listener = self.useFixture(utils.NotificationFixture( listener = self.useFixture(utils.NotificationFixture(
transport.transport, ['test_all_categories'])) self.url, ['test_all_categories']))
transport.wait()
n = listener.notifier('abc') n = listener.notifier('abc')
cats = ['debug', 'audit', 'info', 'warn', 'error', 'critical'] cats = ['debug', 'audit', 'info', 'warn', 'error', 'critical']

View File

@ -62,16 +62,16 @@ class TransportFixture(fixtures.Fixture):
super(TransportFixture, self).cleanUp() super(TransportFixture, self).cleanUp()
def wait(self): def wait(self):
if self.url.startswith("rabbit") or self.url.startswith("qpid"): # allow time for the server to connect to the broker
time.sleep(0.5) time.sleep(0.5)
class RpcServerFixture(fixtures.Fixture): class RpcServerFixture(fixtures.Fixture):
"""Fixture to setup the TestServerEndpoint.""" """Fixture to setup the TestServerEndpoint."""
def __init__(self, transport, target, endpoint=None, ctrl_target=None): def __init__(self, url, target, endpoint=None, ctrl_target=None):
super(RpcServerFixture, self).__init__() super(RpcServerFixture, self).__init__()
self.transport = transport self.url = url
self.target = target self.target = target
self.endpoint = endpoint or TestServerEndpoint() self.endpoint = endpoint or TestServerEndpoint()
self.syncq = moves.queue.Queue() self.syncq = moves.queue.Queue()
@ -80,11 +80,14 @@ class RpcServerFixture(fixtures.Fixture):
def setUp(self): def setUp(self):
super(RpcServerFixture, self).setUp() super(RpcServerFixture, self).setUp()
endpoints = [self.endpoint, self] endpoints = [self.endpoint, self]
self.server = oslo_messaging.get_rpc_server(self.transport, transport = self.useFixture(TransportFixture(self.url))
self.server = oslo_messaging.get_rpc_server(transport.transport,
self.target, self.target,
endpoints) endpoints)
self._ctrl = oslo_messaging.RPCClient(self.transport, self.ctrl_target) self._ctrl = oslo_messaging.RPCClient(transport.transport,
self.ctrl_target)
self._start() self._start()
transport.wait()
def cleanUp(self): def cleanUp(self):
self._stop() self._stop()
@ -104,13 +107,13 @@ class RpcServerFixture(fixtures.Fixture):
def ping(self, ctxt): def ping(self, ctxt):
pass pass
def sync(self, ctxt, item): def sync(self, ctxt):
self.syncq.put(item) self.syncq.put('x')
class RpcServerGroupFixture(fixtures.Fixture): class RpcServerGroupFixture(fixtures.Fixture):
def __init__(self, url, topic=None, names=None, exchange=None, def __init__(self, url, topic=None, names=None, exchange=None,
transport=None, use_fanout_ctrl=False): use_fanout_ctrl=False):
self.url = url self.url = url
# NOTE(sileht): topic and servier_name must be uniq # NOTE(sileht): topic and servier_name must be uniq
# to be able to run all tests in parallel # to be able to run all tests in parallel
@ -119,15 +122,11 @@ class RpcServerGroupFixture(fixtures.Fixture):
for i in range(3)] for i in range(3)]
self.exchange = exchange self.exchange = exchange
self.targets = [self._target(server=n) for n in self.names] self.targets = [self._target(server=n) for n in self.names]
self.transport = transport
self.use_fanout_ctrl = use_fanout_ctrl self.use_fanout_ctrl = use_fanout_ctrl
def setUp(self): def setUp(self):
super(RpcServerGroupFixture, self).setUp() super(RpcServerGroupFixture, self).setUp()
if not self.transport:
self.transport = self.useFixture(TransportFixture(self.url))
self.servers = [self.useFixture(self._server(t)) for t in self.targets] self.servers = [self.useFixture(self._server(t)) for t in self.targets]
self.transport.wait()
def _target(self, server=None, fanout=False): def _target(self, server=None, fanout=False):
t = oslo_messaging.Target(exchange=self.exchange, topic=self.topic) t = oslo_messaging.Target(exchange=self.exchange, topic=self.topic)
@ -139,8 +138,8 @@ class RpcServerGroupFixture(fixtures.Fixture):
ctrl = None ctrl = None
if self.use_fanout_ctrl: if self.use_fanout_ctrl:
ctrl = self._target(fanout=True) ctrl = self._target(fanout=True)
return RpcServerFixture(self.transport.transport, target, server = RpcServerFixture(self.url, target, ctrl_target=ctrl)
ctrl_target=ctrl) return server
def client(self, server=None, cast=False): def client(self, server=None, cast=False):
if server is None: if server is None:
@ -152,8 +151,12 @@ class RpcServerGroupFixture(fixtures.Fixture):
target = self.targets[server] target = self.targets[server]
else: else:
raise ValueError("Invalid value for server: %r" % server) raise ValueError("Invalid value for server: %r" % server)
return ClientStub(self.transport.transport, target, cast=cast,
timeout=5) transport = self.useFixture(TransportFixture(self.url))
client = ClientStub(transport.transport, target, cast=cast,
timeout=5)
transport.wait()
return client
def sync(self, server=None): def sync(self, server=None):
if server is None: if server is None:
@ -161,13 +164,9 @@ class RpcServerGroupFixture(fixtures.Fixture):
self.client(i).ping() self.client(i).ping()
else: else:
if server == 'all': if server == 'all':
c = self.client(server='all', cast=True)
c.sync(item='x')
for s in self.servers: for s in self.servers:
s.syncq.get(timeout=5) s.syncq.get(timeout=5)
elif server >= 0 and server < len(self.targets): elif server >= 0 and server < len(self.targets):
c = self.client(server=server, cast=True)
c.sync(item='x')
self.servers[server].syncq.get(timeout=5) self.servers[server].syncq.get(timeout=5)
else: else:
raise ValueError("Invalid value for server: %r" % server) raise ValueError("Invalid value for server: %r" % server)
@ -276,9 +275,9 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase):
class NotificationFixture(fixtures.Fixture): class NotificationFixture(fixtures.Fixture):
def __init__(self, transport, topics): def __init__(self, url, topics):
super(NotificationFixture, self).__init__() super(NotificationFixture, self).__init__()
self.transport = transport self.url = url
self.topics = topics self.topics = topics
self.events = moves.queue.Queue() self.events = moves.queue.Queue()
self.name = str(id(self)) self.name = str(id(self))
@ -288,12 +287,14 @@ class NotificationFixture(fixtures.Fixture):
targets = [oslo_messaging.Target(topic=t) for t in self.topics] targets = [oslo_messaging.Target(topic=t) for t in self.topics]
# add a special topic for internal notifications # add a special topic for internal notifications
targets.append(oslo_messaging.Target(topic=self.name)) targets.append(oslo_messaging.Target(topic=self.name))
transport = self.useFixture(TransportFixture(self.url))
self.server = oslo_messaging.get_notification_listener( self.server = oslo_messaging.get_notification_listener(
self.transport, transport.transport,
targets, targets,
[self]) [self])
self._ctrl = self.notifier('internal', topic=self.name) self._ctrl = self.notifier('internal', topic=self.name)
self._start() self._start()
transport.wait()
def cleanUp(self): def cleanUp(self):
self._stop() self._stop()
@ -311,10 +312,13 @@ class NotificationFixture(fixtures.Fixture):
self.thread.join() self.thread.join()
def notifier(self, publisher, topic=None): def notifier(self, publisher, topic=None):
return notifier.Notifier(self.transport, transport = self.useFixture(TransportFixture(self.url))
publisher, n = notifier.Notifier(transport.transport,
driver='messaging', publisher,
topic=topic or self.topics[0]) driver='messaging',
topic=topic or self.topics[0])
transport.wait()
return n
def debug(self, ctxt, publisher, event_type, payload, metadata): def debug(self, ctxt, publisher, event_type, payload, metadata):
self.events.put(['debug', event_type, payload, publisher]) self.events.put(['debug', event_type, payload, publisher])