Merge "Add transport URL support to rabbit driver"
This commit is contained in:
@@ -24,6 +24,7 @@ from oslo import messaging
|
|||||||
from oslo.messaging._drivers import amqp as rpc_amqp
|
from oslo.messaging._drivers import amqp as rpc_amqp
|
||||||
from oslo.messaging._drivers import base
|
from oslo.messaging._drivers import base
|
||||||
from oslo.messaging._drivers import common as rpc_common
|
from oslo.messaging._drivers import common as rpc_common
|
||||||
|
from oslo.messaging import _urls as urls
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -245,6 +246,8 @@ class AMQPDriverBase(base.BaseDriver):
|
|||||||
super(AMQPDriverBase, self).__init__(conf, url, default_exchange,
|
super(AMQPDriverBase, self).__init__(conf, url, default_exchange,
|
||||||
allowed_remote_exmods)
|
allowed_remote_exmods)
|
||||||
|
|
||||||
|
self._server_params = self._parse_url(self._url)
|
||||||
|
|
||||||
self._default_exchange = default_exchange
|
self._default_exchange = default_exchange
|
||||||
|
|
||||||
# FIXME(markmc): temp hack
|
# FIXME(markmc): temp hack
|
||||||
@@ -258,10 +261,46 @@ class AMQPDriverBase(base.BaseDriver):
|
|||||||
self._reply_q_conn = None
|
self._reply_q_conn = None
|
||||||
self._waiter = None
|
self._waiter = None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _parse_url(url):
|
||||||
|
if url is None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
parsed = urls.parse_url(url)
|
||||||
|
|
||||||
|
# Make sure there's not a query string; that could identify
|
||||||
|
# requirements we can't comply with (e.g., ssl), so reject it if
|
||||||
|
# it's present
|
||||||
|
if parsed['parameters']:
|
||||||
|
raise messaging.InvalidTransportURL(
|
||||||
|
url, "Cannot comply with query string in transport URL")
|
||||||
|
|
||||||
|
if not parsed['hosts']:
|
||||||
|
return None
|
||||||
|
|
||||||
|
sp = {
|
||||||
|
'virtual_host': parsed['virtual_host'],
|
||||||
|
}
|
||||||
|
|
||||||
|
# FIXME(markmc): support multiple hosts
|
||||||
|
host = parsed['hosts'][0]
|
||||||
|
|
||||||
|
if ':' in host['host']:
|
||||||
|
(sp['hostname'], sp['port']) = host['host'].split(':', 1)
|
||||||
|
sp['port'] = int(sp['port'])
|
||||||
|
else:
|
||||||
|
sp['hostname'] = host['host']
|
||||||
|
|
||||||
|
sp['username'] = host['username']
|
||||||
|
sp['password'] = host['password']
|
||||||
|
|
||||||
|
return sp
|
||||||
|
|
||||||
def _get_connection(self, pooled=True):
|
def _get_connection(self, pooled=True):
|
||||||
return rpc_amqp.ConnectionContext(self.conf,
|
return rpc_amqp.ConnectionContext(self.conf,
|
||||||
self._connection_pool,
|
self._connection_pool,
|
||||||
pooled=pooled)
|
pooled=pooled,
|
||||||
|
server_params=self._server_params)
|
||||||
|
|
||||||
def _get_reply_q(self):
|
def _get_reply_q(self):
|
||||||
with self._reply_q_lock:
|
with self._reply_q_lock:
|
||||||
|
|||||||
@@ -47,6 +47,66 @@ class TestRabbitDriverLoad(test_utils.BaseTestCase):
|
|||||||
self.assertIsInstance(transport._driver, rabbit_driver.RabbitDriver)
|
self.assertIsInstance(transport._driver, rabbit_driver.RabbitDriver)
|
||||||
|
|
||||||
|
|
||||||
|
class TestRabbitTransportURL(test_utils.BaseTestCase):
|
||||||
|
|
||||||
|
scenarios = [
|
||||||
|
('none', dict(url=None, expected=None)),
|
||||||
|
('empty', dict(url='rabbit:///', expected=None)),
|
||||||
|
('localhost',
|
||||||
|
dict(url='rabbit://localhost/',
|
||||||
|
expected=dict(hostname='localhost',
|
||||||
|
username='',
|
||||||
|
password='',
|
||||||
|
virtual_host=''))),
|
||||||
|
('no_creds',
|
||||||
|
dict(url='rabbit://host/virtual_host',
|
||||||
|
expected=dict(hostname='host',
|
||||||
|
username='',
|
||||||
|
password='',
|
||||||
|
virtual_host='virtual_host'))),
|
||||||
|
('no_port',
|
||||||
|
dict(url='rabbit://user:password@host/virtual_host',
|
||||||
|
expected=dict(hostname='host',
|
||||||
|
username='user',
|
||||||
|
password='password',
|
||||||
|
virtual_host='virtual_host'))),
|
||||||
|
('full_url',
|
||||||
|
dict(url='rabbit://user:password@host:10/virtual_host',
|
||||||
|
expected=dict(hostname='host',
|
||||||
|
port=10,
|
||||||
|
username='user',
|
||||||
|
password='password',
|
||||||
|
virtual_host='virtual_host'))),
|
||||||
|
]
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestRabbitTransportURL, self).setUp()
|
||||||
|
self.conf.register_opts(msg_transport._transport_opts)
|
||||||
|
self.conf.register_opts(rabbit_driver.rabbit_opts)
|
||||||
|
self.config(rpc_backend='rabbit')
|
||||||
|
self.config(fake_rabbit=True)
|
||||||
|
|
||||||
|
def test_transport_url(self):
|
||||||
|
cnx_init = rabbit_driver.Connection.__init__
|
||||||
|
passed_params = []
|
||||||
|
|
||||||
|
def record_params(self, conf, server_params=None):
|
||||||
|
passed_params.append(server_params)
|
||||||
|
return cnx_init(self, conf, server_params)
|
||||||
|
|
||||||
|
self.stubs.Set(rabbit_driver.Connection, '__init__', record_params)
|
||||||
|
|
||||||
|
transport = messaging.get_transport(self.conf, self.url)
|
||||||
|
|
||||||
|
driver = transport._driver
|
||||||
|
|
||||||
|
target = messaging.Target(topic='testtopic')
|
||||||
|
|
||||||
|
driver.send(target, {}, {})
|
||||||
|
|
||||||
|
self.assertEquals(passed_params[0], self.expected)
|
||||||
|
|
||||||
|
|
||||||
class TestSendReceive(test_utils.BaseTestCase):
|
class TestSendReceive(test_utils.BaseTestCase):
|
||||||
|
|
||||||
_n_senders = [
|
_n_senders = [
|
||||||
|
|||||||
Reference in New Issue
Block a user