diff --git a/mistral/api/service.py b/mistral/api/service.py index ce32e675e..3d2aeb906 100644 --- a/mistral/api/service.py +++ b/mistral/api/service.py @@ -18,6 +18,7 @@ from oslo_service import service from oslo_service import wsgi from mistral.api import app +from mistral.rpc import clients as rpc_clients class WSGIService(service.ServiceBase): @@ -40,8 +41,19 @@ class WSGIService(service.ServiceBase): ) def start(self): + # NOTE: When oslo.service creates an API worker it forks a new child + # system process. The child process is created as precise copy of the + # parent process (see how os.fork() works) and within the child process + # oslo.service calls service's start() method again to reinitialize + # what's needed. So we must clean up all RPC clients so that RPC works + # properly (e.g. message routing for synchronous calls may be based on + # generated queue names). + rpc_clients.cleanup() + self.server.start() + print('API server started.') + def stop(self): self.server.stop() diff --git a/mistral/rpc/clients.py b/mistral/rpc/clients.py index d4b13e0aa..89cf573c0 100644 --- a/mistral/rpc/clients.py +++ b/mistral/rpc/clients.py @@ -16,6 +16,7 @@ from oslo_config import cfg from osprofiler import profiler +import threading from mistral import context as auth_ctx from mistral.engine import base as eng @@ -25,12 +26,22 @@ from mistral.rpc import base _ENGINE_CLIENT = None +_ENGINE_CLIENT_LOCK = threading.Lock() + _EXECUTOR_CLIENT = None +_EXECUTOR_CLIENT_LOCK = threading.Lock() + _EVENT_ENGINE_CLIENT = None +_EVENT_ENGINE_CLIENT_LOCK = threading.Lock() def cleanup(): - """Intended to be used by tests to recreate all RPC related objects.""" + """Clean all the RPC clients. + + Intended to be used by tests to recreate all RPC related objects. + Another usage is forking a child API process. In this case we must + recreate all RPC objects so that they function properly. + """ global _ENGINE_CLIENT global _EXECUTOR_CLIENT @@ -43,27 +54,33 @@ def cleanup(): def get_engine_client(): global _ENGINE_CLIENT + global _EVENT_ENGINE_CLIENT_LOCK - if not _ENGINE_CLIENT: - _ENGINE_CLIENT = EngineClient(cfg.CONF.engine) + with _ENGINE_CLIENT_LOCK: + if not _ENGINE_CLIENT: + _ENGINE_CLIENT = EngineClient(cfg.CONF.engine) return _ENGINE_CLIENT def get_executor_client(): global _EXECUTOR_CLIENT + global _EXECUTOR_CLIENT_LOCK - if not _EXECUTOR_CLIENT: - _EXECUTOR_CLIENT = ExecutorClient(cfg.CONF.executor) + with _EXECUTOR_CLIENT_LOCK: + if not _EXECUTOR_CLIENT: + _EXECUTOR_CLIENT = ExecutorClient(cfg.CONF.executor) return _EXECUTOR_CLIENT def get_event_engine_client(): global _EVENT_ENGINE_CLIENT + global _EVENT_ENGINE_CLIENT_LOCK - if not _EVENT_ENGINE_CLIENT: - _EVENT_ENGINE_CLIENT = EventEngineClient(cfg.CONF.event_engine) + with _EVENT_ENGINE_CLIENT_LOCK: + if not _EVENT_ENGINE_CLIENT: + _EVENT_ENGINE_CLIENT = EventEngineClient(cfg.CONF.event_engine) return _EVENT_ENGINE_CLIENT diff --git a/mistral/rpc/kombu/base.py b/mistral/rpc/kombu/base.py index a8885ee0b..e66c5a724 100644 --- a/mistral/rpc/kombu/base.py +++ b/mistral/rpc/kombu/base.py @@ -109,7 +109,7 @@ class Base(object): :param exchange: Kombu Exchange object (can be created using _make_exchange). :param routing_key: Routing key for queue. It behaves differently - depending the exchange type. See Kombu docs for + depending on exchange type. See Kombu docs for further details. :param durable: If set to True, messages on this queue would be store on disk - therefore can be retrieve after diff --git a/mistral/rpc/kombu/kombu_client.py b/mistral/rpc/kombu/kombu_client.py index a567af054..6240edca1 100644 --- a/mistral/rpc/kombu/kombu_client.py +++ b/mistral/rpc/kombu/kombu_client.py @@ -104,7 +104,9 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base): try: return self._listener.get_result(correlation_id, self._timeout) except moves.queue.Empty: - raise exc.MistralException("RPC Request timeout") + raise exc.MistralException( + "RPC Request timeout, correlation_id = %s" % correlation_id + ) def _call(self, ctx, method, target, async_=False, **kwargs): """Performs a remote call for the given method. @@ -126,7 +128,7 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base): 'async': async_ } - LOG.debug("Publish request: {0}".format(body)) + LOG.debug("Publish request: %s", body) try: if not async_: @@ -147,6 +149,11 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base): if async_: return + LOG.debug( + "Waiting a reply for sync call [reply_to = %s]", + self.queue_name + ) + result = self._wait_for_result(correlation_id) res_type = result[kombu_base.TYPE] res_object = result[kombu_base.RESULT] diff --git a/mistral/rpc/kombu/kombu_listener.py b/mistral/rpc/kombu/kombu_listener.py index d5da58f37..cbc985704 100644 --- a/mistral/rpc/kombu/kombu_listener.py +++ b/mistral/rpc/kombu/kombu_listener.py @@ -90,6 +90,7 @@ class KombuRPCListener(ConsumerMixin): else None, kombu_base.RESULT: response } + queue.put(result) else: LOG.debug( diff --git a/mistral/rpc/kombu/kombu_server.py b/mistral/rpc/kombu/kombu_server.py index 76fb80a73..96b56977b 100644 --- a/mistral/rpc/kombu/kombu_server.py +++ b/mistral/rpc/kombu/kombu_server.py @@ -34,11 +34,13 @@ LOG = logging.getLogger(__name__) CONF = cfg.CONF _pool_opts = [ - cfg.IntOpt('executor_thread_pool_size', - default=64, - deprecated_name="rpc_thread_pool_size", - help='Size of executor thread pool when' - ' executor is threading or eventlet.'), + cfg.IntOpt( + 'executor_thread_pool_size', + default=64, + deprecated_name="rpc_thread_pool_size", + help='Size of executor thread pool when' + ' executor is threading or eventlet.' + ), ] @@ -59,6 +61,8 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): self._executor_threads = CONF.executor_thread_pool_size self.exchange = CONF.control_exchange + # TODO(rakhmerov): We shouldn't rely on any properties related + # to oslo.messaging. Only "transport_url" should matter. self.virtual_host = CONF.oslo_messaging_rabbit.rabbit_virtual_host self.durable_queue = CONF.oslo_messaging_rabbit.amqp_durable_queues self.auto_delete = CONF.oslo_messaging_rabbit.amqp_auto_delete @@ -69,6 +73,7 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): self._stopped = threading.Event() self.endpoints = [] self._worker = None + self._thread = None # TODO(ddeja): Those 2 options should be gathered from config. self._sleep_time = 1 @@ -80,6 +85,12 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): return self._running.is_set() def run(self, executor='blocking'): + if self._thread is None: + self._thread = threading.Thread(target=self._run, args=(executor,)) + self._thread.daemon = True + self._thread.start() + + def _run(self, executor): """Start the server.""" self._prepare_worker(executor) @@ -134,20 +145,22 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): except KeyboardInterrupt: self.stop() - LOG.info("Server with id='{0}' stopped.".format( - self.server_id) + LOG.info( + "Server with id='%w' stopped.", + self.server_id ) return except (socket.error, amqp.exceptions.ConnectionForced) as e: LOG.debug("Broker connection failed: %s", e) + _retry_connection = True finally: self._stopped.set() if _retry_connection: LOG.debug( - "Sleeping for %s seconds, than retrying " + "Sleeping for %s seconds, then retrying " "connection", self._sleep_time ) @@ -214,7 +227,8 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): ) LOG.debug("Exceptions: %s", str(e)) - # Wrap exception into another exception for compability with oslo. + # Wrap exception into another exception for compatibility + # with oslo. self.publish_message( exc.KombuException(e), message.properties['reply_to'], @@ -248,6 +262,12 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): response = rpc_method(rpc_ctx=rpc_context, **arguments) if not is_async: + LOG.debug( + "RPC server sent a reply [reply_to = %s, correlation_id = %s", + reply_to, + correlation_id + ) + self.publish_message( response, reply_to, diff --git a/mistral/tests/unit/rpc/kombu/test_kombu_server.py b/mistral/tests/unit/rpc/kombu/test_kombu_server.py index f2d369333..952be5c1b 100644 --- a/mistral/tests/unit/rpc/kombu/test_kombu_server.py +++ b/mistral/tests/unit/rpc/kombu/test_kombu_server.py @@ -78,7 +78,7 @@ class KombuServerTestCase(base.KombuTestCase): acquire_mock.drain_events.side_effect = TestException() fake_kombu.connection.acquire.return_value = acquire_mock - self.assertRaises(TestException, self.server.run) + self.assertRaises(TestException, self.server._run, 'blocking') self.assertTrue(self.server.is_running) def test_run_launch_successfully_than_stop(self): @@ -91,7 +91,7 @@ class KombuServerTestCase(base.KombuTestCase): acquire_mock.drain_events.side_effect = side_effect fake_kombu.connection.acquire.return_value = acquire_mock - self.server.run() + self.server._run('blocking') self.assertFalse(self.server.is_running) self.assertEqual(self.server._sleep_time, 1) @@ -106,7 +106,7 @@ class KombuServerTestCase(base.KombuTestCase): acquire_mock.drain_events.side_effect = side_effect fake_kombu.connection.acquire.return_value = acquire_mock - self.assertRaises(TestException, self.server.run) + self.assertRaises(TestException, self.server._run, 'blocking') self.assertEqual(self.server._sleep_time, 2) def test_run_socket_timeout_still_running(self): @@ -122,7 +122,8 @@ class KombuServerTestCase(base.KombuTestCase): self.assertRaises( TestException, - self.server.run + self.server._run, + 'blocking' ) self.assertTrue(self.server.is_running)