From 50f5f3b925aff18241ce2eed2f5018ca1b2368dc Mon Sep 17 00:00:00 2001 From: Mark McLoughlin Date: Fri, 14 Jun 2013 11:50:03 +0100 Subject: [PATCH] Add some RPC server tests These actually test the blocking executor and fake driver too. --- tests/unit/messaging/test_rpc_server.py | 244 ++++++++++++++++++++++++ 1 file changed, 244 insertions(+) create mode 100644 tests/unit/messaging/test_rpc_server.py diff --git a/tests/unit/messaging/test_rpc_server.py b/tests/unit/messaging/test_rpc_server.py new file mode 100644 index 000000000..15b926706 --- /dev/null +++ b/tests/unit/messaging/test_rpc_server.py @@ -0,0 +1,244 @@ + +# Copyright 2013 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import threading + +from oslo.config import cfg + +from openstack.common import messaging +from tests import utils as test_utils + + +class TestRPCServer(test_utils.BaseTestCase): + + class Server(object): + def __init__(self, transport, topic, server, endpoint, serializer): + target = messaging.Target(topic=topic, server=server) + self._server = messaging.get_rpc_server(transport, + target, + [endpoint, self], + serializer=serializer) + + def stop(self, ctxt): + self._server.stop() + + def start(self): + self._server.start() + + class TestSerializer(object): + + def serialize_entity(self, ctxt, entity): + return 's' + (entity or '') + + def deserialize_entity(self, ctxt, entity): + return 'd' + (entity or '') + + def setUp(self): + super(TestRPCServer, self).setUp(conf=cfg.ConfigOpts()) + self.serializer = self.TestSerializer() + + def _setup_server(self, transport, endpoint, topic='testtopic'): + server = self.Server(transport, + topic=topic, + server='testserver', + endpoint=endpoint, + serializer=self.serializer) + + thread = threading.Thread(target=server.start) + thread.daemon = True + thread.start() + + return thread + + def _stop_server(self, client, server_thread, topic=None): + if topic is not None: + client = client.prepare(topic=topic) + client.cast({}, 'stop') + server_thread.join(timeout=30) + self.assertFalse(server_thread.isAlive()) + + def _setup_client(self, transport): + return messaging.RPCClient(transport, + messaging.Target(topic='testtopic'), + serializer=self.serializer) + + def test_constructor(self): + transport = messaging.get_transport(self.conf, url='fake:') + target = messaging.Target(topic='foo', server='bar') + endpoints = [object()] + serializer = object() + + server = messaging.get_rpc_server(transport, target, endpoints, + serializer=serializer) + + self.assertTrue(server.conf is self.conf) + self.assertTrue(server.transport is transport) + self.assertTrue(server.target is target) + self.assertTrue(isinstance(server.dispatcher, messaging.RPCDispatcher)) + self.assertTrue(server.dispatcher.endpoints is endpoints) + self.assertTrue(server.dispatcher.serializer is serializer) + self.assertTrue(server.executor is 'blocking') + + def test_no_target_server(self): + transport = messaging.get_transport(self.conf, url='fake:') + + server = messaging.get_rpc_server(transport, + messaging.Target(topic='testtopic'), + []) + try: + server.start() + except Exception as ex: + self.assertTrue(isinstance(ex, messaging.ServerListenError), ex) + self.assertEquals(ex.target.topic, 'testtopic') + else: + self.assertTrue(False) + + def test_no_target_topic(self): + transport = messaging.get_transport(self.conf, url='fake:') + target = messaging.Target(server='testserver') + server = messaging.get_rpc_server(transport, target, []) + try: + server.start() + except Exception as ex: + self.assertTrue(isinstance(ex, messaging.ServerListenError), ex) + self.assertEquals(ex.target.server, 'testserver') + else: + self.assertTrue(False) + + def test_duplicate_target_topic(self): + transport = messaging.get_transport(self.conf, url='fake:') + + server_thread = self._setup_server(transport, None, topic='testtopic') + + server = messaging.get_rpc_server(transport, + messaging.Target(server='testserver', + topic='testtopic'), + []) + try: + server.start() + except Exception as ex: + self.assertTrue(isinstance(ex, messaging.ServerListenError), ex) + self.assertEquals(ex.target.server, 'testserver') + self.assertEquals(ex.target.topic, 'testtopic') + else: + self.assertTrue(False) + finally: + client = self._setup_client(transport) + self._stop_server(client, server_thread) + + def test_unknown_executor(self): + transport = messaging.get_transport(self.conf, url='fake:') + + try: + messaging.get_rpc_server(transport, None, [], executor='foo') + except Exception as ex: + self.assertTrue(isinstance(ex, messaging.ExecutorLoadFailure)) + self.assertEquals(ex.executor, 'foo') + else: + self.assertTrue(False) + + def test_cast(self): + transport = messaging.get_transport(self.conf, url='fake:') + + class TestEndpoint(object): + def __init__(self): + self.pings = [] + + def ping(self, ctxt, arg): + self.pings.append(arg) + + endpoint = TestEndpoint() + server_thread = self._setup_server(transport, endpoint) + client = self._setup_client(transport) + + client.cast({}, 'ping', arg='foo') + client.cast({}, 'ping', arg='bar') + + self._stop_server(client, server_thread) + + self.assertEquals(endpoint.pings, ['dsfoo', 'dsbar']) + + def test_call(self): + transport = messaging.get_transport(self.conf, url='fake:') + + class TestEndpoint(object): + def ping(self, ctxt, arg): + return arg + + server_thread = self._setup_server(transport, TestEndpoint()) + client = self._setup_client(transport) + + self.assertEquals(client.call({}, 'ping', arg='foo'), 'dsdsfoo') + + self._stop_server(client, server_thread) + + def test_direct_call(self): + transport = messaging.get_transport(self.conf, url='fake:') + + class TestEndpoint(object): + def ping(self, ctxt, arg): + return arg + + server_thread = self._setup_server(transport, TestEndpoint()) + client = self._setup_client(transport) + + direct = client.prepare(server='testserver') + self.assertEquals(direct.call({}, 'ping', arg='foo'), 'dsdsfoo') + + self._stop_server(client, server_thread) + + def test_context(self): + transport = messaging.get_transport(self.conf, url='fake:') + + class TestEndpoint(object): + def ctxt_check(self, ctxt, key): + return ctxt[key] + + server_thread = self._setup_server(transport, TestEndpoint()) + client = self._setup_client(transport) + + self.assertEquals(client.call({'dsa': 'b'}, + 'ctxt_check', + key='a'), + 'dsb') + + self._stop_server(client, server_thread) + + def test_multiple_servers(self): + transport = messaging.get_transport(self.conf, url='fake:') + + class TestEndpoint(object): + def __init__(self): + self.pings = [] + + def ping(self, ctxt, arg): + self.pings.append(arg) + + endpoint = TestEndpoint() + + thread1 = self._setup_server(transport, endpoint, topic='topic1') + thread2 = self._setup_server(transport, endpoint, topic='topic2') + + client = self._setup_client(transport) + + client.prepare(topic='topic1').cast({}, 'ping', arg='1') + client.prepare(topic='topic2').cast({}, 'ping', arg='2') + + self.assertTrue(thread1.isAlive()) + self._stop_server(client, thread1, topic='topic1') + self.assertTrue(thread2.isAlive()) + self._stop_server(client, thread2, topic='topic2') + + self.assertEquals(endpoint.pings, ['ds1', 'ds2'])