Don't use oslo.cfg to set kombu in-memory driver

This removes a TODO and also fixes a issue due to the
global state of oslo.config.cfg.CONF.

Closes bug: #1397339

Change-Id: Ib366f35678f899fda93821e6f07897baf8f631b4
This commit is contained in:
Mehdi Abaakouk 2014-12-01 11:28:13 +01:00
parent 42a2df15dd
commit bcb3b23b8f
4 changed files with 24 additions and 60 deletions

View File

@ -100,11 +100,6 @@ rabbit_opts = [
help='Use HA queues in RabbitMQ (x-ha-policy: all). ' help='Use HA queues in RabbitMQ (x-ha-policy: all). '
'If you change this option, you must wipe the ' 'If you change this option, you must wipe the '
'RabbitMQ database.'), 'RabbitMQ database.'),
# FIXME(markmc): this was toplevel in openstack.common.rpc
cfg.BoolOpt('fake_rabbit',
default=False,
help='If passed, use a fake RabbitMQ provider.'),
] ]
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -446,11 +441,7 @@ class Connection(object):
virtual_host = self.conf.rabbit_virtual_host virtual_host = self.conf.rabbit_virtual_host
self._url = '' self._url = ''
if self.conf.fake_rabbit: if url.hosts:
# TODO(sileht): use memory://virtual_host into
# unit tests to remove cfg.CONF.fake_rabbit
self._url = 'memory://%s/' % virtual_host
elif url.hosts:
for host in url.hosts: for host in url.hosts:
transport = url.transport.replace('kombu+', '') transport = url.transport.replace('kombu+', '')
transport = url.transport.replace('rabbit', 'amqp') transport = url.transport.replace('rabbit', 'amqp')
@ -461,6 +452,11 @@ class Connection(object):
parse.quote(host.password or ''), parse.quote(host.password or ''),
host.hostname or '', str(host.port or 5672), host.hostname or '', str(host.port or 5672),
virtual_host) virtual_host)
elif url.transport.startswith('kombu+'):
# NOTE(sileht): url have a + but no hosts
# (like kombu+memory:///), pass it to kombu as-is
transport = url.transport.replace('kombu+', '')
self._url = "%s://%s" % (transport, virtual_host)
else: else:
for adr in self.conf.rabbit_hosts: for adr in self.conf.rabbit_hosts:
hostname, port = netutils.parse_host_port( hostname, port = netutils.parse_host_port(
@ -489,7 +485,7 @@ class Connection(object):
{'hostname': self.connection.hostname, {'hostname': self.connection.hostname,
'port': self.connection.port}) 'port': self.connection.port})
if self.conf.fake_rabbit: if self._url.startswith('memory://'):
# Kludge to speed up tests. # Kludge to speed up tests.
self.connection.transport.polling_interval = 0.0 self.connection.transport.polling_interval = 0.0

View File

@ -68,21 +68,6 @@ class ConfFixture(fixtures.Fixture):
def transport_driver(self, value): def transport_driver(self, value):
self.conf.set_override('rpc_backend', value) self.conf.set_override('rpc_backend', value)
@property
def in_memory(self):
"""Use an in-memory transport; currently supported by rabbit driver."""
if (('rabbit' in self.transport_driver or
'kombu' in self.transport_driver)):
return self.conf.fake_rabbit
else:
return False
@in_memory.setter
def in_memory(self, value):
if (('rabbit' in self.transport_driver or
'kombu' in self.transport_driver)):
self.conf.set_override('fake_rabbit', value)
@property @property
def response_timeout(self): def response_timeout(self):
"""Default number of seconds to wait for a response from a call.""" """Default number of seconds to wait for a response from a call."""

View File

@ -180,7 +180,7 @@ def get_transport(conf, url=None, allowed_remote_exmods=None, aliases=None):
try: try:
mgr = driver.DriverManager('oslo.messaging.drivers', mgr = driver.DriverManager('oslo.messaging.drivers',
url.transport, url.transport.split('+')[0],
invoke_on_load=True, invoke_on_load=True,
invoke_args=[conf, url], invoke_args=[conf, url],
invoke_kwds=kwargs) invoke_kwds=kwargs)

View File

@ -38,10 +38,12 @@ class TestRabbitDriverLoad(test_utils.BaseTestCase):
def setUp(self): def setUp(self):
super(TestRabbitDriverLoad, self).setUp() super(TestRabbitDriverLoad, self).setUp()
self.messaging_conf.transport_driver = 'rabbit' self.messaging_conf.transport_driver = 'rabbit'
self.messaging_conf.in_memory = True
def test_driver_load(self): @mock.patch('oslo.messaging._drivers.impl_rabbit.Connection.ensure')
@mock.patch('oslo.messaging._drivers.impl_rabbit.Connection.reset')
def test_driver_load(self, fake_ensure, fake_reset):
transport = messaging.get_transport(self.conf) transport = messaging.get_transport(self.conf)
self.addCleanup(transport.cleanup)
self.assertIsInstance(transport._driver, rabbit_driver.RabbitDriver) self.assertIsInstance(transport._driver, rabbit_driver.RabbitDriver)
@ -50,6 +52,8 @@ class TestRabbitTransportURL(test_utils.BaseTestCase):
scenarios = [ scenarios = [
('none', dict(url=None, ('none', dict(url=None,
expected=["amqp://guest:guest@localhost:5672//"])), expected=["amqp://guest:guest@localhost:5672//"])),
('memory', dict(url='kombu+memory:////',
expected=["memory:///"])),
('empty', ('empty',
dict(url='rabbit:///', dict(url='rabbit:///',
expected=['amqp://guest:guest@localhost:5672/'])), expected=['amqp://guest:guest@localhost:5672/'])),
@ -76,11 +80,13 @@ class TestRabbitTransportURL(test_utils.BaseTestCase):
)), )),
] ]
@mock.patch('oslo.messaging._drivers.impl_rabbit.Connection.ensure') def setUp(self):
def test_transport_url(self, fake_ensure): super(TestRabbitTransportURL, self).setUp()
self.messaging_conf.transport_driver = 'rabbit' self.messaging_conf.transport_driver = 'rabbit'
self.messaging_conf.in_memory = False
@mock.patch('oslo.messaging._drivers.impl_rabbit.Connection.ensure')
@mock.patch('oslo.messaging._drivers.impl_rabbit.Connection.reset')
def test_transport_url(self, fake_ensure_connection, fake_reset):
transport = messaging.get_transport(self.conf, self.url) transport = messaging.get_transport(self.conf, self.url)
self.addCleanup(transport.cleanup) self.addCleanup(transport.cleanup)
driver = transport._driver driver = transport._driver
@ -129,13 +135,8 @@ class TestSendReceive(test_utils.BaseTestCase):
cls._failure, cls._failure,
cls._timeout) cls._timeout)
def setUp(self):
super(TestSendReceive, self).setUp()
self.messaging_conf.transport_driver = 'rabbit'
self.messaging_conf.in_memory = True
def test_send_receive(self): def test_send_receive(self):
transport = messaging.get_transport(self.conf) transport = messaging.get_transport(self.conf, 'kombu+memory:////')
self.addCleanup(transport.cleanup) self.addCleanup(transport.cleanup)
driver = transport._driver driver = transport._driver
@ -224,13 +225,8 @@ TestSendReceive.generate_scenarios()
class TestPollAsync(test_utils.BaseTestCase): class TestPollAsync(test_utils.BaseTestCase):
def setUp(self):
super(TestPollAsync, self).setUp()
self.messaging_conf.transport_driver = 'rabbit'
self.messaging_conf.in_memory = True
def test_poll_timeout(self): def test_poll_timeout(self):
transport = messaging.get_transport(self.conf) transport = messaging.get_transport(self.conf, 'kombu+memory:////')
self.addCleanup(transport.cleanup) self.addCleanup(transport.cleanup)
driver = transport._driver driver = transport._driver
target = messaging.Target(topic='testtopic') target = messaging.Target(topic='testtopic')
@ -241,13 +237,8 @@ class TestPollAsync(test_utils.BaseTestCase):
class TestRacyWaitForReply(test_utils.BaseTestCase): class TestRacyWaitForReply(test_utils.BaseTestCase):
def setUp(self):
super(TestRacyWaitForReply, self).setUp()
self.messaging_conf.transport_driver = 'rabbit'
self.messaging_conf.in_memory = True
def test_send_receive(self): def test_send_receive(self):
transport = messaging.get_transport(self.conf) transport = messaging.get_transport(self.conf, 'kombu+memory:////')
self.addCleanup(transport.cleanup) self.addCleanup(transport.cleanup)
driver = transport._driver driver = transport._driver
@ -430,9 +421,6 @@ class TestRequestWireFormat(test_utils.BaseTestCase):
def setUp(self): def setUp(self):
super(TestRequestWireFormat, self).setUp() super(TestRequestWireFormat, self).setUp()
self.messaging_conf.transport_driver = 'rabbit'
self.messaging_conf.in_memory = True
self.uuids = [] self.uuids = []
self.orig_uuid4 = uuid.uuid4 self.orig_uuid4 = uuid.uuid4
self.useFixture(fixtures.MonkeyPatch('uuid.uuid4', self.mock_uuid4)) self.useFixture(fixtures.MonkeyPatch('uuid.uuid4', self.mock_uuid4))
@ -445,7 +433,7 @@ class TestRequestWireFormat(test_utils.BaseTestCase):
if hasattr(self, 'skip_msg'): if hasattr(self, 'skip_msg'):
self.skipTest(self.skip_msg) self.skipTest(self.skip_msg)
transport = messaging.get_transport(self.conf) transport = messaging.get_transport(self.conf, 'kombu+memory:////')
self.addCleanup(transport.cleanup) self.addCleanup(transport.cleanup)
driver = transport._driver driver = transport._driver
@ -574,16 +562,11 @@ class TestReplyWireFormat(test_utils.BaseTestCase):
cls._context, cls._context,
cls._target) cls._target)
def setUp(self):
super(TestReplyWireFormat, self).setUp()
self.messaging_conf.transport_driver = 'rabbit'
self.messaging_conf.in_memory = True
def test_reply_wire_format(self): def test_reply_wire_format(self):
if hasattr(self, 'skip_msg'): if hasattr(self, 'skip_msg'):
self.skipTest(self.skip_msg) self.skipTest(self.skip_msg)
transport = messaging.get_transport(self.conf) transport = messaging.get_transport(self.conf, 'kombu+memory:////')
self.addCleanup(transport.cleanup) self.addCleanup(transport.cleanup)
driver = transport._driver driver = transport._driver