Merge "Make kombu driver work in multi-thread manner"

This commit is contained in:
Jenkins 2017-01-22 14:28:52 +00:00 committed by Gerrit Code Review
commit 1b7b969e53
3 changed files with 62 additions and 1 deletions

View File

@ -16,7 +16,9 @@ import socket
import threading
import kombu
from oslo_config import cfg
from oslo_log import log as logging
from stevedore import driver
from mistral import context as auth_ctx
from mistral.engine.rpc_backend import base as rpc_base
@ -25,6 +27,13 @@ from mistral import exceptions as exc
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
_pool_opts = [
cfg.IntOpt('executor_thread_pool_size',
default=64,
deprecated_name="rpc_thread_pool_size",
help='Size of executor thread pool.'),
]
class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
@ -32,7 +41,9 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
super(KombuRPCServer, self).__init__(conf)
self._register_mistral_serialization()
CONF.register_opts(_pool_opts)
self._executor_threads = CONF.executor_thread_pool_size
self.exchange = conf.get('exchange', '')
self.user_id = conf.get('user_id', 'guest')
self.password = conf.get('password', 'guest')
@ -49,6 +60,7 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
self._running = threading.Event()
self._stopped = threading.Event()
self.endpoints = []
self._worker = None
@property
def is_running(self):
@ -57,6 +69,8 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
def run(self, executor='blocking'):
"""Start the server."""
self._prepare_worker(executor)
self.conn = self._make_connection(
self.host,
self.port,
@ -83,7 +97,7 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
)
with conn.Consumer(
queues=queue,
callbacks=[self._on_message_safe],
callbacks=[self._process_message],
) as consumer:
consumer.qos(prefetch_count=1)
@ -115,6 +129,10 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
def wait(self):
self._stopped.wait()
try:
self._worker.shutdown(wait=True)
except AttributeError as e:
LOG.warning("Cannot stop worker in graceful way: %s" % e)
def _get_rpc_method(self, method_name):
for endpoint in self.endpoints:
@ -192,3 +210,15 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
def register_endpoint(self, endpoint):
self.endpoints.append(endpoint)
def _process_message(self, request, message):
self._worker.submit(self._on_message_safe, request, message)
def _prepare_worker(self, executor='blocking'):
mgr = driver.DriverManager('kombu_driver.executors', executor)
executor_opts = {}
if executor == 'threading':
executor_opts['max_workers'] = self._executor_threads
self._worker = mgr.driver(**executor_opts)

View File

@ -19,6 +19,7 @@ from mistral.tests.unit.engine.rpc_backend.kombu import fake_kombu
import mock
import socket
from stevedore import driver
with mock.patch.dict('sys.modules', kombu=fake_kombu):
from mistral.engine.rpc_backend.kombu import kombu_server
@ -260,3 +261,29 @@ class KombuServerTestCase(base.KombuTestCase):
reply_to,
correlation_id
)
@mock.patch('stevedore.driver.DriverManager')
def test__prepare_worker(self, driver_manager_mock):
worker_mock = mock.MagicMock()
mgr_mock = mock.MagicMock()
mgr_mock.driver.return_value = worker_mock
def side_effect(*args, **kwargs):
return mgr_mock
driver_manager_mock.side_effect = side_effect
self.server._prepare_worker('blocking')
self.assertEqual(self.server._worker, worker_mock)
@mock.patch('stevedore.driver.DriverManager')
def test__prepare_worker_no_valid_executor(self, driver_manager_mock):
driver_manager_mock.side_effect = driver.NoMatches()
self.assertRaises(
driver.NoMatches,
self.server._prepare_worker,
'non_valid_executor'
)

View File

@ -86,3 +86,7 @@ mistral.expression.evaluators =
mistral.auth =
keystone = mistral.auth.keystone:KeystoneAuthHandler
keycloak-oidc = mistral.auth.keycloak:KeycloakAuthHandler
kombu_driver.executors =
blocking = futurist:SynchronousExecutor
threading = futurist:ThreadPoolExecutor