Fix Kombu RPC threading and use within multiprocess environment
* Added a call to clean up RPC clients into WSGIService start() method to make sure that we reinitialize RPC in every child API process. Otherwise, replies for synchornous RPC calls don't get routed to the right client process (see bug description) * Mistral components engine, executor and event engine didn't start properly because KombuServer.run() was implemented w/o a new thread. This patch fixes it. * The methods to get RPC clients like mistral/rpc/clients.py:get_engine_client() wasn't synchronized on the client instances. As a consequence, sometimes Mistral created more than one instance of the same client (violation of Singleton pattern). This patch fixes it by adding required locking. * Other style changes Closes-Bug: #1714186 Closes-Bug: #1714929 Change-Id: I242c48df9811459321d1cd4b1d37ed35af0a013a
This commit is contained in:
parent
a64e3f5fca
commit
2daa3c0ab2
|
@ -18,6 +18,7 @@ from oslo_service import service
|
|||
from oslo_service import wsgi
|
||||
|
||||
from mistral.api import app
|
||||
from mistral.rpc import clients as rpc_clients
|
||||
|
||||
|
||||
class WSGIService(service.ServiceBase):
|
||||
|
@ -40,8 +41,19 @@ class WSGIService(service.ServiceBase):
|
|||
)
|
||||
|
||||
def start(self):
|
||||
# NOTE: When oslo.service creates an API worker it forks a new child
|
||||
# system process. The child process is created as precise copy of the
|
||||
# parent process (see how os.fork() works) and within the child process
|
||||
# oslo.service calls service's start() method again to reinitialize
|
||||
# what's needed. So we must clean up all RPC clients so that RPC works
|
||||
# properly (e.g. message routing for synchronous calls may be based on
|
||||
# generated queue names).
|
||||
rpc_clients.cleanup()
|
||||
|
||||
self.server.start()
|
||||
|
||||
print('API server started.')
|
||||
|
||||
def stop(self):
|
||||
self.server.stop()
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
from oslo_config import cfg
|
||||
from osprofiler import profiler
|
||||
import threading
|
||||
|
||||
from mistral import context as auth_ctx
|
||||
from mistral.engine import base as eng
|
||||
|
@ -25,12 +26,22 @@ from mistral.rpc import base
|
|||
|
||||
|
||||
_ENGINE_CLIENT = None
|
||||
_ENGINE_CLIENT_LOCK = threading.Lock()
|
||||
|
||||
_EXECUTOR_CLIENT = None
|
||||
_EXECUTOR_CLIENT_LOCK = threading.Lock()
|
||||
|
||||
_EVENT_ENGINE_CLIENT = None
|
||||
_EVENT_ENGINE_CLIENT_LOCK = threading.Lock()
|
||||
|
||||
|
||||
def cleanup():
|
||||
"""Intended to be used by tests to recreate all RPC related objects."""
|
||||
"""Clean all the RPC clients.
|
||||
|
||||
Intended to be used by tests to recreate all RPC related objects.
|
||||
Another usage is forking a child API process. In this case we must
|
||||
recreate all RPC objects so that they function properly.
|
||||
"""
|
||||
|
||||
global _ENGINE_CLIENT
|
||||
global _EXECUTOR_CLIENT
|
||||
|
@ -43,27 +54,33 @@ def cleanup():
|
|||
|
||||
def get_engine_client():
|
||||
global _ENGINE_CLIENT
|
||||
global _EVENT_ENGINE_CLIENT_LOCK
|
||||
|
||||
if not _ENGINE_CLIENT:
|
||||
_ENGINE_CLIENT = EngineClient(cfg.CONF.engine)
|
||||
with _ENGINE_CLIENT_LOCK:
|
||||
if not _ENGINE_CLIENT:
|
||||
_ENGINE_CLIENT = EngineClient(cfg.CONF.engine)
|
||||
|
||||
return _ENGINE_CLIENT
|
||||
|
||||
|
||||
def get_executor_client():
|
||||
global _EXECUTOR_CLIENT
|
||||
global _EXECUTOR_CLIENT_LOCK
|
||||
|
||||
if not _EXECUTOR_CLIENT:
|
||||
_EXECUTOR_CLIENT = ExecutorClient(cfg.CONF.executor)
|
||||
with _EXECUTOR_CLIENT_LOCK:
|
||||
if not _EXECUTOR_CLIENT:
|
||||
_EXECUTOR_CLIENT = ExecutorClient(cfg.CONF.executor)
|
||||
|
||||
return _EXECUTOR_CLIENT
|
||||
|
||||
|
||||
def get_event_engine_client():
|
||||
global _EVENT_ENGINE_CLIENT
|
||||
global _EVENT_ENGINE_CLIENT_LOCK
|
||||
|
||||
if not _EVENT_ENGINE_CLIENT:
|
||||
_EVENT_ENGINE_CLIENT = EventEngineClient(cfg.CONF.event_engine)
|
||||
with _EVENT_ENGINE_CLIENT_LOCK:
|
||||
if not _EVENT_ENGINE_CLIENT:
|
||||
_EVENT_ENGINE_CLIENT = EventEngineClient(cfg.CONF.event_engine)
|
||||
|
||||
return _EVENT_ENGINE_CLIENT
|
||||
|
||||
|
|
|
@ -109,7 +109,7 @@ class Base(object):
|
|||
:param exchange: Kombu Exchange object (can be created using
|
||||
_make_exchange).
|
||||
:param routing_key: Routing key for queue. It behaves differently
|
||||
depending the exchange type. See Kombu docs for
|
||||
depending on exchange type. See Kombu docs for
|
||||
further details.
|
||||
:param durable: If set to True, messages on this queue would be
|
||||
store on disk - therefore can be retrieve after
|
||||
|
|
|
@ -104,7 +104,9 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
|
|||
try:
|
||||
return self._listener.get_result(correlation_id, self._timeout)
|
||||
except moves.queue.Empty:
|
||||
raise exc.MistralException("RPC Request timeout")
|
||||
raise exc.MistralException(
|
||||
"RPC Request timeout, correlation_id = %s" % correlation_id
|
||||
)
|
||||
|
||||
def _call(self, ctx, method, target, async_=False, **kwargs):
|
||||
"""Performs a remote call for the given method.
|
||||
|
@ -126,7 +128,7 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
|
|||
'async': async_
|
||||
}
|
||||
|
||||
LOG.debug("Publish request: {0}".format(body))
|
||||
LOG.debug("Publish request: %s", body)
|
||||
|
||||
try:
|
||||
if not async_:
|
||||
|
@ -147,6 +149,11 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
|
|||
if async_:
|
||||
return
|
||||
|
||||
LOG.debug(
|
||||
"Waiting a reply for sync call [reply_to = %s]",
|
||||
self.queue_name
|
||||
)
|
||||
|
||||
result = self._wait_for_result(correlation_id)
|
||||
res_type = result[kombu_base.TYPE]
|
||||
res_object = result[kombu_base.RESULT]
|
||||
|
|
|
@ -90,6 +90,7 @@ class KombuRPCListener(ConsumerMixin):
|
|||
else None,
|
||||
kombu_base.RESULT: response
|
||||
}
|
||||
|
||||
queue.put(result)
|
||||
else:
|
||||
LOG.debug(
|
||||
|
|
|
@ -34,11 +34,13 @@ LOG = logging.getLogger(__name__)
|
|||
CONF = cfg.CONF
|
||||
|
||||
_pool_opts = [
|
||||
cfg.IntOpt('executor_thread_pool_size',
|
||||
default=64,
|
||||
deprecated_name="rpc_thread_pool_size",
|
||||
help='Size of executor thread pool when'
|
||||
' executor is threading or eventlet.'),
|
||||
cfg.IntOpt(
|
||||
'executor_thread_pool_size',
|
||||
default=64,
|
||||
deprecated_name="rpc_thread_pool_size",
|
||||
help='Size of executor thread pool when'
|
||||
' executor is threading or eventlet.'
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
|
@ -59,6 +61,8 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
|
|||
|
||||
self._executor_threads = CONF.executor_thread_pool_size
|
||||
self.exchange = CONF.control_exchange
|
||||
# TODO(rakhmerov): We shouldn't rely on any properties related
|
||||
# to oslo.messaging. Only "transport_url" should matter.
|
||||
self.virtual_host = CONF.oslo_messaging_rabbit.rabbit_virtual_host
|
||||
self.durable_queue = CONF.oslo_messaging_rabbit.amqp_durable_queues
|
||||
self.auto_delete = CONF.oslo_messaging_rabbit.amqp_auto_delete
|
||||
|
@ -69,6 +73,7 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
|
|||
self._stopped = threading.Event()
|
||||
self.endpoints = []
|
||||
self._worker = None
|
||||
self._thread = None
|
||||
|
||||
# TODO(ddeja): Those 2 options should be gathered from config.
|
||||
self._sleep_time = 1
|
||||
|
@ -80,6 +85,12 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
|
|||
return self._running.is_set()
|
||||
|
||||
def run(self, executor='blocking'):
|
||||
if self._thread is None:
|
||||
self._thread = threading.Thread(target=self._run, args=(executor,))
|
||||
self._thread.daemon = True
|
||||
self._thread.start()
|
||||
|
||||
def _run(self, executor):
|
||||
"""Start the server."""
|
||||
self._prepare_worker(executor)
|
||||
|
||||
|
@ -134,20 +145,22 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
|
|||
except KeyboardInterrupt:
|
||||
self.stop()
|
||||
|
||||
LOG.info("Server with id='{0}' stopped.".format(
|
||||
self.server_id)
|
||||
LOG.info(
|
||||
"Server with id='%w' stopped.",
|
||||
self.server_id
|
||||
)
|
||||
|
||||
return
|
||||
except (socket.error, amqp.exceptions.ConnectionForced) as e:
|
||||
LOG.debug("Broker connection failed: %s", e)
|
||||
|
||||
_retry_connection = True
|
||||
finally:
|
||||
self._stopped.set()
|
||||
|
||||
if _retry_connection:
|
||||
LOG.debug(
|
||||
"Sleeping for %s seconds, than retrying "
|
||||
"Sleeping for %s seconds, then retrying "
|
||||
"connection",
|
||||
self._sleep_time
|
||||
)
|
||||
|
@ -214,7 +227,8 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
|
|||
)
|
||||
LOG.debug("Exceptions: %s", str(e))
|
||||
|
||||
# Wrap exception into another exception for compability with oslo.
|
||||
# Wrap exception into another exception for compatibility
|
||||
# with oslo.
|
||||
self.publish_message(
|
||||
exc.KombuException(e),
|
||||
message.properties['reply_to'],
|
||||
|
@ -248,6 +262,12 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
|
|||
response = rpc_method(rpc_ctx=rpc_context, **arguments)
|
||||
|
||||
if not is_async:
|
||||
LOG.debug(
|
||||
"RPC server sent a reply [reply_to = %s, correlation_id = %s",
|
||||
reply_to,
|
||||
correlation_id
|
||||
)
|
||||
|
||||
self.publish_message(
|
||||
response,
|
||||
reply_to,
|
||||
|
|
|
@ -78,7 +78,7 @@ class KombuServerTestCase(base.KombuTestCase):
|
|||
acquire_mock.drain_events.side_effect = TestException()
|
||||
fake_kombu.connection.acquire.return_value = acquire_mock
|
||||
|
||||
self.assertRaises(TestException, self.server.run)
|
||||
self.assertRaises(TestException, self.server._run, 'blocking')
|
||||
self.assertTrue(self.server.is_running)
|
||||
|
||||
def test_run_launch_successfully_than_stop(self):
|
||||
|
@ -91,7 +91,7 @@ class KombuServerTestCase(base.KombuTestCase):
|
|||
acquire_mock.drain_events.side_effect = side_effect
|
||||
fake_kombu.connection.acquire.return_value = acquire_mock
|
||||
|
||||
self.server.run()
|
||||
self.server._run('blocking')
|
||||
self.assertFalse(self.server.is_running)
|
||||
self.assertEqual(self.server._sleep_time, 1)
|
||||
|
||||
|
@ -106,7 +106,7 @@ class KombuServerTestCase(base.KombuTestCase):
|
|||
acquire_mock.drain_events.side_effect = side_effect
|
||||
fake_kombu.connection.acquire.return_value = acquire_mock
|
||||
|
||||
self.assertRaises(TestException, self.server.run)
|
||||
self.assertRaises(TestException, self.server._run, 'blocking')
|
||||
self.assertEqual(self.server._sleep_time, 2)
|
||||
|
||||
def test_run_socket_timeout_still_running(self):
|
||||
|
@ -122,7 +122,8 @@ class KombuServerTestCase(base.KombuTestCase):
|
|||
|
||||
self.assertRaises(
|
||||
TestException,
|
||||
self.server.run
|
||||
self.server._run,
|
||||
'blocking'
|
||||
)
|
||||
self.assertTrue(self.server.is_running)
|
||||
|
||||
|
|
Loading…
Reference in New Issue