Class-level _exchanges in FakeExchangeManager
The FakeExchangeManager uses an instance-level storage for FakeExchanges mapping[1]. When a client--server pair is created, each keeps their own instance of FakeDriver -> FakeExchangeManager -> FakeExchange, each of which has their own (instance-level) copy of e.g _server_queues[2], making it impossible for them to communicate. This patch makes the _exchanges mapping a class-level attribute in order to keep the registered exchanges shared between all Manager instances, allowing client and server communication (within a single process). The test_server unit-tests had to be refactored to explicitly pass an exchange name when building a target. This is required for an exchange name change to have any effect during a test case run time when compared to passing the exchange name through the URL. This issue was revealed with this patch. [1] https://github.com/openstack/oslo.messaging/blob/master/oslo_messaging/_drivers/impl_fake.py#L145,#L148 [2] https://github.com/openstack/oslo.messaging/blob/master/oslo_messaging/_drivers/impl_fake.py#L88,#L92 Change-Id: I8dff66f4cafeb1f4c57dbfbfaba5d49e50f55fee Closes-Bug: #1714055
This commit is contained in:
parent
ba30a3067d
commit
d1dac1c11d
@ -142,10 +142,11 @@ class FakeExchange(object):
|
||||
|
||||
|
||||
class FakeExchangeManager(object):
|
||||
_exchanges_lock = threading.Lock()
|
||||
_exchanges = {}
|
||||
|
||||
def __init__(self, default_exchange):
|
||||
self._default_exchange = default_exchange
|
||||
self._exchanges_lock = threading.Lock()
|
||||
self._exchanges = {}
|
||||
|
||||
def get_exchange(self, name):
|
||||
if name is None:
|
||||
|
@ -13,6 +13,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import fixtures
|
||||
import threading
|
||||
|
||||
from oslo_config import cfg
|
||||
@ -131,6 +132,9 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
|
||||
def setUp(self):
|
||||
super(TestNotifyListener, self).setUp(conf=cfg.ConfigOpts())
|
||||
ListenerSetupMixin.setUp(self)
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'oslo_messaging._drivers.impl_fake.FakeExchangeManager._exchanges',
|
||||
new_value={}))
|
||||
|
||||
@mock.patch('debtcollector.deprecate')
|
||||
def test_constructor(self, deprecate):
|
||||
|
@ -35,9 +35,11 @@ load_tests = testscenarios.load_tests_apply_scenarios
|
||||
class ServerSetupMixin(object):
|
||||
|
||||
class Server(object):
|
||||
def __init__(self, transport, topic, server, endpoint, serializer):
|
||||
def __init__(self, transport, topic, server, endpoint, serializer,
|
||||
exchange):
|
||||
self.controller = ServerSetupMixin.ServerController()
|
||||
target = oslo_messaging.Target(topic=topic, server=server)
|
||||
target = oslo_messaging.Target(topic=topic, server=server,
|
||||
exchange=exchange)
|
||||
self.server = oslo_messaging.get_rpc_server(transport,
|
||||
target,
|
||||
[endpoint,
|
||||
@ -81,25 +83,25 @@ class ServerSetupMixin(object):
|
||||
def __init__(self):
|
||||
self.serializer = self.TestSerializer()
|
||||
|
||||
def _setup_server(self, transport, endpoint, topic=None, server=None):
|
||||
def _setup_server(self, transport, endpoint, topic=None, server=None,
|
||||
exchange=None):
|
||||
server = self.Server(transport,
|
||||
topic=topic or 'testtopic',
|
||||
server=server or 'testserver',
|
||||
endpoint=endpoint,
|
||||
serializer=self.serializer)
|
||||
serializer=self.serializer,
|
||||
exchange=exchange)
|
||||
|
||||
server.start()
|
||||
return server
|
||||
|
||||
def _stop_server(self, client, server, topic=None):
|
||||
if topic is not None:
|
||||
client = client.prepare(topic=topic)
|
||||
def _stop_server(self, client, server, topic=None, exchange=None):
|
||||
client.cast({}, 'stop')
|
||||
server.wait()
|
||||
|
||||
def _setup_client(self, transport, topic='testtopic'):
|
||||
return oslo_messaging.RPCClient(transport,
|
||||
oslo_messaging.Target(topic=topic),
|
||||
def _setup_client(self, transport, topic='testtopic', exchange=None):
|
||||
target = oslo_messaging.Target(topic=topic, exchange=exchange)
|
||||
return oslo_messaging.RPCClient(transport, target=target,
|
||||
serializer=self.serializer)
|
||||
|
||||
|
||||
@ -111,6 +113,11 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
|
||||
|
||||
def setUp(self):
|
||||
super(TestRPCServer, self).setUp(conf=cfg.ConfigOpts())
|
||||
# FakeExchangeManager uses a class-level exchanges mapping; "reset" it
|
||||
# before tests assert amount of items stored
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'oslo_messaging._drivers.impl_fake.FakeExchangeManager._exchanges',
|
||||
new_value={}))
|
||||
|
||||
@mock.patch('warnings.warn')
|
||||
def test_constructor(self, warn):
|
||||
@ -300,14 +307,20 @@ class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
|
||||
self.assertEqual(['dsfoo', 'dsbar'], endpoint.pings)
|
||||
|
||||
def test_call(self):
|
||||
transport = oslo_messaging.get_rpc_transport(self.conf, url='fake:')
|
||||
# NOTE(milan): using a separate transport instance for each the client
|
||||
# and the server to be able to check independent transport instances
|
||||
# can communicate over same exchange&topic
|
||||
transport_srv = oslo_messaging.get_rpc_transport(self.conf,
|
||||
url='fake:')
|
||||
transport_cli = oslo_messaging.get_rpc_transport(self.conf,
|
||||
url='fake:')
|
||||
|
||||
class TestEndpoint(object):
|
||||
def ping(self, ctxt, arg):
|
||||
return arg
|
||||
|
||||
server_thread = self._setup_server(transport, TestEndpoint())
|
||||
client = self._setup_client(transport)
|
||||
server_thread = self._setup_server(transport_srv, TestEndpoint())
|
||||
client = self._setup_client(transport_cli)
|
||||
|
||||
self.assertIsNone(client.call({}, 'ping', arg=None))
|
||||
self.assertEqual(0, client.call({}, 'ping', arg=0))
|
||||
@ -498,8 +511,8 @@ class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
|
||||
single_server = params['server1'] == params['server2']
|
||||
return not (single_topic and single_server)
|
||||
|
||||
# fanout to multiple servers on same topic and exchange
|
||||
# each endpoint will receive both messages
|
||||
# fanout to multiple servers on same topic and exchange each endpoint
|
||||
# will receive both messages
|
||||
def fanout_to_servers(scenario):
|
||||
params = scenario[1]
|
||||
fanout = params['fanout1'] or params['fanout2']
|
||||
@ -536,14 +549,16 @@ class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
|
||||
|
||||
def setUp(self):
|
||||
super(TestMultipleServers, self).setUp(conf=cfg.ConfigOpts())
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'oslo_messaging._drivers.impl_fake.FakeExchangeManager._exchanges',
|
||||
new_value={}))
|
||||
|
||||
def test_multiple_servers(self):
|
||||
url1 = 'fake:///' + (self.exchange1 or '')
|
||||
url2 = 'fake:///' + (self.exchange2 or '')
|
||||
|
||||
transport1 = oslo_messaging.get_rpc_transport(self.conf, url=url1)
|
||||
if url1 != url2:
|
||||
transport2 = oslo_messaging.get_rpc_transport(self.conf, url=url1)
|
||||
transport1 = oslo_messaging.get_rpc_transport(self.conf,
|
||||
url='fake:')
|
||||
if self.exchange1 != self.exchange2:
|
||||
transport2 = oslo_messaging.get_rpc_transport(self.conf,
|
||||
url='fake:')
|
||||
else:
|
||||
transport2 = transport1
|
||||
|
||||
@ -563,12 +578,18 @@ class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
|
||||
endpoint1 = endpoint2 = TestEndpoint()
|
||||
|
||||
server1 = self._setup_server(transport1, endpoint1,
|
||||
topic=self.topic1, server=self.server1)
|
||||
topic=self.topic1,
|
||||
exchange=self.exchange1,
|
||||
server=self.server1)
|
||||
server2 = self._setup_server(transport2, endpoint2,
|
||||
topic=self.topic2, server=self.server2)
|
||||
topic=self.topic2,
|
||||
exchange=self.exchange2,
|
||||
server=self.server2)
|
||||
|
||||
client1 = self._setup_client(transport1, topic=self.topic1)
|
||||
client2 = self._setup_client(transport2, topic=self.topic2)
|
||||
client1 = self._setup_client(transport1, topic=self.topic1,
|
||||
exchange=self.exchange1)
|
||||
client2 = self._setup_client(transport2, topic=self.topic2,
|
||||
exchange=self.exchange2)
|
||||
|
||||
client1 = client1.prepare(server=self.server1)
|
||||
client2 = client2.prepare(server=self.server2)
|
||||
@ -584,9 +605,9 @@ class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
|
||||
(client2.call if self.call2 else client2.cast)({}, 'ping', arg='2')
|
||||
|
||||
self._stop_server(client1.prepare(fanout=None),
|
||||
server1, topic=self.topic1)
|
||||
server1, topic=self.topic1, exchange=self.exchange1)
|
||||
self._stop_server(client2.prepare(fanout=None),
|
||||
server2, topic=self.topic2)
|
||||
server2, topic=self.topic2, exchange=self.exchange2)
|
||||
|
||||
def check(pings, expect):
|
||||
self.assertEqual(len(expect), len(pings))
|
||||
|
Loading…
x
Reference in New Issue
Block a user