Merge "Fix Kombu RPC threading and use within multiprocess environment"
This commit is contained in:
commit
d3d4efdca6
|
@ -18,6 +18,7 @@ from oslo_service import service
|
|||
from oslo_service import wsgi
|
||||
|
||||
from mistral.api import app
|
||||
from mistral.rpc import clients as rpc_clients
|
||||
|
||||
|
||||
class WSGIService(service.ServiceBase):
|
||||
|
@ -40,6 +41,15 @@ class WSGIService(service.ServiceBase):
|
|||
)
|
||||
|
||||
def start(self):
|
||||
# NOTE: When oslo.service creates an API worker it forks a new child
|
||||
# system process. The child process is created as precise copy of the
|
||||
# parent process (see how os.fork() works) and within the child process
|
||||
# oslo.service calls service's start() method again to reinitialize
|
||||
# what's needed. So we must clean up all RPC clients so that RPC works
|
||||
# properly (e.g. message routing for synchronous calls may be based on
|
||||
# generated queue names).
|
||||
rpc_clients.cleanup()
|
||||
|
||||
self.server.start()
|
||||
|
||||
print('API server started.')
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
from oslo_config import cfg
|
||||
from osprofiler import profiler
|
||||
import threading
|
||||
|
||||
from mistral import context as auth_ctx
|
||||
from mistral.engine import base as eng
|
||||
|
@ -25,12 +26,22 @@ from mistral.rpc import base
|
|||
|
||||
|
||||
_ENGINE_CLIENT = None
|
||||
_ENGINE_CLIENT_LOCK = threading.Lock()
|
||||
|
||||
_EXECUTOR_CLIENT = None
|
||||
_EXECUTOR_CLIENT_LOCK = threading.Lock()
|
||||
|
||||
_EVENT_ENGINE_CLIENT = None
|
||||
_EVENT_ENGINE_CLIENT_LOCK = threading.Lock()
|
||||
|
||||
|
||||
def cleanup():
|
||||
"""Intended to be used by tests to recreate all RPC related objects."""
|
||||
"""Clean all the RPC clients.
|
||||
|
||||
Intended to be used by tests to recreate all RPC related objects.
|
||||
Another usage is forking a child API process. In this case we must
|
||||
recreate all RPC objects so that they function properly.
|
||||
"""
|
||||
|
||||
global _ENGINE_CLIENT
|
||||
global _EXECUTOR_CLIENT
|
||||
|
@ -43,27 +54,33 @@ def cleanup():
|
|||
|
||||
def get_engine_client():
|
||||
global _ENGINE_CLIENT
|
||||
global _EVENT_ENGINE_CLIENT_LOCK
|
||||
|
||||
if not _ENGINE_CLIENT:
|
||||
_ENGINE_CLIENT = EngineClient(cfg.CONF.engine)
|
||||
with _ENGINE_CLIENT_LOCK:
|
||||
if not _ENGINE_CLIENT:
|
||||
_ENGINE_CLIENT = EngineClient(cfg.CONF.engine)
|
||||
|
||||
return _ENGINE_CLIENT
|
||||
|
||||
|
||||
def get_executor_client():
|
||||
global _EXECUTOR_CLIENT
|
||||
global _EXECUTOR_CLIENT_LOCK
|
||||
|
||||
if not _EXECUTOR_CLIENT:
|
||||
_EXECUTOR_CLIENT = ExecutorClient(cfg.CONF.executor)
|
||||
with _EXECUTOR_CLIENT_LOCK:
|
||||
if not _EXECUTOR_CLIENT:
|
||||
_EXECUTOR_CLIENT = ExecutorClient(cfg.CONF.executor)
|
||||
|
||||
return _EXECUTOR_CLIENT
|
||||
|
||||
|
||||
def get_event_engine_client():
|
||||
global _EVENT_ENGINE_CLIENT
|
||||
global _EVENT_ENGINE_CLIENT_LOCK
|
||||
|
||||
if not _EVENT_ENGINE_CLIENT:
|
||||
_EVENT_ENGINE_CLIENT = EventEngineClient(cfg.CONF.event_engine)
|
||||
with _EVENT_ENGINE_CLIENT_LOCK:
|
||||
if not _EVENT_ENGINE_CLIENT:
|
||||
_EVENT_ENGINE_CLIENT = EventEngineClient(cfg.CONF.event_engine)
|
||||
|
||||
return _EVENT_ENGINE_CLIENT
|
||||
|
||||
|
|
|
@ -109,7 +109,7 @@ class Base(object):
|
|||
:param exchange: Kombu Exchange object (can be created using
|
||||
_make_exchange).
|
||||
:param routing_key: Routing key for queue. It behaves differently
|
||||
depending the exchange type. See Kombu docs for
|
||||
depending on exchange type. See Kombu docs for
|
||||
further details.
|
||||
:param durable: If set to True, messages on this queue would be
|
||||
store on disk - therefore can be retrieve after
|
||||
|
|
|
@ -104,7 +104,9 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
|
|||
try:
|
||||
return self._listener.get_result(correlation_id, self._timeout)
|
||||
except moves.queue.Empty:
|
||||
raise exc.MistralException("RPC Request timeout")
|
||||
raise exc.MistralException(
|
||||
"RPC Request timeout, correlation_id = %s" % correlation_id
|
||||
)
|
||||
|
||||
def _call(self, ctx, method, target, async_=False, **kwargs):
|
||||
"""Performs a remote call for the given method.
|
||||
|
@ -126,7 +128,7 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
|
|||
'async': async_
|
||||
}
|
||||
|
||||
LOG.debug("Publish request: {0}".format(body))
|
||||
LOG.debug("Publish request: %s", body)
|
||||
|
||||
try:
|
||||
if not async_:
|
||||
|
@ -147,6 +149,11 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
|
|||
if async_:
|
||||
return
|
||||
|
||||
LOG.debug(
|
||||
"Waiting a reply for sync call [reply_to = %s]",
|
||||
self.queue_name
|
||||
)
|
||||
|
||||
result = self._wait_for_result(correlation_id)
|
||||
res_type = result[kombu_base.TYPE]
|
||||
res_object = result[kombu_base.RESULT]
|
||||
|
|
|
@ -90,6 +90,7 @@ class KombuRPCListener(ConsumerMixin):
|
|||
else None,
|
||||
kombu_base.RESULT: response
|
||||
}
|
||||
|
||||
queue.put(result)
|
||||
else:
|
||||
LOG.debug(
|
||||
|
|
|
@ -34,11 +34,13 @@ LOG = logging.getLogger(__name__)
|
|||
CONF = cfg.CONF
|
||||
|
||||
_pool_opts = [
|
||||
cfg.IntOpt('executor_thread_pool_size',
|
||||
default=64,
|
||||
deprecated_name="rpc_thread_pool_size",
|
||||
help='Size of executor thread pool when'
|
||||
' executor is threading or eventlet.'),
|
||||
cfg.IntOpt(
|
||||
'executor_thread_pool_size',
|
||||
default=64,
|
||||
deprecated_name="rpc_thread_pool_size",
|
||||
help='Size of executor thread pool when'
|
||||
' executor is threading or eventlet.'
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
|
@ -59,6 +61,8 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
|
|||
|
||||
self._executor_threads = CONF.executor_thread_pool_size
|
||||
self.exchange = CONF.control_exchange
|
||||
# TODO(rakhmerov): We shouldn't rely on any properties related
|
||||
# to oslo.messaging. Only "transport_url" should matter.
|
||||
self.virtual_host = CONF.oslo_messaging_rabbit.rabbit_virtual_host
|
||||
self.durable_queue = CONF.oslo_messaging_rabbit.amqp_durable_queues
|
||||
self.auto_delete = CONF.oslo_messaging_rabbit.amqp_auto_delete
|
||||
|
@ -69,6 +73,7 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
|
|||
self._stopped = threading.Event()
|
||||
self.endpoints = []
|
||||
self._worker = None
|
||||
self._thread = None
|
||||
|
||||
# TODO(ddeja): Those 2 options should be gathered from config.
|
||||
self._sleep_time = 1
|
||||
|
@ -80,6 +85,12 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
|
|||
return self._running.is_set()
|
||||
|
||||
def run(self, executor='blocking'):
|
||||
if self._thread is None:
|
||||
self._thread = threading.Thread(target=self._run, args=(executor,))
|
||||
self._thread.daemon = True
|
||||
self._thread.start()
|
||||
|
||||
def _run(self, executor):
|
||||
"""Start the server."""
|
||||
self._prepare_worker(executor)
|
||||
|
||||
|
@ -134,20 +145,22 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
|
|||
except KeyboardInterrupt:
|
||||
self.stop()
|
||||
|
||||
LOG.info("Server with id='{0}' stopped.".format(
|
||||
self.server_id)
|
||||
LOG.info(
|
||||
"Server with id='%w' stopped.",
|
||||
self.server_id
|
||||
)
|
||||
|
||||
return
|
||||
except (socket.error, amqp.exceptions.ConnectionForced) as e:
|
||||
LOG.debug("Broker connection failed: %s", e)
|
||||
|
||||
_retry_connection = True
|
||||
finally:
|
||||
self._stopped.set()
|
||||
|
||||
if _retry_connection:
|
||||
LOG.debug(
|
||||
"Sleeping for %s seconds, than retrying "
|
||||
"Sleeping for %s seconds, then retrying "
|
||||
"connection",
|
||||
self._sleep_time
|
||||
)
|
||||
|
@ -214,7 +227,8 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
|
|||
)
|
||||
LOG.debug("Exceptions: %s", str(e))
|
||||
|
||||
# Wrap exception into another exception for compability with oslo.
|
||||
# Wrap exception into another exception for compatibility
|
||||
# with oslo.
|
||||
self.publish_message(
|
||||
exc.KombuException(e),
|
||||
message.properties['reply_to'],
|
||||
|
@ -248,6 +262,12 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
|
|||
response = rpc_method(rpc_ctx=rpc_context, **arguments)
|
||||
|
||||
if not is_async:
|
||||
LOG.debug(
|
||||
"RPC server sent a reply [reply_to = %s, correlation_id = %s",
|
||||
reply_to,
|
||||
correlation_id
|
||||
)
|
||||
|
||||
self.publish_message(
|
||||
response,
|
||||
reply_to,
|
||||
|
|
|
@ -78,7 +78,7 @@ class KombuServerTestCase(base.KombuTestCase):
|
|||
acquire_mock.drain_events.side_effect = TestException()
|
||||
fake_kombu.connection.acquire.return_value = acquire_mock
|
||||
|
||||
self.assertRaises(TestException, self.server.run)
|
||||
self.assertRaises(TestException, self.server._run, 'blocking')
|
||||
self.assertTrue(self.server.is_running)
|
||||
|
||||
def test_run_launch_successfully_than_stop(self):
|
||||
|
@ -91,7 +91,7 @@ class KombuServerTestCase(base.KombuTestCase):
|
|||
acquire_mock.drain_events.side_effect = side_effect
|
||||
fake_kombu.connection.acquire.return_value = acquire_mock
|
||||
|
||||
self.server.run()
|
||||
self.server._run('blocking')
|
||||
self.assertFalse(self.server.is_running)
|
||||
self.assertEqual(self.server._sleep_time, 1)
|
||||
|
||||
|
@ -106,7 +106,7 @@ class KombuServerTestCase(base.KombuTestCase):
|
|||
acquire_mock.drain_events.side_effect = side_effect
|
||||
fake_kombu.connection.acquire.return_value = acquire_mock
|
||||
|
||||
self.assertRaises(TestException, self.server.run)
|
||||
self.assertRaises(TestException, self.server._run, 'blocking')
|
||||
self.assertEqual(self.server._sleep_time, 2)
|
||||
|
||||
def test_run_socket_timeout_still_running(self):
|
||||
|
@ -122,7 +122,8 @@ class KombuServerTestCase(base.KombuTestCase):
|
|||
|
||||
self.assertRaises(
|
||||
TestException,
|
||||
self.server.run
|
||||
self.server._run,
|
||||
'blocking'
|
||||
)
|
||||
self.assertTrue(self.server.is_running)
|
||||
|
||||
|
|
Loading…
Reference in New Issue