# # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import os import time import uuid import concurrent.futures from oslo_config import cfg import six.moves from testtools import matchers import oslo_messaging from oslo_messaging.tests.functional import utils class CallTestCase(utils.SkipIfNoTransportURL): def setUp(self): super(CallTestCase, self).setUp(conf=cfg.ConfigOpts()) if self.url.startswith("kafka://"): self.skipTest("kafka does not support RPC API") self.conf.prog = "test_prog" self.conf.project = "test_project" self.config(heartbeat_timeout_threshold=0, group='oslo_messaging_rabbit') def test_specific_server(self): group = self.useFixture(utils.RpcServerGroupFixture( self.conf, self.url) ) client = group.client(1) client.append(text='open') self.assertEqual('openstack', client.append(text='stack')) client.add(increment=2) self.assertEqual(12, client.add(increment=10)) self.assertEqual(9, client.subtract(increment=3)) self.assertEqual('openstack', group.servers[1].endpoint.sval) self.assertEqual(9, group.servers[1].endpoint.ival) for i in [0, 2]: self.assertEqual('', group.servers[i].endpoint.sval) self.assertEqual(0, group.servers[i].endpoint.ival) def test_server_in_group(self): group = self.useFixture( utils.RpcServerGroupFixture(self.conf, self.url) ) client = group.client() data = [c for c in 'abcdefghijklmn'] for i in data: client.append(text=i) for s in group.servers: self.assertThat(len(s.endpoint.sval), matchers.GreaterThan(0)) actual = [[c for c in s.endpoint.sval] for s in group.servers] self.assertThat(actual, utils.IsValidDistributionOf(data)) def test_different_exchanges(self): # If the different exchanges are not honoured, then the # teardown may hang unless we broadcast all control messages # to each server group1 = self.useFixture( utils.RpcServerGroupFixture(self.conf, self.url, use_fanout_ctrl=True)) group2 = self.useFixture( utils.RpcServerGroupFixture(self.conf, self.url, exchange="a", use_fanout_ctrl=True)) group3 = self.useFixture( utils.RpcServerGroupFixture(self.conf, self.url, exchange="b", use_fanout_ctrl=True)) client1 = group1.client(1) data1 = [c for c in 'abcdefghijklmn'] for i in data1: client1.append(text=i) client2 = group2.client() data2 = [c for c in 'opqrstuvwxyz'] for i in data2: client2.append(text=i) actual1 = [[c for c in s.endpoint.sval] for s in group1.servers] self.assertThat(actual1, utils.IsValidDistributionOf(data1)) actual1 = [c for c in group1.servers[1].endpoint.sval] self.assertThat([actual1], utils.IsValidDistributionOf(data1)) for s in group1.servers: expected = len(data1) if group1.servers.index(s) == 1 else 0 self.assertEqual(expected, len(s.endpoint.sval)) self.assertEqual(0, s.endpoint.ival) actual2 = [[c for c in s.endpoint.sval] for s in group2.servers] for s in group2.servers: self.assertThat(len(s.endpoint.sval), matchers.GreaterThan(0)) self.assertEqual(0, s.endpoint.ival) self.assertThat(actual2, utils.IsValidDistributionOf(data2)) for s in group3.servers: self.assertEqual(0, len(s.endpoint.sval)) self.assertEqual(0, s.endpoint.ival) def test_timeout(self): transport = self.useFixture( utils.RPCTransportFixture(self.conf, self.url) ) target = oslo_messaging.Target(topic="no_such_topic") c = utils.ClientStub(transport.transport, target, timeout=1) self.assertThat(c.ping, matchers.raises(oslo_messaging.MessagingTimeout)) def test_exception(self): group = self.useFixture( utils.RpcServerGroupFixture(self.conf, self.url) ) client = group.client(1) client.add(increment=2) self.assertRaises(ValueError, client.subtract, increment=3) def test_timeout_with_concurrently_queues(self): transport = self.useFixture( utils.RPCTransportFixture(self.conf, self.url) ) target = oslo_messaging.Target(topic="topic_" + str(uuid.uuid4()), server="server_" + str(uuid.uuid4())) server = self.useFixture( utils.RpcServerFixture(self.conf, self.url, target, executor="threading")) client = utils.ClientStub(transport.transport, target, cast=False, timeout=5) def short_periodical_tasks(): for i in range(10): client.add(increment=1) time.sleep(1) with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: future = executor.submit(client.long_running_task, seconds=10) executor.submit(short_periodical_tasks) self.assertRaises(oslo_messaging.MessagingTimeout, future.result) self.assertEqual(10, server.endpoint.ival) def test_mandatory_call(self): if not self.url.startswith("rabbit://"): self.skipTest("backend does not support call monitoring") transport = self.useFixture(utils.RPCTransportFixture(self.conf, self.url)) target = oslo_messaging.Target(topic='topic_' + str(uuid.uuid4()), server='server_' + str(uuid.uuid4())) # test for mandatory flag using transport-options, see: # https://blueprints.launchpad.net/oslo.messaging/+spec/transport-options # first test with `at_least_once=False` raises a "MessagingTimeout" # error since there is no control if the queue actually exists. # (Default behavior) options = oslo_messaging.TransportOptions(at_least_once=False) client1 = utils.ClientStub(transport.transport, target, cast=False, timeout=1, transport_options=options) self.assertRaises(oslo_messaging.MessagingTimeout, client1.delay) # second test with `at_least_once=True` raises a "MessageUndeliverable" # caused by mandatory flag. # the MessageUndeliverable error is raised immediately without waiting # any timeout options2 = oslo_messaging.TransportOptions(at_least_once=True) client2 = utils.ClientStub(transport.transport, target, cast=False, timeout=60, transport_options=options2) self.assertRaises(oslo_messaging.MessageUndeliverable, client2.delay) def test_monitor_long_call(self): if not (self.url.startswith("rabbit://") or self.url.startswith("amqp://")): self.skipTest("backend does not support call monitoring") transport = self.useFixture(utils.RPCTransportFixture(self.conf, self.url)) target = oslo_messaging.Target(topic='topic_' + str(uuid.uuid4()), server='server_' + str(uuid.uuid4())) class _endpoint(object): def delay(self, ctxt, seconds): time.sleep(seconds) return seconds self.useFixture( utils.RpcServerFixture(self.conf, self.url, target, executor='threading', endpoint=_endpoint())) # First case, no monitoring, ensure we timeout normally when the # server side runs long client1 = utils.ClientStub(transport.transport, target, cast=False, timeout=1) self.assertRaises(oslo_messaging.MessagingTimeout, client1.delay, seconds=4) # Second case, set a short call monitor timeout and a very # long overall timeout. If we didn't honor the call monitor # timeout, we would wait an hour, past the test timeout. If # the server was not sending message heartbeats, we'd time out # after two seconds. client2 = utils.ClientStub(transport.transport, target, cast=False, timeout=3600, call_monitor_timeout=2) self.assertEqual(4, client2.delay(seconds=4)) def test_endpoint_version_namespace(self): # verify endpoint version and namespace are checked target = oslo_messaging.Target(topic="topic_" + str(uuid.uuid4()), server="server_" + str(uuid.uuid4()), namespace="Name1", version="7.5") class _endpoint(object): def __init__(self, target): self.target = target() def test(self, ctxt, echo): return echo transport = self.useFixture( utils.RPCTransportFixture(self.conf, self.url) ) self.useFixture( utils.RpcServerFixture(self.conf, self.url, target, executor="threading", endpoint=_endpoint(target))) client1 = utils.ClientStub(transport.transport, target, cast=False, timeout=5) self.assertEqual("Hi there", client1.test(echo="Hi there")) # unsupported version target2 = target() target2.version = "7.6" client2 = utils.ClientStub(transport.transport, target2, cast=False, timeout=5) self.assertRaises(oslo_messaging.rpc.client.RemoteError, client2.test, echo="Expect failure") # no matching namespace target3 = oslo_messaging.Target(topic=target.topic, server=target.server, version=target.version, namespace="Name2") client3 = utils.ClientStub(transport.transport, target3, cast=False, timeout=5) self.assertRaises(oslo_messaging.rpc.client.RemoteError, client3.test, echo="Expect failure") def test_bad_endpoint(self): # 'target' attribute is reserved and should be of type Target class _endpoint(object): def target(self, ctxt, echo): return echo target = oslo_messaging.Target(topic="topic_" + str(uuid.uuid4()), server="server_" + str(uuid.uuid4())) transport = self.useFixture( utils.RPCTransportFixture(self.conf, self.url) ) self.assertRaises(TypeError, oslo_messaging.get_rpc_server, transport=transport.transport, target=target, endpoints=[_endpoint()], executor="threading") class CastTestCase(utils.SkipIfNoTransportURL): # Note: casts return immediately, so these tests utilise a special # internal sync() cast to ensure prior casts are complete before # making the necessary assertions. def setUp(self): super(CastTestCase, self).setUp() if self.url.startswith("kafka://"): self.skipTest("kafka does not support RPC API") def test_specific_server(self): group = self.useFixture( utils.RpcServerGroupFixture(self.conf, self.url) ) client = group.client(1, cast=True) client.append(text='open') client.append(text='stack') client.add(increment=2) client.add(increment=10) time.sleep(0.3) client.sync() group.sync(1) self.assertIn(group.servers[1].endpoint.sval, ["openstack", "stackopen"]) self.assertEqual(12, group.servers[1].endpoint.ival) for i in [0, 2]: self.assertEqual('', group.servers[i].endpoint.sval) self.assertEqual(0, group.servers[i].endpoint.ival) def test_server_in_group(self): if self.url.startswith("amqp:"): self.skipTest("QPID-6307") group = self.useFixture( utils.RpcServerGroupFixture(self.conf, self.url) ) client = group.client(cast=True) for i in range(20): client.add(increment=1) for i in range(len(group.servers)): # expect each server to get a sync client.sync() group.sync(server="all") total = 0 for s in group.servers: ival = s.endpoint.ival self.assertThat(ival, matchers.GreaterThan(0)) self.assertThat(ival, matchers.LessThan(20)) total += ival self.assertEqual(20, total) def test_fanout(self): group = self.useFixture( utils.RpcServerGroupFixture(self.conf, self.url) ) client = group.client('all', cast=True) client.append(text='open') client.append(text='stack') client.add(increment=2) client.add(increment=10) time.sleep(0.3) client.sync() group.sync(server='all') for s in group.servers: self.assertIn(s.endpoint.sval, ["openstack", "stackopen"]) self.assertEqual(12, s.endpoint.ival) class NotifyTestCase(utils.SkipIfNoTransportURL): # NOTE(sileht): Each test must not use the same topics # to be run in parallel # NOTE(ansmith): kafka partition assignment delay requires # longer timeouts for test completion def test_simple(self): get_timeout = 1 if self.url.startswith("kafka://"): get_timeout = 5 self.conf.set_override('consumer_group', 'test_simple', group='oslo_messaging_kafka') listener = self.useFixture( utils.NotificationFixture(self.conf, self.url, ['test_simple'])) notifier = listener.notifier('abc') notifier.info({}, 'test', 'Hello World!') event = listener.events.get(timeout=get_timeout) self.assertEqual('info', event[0]) self.assertEqual('test', event[1]) self.assertEqual('Hello World!', event[2]) self.assertEqual('abc', event[3]) def test_multiple_topics(self): get_timeout = 1 if self.url.startswith("kafka://"): get_timeout = 5 self.conf.set_override('consumer_group', 'test_multiple_topics', group='oslo_messaging_kafka') listener = self.useFixture( utils.NotificationFixture(self.conf, self.url, ['a', 'b'])) a = listener.notifier('pub-a', topics=['a']) b = listener.notifier('pub-b', topics=['b']) sent = { 'pub-a': [a, 'test-a', 'payload-a'], 'pub-b': [b, 'test-b', 'payload-b'] } for e in sent.values(): e[0].info({}, e[1], e[2]) received = {} while len(received) < len(sent): e = listener.events.get(timeout=get_timeout) received[e[3]] = e for key in received: actual = received[key] expected = sent[key] self.assertEqual('info', actual[0]) self.assertEqual(expected[1], actual[1]) self.assertEqual(expected[2], actual[2]) def test_multiple_servers(self): timeout = 0.5 if self.url.startswith("amqp:"): self.skipTest("QPID-6307") if self.url.startswith("kafka://"): self.skipTest("Kafka: needs to be fixed") timeout = 5 self.conf.set_override('consumer_group', 'test_multiple_servers', group='oslo_messaging_kafka') listener_a = self.useFixture( utils.NotificationFixture(self.conf, self.url, ['test-topic'])) listener_b = self.useFixture( utils.NotificationFixture(self.conf, self.url, ['test-topic'])) n = listener_a.notifier('pub') events_out = [('test-%s' % c, 'payload-%s' % c) for c in 'abcdefgh'] for event_type, payload in events_out: n.info({}, event_type, payload) events_in = [[(e[1], e[2]) for e in listener_a.get_events(timeout)], [(e[1], e[2]) for e in listener_b.get_events(timeout)]] self.assertThat(events_in, utils.IsValidDistributionOf(events_out)) for stream in events_in: self.assertThat(len(stream), matchers.GreaterThan(0)) def test_independent_topics(self): get_timeout = 0.5 if self.url.startswith("kafka://"): get_timeout = 5 self.conf.set_override('consumer_group', 'test_independent_topics_a', group='oslo_messaging_kafka') listener_a = self.useFixture( utils.NotificationFixture(self.conf, self.url, ['1'])) if self.url.startswith("kafka://"): self.conf.set_override('consumer_group', 'test_independent_topics_b', group='oslo_messaging_kafka') listener_b = self.useFixture( utils.NotificationFixture(self.conf, self.url, ['2'])) a = listener_a.notifier('pub-1', topics=['1']) b = listener_b.notifier('pub-2', topics=['2']) a_out = [('test-1-%s' % c, 'payload-1-%s' % c) for c in 'abcdefgh'] for event_type, payload in a_out: a.info({}, event_type, payload) b_out = [('test-2-%s' % c, 'payload-2-%s' % c) for c in 'ijklmnop'] for event_type, payload in b_out: b.info({}, event_type, payload) def check_received(listener, publisher, messages): actuals = sorted([listener.events.get(timeout=get_timeout) for __ in range(len(a_out))]) expected = sorted([['info', m[0], m[1], publisher] for m in messages]) self.assertEqual(expected, actuals) check_received(listener_a, "pub-1", a_out) check_received(listener_b, "pub-2", b_out) def test_all_categories(self): get_timeout = 1 if self.url.startswith("kafka://"): get_timeout = 5 self.conf.set_override('consumer_group', 'test_all_categories', group='oslo_messaging_kafka') listener = self.useFixture(utils.NotificationFixture( self.conf, self.url, ['test_all_categories'])) n = listener.notifier('abc') cats = ['debug', 'audit', 'info', 'warn', 'error', 'critical'] events = [(getattr(n, c), c, 'type-' + c, c + '-data') for c in cats] for e in events: e[0]({}, e[2], e[3]) # order between events with different categories is not guaranteed received = {} for expected in events: e = listener.events.get(timeout=get_timeout) received[e[0]] = e for expected in events: actual = received[expected[1]] self.assertEqual(expected[1], actual[0]) self.assertEqual(expected[2], actual[1]) self.assertEqual(expected[3], actual[2]) def test_simple_batch(self): get_timeout = 3 batch_timeout = 2 if self.url.startswith("amqp:"): backend = os.environ.get("AMQP1_BACKEND") if backend == "qdrouterd": # end-to-end acknowledgement with router intermediary # sender pends until batch_size or timeout reached self.skipTest("qdrouterd backend") if self.url.startswith("kafka://"): get_timeout = 10 batch_timeout = 5 self.conf.set_override('consumer_group', 'test_simple_batch', group='oslo_messaging_kafka') listener = self.useFixture( utils.BatchNotificationFixture(self.conf, self.url, ['test_simple_batch'], batch_size=100, batch_timeout=batch_timeout)) notifier = listener.notifier('abc') for i in six.moves.range(0, 205): notifier.info({}, 'test%s' % i, 'Hello World!') events = listener.get_events(timeout=get_timeout) self.assertEqual(3, len(events)) self.assertEqual(100, len(events[0][1])) self.assertEqual(100, len(events[1][1])) self.assertEqual(5, len(events[2][1])) def test_compression(self): get_timeout = 1 if self.url.startswith("amqp:"): self.conf.set_override('kombu_compression', 'gzip', group='oslo_messaging_rabbit') if self.url.startswith("kafka://"): get_timeout = 5 self.conf.set_override('compression_codec', 'gzip', group='oslo_messaging_kafka') self.conf.set_override('consumer_group', 'test_compression', group='oslo_messaging_kafka') listener = self.useFixture( utils.NotificationFixture(self.conf, self.url, ['test_compression'])) notifier = listener.notifier('abc') notifier.info({}, 'test', 'Hello World!') event = listener.events.get(timeout=get_timeout) self.assertEqual('info', event[0]) self.assertEqual('test', event[1]) self.assertEqual('Hello World!', event[2]) self.assertEqual('abc', event[3])