Browse Source

zmq: Refactor test case shared code

A number of the ZMQ test cases shared the same setUp method; refactor
and introduce a base test case class to share this code across all
appropriate ZMQ test cases.

Change-Id: I59300464af001c343efcd4f3f33d34c972da2b87
changes/54/144954/3
James Page 7 years ago
parent
commit
386f5daee6
2 changed files with 64 additions and 208 deletions
  1. +32
    -105
      oslo_messaging/tests/drivers/test_impl_zmq.py
  2. +32
    -103
      tests/drivers/test_impl_zmq.py

+ 32
- 105
oslo_messaging/tests/drivers/test_impl_zmq.py View File

@ -24,8 +24,6 @@ from oslo.utils import importutils
import oslo_messaging
from oslo_messaging.tests import utils as test_utils
# NOTE(jamespage) the zmq driver implementation is currently tied
# to eventlet so we have to monkey_patch to support testing
# eventlet is not yet py3 compatible, so skip if not installed
eventlet = importutils.try_import('eventlet')
@ -44,34 +42,12 @@ def get_unused_port():
return port
class TestConfZmqDriverLoad(test_utils.BaseTestCase):
@testtools.skipIf(impl_zmq is None, "zmq not available")
def setUp(self):
super(TestConfZmqDriverLoad, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
def test_driver_load(self):
transport = oslo_messaging.get_transport(self.conf)
self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver)
class stopRpc(object):
def __init__(self, attrs):
self.attrs = attrs
def __call__(self):
if self.attrs['reactor']:
self.attrs['reactor'].close()
if self.attrs['driver']:
self.attrs['driver'].cleanup()
class TestZmqBasics(test_utils.BaseTestCase):
class ZmqBaseTestCase(test_utils.BaseTestCase):
"""Base test case for all ZMQ tests that make use of the ZMQ Proxy"""
@testtools.skipIf(impl_zmq is None, "zmq not available")
def setUp(self):
super(TestZmqBasics, self).setUp()
super(ZmqBaseTestCase, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = oslo_messaging.get_transport(self.conf)
@ -94,6 +70,32 @@ class TestZmqBasics(test_utils.BaseTestCase):
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
self.addCleanup(stopRpc(self.__dict__))
class TestConfZmqDriverLoad(test_utils.BaseTestCase):
@testtools.skipIf(impl_zmq is None, "zmq not available")
def setUp(self):
super(TestConfZmqDriverLoad, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
def test_driver_load(self):
transport = oslo_messaging.get_transport(self.conf)
self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver)
class stopRpc(object):
def __init__(self, attrs):
self.attrs = attrs
def __call__(self):
if self.attrs['reactor']:
self.attrs['reactor'].close()
if self.attrs['driver']:
self.attrs['driver'].cleanup()
class TestZmqBasics(ZmqBaseTestCase):
def test_start_stop_listener(self):
target = oslo_messaging.Target(topic='testtopic')
listener = self.driver.listen(target)
@ -277,32 +279,7 @@ class TestZmqIncomingMessage(test_utils.BaseTestCase):
msg.requeue()
class TestZmqConnection(test_utils.BaseTestCase):
@testtools.skipIf(impl_zmq is None, "zmq not available")
def setUp(self):
super(TestZmqConnection, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = oslo_messaging.get_transport(self.conf)
self.driver = transport._driver
# Set config values
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
'rpc_zmq_host': '127.0.0.1',
'rpc_response_timeout': 5,
'rpc_zmq_port': get_unused_port(),
'rpc_zmq_ipc_dir': self.internal_ipc_dir}
self.config(**kwargs)
# Start RPC
LOG.info("Running internal zmq receiver.")
self.reactor = impl_zmq.ZmqProxy(self.conf)
self.reactor.consume_in_thread()
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
self.addCleanup(stopRpc(self.__dict__))
class TestZmqConnection(ZmqBaseTestCase):
@mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_create_consumer(self, mock_reactor):
@ -379,32 +356,7 @@ class TestZmqConnection(test_utils.BaseTestCase):
self.assertTrue(conn.reactor.consume_in_thread.called)
class TestZmqListener(test_utils.BaseTestCase):
@testtools.skipIf(impl_zmq is None, "zmq not available")
def setUp(self):
super(TestZmqListener, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = oslo_messaging.get_transport(self.conf)
self.driver = transport._driver
# Set config values
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
'rpc_zmq_host': '127.0.0.1',
'rpc_response_timeout': 5,
'rpc_zmq_port': get_unused_port(),
'rpc_zmq_ipc_dir': self.internal_ipc_dir}
self.config(**kwargs)
# Start RPC
LOG.info("Running internal zmq receiver.")
self.reactor = impl_zmq.ZmqProxy(self.conf)
self.reactor.consume_in_thread()
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
self.addCleanup(stopRpc(self.__dict__))
class TestZmqListener(ZmqBaseTestCase):
def test_zmqlistener_no_msg(self):
listener = impl_zmq.ZmqListener(self.driver)
@ -424,32 +376,7 @@ class TestZmqListener(test_utils.BaseTestCase):
self.assertEqual(resp.message, msg)
class TestZmqDriver(test_utils.BaseTestCase):
@testtools.skipIf(impl_zmq is None, "zmq not available")
def setUp(self):
super(TestZmqDriver, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = oslo_messaging.get_transport(self.conf)
self.driver = transport._driver
# Set config values
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
'rpc_zmq_host': '127.0.0.1',
'rpc_response_timeout': 5,
'rpc_zmq_port': get_unused_port(),
'rpc_zmq_ipc_dir': self.internal_ipc_dir}
self.config(**kwargs)
# Start RPC
LOG.info("Running internal zmq receiver.")
self.reactor = impl_zmq.ZmqProxy(self.conf)
self.reactor.consume_in_thread()
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
self.addCleanup(stopRpc(self.__dict__))
class TestZmqDriver(ZmqBaseTestCase):
@mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True)
@mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True)


+ 32
- 103
tests/drivers/test_impl_zmq.py View File

@ -42,34 +42,12 @@ def get_unused_port():
return port
class TestConfZmqDriverLoad(test_utils.BaseTestCase):
@testtools.skipIf(impl_zmq is None, "zmq not available")
def setUp(self):
super(TestConfZmqDriverLoad, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
def test_driver_load(self):
transport = messaging.get_transport(self.conf)
self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver)
class stopRpc(object):
def __init__(self, attrs):
self.attrs = attrs
def __call__(self):
if self.attrs['reactor']:
self.attrs['reactor'].close()
if self.attrs['driver']:
self.attrs['driver'].cleanup()
class TestZmqBasics(test_utils.BaseTestCase):
class ZmqBaseTestCase(test_utils.BaseTestCase):
"""Base test case for all ZMQ tests that make use of the ZMQ Proxy"""
@testtools.skipIf(impl_zmq is None, "zmq not available")
def setUp(self):
super(TestZmqBasics, self).setUp()
super(ZmqBaseTestCase, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = messaging.get_transport(self.conf)
@ -92,6 +70,32 @@ class TestZmqBasics(test_utils.BaseTestCase):
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
self.addCleanup(stopRpc(self.__dict__))
class TestConfZmqDriverLoad(test_utils.BaseTestCase):
@testtools.skipIf(impl_zmq is None, "zmq not available")
def setUp(self):
super(TestConfZmqDriverLoad, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
def test_driver_load(self):
transport = messaging.get_transport(self.conf)
self.assertIsInstance(transport._driver, impl_zmq.ZmqDriver)
class stopRpc(object):
def __init__(self, attrs):
self.attrs = attrs
def __call__(self):
if self.attrs['reactor']:
self.attrs['reactor'].close()
if self.attrs['driver']:
self.attrs['driver'].cleanup()
class TestZmqBasics(ZmqBaseTestCase):
def test_start_stop_listener(self):
target = messaging.Target(topic='testtopic')
listener = self.driver.listen(target)
@ -275,32 +279,7 @@ class TestZmqIncomingMessage(test_utils.BaseTestCase):
msg.requeue()
class TestZmqConnection(test_utils.BaseTestCase):
@testtools.skipIf(impl_zmq is None, "zmq not available")
def setUp(self):
super(TestZmqConnection, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = messaging.get_transport(self.conf)
self.driver = transport._driver
# Set config values
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
'rpc_zmq_host': '127.0.0.1',
'rpc_response_timeout': 5,
'rpc_zmq_port': get_unused_port(),
'rpc_zmq_ipc_dir': self.internal_ipc_dir}
self.config(**kwargs)
# Start RPC
LOG.info("Running internal zmq receiver.")
self.reactor = impl_zmq.ZmqProxy(self.conf)
self.reactor.consume_in_thread()
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
self.addCleanup(stopRpc(self.__dict__))
class TestZmqConnection(ZmqBaseTestCase):
@mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True)
def test_zmqconnection_create_consumer(self, mock_reactor):
@ -377,32 +356,7 @@ class TestZmqConnection(test_utils.BaseTestCase):
self.assertTrue(conn.reactor.consume_in_thread.called)
class TestZmqListener(test_utils.BaseTestCase):
@testtools.skipIf(impl_zmq is None, "zmq not available")
def setUp(self):
super(TestZmqListener, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = messaging.get_transport(self.conf)
self.driver = transport._driver
# Set config values
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
'rpc_zmq_host': '127.0.0.1',
'rpc_response_timeout': 5,
'rpc_zmq_port': get_unused_port(),
'rpc_zmq_ipc_dir': self.internal_ipc_dir}
self.config(**kwargs)
# Start RPC
LOG.info("Running internal zmq receiver.")
self.reactor = impl_zmq.ZmqProxy(self.conf)
self.reactor.consume_in_thread()
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
self.addCleanup(stopRpc(self.__dict__))
class TestZmqListener(ZmqBaseTestCase):
def test_zmqlistener_no_msg(self):
listener = impl_zmq.ZmqListener(self.driver)
@ -422,32 +376,7 @@ class TestZmqListener(test_utils.BaseTestCase):
self.assertEqual(resp.message, msg)
class TestZmqDriver(test_utils.BaseTestCase):
@testtools.skipIf(impl_zmq is None, "zmq not available")
def setUp(self):
super(TestZmqDriver, self).setUp()
self.messaging_conf.transport_driver = 'zmq'
# Get driver
transport = messaging.get_transport(self.conf)
self.driver = transport._driver
# Set config values
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
kwargs = {'rpc_zmq_bind_address': '127.0.0.1',
'rpc_zmq_host': '127.0.0.1',
'rpc_response_timeout': 5,
'rpc_zmq_port': get_unused_port(),
'rpc_zmq_ipc_dir': self.internal_ipc_dir}
self.config(**kwargs)
# Start RPC
LOG.info("Running internal zmq receiver.")
self.reactor = impl_zmq.ZmqProxy(self.conf)
self.reactor.consume_in_thread()
self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1')
self.addCleanup(stopRpc(self.__dict__))
class TestZmqDriver(ZmqBaseTestCase):
@mock.patch('oslo.messaging._drivers.impl_zmq._cast', autospec=True)
@mock.patch('oslo.messaging._drivers.impl_zmq._multi_send', autospec=True)


Loading…
Cancel
Save