Merge "Integrating new RPC layer with Mistral"

This commit is contained in:
Jenkins 2016-07-06 05:25:06 +00:00 committed by Gerrit Code Review
commit 320dd398ad
10 changed files with 79 additions and 105 deletions

View File

@ -25,9 +25,10 @@ eventlet.monkey_patch(
thread=False if '--use-debugger' in sys.argv else True,
time=True)
import os
import six
import time
# If ../mistral/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
@ -39,7 +40,6 @@ if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'mistral', '__init__.py')):
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
if six.PY3 is True:
import socketserver
@ -51,7 +51,6 @@ from wsgiref.simple_server import WSGIServer
from mistral.api import app
from mistral import config
from mistral import context as ctx
from mistral.db.v2 import api as db_api
from mistral.engine import default_engine as def_eng
from mistral.engine import default_executor as def_executor
@ -59,6 +58,7 @@ from mistral.engine.rpc import rpc
from mistral.services import expiration_policy
from mistral.services import scheduler
from mistral.utils import profiler
from mistral.utils import rpc_utils
from mistral import version
@ -67,51 +67,33 @@ CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def launch_executor(transport):
def launch_executor():
profiler.setup('mistral-executor', cfg.CONF.executor.host)
target = messaging.Target(
topic=cfg.CONF.executor.topic,
server=cfg.CONF.executor.host
)
executor_v2 = def_executor.DefaultExecutor(rpc.get_engine_client())
executor_endpoint = rpc.ExecutorServer(executor_v2)
endpoints = [rpc.ExecutorServer(executor_v2)]
server = messaging.get_rpc_server(
transport,
target,
endpoints,
executor='eventlet',
serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer())
executor_server = rpc.get_rpc_server_driver()(
rpc_utils.get_rpc_info_from_oslo(CONF.executor)
)
executor_server.register_endpoint(executor_endpoint)
executor_v2.register_membership()
try:
server.start()
while True:
time.sleep(604800)
executor_server.run()
except (KeyboardInterrupt, SystemExit):
pass
finally:
print("Stopping executor service...")
server.stop()
server.wait()
def launch_engine(transport):
def launch_engine():
profiler.setup('mistral-engine', cfg.CONF.engine.host)
target = messaging.Target(
topic=cfg.CONF.engine.topic,
server=cfg.CONF.engine.host
)
engine_v2 = def_eng.DefaultEngine(rpc.get_engine_client())
endpoints = [rpc.EngineServer(engine_v2)]
engine_endpoint = rpc.EngineServer(engine_v2)
# Setup scheduler in engine.
db_api.setup_db()
@ -120,37 +102,30 @@ def launch_engine(transport):
# Setup expiration policy
expiration_policy.setup()
server = messaging.get_rpc_server(
transport,
target,
endpoints,
executor='eventlet',
serializer=ctx.RpcContextSerializer(ctx.JsonPayloadSerializer())
engine_server = rpc.get_rpc_server_driver()(
rpc_utils.get_rpc_info_from_oslo(CONF.engine)
)
engine_server.register_endpoint(engine_endpoint)
engine_v2.register_membership()
try:
server.start()
while True:
time.sleep(604800)
engine_server.run()
except (KeyboardInterrupt, SystemExit):
pass
finally:
print("Stopping engine service...")
server.stop()
server.wait()
class ThreadingWSGIServer(socketserver.ThreadingMixIn, WSGIServer):
pass
def launch_api(transport):
def launch_api():
host = cfg.CONF.api.host
port = cfg.CONF.api.port
server = simple_server.make_server(
api_server = simple_server.make_server(
host,
port,
app.setup_app(),
@ -160,12 +135,12 @@ def launch_api(transport):
LOG.info("Mistral API is serving on http://%s:%s (PID=%s)" %
(host, port, os.getpid()))
server.serve_forever()
api_server.serve_forever()
def launch_any(transport, options):
def launch_any(options):
# Launch the servers on different threads.
threads = [eventlet.spawn(LAUNCH_OPTIONS[option], transport)
threads = [eventlet.spawn(LAUNCH_OPTIONS[option])
for option in options]
print('Server started.')
@ -250,11 +225,11 @@ def main():
# servers are launched on the same process. Otherwise, messages do not
# get delivered if the Mistral servers are launched on different
# processes because the "fake" transport is using an in process queue.
transport = rpc.get_transport()
rpc.get_transport()
if cfg.CONF.server == ['all']:
# Launch all servers.
launch_any(transport, LAUNCH_OPTIONS.keys())
launch_any(LAUNCH_OPTIONS.keys())
else:
# Validate launch option.
if set(cfg.CONF.server) - set(LAUNCH_OPTIONS.keys()):
@ -262,7 +237,7 @@ def main():
'api, engine, and executor.')
# Launch distinct set of server(s).
launch_any(transport, set(cfg.CONF.server))
launch_any(set(cfg.CONF.server))
except RuntimeError as excp:
sys.stderr.write("ERROR: %s\n" % excp)

View File

@ -50,7 +50,8 @@ rpc_impl_opt = cfg.StrOpt(
'rpc_implementation',
default='oslo',
choices=['oslo', 'kombu'],
help='Specifies RPC implementation for RPC client and server.'
help='Specifies RPC implementation for RPC client and server. Support of '
'kombu driver is experimental.'
)
pecan_opts = [

View File

@ -17,13 +17,12 @@ from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_messaging.rpc import client
from oslo_messaging.rpc import dispatcher
from oslo_messaging.rpc import server
from stevedore import driver
from mistral import context as auth_ctx
from mistral.engine import base
from mistral import exceptions as exc
from mistral.utils import rpc_utils
from mistral.workflow import utils as wf_utils
@ -38,16 +37,6 @@ _ENGINE_CLIENT = None
_EXECUTOR_CLIENT = None
def get_rpc_server(transport, target, endpoints, executor='blocking',
serializer=None):
return server.RPCServer(
transport,
target,
dispatcher.RPCDispatcher(endpoints, serializer),
executor
)
def cleanup():
"""Intended to be used by tests to recreate all RPC related objects."""
@ -73,7 +62,9 @@ def get_engine_client():
global _ENGINE_CLIENT
if not _ENGINE_CLIENT:
_ENGINE_CLIENT = EngineClient(get_transport())
_ENGINE_CLIENT = EngineClient(
rpc_utils.get_rpc_info_from_oslo(cfg.CONF.engine)
)
return _ENGINE_CLIENT
@ -82,7 +73,9 @@ def get_executor_client():
global _EXECUTOR_CLIENT
if not _EXECUTOR_CLIENT:
_EXECUTOR_CLIENT = ExecutorClient(get_transport())
_EXECUTOR_CLIENT = ExecutorClient(
rpc_utils.get_rpc_info_from_oslo(cfg.CONF.executor)
)
return _EXECUTOR_CLIENT
@ -316,19 +309,12 @@ def wrap_messaging_exception(method):
class EngineClient(base.Engine):
"""RPC Engine client."""
def __init__(self, transport):
def __init__(self, rpc_conf_dict):
"""Constructs an RPC client for engine.
:param transport: Messaging transport.
:param rpc_conf_dict: Dict containing RPC configuration.
"""
serializer = auth_ctx.RpcContextSerializer(
auth_ctx.JsonPayloadSerializer())
self._client = messaging.RPCClient(
transport,
messaging.Target(topic=cfg.CONF.engine.topic),
serializer=serializer
)
self._client = get_rpc_client_driver()(rpc_conf_dict)
@wrap_messaging_exception
def start_workflow(self, wf_identifier, wf_input, description='',
@ -337,7 +323,7 @@ class EngineClient(base.Engine):
:return: Workflow execution.
"""
return self._client.call(
return self._client.sync_call(
auth_ctx.ctx(),
'start_workflow',
workflow_identifier=wf_identifier,
@ -353,7 +339,7 @@ class EngineClient(base.Engine):
:return: Action execution.
"""
return self._client.call(
return self._client.sync_call(
auth_ctx.ctx(),
'start_action',
action_name=action_name,
@ -363,7 +349,7 @@ class EngineClient(base.Engine):
)
def on_task_state_change(self, task_ex_id, state, state_info=None):
return self._client.call(
return self._client.sync_call(
auth_ctx.ctx(),
'on_task_state_change',
task_ex_id=task_ex_id,
@ -389,7 +375,7 @@ class EngineClient(base.Engine):
:return: Task.
"""
return self._client.call(
return self._client.sync_call(
auth_ctx.ctx(),
'on_action_complete',
action_ex_id=action_ex_id,
@ -405,7 +391,7 @@ class EngineClient(base.Engine):
:return: Workflow execution.
"""
return self._client.call(
return self._client.sync_call(
auth_ctx.ctx(),
'pause_workflow',
execution_id=wf_ex_id
@ -442,7 +428,7 @@ class EngineClient(base.Engine):
:return: Workflow execution.
"""
return self._client.call(
return self._client.sync_call(
auth_ctx.ctx(),
'resume_workflow',
wf_ex_id=wf_ex_id,
@ -463,7 +449,7 @@ class EngineClient(base.Engine):
:return: Workflow execution, model.Execution
"""
return self._client.call(
return self._client.sync_call(
auth_ctx.ctx(),
'stop_workflow',
execution_id=wf_ex_id,
@ -480,7 +466,7 @@ class EngineClient(base.Engine):
:return: Workflow execution.
"""
return self._client.call(
return self._client.sync_call(
auth_ctx.ctx(),
'rollback_workflow',
execution_id=wf_ex_id
@ -522,23 +508,14 @@ class ExecutorServer(object):
class ExecutorClient(base.Executor):
"""RPC Executor client."""
def __init__(self, transport):
def __init__(self, rpc_conf_dict):
"""Constructs an RPC client for the Executor.
:param transport: Messaging transport.
:type transport: Transport.
:param rpc_conf_dict: Dict containing RPC configuration.
"""
self.topic = cfg.CONF.executor.topic
serializer = auth_ctx.RpcContextSerializer(
auth_ctx.JsonPayloadSerializer()
)
self._client = messaging.RPCClient(
transport,
messaging.Target(),
serializer=serializer
)
self._client = get_rpc_client_driver()(rpc_conf_dict)
def run_action(self, action_ex_id, action_class_str, attributes,
action_params, target=None, async=True):
@ -561,9 +538,8 @@ class ExecutorClient(base.Executor):
'params': action_params
}
call_ctx = self._client.prepare(topic=self.topic, server=target)
rpc_client_method = call_ctx.cast if async else call_ctx.call
rpc_client_method = (self._client.async_call
if async else self._client.sync_call)
res = rpc_client_method(auth_ctx.ctx(), 'run_action', **kwargs)

View File

@ -19,7 +19,9 @@ import datetime
import json
import mock
from oslo_config import cfg
import oslo_messaging
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models
@ -29,6 +31,9 @@ from mistral.tests.unit.api import base
from mistral.workflow import states
from mistral.workflow import utils as wf_utils
# This line is needed for correct initialization of messaging config.
oslo_messaging.get_transport(cfg.CONF)
ACTION_EX_DB = models.ActionExecution(
id='123',
@ -146,6 +151,7 @@ MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError())
MOCK_DELETE = mock.MagicMock(return_value=None)
@mock.patch.object(rpc, '_IMPL_CLIENT', mock.Mock())
class TestActionExecutionsController(base.APITest):
def setUp(self):
super(TestActionExecutionsController, self).setUp()

View File

@ -19,6 +19,8 @@ import datetime
import json
import mock
from oslo_config import cfg
import oslo_messaging
import uuid
from webtest import app as webtest_app
@ -31,6 +33,9 @@ from mistral.tests.unit.api import base
from mistral import utils
from mistral.workflow import states
# This line is needed for correct initialization of messaging config.
oslo_messaging.get_transport(cfg.CONF)
WF_EX = models.WorkflowExecution(
id='123e4567-e89b-12d3-a456-426655440000',
@ -117,6 +122,7 @@ MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError())
MOCK_ACTION_EXC = mock.MagicMock(side_effect=exc.ActionException())
@mock.patch.object(rpc, '_IMPL_CLIENT', mock.Mock())
class TestExecutionsController(base.APITest):
@mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX)
def test_get(self):

View File

@ -99,11 +99,10 @@ class EngineTestCase(base.DbTestCase):
# Drop all RPC objects (transport, clients).
rpc.cleanup()
transport = rpc.get_transport()
self.engine_client = rpc.EngineClient(transport)
self.executor_client = rpc.ExecutorClient(transport)
self.engine_client = rpc.get_engine_client()
self.executor_client = rpc.get_executor_client()
self.engine = def_eng.DefaultEngine(self.engine_client)
self.executor = def_exec.DefaultExecutor(self.engine_client)

View File

@ -19,10 +19,13 @@ from mistral.utils import rpc_utils
class RPCTest(base.EngineTestCase):
def setUp(self):
super(RPCTest, self).setUp()
def test_get_rabbit_config(self):
conf = cfg.CONF
rpc_info = rpc_utils.get_rabbit_info_from_oslo(conf.engine)
rpc_info = rpc_utils._get_rabbit_info_from_oslo(conf.engine)
self.assertDictEqual(
{
@ -34,8 +37,8 @@ class RPCTest(base.EngineTestCase):
'host': 'localhost',
'exchange': 'openstack',
'password': 'guest',
'auto_delete': False,
'durable_queues': False,
'auto_delete': False
},
rpc_info
)

View File

@ -452,7 +452,7 @@ class DefaultEngineTest(base.DbTestCase):
class DefaultEngineWithTransportTest(eng_test_base.EngineTestCase):
def test_engine_client_remote_error(self):
mocked = mock.Mock()
mocked.call.side_effect = rpc_client.RemoteError(
mocked.sync_call.side_effect = rpc_client.RemoteError(
'InputException',
'Input is wrong'
)
@ -468,7 +468,7 @@ class DefaultEngineWithTransportTest(eng_test_base.EngineTestCase):
def test_engine_client_remote_error_arbitrary(self):
mocked = mock.Mock()
mocked.call.side_effect = KeyError('wrong key')
mocked.sync_call.side_effect = KeyError('wrong key')
self.engine_client._client = mocked
exception = self.assertRaises(

View File

@ -19,7 +19,15 @@ from oslo_config import cfg
CONF = cfg.CONF
def get_rabbit_info_from_oslo(additional_conf):
def get_rpc_info_from_oslo(additional_conf=None):
if CONF.rpc_backend in ['rabbit', 'fake']:
return _get_rabbit_info_from_oslo(additional_conf)
else:
# TODO(nmakhotkin) Implement.
raise NotImplementedError
def _get_rabbit_info_from_oslo(additional_conf):
return {
'user_id': CONF.oslo_messaging_rabbit.rabbit_userid,
'password': CONF.oslo_messaging_rabbit.rabbit_password,