From 95e6b34b17b62674972e1cea47fbd83ed4613ee8 Mon Sep 17 00:00:00 2001 From: Nikolay Mahotkin Date: Fri, 26 Jun 2015 14:12:03 +0300 Subject: [PATCH] Integrating new RPC layer with Mistral At this point all tests pass, but there is no test for kombu driver. Komu driver must be consider as expermiental before new tests are introduced. TODO (next commits): - tests Partially implements blueprint mistral-alternative-rpc Co-Authored-By: Dawid Deja Change-Id: I1f5ca1f1e8c741efcb61480ccbec8e50ad993cba --- mistral/cmd/launch.py | 71 ++++++------------ mistral/config.py | 3 +- mistral/engine/rpc/rpc.py | 72 +++++++------------ .../unit/api/v2/test_action_executions.py | 6 ++ mistral/tests/unit/api/v2/test_executions.py | 6 ++ mistral/tests/unit/engine/base.py | 5 +- .../engine/{rpc_direct => rpc}/__init__.py | 0 .../engine/{rpc_direct => rpc}/test_rpc.py | 7 +- .../tests/unit/engine/test_default_engine.py | 4 +- mistral/utils/rpc_utils.py | 10 ++- 10 files changed, 79 insertions(+), 105 deletions(-) rename mistral/tests/unit/engine/{rpc_direct => rpc}/__init__.py (100%) rename mistral/tests/unit/engine/{rpc_direct => rpc}/test_rpc.py (88%) diff --git a/mistral/cmd/launch.py b/mistral/cmd/launch.py index 204f6c83..e13ab70c 100644 --- a/mistral/cmd/launch.py +++ b/mistral/cmd/launch.py @@ -25,9 +25,10 @@ eventlet.monkey_patch( thread=False if '--use-debugger' in sys.argv else True, time=True) + import os import six -import time + # If ../mistral/__init__.py exists, add ../ to Python search path, so that # it will override what happens to be installed in /usr/(local/)lib/python... @@ -39,7 +40,6 @@ if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'mistral', '__init__.py')): from oslo_config import cfg from oslo_log import log as logging -import oslo_messaging as messaging if six.PY3 is True: import socketserver @@ -51,7 +51,6 @@ from wsgiref.simple_server import WSGIServer from mistral.api import app from mistral import config -from mistral import context as ctx from mistral.db.v2 import api as db_api from mistral.engine import default_engine as def_eng from mistral.engine import default_executor as def_executor @@ -59,6 +58,7 @@ from mistral.engine.rpc import rpc from mistral.services import expiration_policy from mistral.services import scheduler from mistral.utils import profiler +from mistral.utils import rpc_utils from mistral import version @@ -67,51 +67,33 @@ CONF = cfg.CONF LOG = logging.getLogger(__name__) -def launch_executor(transport): +def launch_executor(): profiler.setup('mistral-executor', cfg.CONF.executor.host) - target = messaging.Target( - topic=cfg.CONF.executor.topic, - server=cfg.CONF.executor.host - ) - executor_v2 = def_executor.DefaultExecutor(rpc.get_engine_client()) + executor_endpoint = rpc.ExecutorServer(executor_v2) - endpoints = [rpc.ExecutorServer(executor_v2)] - - server = messaging.get_rpc_server( - transport, - target, - endpoints, - executor='eventlet', - serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer()) + executor_server = rpc.get_rpc_server_driver()( + rpc_utils.get_rpc_info_from_oslo(CONF.executor) ) + executor_server.register_endpoint(executor_endpoint) executor_v2.register_membership() try: - server.start() - while True: - time.sleep(604800) + executor_server.run() except (KeyboardInterrupt, SystemExit): pass finally: print("Stopping executor service...") - server.stop() - server.wait() -def launch_engine(transport): +def launch_engine(): profiler.setup('mistral-engine', cfg.CONF.engine.host) - target = messaging.Target( - topic=cfg.CONF.engine.topic, - server=cfg.CONF.engine.host - ) - engine_v2 = def_eng.DefaultEngine(rpc.get_engine_client()) - endpoints = [rpc.EngineServer(engine_v2)] + engine_endpoint = rpc.EngineServer(engine_v2) # Setup scheduler in engine. db_api.setup_db() @@ -120,37 +102,30 @@ def launch_engine(transport): # Setup expiration policy expiration_policy.setup() - server = messaging.get_rpc_server( - transport, - target, - endpoints, - executor='eventlet', - serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer()) + engine_server = rpc.get_rpc_server_driver()( + rpc_utils.get_rpc_info_from_oslo(CONF.engine) ) + engine_server.register_endpoint(engine_endpoint) engine_v2.register_membership() try: - server.start() - while True: - time.sleep(604800) + engine_server.run() except (KeyboardInterrupt, SystemExit): pass finally: print("Stopping engine service...") - server.stop() - server.wait() class ThreadingWSGIServer(socketserver.ThreadingMixIn, WSGIServer): pass -def launch_api(transport): +def launch_api(): host = cfg.CONF.api.host port = cfg.CONF.api.port - server = simple_server.make_server( + api_server = simple_server.make_server( host, port, app.setup_app(), @@ -160,12 +135,12 @@ def launch_api(transport): LOG.info("Mistral API is serving on http://%s:%s (PID=%s)" % (host, port, os.getpid())) - server.serve_forever() + api_server.serve_forever() -def launch_any(transport, options): +def launch_any(options): # Launch the servers on different threads. - threads = [eventlet.spawn(LAUNCH_OPTIONS[option], transport) + threads = [eventlet.spawn(LAUNCH_OPTIONS[option]) for option in options] print('Server started.') @@ -250,11 +225,11 @@ def main(): # servers are launched on the same process. Otherwise, messages do not # get delivered if the Mistral servers are launched on different # processes because the "fake" transport is using an in process queue. - transport = rpc.get_transport() + rpc.get_transport() if cfg.CONF.server == ['all']: # Launch all servers. - launch_any(transport, LAUNCH_OPTIONS.keys()) + launch_any(LAUNCH_OPTIONS.keys()) else: # Validate launch option. if set(cfg.CONF.server) - set(LAUNCH_OPTIONS.keys()): @@ -262,7 +237,7 @@ def main(): 'api, engine, and executor.') # Launch distinct set of server(s). - launch_any(transport, set(cfg.CONF.server)) + launch_any(set(cfg.CONF.server)) except RuntimeError as excp: sys.stderr.write("ERROR: %s\n" % excp) diff --git a/mistral/config.py b/mistral/config.py index b945d74f..9d70ecd0 100644 --- a/mistral/config.py +++ b/mistral/config.py @@ -50,7 +50,8 @@ rpc_impl_opt = cfg.StrOpt( 'rpc_implementation', default='oslo', choices=['oslo', 'kombu'], - help='Specifies RPC implementation for RPC client and server.' + help='Specifies RPC implementation for RPC client and server. Support of ' + 'kombu driver is experimental.' ) pecan_opts = [ diff --git a/mistral/engine/rpc/rpc.py b/mistral/engine/rpc/rpc.py index 1dbf55ad..65869cb8 100644 --- a/mistral/engine/rpc/rpc.py +++ b/mistral/engine/rpc/rpc.py @@ -17,13 +17,12 @@ from oslo_config import cfg from oslo_log import log as logging import oslo_messaging as messaging from oslo_messaging.rpc import client -from oslo_messaging.rpc import dispatcher -from oslo_messaging.rpc import server from stevedore import driver from mistral import context as auth_ctx from mistral.engine import base from mistral import exceptions as exc +from mistral.utils import rpc_utils from mistral.workflow import utils as wf_utils @@ -38,16 +37,6 @@ _ENGINE_CLIENT = None _EXECUTOR_CLIENT = None -def get_rpc_server(transport, target, endpoints, executor='blocking', - serializer=None): - return server.RPCServer( - transport, - target, - dispatcher.RPCDispatcher(endpoints, serializer), - executor - ) - - def cleanup(): """Intended to be used by tests to recreate all RPC related objects.""" @@ -73,7 +62,9 @@ def get_engine_client(): global _ENGINE_CLIENT if not _ENGINE_CLIENT: - _ENGINE_CLIENT = EngineClient(get_transport()) + _ENGINE_CLIENT = EngineClient( + rpc_utils.get_rpc_info_from_oslo(cfg.CONF.engine) + ) return _ENGINE_CLIENT @@ -82,7 +73,9 @@ def get_executor_client(): global _EXECUTOR_CLIENT if not _EXECUTOR_CLIENT: - _EXECUTOR_CLIENT = ExecutorClient(get_transport()) + _EXECUTOR_CLIENT = ExecutorClient( + rpc_utils.get_rpc_info_from_oslo(cfg.CONF.executor) + ) return _EXECUTOR_CLIENT @@ -316,19 +309,12 @@ def wrap_messaging_exception(method): class EngineClient(base.Engine): """RPC Engine client.""" - def __init__(self, transport): + def __init__(self, rpc_conf_dict): """Constructs an RPC client for engine. - :param transport: Messaging transport. + :param rpc_conf_dict: Dict containing RPC configuration. """ - serializer = auth_ctx.RpcContextSerializer( - auth_ctx.JsonPayloadSerializer()) - - self._client = messaging.RPCClient( - transport, - messaging.Target(topic=cfg.CONF.engine.topic), - serializer=serializer - ) + self._client = get_rpc_client_driver()(rpc_conf_dict) @wrap_messaging_exception def start_workflow(self, wf_identifier, wf_input, description='', @@ -337,7 +323,7 @@ class EngineClient(base.Engine): :return: Workflow execution. """ - return self._client.call( + return self._client.sync_call( auth_ctx.ctx(), 'start_workflow', workflow_identifier=wf_identifier, @@ -353,7 +339,7 @@ class EngineClient(base.Engine): :return: Action execution. """ - return self._client.call( + return self._client.sync_call( auth_ctx.ctx(), 'start_action', action_name=action_name, @@ -363,7 +349,7 @@ class EngineClient(base.Engine): ) def on_task_state_change(self, task_ex_id, state, state_info=None): - return self._client.call( + return self._client.sync_call( auth_ctx.ctx(), 'on_task_state_change', task_ex_id=task_ex_id, @@ -389,7 +375,7 @@ class EngineClient(base.Engine): :return: Task. """ - return self._client.call( + return self._client.sync_call( auth_ctx.ctx(), 'on_action_complete', action_ex_id=action_ex_id, @@ -405,7 +391,7 @@ class EngineClient(base.Engine): :return: Workflow execution. """ - return self._client.call( + return self._client.sync_call( auth_ctx.ctx(), 'pause_workflow', execution_id=wf_ex_id @@ -442,7 +428,7 @@ class EngineClient(base.Engine): :return: Workflow execution. """ - return self._client.call( + return self._client.sync_call( auth_ctx.ctx(), 'resume_workflow', wf_ex_id=wf_ex_id, @@ -463,7 +449,7 @@ class EngineClient(base.Engine): :return: Workflow execution, model.Execution """ - return self._client.call( + return self._client.sync_call( auth_ctx.ctx(), 'stop_workflow', execution_id=wf_ex_id, @@ -480,7 +466,7 @@ class EngineClient(base.Engine): :return: Workflow execution. """ - return self._client.call( + return self._client.sync_call( auth_ctx.ctx(), 'rollback_workflow', execution_id=wf_ex_id @@ -522,23 +508,14 @@ class ExecutorServer(object): class ExecutorClient(base.Executor): """RPC Executor client.""" - def __init__(self, transport): + def __init__(self, rpc_conf_dict): """Constructs an RPC client for the Executor. - :param transport: Messaging transport. - :type transport: Transport. + :param rpc_conf_dict: Dict containing RPC configuration. """ + self.topic = cfg.CONF.executor.topic - - serializer = auth_ctx.RpcContextSerializer( - auth_ctx.JsonPayloadSerializer() - ) - - self._client = messaging.RPCClient( - transport, - messaging.Target(), - serializer=serializer - ) + self._client = get_rpc_client_driver()(rpc_conf_dict) def run_action(self, action_ex_id, action_class_str, attributes, action_params, target=None, async=True): @@ -561,9 +538,8 @@ class ExecutorClient(base.Executor): 'params': action_params } - call_ctx = self._client.prepare(topic=self.topic, server=target) - - rpc_client_method = call_ctx.cast if async else call_ctx.call + rpc_client_method = (self._client.async_call + if async else self._client.sync_call) res = rpc_client_method(auth_ctx.ctx(), 'run_action', **kwargs) diff --git a/mistral/tests/unit/api/v2/test_action_executions.py b/mistral/tests/unit/api/v2/test_action_executions.py index b3a7276e..851b45b0 100644 --- a/mistral/tests/unit/api/v2/test_action_executions.py +++ b/mistral/tests/unit/api/v2/test_action_executions.py @@ -19,7 +19,9 @@ import datetime import json import mock + from oslo_config import cfg +import oslo_messaging from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models @@ -29,6 +31,9 @@ from mistral.tests.unit.api import base from mistral.workflow import states from mistral.workflow import utils as wf_utils +# This line is needed for correct initialization of messaging config. +oslo_messaging.get_transport(cfg.CONF) + ACTION_EX_DB = models.ActionExecution( id='123', @@ -146,6 +151,7 @@ MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError()) MOCK_DELETE = mock.MagicMock(return_value=None) +@mock.patch.object(rpc, '_IMPL_CLIENT', mock.Mock()) class TestActionExecutionsController(base.APITest): def setUp(self): super(TestActionExecutionsController, self).setUp() diff --git a/mistral/tests/unit/api/v2/test_executions.py b/mistral/tests/unit/api/v2/test_executions.py index 3b503264..5f2e7687 100644 --- a/mistral/tests/unit/api/v2/test_executions.py +++ b/mistral/tests/unit/api/v2/test_executions.py @@ -19,6 +19,8 @@ import datetime import json import mock +from oslo_config import cfg +import oslo_messaging import uuid from webtest import app as webtest_app @@ -31,6 +33,9 @@ from mistral.tests.unit.api import base from mistral import utils from mistral.workflow import states +# This line is needed for correct initialization of messaging config. +oslo_messaging.get_transport(cfg.CONF) + WF_EX = models.WorkflowExecution( id='123e4567-e89b-12d3-a456-426655440000', @@ -117,6 +122,7 @@ MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError()) MOCK_ACTION_EXC = mock.MagicMock(side_effect=exc.ActionException()) +@mock.patch.object(rpc, '_IMPL_CLIENT', mock.Mock()) class TestExecutionsController(base.APITest): @mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX) def test_get(self): diff --git a/mistral/tests/unit/engine/base.py b/mistral/tests/unit/engine/base.py index 568ca3b3..89d2401a 100644 --- a/mistral/tests/unit/engine/base.py +++ b/mistral/tests/unit/engine/base.py @@ -99,11 +99,10 @@ class EngineTestCase(base.DbTestCase): # Drop all RPC objects (transport, clients). rpc.cleanup() - transport = rpc.get_transport() - self.engine_client = rpc.EngineClient(transport) - self.executor_client = rpc.ExecutorClient(transport) + self.engine_client = rpc.get_engine_client() + self.executor_client = rpc.get_executor_client() self.engine = def_eng.DefaultEngine(self.engine_client) self.executor = def_exec.DefaultExecutor(self.engine_client) diff --git a/mistral/tests/unit/engine/rpc_direct/__init__.py b/mistral/tests/unit/engine/rpc/__init__.py similarity index 100% rename from mistral/tests/unit/engine/rpc_direct/__init__.py rename to mistral/tests/unit/engine/rpc/__init__.py diff --git a/mistral/tests/unit/engine/rpc_direct/test_rpc.py b/mistral/tests/unit/engine/rpc/test_rpc.py similarity index 88% rename from mistral/tests/unit/engine/rpc_direct/test_rpc.py rename to mistral/tests/unit/engine/rpc/test_rpc.py index e5824303..91a94dd6 100644 --- a/mistral/tests/unit/engine/rpc_direct/test_rpc.py +++ b/mistral/tests/unit/engine/rpc/test_rpc.py @@ -19,10 +19,13 @@ from mistral.utils import rpc_utils class RPCTest(base.EngineTestCase): + def setUp(self): + super(RPCTest, self).setUp() + def test_get_rabbit_config(self): conf = cfg.CONF - rpc_info = rpc_utils.get_rabbit_info_from_oslo(conf.engine) + rpc_info = rpc_utils._get_rabbit_info_from_oslo(conf.engine) self.assertDictEqual( { @@ -34,8 +37,8 @@ class RPCTest(base.EngineTestCase): 'host': 'localhost', 'exchange': 'openstack', 'password': 'guest', - 'auto_delete': False, 'durable_queues': False, + 'auto_delete': False }, rpc_info ) diff --git a/mistral/tests/unit/engine/test_default_engine.py b/mistral/tests/unit/engine/test_default_engine.py index 375f55d5..28ff8c30 100644 --- a/mistral/tests/unit/engine/test_default_engine.py +++ b/mistral/tests/unit/engine/test_default_engine.py @@ -452,7 +452,7 @@ class DefaultEngineTest(base.DbTestCase): class DefaultEngineWithTransportTest(eng_test_base.EngineTestCase): def test_engine_client_remote_error(self): mocked = mock.Mock() - mocked.call.side_effect = rpc_client.RemoteError( + mocked.sync_call.side_effect = rpc_client.RemoteError( 'InputException', 'Input is wrong' ) @@ -468,7 +468,7 @@ class DefaultEngineWithTransportTest(eng_test_base.EngineTestCase): def test_engine_client_remote_error_arbitrary(self): mocked = mock.Mock() - mocked.call.side_effect = KeyError('wrong key') + mocked.sync_call.side_effect = KeyError('wrong key') self.engine_client._client = mocked exception = self.assertRaises( diff --git a/mistral/utils/rpc_utils.py b/mistral/utils/rpc_utils.py index ad006a76..efd03dbf 100644 --- a/mistral/utils/rpc_utils.py +++ b/mistral/utils/rpc_utils.py @@ -19,7 +19,15 @@ from oslo_config import cfg CONF = cfg.CONF -def get_rabbit_info_from_oslo(additional_conf): +def get_rpc_info_from_oslo(additional_conf=None): + if CONF.rpc_backend in ['rabbit', 'fake']: + return _get_rabbit_info_from_oslo(additional_conf) + else: + # TODO(nmakhotkin) Implement. + raise NotImplementedError + + +def _get_rabbit_info_from_oslo(additional_conf): return { 'user_id': CONF.oslo_messaging_rabbit.rabbit_userid, 'password': CONF.oslo_messaging_rabbit.rabbit_password,