2013-07-26 07:43:13 +01:00
|
|
|
# Copyright 2013 Red Hat, Inc.
|
|
|
|
#
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
|
|
# not use this file except in compliance with the License. You may obtain
|
|
|
|
# a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
|
|
# License for the specific language governing permissions and limitations
|
|
|
|
# under the License.
|
|
|
|
|
2013-07-31 11:36:28 +01:00
|
|
|
import datetime
|
2014-03-07 10:46:17 +01:00
|
|
|
import operator
|
2013-08-01 20:56:01 +01:00
|
|
|
import sys
|
2013-08-01 08:04:57 +01:00
|
|
|
import threading
|
2013-07-31 11:36:28 +01:00
|
|
|
import uuid
|
|
|
|
|
|
|
|
import fixtures
|
|
|
|
import kombu
|
2014-02-21 11:50:45 +01:00
|
|
|
import mock
|
2013-07-31 11:36:28 +01:00
|
|
|
import testscenarios
|
|
|
|
|
2013-07-26 07:43:13 +01:00
|
|
|
from oslo import messaging
|
2013-08-26 11:10:57 +01:00
|
|
|
from oslo.messaging._drivers import amqpdriver
|
2013-08-11 13:42:34 +01:00
|
|
|
from oslo.messaging._drivers import common as driver_common
|
2013-07-26 07:43:13 +01:00
|
|
|
from oslo.messaging._drivers import impl_rabbit as rabbit_driver
|
2013-07-31 11:36:28 +01:00
|
|
|
from oslo.messaging.openstack.common import jsonutils
|
2013-07-26 07:43:13 +01:00
|
|
|
from tests import utils as test_utils
|
|
|
|
|
2013-07-31 11:36:28 +01:00
|
|
|
load_tests = testscenarios.load_tests_apply_scenarios
|
|
|
|
|
2013-07-26 07:43:13 +01:00
|
|
|
|
2013-08-01 20:56:01 +01:00
|
|
|
class TestRabbitDriverLoad(test_utils.BaseTestCase):
|
2013-07-26 07:43:13 +01:00
|
|
|
|
|
|
|
def setUp(self):
|
2013-08-01 20:56:01 +01:00
|
|
|
super(TestRabbitDriverLoad, self).setUp()
|
2013-08-13 12:46:36 +01:00
|
|
|
self.messaging_conf.transport_driver = 'rabbit'
|
|
|
|
self.messaging_conf.in_memory = True
|
2013-07-26 07:43:13 +01:00
|
|
|
|
|
|
|
def test_driver_load(self):
|
|
|
|
transport = messaging.get_transport(self.conf)
|
2013-08-07 06:43:55 +01:00
|
|
|
self.assertIsInstance(transport._driver, rabbit_driver.RabbitDriver)
|
2013-07-26 07:43:13 +01:00
|
|
|
|
2013-08-01 20:56:01 +01:00
|
|
|
|
2013-08-12 07:48:00 +01:00
|
|
|
class TestRabbitTransportURL(test_utils.BaseTestCase):
|
|
|
|
|
|
|
|
scenarios = [
|
2014-03-07 10:46:17 +01:00
|
|
|
('none', dict(url=None,
|
|
|
|
expected=[dict(hostname='localhost',
|
|
|
|
port=5672,
|
|
|
|
userid='guest',
|
|
|
|
password='guest',
|
|
|
|
virtual_host='/')])),
|
2013-10-23 07:25:27 +01:00
|
|
|
('empty',
|
|
|
|
dict(url='rabbit:///',
|
2014-03-07 10:46:17 +01:00
|
|
|
expected=[dict(hostname='localhost',
|
|
|
|
port=5672,
|
|
|
|
userid='guest',
|
|
|
|
password='guest',
|
|
|
|
virtual_host='')])),
|
2013-08-12 07:48:00 +01:00
|
|
|
('localhost',
|
|
|
|
dict(url='rabbit://localhost/',
|
2014-03-07 10:46:17 +01:00
|
|
|
expected=[dict(hostname='localhost',
|
|
|
|
port=5672,
|
|
|
|
userid='',
|
|
|
|
password='',
|
|
|
|
virtual_host='')])),
|
2013-10-23 07:25:27 +01:00
|
|
|
('virtual_host',
|
|
|
|
dict(url='rabbit:///vhost',
|
2014-03-07 10:46:17 +01:00
|
|
|
expected=[dict(hostname='localhost',
|
|
|
|
port=5672,
|
|
|
|
userid='guest',
|
|
|
|
password='guest',
|
|
|
|
virtual_host='vhost')])),
|
2013-08-12 07:48:00 +01:00
|
|
|
('no_creds',
|
|
|
|
dict(url='rabbit://host/virtual_host',
|
2014-03-07 10:46:17 +01:00
|
|
|
expected=[dict(hostname='host',
|
|
|
|
port=5672,
|
|
|
|
userid='',
|
|
|
|
password='',
|
|
|
|
virtual_host='virtual_host')])),
|
2013-08-12 07:48:00 +01:00
|
|
|
('no_port',
|
|
|
|
dict(url='rabbit://user:password@host/virtual_host',
|
2014-03-07 10:46:17 +01:00
|
|
|
expected=[dict(hostname='host',
|
|
|
|
port=5672,
|
|
|
|
userid='user',
|
|
|
|
password='password',
|
|
|
|
virtual_host='virtual_host')])),
|
2013-08-12 07:48:00 +01:00
|
|
|
('full_url',
|
|
|
|
dict(url='rabbit://user:password@host:10/virtual_host',
|
2014-03-07 10:46:17 +01:00
|
|
|
expected=[dict(hostname='host',
|
|
|
|
port=10,
|
|
|
|
userid='user',
|
|
|
|
password='password',
|
|
|
|
virtual_host='virtual_host')])),
|
|
|
|
('full_two_url',
|
|
|
|
dict(url='rabbit://user:password@host:10,'
|
|
|
|
'user2:password2@host2:12/virtual_host',
|
|
|
|
expected=[dict(hostname='host',
|
|
|
|
port=10,
|
|
|
|
userid='user',
|
|
|
|
password='password',
|
|
|
|
virtual_host='virtual_host'),
|
|
|
|
dict(hostname='host2',
|
|
|
|
port=12,
|
|
|
|
userid='user2',
|
|
|
|
password='password2',
|
|
|
|
virtual_host='virtual_host')
|
|
|
|
]
|
|
|
|
)),
|
2013-08-12 07:48:00 +01:00
|
|
|
|
2014-03-07 10:46:17 +01:00
|
|
|
]
|
Properly handle transport URL config on the client
On the client side, in the rabbit and qpid drivers, we use a connection
pool to avoid opening a connection for each message we send. However,
there is only currently one connection pool per process:
def get_connection_pool(conf, connection_cls):
with _pool_create_sem:
# Make sure only one thread tries to create the connection pool.
if not connection_cls.pool:
connection_cls.pool = ConnectionPool(conf, connection_cls)
return connection_cls.pool
This is a nasty artifact of the original RPC having no conectp of a
transport context - everything was a global. We'll fix this soon enough.
In the meantime, we need to make sure we only use this connection pool
where we're not using the default transport configuration from the
config file - i.e. where we supply a transport URL.
The use case here is cells - we send messages to a remote cell by
connecting to it using a transport URL. In our devstack testing, the
two cells are on the same Rabbit broker but under different virtual
hosts. Because we were always using the connection pool on the client
side, we were seeing both cells always send messages to the '/' virtual
host.
Note - avoiding the connection pool in the case of cells is the same
behaviour as the current RPC code:
def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
...
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
Change-Id: I2f35b45ef237bb85ab8faf58a408c03fcb1de9d7
2013-10-23 07:28:58 +01:00
|
|
|
|
2014-03-07 10:46:17 +01:00
|
|
|
def test_transport_url(self):
|
2013-08-13 12:46:36 +01:00
|
|
|
self.messaging_conf.in_memory = True
|
2013-08-12 07:48:00 +01:00
|
|
|
|
2014-03-07 10:46:17 +01:00
|
|
|
transport = messaging.get_transport(self.conf, self.url)
|
|
|
|
self.addCleanup(transport.cleanup)
|
|
|
|
driver = transport._driver
|
2013-12-02 09:27:29 +01:00
|
|
|
|
2014-03-07 10:46:17 +01:00
|
|
|
brokers_params = driver._get_connection().brokers_params[:]
|
|
|
|
brokers_params = [dict((k, v) for k, v in broker.items()
|
|
|
|
if k not in ['transport', 'login_method'])
|
|
|
|
for broker in brokers_params]
|
2013-08-12 07:48:00 +01:00
|
|
|
|
2014-03-07 10:46:17 +01:00
|
|
|
self.assertEqual(sorted(self.expected,
|
|
|
|
key=operator.itemgetter('hostname')),
|
|
|
|
sorted(brokers_params,
|
|
|
|
key=operator.itemgetter('hostname')))
|
2013-08-12 07:48:00 +01:00
|
|
|
|
|
|
|
|
2013-08-01 20:56:01 +01:00
|
|
|
class TestSendReceive(test_utils.BaseTestCase):
|
|
|
|
|
|
|
|
_n_senders = [
|
|
|
|
('single_sender', dict(n_senders=1)),
|
|
|
|
('multiple_senders', dict(n_senders=10)),
|
|
|
|
]
|
|
|
|
|
|
|
|
_context = [
|
|
|
|
('empty_context', dict(ctxt={})),
|
|
|
|
('with_context', dict(ctxt={'user': 'mark'})),
|
|
|
|
]
|
|
|
|
|
2013-08-17 21:40:02 +01:00
|
|
|
_reply = [
|
|
|
|
('rx_id', dict(rx_id=True, reply=None)),
|
|
|
|
('none', dict(rx_id=False, reply=None)),
|
|
|
|
('empty_list', dict(rx_id=False, reply=[])),
|
|
|
|
('empty_dict', dict(rx_id=False, reply={})),
|
|
|
|
('false', dict(rx_id=False, reply=False)),
|
|
|
|
('zero', dict(rx_id=False, reply=0)),
|
|
|
|
]
|
|
|
|
|
2013-08-01 20:56:01 +01:00
|
|
|
_failure = [
|
|
|
|
('success', dict(failure=False)),
|
2013-08-07 09:00:44 +01:00
|
|
|
('failure', dict(failure=True, expected=False)),
|
|
|
|
('expected_failure', dict(failure=True, expected=True)),
|
2013-08-01 20:56:01 +01:00
|
|
|
]
|
|
|
|
|
2013-08-01 21:45:32 +01:00
|
|
|
_timeout = [
|
|
|
|
('no_timeout', dict(timeout=None)),
|
|
|
|
('timeout', dict(timeout=0.01)), # FIXME(markmc): timeout=0 is broken?
|
|
|
|
]
|
|
|
|
|
2013-08-01 20:56:01 +01:00
|
|
|
@classmethod
|
|
|
|
def generate_scenarios(cls):
|
|
|
|
cls.scenarios = testscenarios.multiply_scenarios(cls._n_senders,
|
|
|
|
cls._context,
|
2013-08-17 21:40:02 +01:00
|
|
|
cls._reply,
|
2013-08-01 21:45:32 +01:00
|
|
|
cls._failure,
|
|
|
|
cls._timeout)
|
2013-08-01 20:56:01 +01:00
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
super(TestSendReceive, self).setUp()
|
2013-08-13 12:46:36 +01:00
|
|
|
self.messaging_conf.transport_driver = 'rabbit'
|
|
|
|
self.messaging_conf.in_memory = True
|
2013-08-01 20:56:01 +01:00
|
|
|
|
2013-07-26 07:43:13 +01:00
|
|
|
def test_send_receive(self):
|
|
|
|
transport = messaging.get_transport(self.conf)
|
2013-07-31 07:54:25 +01:00
|
|
|
self.addCleanup(transport.cleanup)
|
|
|
|
|
2013-07-26 07:43:13 +01:00
|
|
|
driver = transport._driver
|
|
|
|
|
|
|
|
target = messaging.Target(topic='testtopic')
|
|
|
|
|
|
|
|
listener = driver.listen(target)
|
|
|
|
|
2013-08-01 15:41:59 +01:00
|
|
|
senders = []
|
2013-08-01 08:04:57 +01:00
|
|
|
replies = []
|
2013-08-01 15:41:59 +01:00
|
|
|
msgs = []
|
2013-08-11 13:42:34 +01:00
|
|
|
errors = []
|
|
|
|
|
|
|
|
def stub_error(msg, *a, **kw):
|
|
|
|
if (a and len(a) == 1 and isinstance(a[0], dict) and a[0]):
|
|
|
|
a = a[0]
|
|
|
|
errors.append(str(msg) % a)
|
|
|
|
|
|
|
|
self.stubs.Set(driver_common.LOG, 'error', stub_error)
|
2013-07-26 07:43:13 +01:00
|
|
|
|
2013-08-01 15:41:59 +01:00
|
|
|
def send_and_wait_for_reply(i):
|
2013-08-01 20:56:01 +01:00
|
|
|
try:
|
|
|
|
replies.append(driver.send(target,
|
|
|
|
self.ctxt,
|
2013-08-17 21:40:02 +01:00
|
|
|
{'tx_id': i},
|
2013-08-01 21:45:32 +01:00
|
|
|
wait_for_reply=True,
|
|
|
|
timeout=self.timeout))
|
2013-08-01 20:56:01 +01:00
|
|
|
self.assertFalse(self.failure)
|
2013-08-01 21:45:32 +01:00
|
|
|
self.assertIsNone(self.timeout)
|
|
|
|
except (ZeroDivisionError, messaging.MessagingTimeout) as e:
|
2013-08-01 20:56:01 +01:00
|
|
|
replies.append(e)
|
2013-08-01 21:45:32 +01:00
|
|
|
self.assertTrue(self.failure or self.timeout is not None)
|
2013-08-01 20:56:01 +01:00
|
|
|
|
|
|
|
while len(senders) < self.n_senders:
|
2013-08-01 15:41:59 +01:00
|
|
|
senders.append(threading.Thread(target=send_and_wait_for_reply,
|
|
|
|
args=(len(senders), )))
|
2013-07-26 07:43:13 +01:00
|
|
|
|
2013-08-01 15:41:59 +01:00
|
|
|
for i in range(len(senders)):
|
|
|
|
senders[i].start()
|
2013-08-01 08:04:57 +01:00
|
|
|
|
2013-08-01 15:41:59 +01:00
|
|
|
received = listener.poll()
|
2013-08-07 06:43:55 +01:00
|
|
|
self.assertIsNotNone(received)
|
2014-04-02 17:09:21 +08:00
|
|
|
self.assertEqual(self.ctxt, received.ctxt)
|
|
|
|
self.assertEqual({'tx_id': i}, received.message)
|
2013-08-01 15:41:59 +01:00
|
|
|
msgs.append(received)
|
2013-08-01 08:04:57 +01:00
|
|
|
|
2013-08-01 15:41:59 +01:00
|
|
|
# reply in reverse, except reply to the first guy second from last
|
2013-12-18 18:11:23 +01:00
|
|
|
order = list(range(len(senders)-1, -1, -1))
|
2013-08-01 20:56:01 +01:00
|
|
|
if len(order) > 1:
|
|
|
|
order[-1], order[-2] = order[-2], order[-1]
|
2013-08-01 08:04:57 +01:00
|
|
|
|
2013-08-01 15:41:59 +01:00
|
|
|
for i in order:
|
2013-08-01 21:45:32 +01:00
|
|
|
if self.timeout is None:
|
|
|
|
if self.failure:
|
|
|
|
try:
|
|
|
|
raise ZeroDivisionError
|
|
|
|
except Exception:
|
|
|
|
failure = sys.exc_info()
|
2013-08-07 09:00:44 +01:00
|
|
|
msgs[i].reply(failure=failure,
|
|
|
|
log_failure=not self.expected)
|
2013-08-17 21:40:02 +01:00
|
|
|
elif self.rx_id:
|
|
|
|
msgs[i].reply({'rx_id': i})
|
2013-08-01 21:45:32 +01:00
|
|
|
else:
|
2013-08-17 21:40:02 +01:00
|
|
|
msgs[i].reply(self.reply)
|
2013-08-01 15:41:59 +01:00
|
|
|
senders[i].join()
|
|
|
|
|
2014-04-02 17:09:21 +08:00
|
|
|
self.assertEqual(len(senders), len(replies))
|
2013-08-01 15:41:59 +01:00
|
|
|
for i, reply in enumerate(replies):
|
2013-08-01 21:45:32 +01:00
|
|
|
if self.timeout is not None:
|
|
|
|
self.assertIsInstance(reply, messaging.MessagingTimeout)
|
|
|
|
elif self.failure:
|
2013-08-07 06:43:55 +01:00
|
|
|
self.assertIsInstance(reply, ZeroDivisionError)
|
2013-08-17 21:40:02 +01:00
|
|
|
elif self.rx_id:
|
2014-04-02 17:09:21 +08:00
|
|
|
self.assertEqual({'rx_id': order[i]}, reply)
|
2013-08-01 21:45:32 +01:00
|
|
|
else:
|
2014-04-02 17:09:21 +08:00
|
|
|
self.assertEqual(self.reply, reply)
|
2013-08-01 20:56:01 +01:00
|
|
|
|
2013-08-11 13:42:34 +01:00
|
|
|
if not self.timeout and self.failure and not self.expected:
|
|
|
|
self.assertTrue(len(errors) > 0, errors)
|
|
|
|
else:
|
2014-04-02 17:09:21 +08:00
|
|
|
self.assertEqual(0, len(errors), errors)
|
2013-08-11 13:42:34 +01:00
|
|
|
|
2013-08-01 20:56:01 +01:00
|
|
|
|
|
|
|
TestSendReceive.generate_scenarios()
|
2013-07-31 11:36:28 +01:00
|
|
|
|
|
|
|
|
2014-07-07 12:25:12 +00:00
|
|
|
class TestPollAsync(test_utils.BaseTestCase):
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
super(TestPollAsync, self).setUp()
|
|
|
|
self.messaging_conf.transport_driver = 'rabbit'
|
|
|
|
self.messaging_conf.in_memory = True
|
|
|
|
|
|
|
|
def test_poll_timeout(self):
|
|
|
|
transport = messaging.get_transport(self.conf)
|
|
|
|
self.addCleanup(transport.cleanup)
|
|
|
|
driver = transport._driver
|
|
|
|
target = messaging.Target(topic='testtopic')
|
|
|
|
listener = driver.listen(target)
|
|
|
|
received = listener.poll(timeout=0.050)
|
|
|
|
self.assertIsNone(received)
|
|
|
|
|
|
|
|
|
2013-08-26 11:10:57 +01:00
|
|
|
class TestRacyWaitForReply(test_utils.BaseTestCase):
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
super(TestRacyWaitForReply, self).setUp()
|
|
|
|
self.messaging_conf.transport_driver = 'rabbit'
|
|
|
|
self.messaging_conf.in_memory = True
|
|
|
|
|
|
|
|
def test_send_receive(self):
|
|
|
|
transport = messaging.get_transport(self.conf)
|
|
|
|
self.addCleanup(transport.cleanup)
|
|
|
|
|
|
|
|
driver = transport._driver
|
|
|
|
|
|
|
|
target = messaging.Target(topic='testtopic')
|
|
|
|
|
|
|
|
listener = driver.listen(target)
|
|
|
|
|
|
|
|
senders = []
|
|
|
|
replies = []
|
|
|
|
msgs = []
|
|
|
|
|
|
|
|
wait_conditions = []
|
|
|
|
orig_reply_waiter = amqpdriver.ReplyWaiter.wait
|
|
|
|
|
|
|
|
def reply_waiter(self, msg_id, timeout):
|
|
|
|
if wait_conditions:
|
2014-02-24 16:29:35 +05:30
|
|
|
cond = wait_conditions.pop()
|
|
|
|
with cond:
|
|
|
|
cond.notify()
|
|
|
|
with cond:
|
|
|
|
cond.wait()
|
2013-08-26 11:10:57 +01:00
|
|
|
return orig_reply_waiter(self, msg_id, timeout)
|
|
|
|
|
|
|
|
self.stubs.Set(amqpdriver.ReplyWaiter, 'wait', reply_waiter)
|
|
|
|
|
|
|
|
def send_and_wait_for_reply(i):
|
|
|
|
replies.append(driver.send(target,
|
|
|
|
{},
|
|
|
|
{'tx_id': i},
|
|
|
|
wait_for_reply=True,
|
|
|
|
timeout=None))
|
|
|
|
|
|
|
|
while len(senders) < 2:
|
|
|
|
t = threading.Thread(target=send_and_wait_for_reply,
|
|
|
|
args=(len(senders), ))
|
|
|
|
t.daemon = True
|
|
|
|
senders.append(t)
|
|
|
|
|
|
|
|
# Start the first guy, receive his message, but delay his polling
|
|
|
|
notify_condition = threading.Condition()
|
|
|
|
wait_conditions.append(notify_condition)
|
2014-02-24 16:29:35 +05:30
|
|
|
with notify_condition:
|
|
|
|
senders[0].start()
|
|
|
|
notify_condition.wait()
|
2013-08-26 11:10:57 +01:00
|
|
|
|
|
|
|
msgs.append(listener.poll())
|
2014-04-02 17:09:21 +08:00
|
|
|
self.assertEqual({'tx_id': 0}, msgs[-1].message)
|
2013-08-26 11:10:57 +01:00
|
|
|
|
|
|
|
# Start the second guy, receive his message
|
|
|
|
senders[1].start()
|
|
|
|
|
|
|
|
msgs.append(listener.poll())
|
2014-04-02 17:09:21 +08:00
|
|
|
self.assertEqual({'tx_id': 1}, msgs[-1].message)
|
2013-08-26 11:10:57 +01:00
|
|
|
|
|
|
|
# Reply to both in order, making the second thread queue
|
|
|
|
# the reply meant for the first thread
|
|
|
|
msgs[0].reply({'rx_id': 0})
|
|
|
|
msgs[1].reply({'rx_id': 1})
|
|
|
|
|
|
|
|
# Wait for the second thread to finish
|
|
|
|
senders[1].join()
|
|
|
|
|
|
|
|
# Let the first thread continue
|
|
|
|
with notify_condition:
|
|
|
|
notify_condition.notify()
|
|
|
|
|
|
|
|
# Wait for the first thread to finish
|
|
|
|
senders[0].join()
|
|
|
|
|
|
|
|
# Verify replies were received out of order
|
2014-04-02 17:09:21 +08:00
|
|
|
self.assertEqual(len(senders), len(replies))
|
|
|
|
self.assertEqual({'rx_id': 1}, replies[0])
|
|
|
|
self.assertEqual({'rx_id': 0}, replies[1])
|
2013-08-26 11:10:57 +01:00
|
|
|
|
|
|
|
|
2013-07-31 11:36:28 +01:00
|
|
|
def _declare_queue(target):
|
|
|
|
connection = kombu.connection.BrokerConnection(transport='memory')
|
|
|
|
|
|
|
|
# Kludge to speed up tests.
|
|
|
|
connection.transport.polling_interval = 0.0
|
|
|
|
|
|
|
|
connection.connect()
|
|
|
|
channel = connection.channel()
|
|
|
|
|
|
|
|
# work around 'memory' transport bug in 1.1.3
|
|
|
|
channel._new_queue('ae.undeliver')
|
|
|
|
|
|
|
|
if target.fanout:
|
|
|
|
exchange = kombu.entity.Exchange(name=target.topic + '_fanout',
|
|
|
|
type='fanout',
|
|
|
|
durable=False,
|
|
|
|
auto_delete=True)
|
|
|
|
queue = kombu.entity.Queue(name=target.topic + '_fanout_12345',
|
|
|
|
channel=channel,
|
|
|
|
exchange=exchange,
|
|
|
|
routing_key=target.topic)
|
|
|
|
if target.server:
|
|
|
|
exchange = kombu.entity.Exchange(name='openstack',
|
|
|
|
type='topic',
|
|
|
|
durable=False,
|
|
|
|
auto_delete=False)
|
|
|
|
topic = '%s.%s' % (target.topic, target.server)
|
|
|
|
queue = kombu.entity.Queue(name=topic,
|
|
|
|
channel=channel,
|
|
|
|
exchange=exchange,
|
|
|
|
routing_key=topic)
|
|
|
|
else:
|
|
|
|
exchange = kombu.entity.Exchange(name='openstack',
|
|
|
|
type='topic',
|
|
|
|
durable=False,
|
|
|
|
auto_delete=False)
|
|
|
|
queue = kombu.entity.Queue(name=target.topic,
|
|
|
|
channel=channel,
|
|
|
|
exchange=exchange,
|
|
|
|
routing_key=target.topic)
|
|
|
|
|
|
|
|
queue.declare()
|
|
|
|
|
|
|
|
return connection, channel, queue
|
|
|
|
|
|
|
|
|
|
|
|
class TestRequestWireFormat(test_utils.BaseTestCase):
|
|
|
|
|
|
|
|
_target = [
|
|
|
|
('topic_target',
|
|
|
|
dict(topic='testtopic', server=None, fanout=False)),
|
|
|
|
('server_target',
|
|
|
|
dict(topic='testtopic', server='testserver', fanout=False)),
|
|
|
|
# NOTE(markmc): https://github.com/celery/kombu/issues/195
|
|
|
|
('fanout_target',
|
|
|
|
dict(topic='testtopic', server=None, fanout=True,
|
|
|
|
skip_msg='Requires kombu>2.5.12 to fix kombu issue #195')),
|
|
|
|
]
|
|
|
|
|
|
|
|
_msg = [
|
|
|
|
('empty_msg',
|
|
|
|
dict(msg={}, expected={})),
|
|
|
|
('primitive_msg',
|
|
|
|
dict(msg={'foo': 'bar'}, expected={'foo': 'bar'})),
|
|
|
|
('complex_msg',
|
|
|
|
dict(msg={'a': {'b': datetime.datetime(1920, 2, 3, 4, 5, 6, 7)}},
|
|
|
|
expected={'a': {'b': '1920-02-03T04:05:06.000007'}})),
|
|
|
|
]
|
|
|
|
|
|
|
|
_context = [
|
|
|
|
('empty_ctxt', dict(ctxt={}, expected_ctxt={})),
|
|
|
|
('user_project_ctxt',
|
|
|
|
dict(ctxt={'user': 'mark', 'project': 'snarkybunch'},
|
|
|
|
expected_ctxt={'_context_user': 'mark',
|
|
|
|
'_context_project': 'snarkybunch'})),
|
|
|
|
]
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def generate_scenarios(cls):
|
|
|
|
cls.scenarios = testscenarios.multiply_scenarios(cls._msg,
|
|
|
|
cls._context,
|
|
|
|
cls._target)
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
super(TestRequestWireFormat, self).setUp()
|
2013-08-13 12:46:36 +01:00
|
|
|
self.messaging_conf.transport_driver = 'rabbit'
|
|
|
|
self.messaging_conf.in_memory = True
|
2013-07-31 11:36:28 +01:00
|
|
|
|
|
|
|
self.uuids = []
|
|
|
|
self.orig_uuid4 = uuid.uuid4
|
|
|
|
self.useFixture(fixtures.MonkeyPatch('uuid.uuid4', self.mock_uuid4))
|
|
|
|
|
|
|
|
def mock_uuid4(self):
|
|
|
|
self.uuids.append(self.orig_uuid4())
|
|
|
|
return self.uuids[-1]
|
|
|
|
|
|
|
|
def test_request_wire_format(self):
|
|
|
|
if hasattr(self, 'skip_msg'):
|
|
|
|
self.skipTest(self.skip_msg)
|
|
|
|
|
|
|
|
transport = messaging.get_transport(self.conf)
|
|
|
|
self.addCleanup(transport.cleanup)
|
|
|
|
|
|
|
|
driver = transport._driver
|
|
|
|
|
|
|
|
target = messaging.Target(topic=self.topic,
|
|
|
|
server=self.server,
|
|
|
|
fanout=self.fanout)
|
|
|
|
|
|
|
|
connection, channel, queue = _declare_queue(target)
|
|
|
|
self.addCleanup(connection.release)
|
|
|
|
|
|
|
|
driver.send(target, self.ctxt, self.msg)
|
|
|
|
|
|
|
|
msgs = []
|
|
|
|
|
|
|
|
def callback(msg):
|
|
|
|
msg = channel.message_to_python(msg)
|
|
|
|
msg.ack()
|
|
|
|
msgs.append(msg.payload)
|
|
|
|
|
|
|
|
queue.consume(callback=callback,
|
|
|
|
consumer_tag='1',
|
|
|
|
nowait=False)
|
|
|
|
|
|
|
|
connection.drain_events()
|
|
|
|
|
|
|
|
self.assertEqual(1, len(msgs))
|
2013-08-07 06:43:55 +01:00
|
|
|
self.assertIn('oslo.message', msgs[0])
|
2013-07-31 11:36:28 +01:00
|
|
|
|
|
|
|
received = msgs[0]
|
|
|
|
received['oslo.message'] = jsonutils.loads(received['oslo.message'])
|
|
|
|
|
2013-08-26 10:09:16 +01:00
|
|
|
# FIXME(markmc): add _msg_id and _reply_q check
|
2013-07-31 11:36:28 +01:00
|
|
|
expected_msg = {
|
2013-08-26 10:09:16 +01:00
|
|
|
'_unique_id': self.uuids[0].hex,
|
2013-07-31 11:36:28 +01:00
|
|
|
}
|
|
|
|
expected_msg.update(self.expected)
|
|
|
|
expected_msg.update(self.expected_ctxt)
|
|
|
|
|
|
|
|
expected = {
|
|
|
|
'oslo.version': '2.0',
|
|
|
|
'oslo.message': expected_msg,
|
|
|
|
}
|
|
|
|
|
|
|
|
self.assertEqual(expected, received)
|
|
|
|
|
|
|
|
|
|
|
|
TestRequestWireFormat.generate_scenarios()
|
|
|
|
|
|
|
|
|
|
|
|
def _create_producer(target):
|
|
|
|
connection = kombu.connection.BrokerConnection(transport='memory')
|
|
|
|
|
|
|
|
# Kludge to speed up tests.
|
|
|
|
connection.transport.polling_interval = 0.0
|
|
|
|
|
|
|
|
connection.connect()
|
|
|
|
channel = connection.channel()
|
|
|
|
|
|
|
|
# work around 'memory' transport bug in 1.1.3
|
|
|
|
channel._new_queue('ae.undeliver')
|
|
|
|
|
|
|
|
if target.fanout:
|
|
|
|
exchange = kombu.entity.Exchange(name=target.topic + '_fanout',
|
|
|
|
type='fanout',
|
|
|
|
durable=False,
|
|
|
|
auto_delete=True)
|
|
|
|
producer = kombu.messaging.Producer(exchange=exchange,
|
|
|
|
channel=channel,
|
|
|
|
routing_key=target.topic)
|
|
|
|
elif target.server:
|
|
|
|
exchange = kombu.entity.Exchange(name='openstack',
|
|
|
|
type='topic',
|
|
|
|
durable=False,
|
|
|
|
auto_delete=False)
|
|
|
|
topic = '%s.%s' % (target.topic, target.server)
|
|
|
|
producer = kombu.messaging.Producer(exchange=exchange,
|
|
|
|
channel=channel,
|
|
|
|
routing_key=topic)
|
|
|
|
else:
|
|
|
|
exchange = kombu.entity.Exchange(name='openstack',
|
|
|
|
type='topic',
|
|
|
|
durable=False,
|
|
|
|
auto_delete=False)
|
|
|
|
producer = kombu.messaging.Producer(exchange=exchange,
|
|
|
|
channel=channel,
|
|
|
|
routing_key=target.topic)
|
|
|
|
|
|
|
|
return connection, producer
|
|
|
|
|
|
|
|
|
|
|
|
class TestReplyWireFormat(test_utils.BaseTestCase):
|
|
|
|
|
|
|
|
_target = [
|
|
|
|
('topic_target',
|
|
|
|
dict(topic='testtopic', server=None, fanout=False)),
|
|
|
|
('server_target',
|
|
|
|
dict(topic='testtopic', server='testserver', fanout=False)),
|
|
|
|
# NOTE(markmc): https://github.com/celery/kombu/issues/195
|
|
|
|
('fanout_target',
|
|
|
|
dict(topic='testtopic', server=None, fanout=True,
|
|
|
|
skip_msg='Requires kombu>2.5.12 to fix kombu issue #195')),
|
|
|
|
]
|
|
|
|
|
|
|
|
_msg = [
|
|
|
|
('empty_msg',
|
|
|
|
dict(msg={}, expected={})),
|
|
|
|
('primitive_msg',
|
|
|
|
dict(msg={'foo': 'bar'}, expected={'foo': 'bar'})),
|
|
|
|
('complex_msg',
|
|
|
|
dict(msg={'a': {'b': '1920-02-03T04:05:06.000007'}},
|
|
|
|
expected={'a': {'b': '1920-02-03T04:05:06.000007'}})),
|
|
|
|
]
|
|
|
|
|
|
|
|
_context = [
|
|
|
|
('empty_ctxt', dict(ctxt={}, expected_ctxt={})),
|
|
|
|
('user_project_ctxt',
|
|
|
|
dict(ctxt={'_context_user': 'mark',
|
|
|
|
'_context_project': 'snarkybunch'},
|
|
|
|
expected_ctxt={'user': 'mark', 'project': 'snarkybunch'})),
|
|
|
|
]
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def generate_scenarios(cls):
|
|
|
|
cls.scenarios = testscenarios.multiply_scenarios(cls._msg,
|
|
|
|
cls._context,
|
|
|
|
cls._target)
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
super(TestReplyWireFormat, self).setUp()
|
2013-08-13 12:46:36 +01:00
|
|
|
self.messaging_conf.transport_driver = 'rabbit'
|
|
|
|
self.messaging_conf.in_memory = True
|
2013-07-31 11:36:28 +01:00
|
|
|
|
|
|
|
def test_reply_wire_format(self):
|
|
|
|
if hasattr(self, 'skip_msg'):
|
|
|
|
self.skipTest(self.skip_msg)
|
|
|
|
|
|
|
|
transport = messaging.get_transport(self.conf)
|
|
|
|
self.addCleanup(transport.cleanup)
|
|
|
|
|
|
|
|
driver = transport._driver
|
|
|
|
|
|
|
|
target = messaging.Target(topic=self.topic,
|
|
|
|
server=self.server,
|
|
|
|
fanout=self.fanout)
|
|
|
|
|
|
|
|
listener = driver.listen(target)
|
|
|
|
|
|
|
|
connection, producer = _create_producer(target)
|
|
|
|
self.addCleanup(connection.release)
|
|
|
|
|
|
|
|
msg = {
|
|
|
|
'oslo.version': '2.0',
|
|
|
|
'oslo.message': {}
|
|
|
|
}
|
|
|
|
|
|
|
|
msg['oslo.message'].update(self.msg)
|
|
|
|
msg['oslo.message'].update(self.ctxt)
|
|
|
|
|
|
|
|
msg['oslo.message'].update({
|
|
|
|
'_msg_id': uuid.uuid4().hex,
|
|
|
|
'_unique_id': uuid.uuid4().hex,
|
|
|
|
'_reply_q': 'reply_' + uuid.uuid4().hex,
|
|
|
|
})
|
|
|
|
|
|
|
|
msg['oslo.message'] = jsonutils.dumps(msg['oslo.message'])
|
|
|
|
|
|
|
|
producer.publish(msg)
|
|
|
|
|
|
|
|
received = listener.poll()
|
2013-08-07 06:43:55 +01:00
|
|
|
self.assertIsNotNone(received)
|
2013-07-31 11:36:28 +01:00
|
|
|
self.assertEqual(self.expected_ctxt, received.ctxt)
|
|
|
|
self.assertEqual(self.expected, received.message)
|
|
|
|
|
|
|
|
|
|
|
|
TestReplyWireFormat.generate_scenarios()
|
2014-01-24 17:31:46 +01:00
|
|
|
|
|
|
|
|
|
|
|
class RpcKombuHATestCase(test_utils.BaseTestCase):
|
|
|
|
|
2014-02-21 11:50:45 +01:00
|
|
|
def setUp(self):
|
|
|
|
super(RpcKombuHATestCase, self).setUp()
|
|
|
|
self.brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
|
|
|
|
self.config(rabbit_hosts=self.brokers)
|
2014-01-24 17:31:46 +01:00
|
|
|
|
2014-03-21 09:49:32 +08:00
|
|
|
hostname_sets = set()
|
2014-02-21 11:50:45 +01:00
|
|
|
self.info = {'attempt': 0,
|
|
|
|
'fail': False}
|
2014-01-24 17:31:46 +01:00
|
|
|
|
|
|
|
def _connect(myself, params):
|
|
|
|
# do as little work that is enough to pass connection attempt
|
|
|
|
myself.connection = kombu.connection.BrokerConnection(**params)
|
|
|
|
myself.connection_errors = myself.connection.connection_errors
|
|
|
|
|
2014-03-21 09:49:32 +08:00
|
|
|
hostname = params['hostname']
|
|
|
|
self.assertNotIn(hostname, hostname_sets)
|
|
|
|
hostname_sets.add(hostname)
|
2014-01-24 17:31:46 +01:00
|
|
|
|
2014-02-21 11:50:45 +01:00
|
|
|
self.info['attempt'] += 1
|
|
|
|
if self.info['fail']:
|
|
|
|
raise IOError('fake fail')
|
|
|
|
|
2014-01-24 17:31:46 +01:00
|
|
|
# just make sure connection instantiation does not fail with an
|
|
|
|
# exception
|
|
|
|
self.stubs.Set(rabbit_driver.Connection, '_connect', _connect)
|
|
|
|
|
|
|
|
# starting from the first broker in the list
|
2014-03-07 10:46:17 +01:00
|
|
|
url = messaging.TransportURL.parse(self.conf, None)
|
2014-02-21 11:50:45 +01:00
|
|
|
self.connection = rabbit_driver.Connection(self.conf, url)
|
|
|
|
self.addCleanup(self.connection.close)
|
2014-01-24 17:31:46 +01:00
|
|
|
|
2014-02-21 11:50:45 +01:00
|
|
|
self.info.update({'attempt': 0,
|
|
|
|
'fail': True})
|
|
|
|
hostname_sets.clear()
|
2014-01-24 17:31:46 +01:00
|
|
|
|
2014-02-21 11:50:45 +01:00
|
|
|
def test_reconnect_order(self):
|
|
|
|
self.assertRaises(messaging.MessageDeliveryFailure,
|
|
|
|
self.connection.reconnect,
|
|
|
|
retry=len(self.brokers) - 1)
|
|
|
|
self.assertEqual(len(self.brokers), self.info['attempt'])
|
|
|
|
|
|
|
|
def test_ensure_four_retry(self):
|
|
|
|
mock_callback = mock.Mock(side_effect=IOError)
|
|
|
|
self.assertRaises(messaging.MessageDeliveryFailure,
|
|
|
|
self.connection.ensure, None, mock_callback,
|
|
|
|
retry=4)
|
|
|
|
self.assertEqual(5, self.info['attempt'])
|
|
|
|
self.assertEqual(1, mock_callback.call_count)
|
|
|
|
|
|
|
|
def test_ensure_one_retry(self):
|
|
|
|
mock_callback = mock.Mock(side_effect=IOError)
|
|
|
|
self.assertRaises(messaging.MessageDeliveryFailure,
|
|
|
|
self.connection.ensure, None, mock_callback,
|
|
|
|
retry=1)
|
|
|
|
self.assertEqual(2, self.info['attempt'])
|
|
|
|
self.assertEqual(1, mock_callback.call_count)
|
|
|
|
|
|
|
|
def test_ensure_no_retry(self):
|
|
|
|
mock_callback = mock.Mock(side_effect=IOError)
|
|
|
|
self.assertRaises(messaging.MessageDeliveryFailure,
|
|
|
|
self.connection.ensure, None, mock_callback,
|
|
|
|
retry=0)
|
|
|
|
self.assertEqual(1, self.info['attempt'])
|
|
|
|
self.assertEqual(1, mock_callback.call_count)
|