diff --git a/README.rst b/README.rst index 8ff7850b9..6d965c28c 100644 --- a/README.rst +++ b/README.rst @@ -21,12 +21,17 @@ To run Mistral API server perform the following command in a shell: Note that an example configuration file can be found in etc/mistral.conf.example. +### Running Mistral Engines +To run Mistral Engine perform the following command in a shell: + +*tox -evenv -- python mistral/cmd/launch.py --server engine --config-file path_to_config* + ### Running Mistral Task Executors To run Mistral Task Executor instance perform the following command in a shell: *tox -evenv -- python mistral/cmd/launch.py --server executor --config-file path_to_config* -Note that at least one Executor instance should be running so that workflow tasks are processed by Mistral. +Note that at least one Engine instance and one Executor instance should be running so that workflow tasks are processed by Mistral. ### Debugging To debug using a local engine and executor without dependencies such as RabbitMQ, create etc/mistral.conf with the following settings:: diff --git a/etc/mistral.conf.example b/etc/mistral.conf.example index 168fd743c..8dcd7f7fc 100644 --- a/etc/mistral.conf.example +++ b/etc/mistral.conf.example @@ -18,6 +18,7 @@ default_log_levels = mistral=INFO,mistral.cmd.api=INFO,mistral.api=DEBUG,wsme=DE #rpc_backend=rabbit # Specifies which mistral server to start by the launch script. (string value) +# Choices are all, api, engine, and executor. #server=all [api] @@ -29,6 +30,16 @@ port = 8989 # Mistral engine class (string value) #engine=mistral.engine.scalable.engine +# Name of the engine node. This can be an opaque identifier. +# It is not necessarily a hostname, FQDN, or IP address. (string value) +#host=0.0.0.0 + +# The message topic that the engine listens on. (string value) +#topic=engine + +# The version of the engine. (string value) +#version=1.0 + [pecan] auth_enable = True diff --git a/mistral/api/app.py b/mistral/api/app.py index 72d85ccf1..3d992593b 100644 --- a/mistral/api/app.py +++ b/mistral/api/app.py @@ -19,10 +19,10 @@ import pecan from oslo.config import cfg from mistral import context as ctx -from mistral.engine import engine from mistral.db import api as db_api from mistral.services import periodic from mistral.api import access_control +from mistral.api.hooks import engine def get_pecan_config(): @@ -48,13 +48,14 @@ def setup_app(config=None, transport=None): app_conf = dict(config.app) db_api.setup_db() - engine.load_engine(transport) + ##TODO(akuznetsov) move this to trigger scheduling to separate process - periodic.setup() + periodic.setup(transport) app = pecan.make_app( app_conf.pop('root'), - hooks=lambda: [ctx.ContextHook()], + hooks=lambda: [ctx.ContextHook(), + engine.EngineHook(transport=transport)], logging=getattr(config, 'logging', {}), **app_conf ) diff --git a/mistral/api/controllers/v1/execution.py b/mistral/api/controllers/v1/execution.py index 2d90eb6fd..56b6c9d44 100644 --- a/mistral/api/controllers/v1/execution.py +++ b/mistral/api/controllers/v1/execution.py @@ -16,6 +16,7 @@ import json +import pecan from pecan import rest from pecan import abort from wsme import types as wtypes @@ -26,7 +27,7 @@ from mistral.api.controllers.v1 import task from mistral.openstack.common import log as logging from mistral.api.controllers import resource from mistral.db import api as db_api -from mistral.engine import engine + LOG = logging.getLogger(__name__) @@ -56,7 +57,8 @@ class Execution(resource.Resource): for key, val in d.items(): if hasattr(e, key): - if key == 'context' and val: + # Nonetype check for dictionary must be explicit + if key == 'context' and val is not None: val = json.dumps(val) setattr(e, key, val) @@ -101,11 +103,13 @@ class ExecutionsController(rest.RestController): def post(self, workbook_name, execution): LOG.debug("Create execution [workbook_name=%s, execution=%s]" % (workbook_name, execution)) + try: context = None if execution.context: context = json.loads(execution.context) + engine = pecan.request.context['engine'] values = engine.start_workflow_execution(execution.workbook_name, execution.task, context) diff --git a/mistral/api/controllers/v1/task.py b/mistral/api/controllers/v1/task.py index eeb541ba6..c026dc9fa 100644 --- a/mistral/api/controllers/v1/task.py +++ b/mistral/api/controllers/v1/task.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import pecan from pecan import rest from pecan import abort from wsme import types as wtypes @@ -22,7 +23,7 @@ import wsmeext.pecan as wsme_pecan from mistral.openstack.common import log as logging from mistral.api.controllers import resource from mistral.db import api as db_api -from mistral.engine import engine + LOG = logging.getLogger(__name__) @@ -67,6 +68,7 @@ class TasksController(rest.RestController): (workbook_name, execution_id, id, task)) # TODO(rakhmerov): pass task result once it's implemented + engine = pecan.request.context['engine'] values = engine.convey_task_result(workbook_name, execution_id, id, diff --git a/mistral/api/hooks/__init__.py b/mistral/api/hooks/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/mistral/api/hooks/engine.py b/mistral/api/hooks/engine.py new file mode 100644 index 000000000..cf13980b9 --- /dev/null +++ b/mistral/api/hooks/engine.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pecan.hooks import PecanHook + +from mistral import engine +from mistral.engine import client +from mistral.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +class EngineHook(PecanHook): + + def __init__(self, transport=None): + self.transport = engine.get_transport(transport) + self.engine = client.EngineClient(self.transport) + + def before(self, state): + state.request.context['engine'] = self.engine diff --git a/mistral/cmd/launch.py b/mistral/cmd/launch.py index fcef5a35e..7e72ec8ec 100755 --- a/mistral/cmd/launch.py +++ b/mistral/cmd/launch.py @@ -37,12 +37,10 @@ from oslo import messaging from oslo.config import cfg from mistral import config -from mistral.engine import engine +from mistral import engine from mistral.engine.scalable.executor import server - from mistral.api import app from wsgiref import simple_server - from mistral.openstack.common import log as logging @@ -50,18 +48,25 @@ LOG = logging.getLogger(__name__) def launch_executor(transport): - # TODO(rakhmerov): This is a temporary hack. - # We have to initialize engine in executor process because - # executor now calls engine.convey_task_result() directly. - engine.load_engine(transport) target = messaging.Target(topic=cfg.CONF.executor.topic, server=cfg.CONF.executor.host) - endpoints = [server.Executor()] - ex_server = messaging.get_rpc_server(transport, target, endpoints) + endpoints = [server.Executor(transport)] + ex_server = messaging.get_rpc_server( + transport, target, endpoints, executor='eventlet') ex_server.start() ex_server.wait() +def launch_engine(transport): + target = messaging.Target(topic=cfg.CONF.engine.topic, + server=cfg.CONF.engine.host) + endpoints = [engine.Engine(transport)] + en_server = messaging.get_rpc_server( + transport, target, endpoints, executor='eventlet') + en_server.start() + en_server.wait() + + def launch_api(transport): host = cfg.CONF.api.host port = cfg.CONF.api.port @@ -75,9 +80,9 @@ def launch_api(transport): def launch_all(transport): # Launch the servers on different threads. t1 = eventlet.spawn(launch_executor, transport) - t2 = eventlet.spawn(launch_api, transport) - t1.wait() - t2.wait() + t2 = eventlet.spawn(launch_engine, transport) + t3 = eventlet.spawn(launch_api, transport) + t1.wait() and t2.wait() and t3.wait() def main(): @@ -90,6 +95,7 @@ def main(): launch_options = { 'all': launch_all, 'api': launch_api, + 'engine': launch_engine, 'executor': launch_executor } diff --git a/mistral/config.py b/mistral/config.py index b3fdb509b..99ac50a00 100644 --- a/mistral/config.py +++ b/mistral/config.py @@ -31,7 +31,15 @@ api_opts = [ engine_opts = [ cfg.StrOpt('engine', default='mistral.engine.scalable.engine', - help='Mistral engine class') + help='Mistral engine class'), + cfg.StrOpt('host', default='0.0.0.0', + help='Name of the engine node. This can be an opaque ' + 'identifier. It is not necessarily a hostname, ' + 'FQDN, or IP address.'), + cfg.StrOpt('topic', default='engine', + help='The message topic that the engine listens on.'), + cfg.StrOpt('version', default='1.0', + help='The version of the engine.') ] pecan_opts = [ @@ -81,7 +89,7 @@ executor_opts = [ launch_opt = cfg.StrOpt( 'server', default='all', - choices=('all', 'api', 'executor'), + choices=('all', 'api', 'engine', 'executor'), help='Specifies which mistral server to start by the launch script.' ) diff --git a/mistral/engine/__init__.py b/mistral/engine/__init__.py index e69de29bb..07fe7411d 100644 --- a/mistral/engine/__init__.py +++ b/mistral/engine/__init__.py @@ -0,0 +1,121 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo import messaging +from oslo.config import cfg + +from mistral.openstack.common import importutils +from mistral.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +def get_transport(transport=None): + return (transport if transport else messaging.get_transport(cfg.CONF)) + + +class Engine(object): + + def __init__(self, transport=None): + module_name = cfg.CONF.engine.engine + module = importutils.import_module(module_name) + self.transport = get_transport(transport) + self.backend = module.get_engine() + self.backend.transport = self.transport + + def start_workflow_execution(self, cntx, **kwargs): + """Starts a workflow execution based on the specified workbook name + and target task. + + :param cntx: a request context dict + :type cntx: dict + :param kwargs: a dict of method arguments + :type kwargs: dict + :return: Workflow execution. + """ + workbook_name = kwargs.get('workbook_name') + task_name = kwargs.get('task_name') + context = kwargs.get('context', None) + return self.backend.start_workflow_execution( + workbook_name, task_name, context) + + def stop_workflow_execution(self, cntx, **kwargs): + """Stops the workflow execution with the given id. + + :param cntx: a request context dict + :type cntx: dict + :param kwargs: a dict of method arguments + :type kwargs: dict + :return: Workflow execution. + """ + workbook_name = kwargs.get('workbook_name') + execution_id = kwargs.get('execution_id') + return self.backend.stop_workflow_execution( + workbook_name, execution_id) + + def convey_task_result(self, cntx, **kwargs): + """Conveys task result to Mistral Engine. + + This method should be used by clients of Mistral Engine to update + state of a task once task action has been performed. One of the + clients of this method is Mistral REST API server that receives + task result from the outside action handlers. + + Note: calling this method serves an event notifying Mistral that + it possibly needs to move the workflow on, i.e. run other workflow + tasks for which all dependencies are satisfied. + + :param cntx: a request context dict + :type cntx: dict + :param kwargs: a dict of method arguments + :type kwargs: dict + :return: Task. + """ + workbook_name = kwargs.get('workbook_name') + execution_id = kwargs.get('execution_id') + task_id = kwargs.get('task_id') + state = kwargs.get('state') + result = kwargs.get('result') + return self.backend.convey_task_result( + workbook_name, execution_id, task_id, state, result) + + def get_workflow_execution_state(self, cntx, **kwargs): + """Gets the workflow execution state. + + :param cntx: a request context dict + :type cntx: dict + :param kwargs: a dict of method arguments + :type kwargs: dict + :return: Current workflow state. + """ + workbook_name = kwargs.get('workbook_name') + execution_id = kwargs.get('execution_id') + return self.backend.get_workflow_execution_state( + workbook_name, execution_id) + + def get_task_state(self, cntx, **kwargs): + """Gets task state. + + :param cntx: a request context dict + :type cntx: dict + :param kwargs: a dict of method arguments + :type kwargs: dict + :return: Current task state. + """ + workbook_name = kwargs.get('workbook_name') + execution_id = kwargs.get('execution_id') + task_id = kwargs.get('task_id') + return self.backend.get_task_state( + workbook_name, execution_id, task_id) diff --git a/mistral/engine/client.py b/mistral/engine/client.py new file mode 100644 index 000000000..bd5a29e65 --- /dev/null +++ b/mistral/engine/client.py @@ -0,0 +1,123 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo import messaging +from oslo.config import cfg + +from mistral.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +class EngineClient(object): + """ + RPC client for the Engine. + """ + + def __init__(self, transport): + """Construct an RPC client for the Engine. + + :param transport: a messaging transport handle + :type transport: Transport + """ + target = messaging.Target(topic=cfg.CONF.engine.topic) + self._client = messaging.RPCClient(transport, target) + + def start_workflow_execution(self, workbook_name, task_name, context=None): + """Starts a workflow execution based on the specified workbook name + and target task. + + :param workbook_name: Workbook name + :param task_name: Target task name + :param context: Execution context which defines a workflow input + :return: Workflow execution. + """ + # TODO(m4dcoder): refactor auth context + cntx = {} + kwargs = {'workbook_name': workbook_name, + 'task_name': task_name, + 'context': context} + return self._client.call(cntx, 'start_workflow_execution', **kwargs) + + def stop_workflow_execution(self, workbook_name, execution_id): + """Stops the workflow execution with the given id. + + :param workbook_name: Workbook name. + :param execution_id: Workflow execution id. + :return: Workflow execution. + """ + # TODO(m4dcoder): refactor auth context + cntx = {} + kwargs = {'workbook_name': workbook_name, + 'execution_id': execution_id} + return self._client.call(cntx, 'stop_workflow_execution', **kwargs) + + def convey_task_result(self, workbook_name, execution_id, + task_id, state, result): + """Conveys task result to Mistral Engine. + + This method should be used by clients of Mistral Engine to update + state of a task once task action has been performed. One of the + clients of this method is Mistral REST API server that receives + task result from the outside action handlers. + + Note: calling this method serves an event notifying Mistral that + it possibly needs to move the workflow on, i.e. run other workflow + tasks for which all dependencies are satisfied. + + :param workbook_name: Workbook name. + :param execution_id: Workflow execution id. + :param task_id: Task id. + :param state: New task state. + :param result: Task result data. + :return: Task. + """ + # TODO(m4dcoder): refactor auth context + cntx = {} + kwargs = {'workbook_name': workbook_name, + 'execution_id': execution_id, + 'task_id': task_id, + 'state': state, + 'result': result} + return self._client.call(cntx, 'convey_task_result', **kwargs) + + def get_workflow_execution_state(self, workbook_name, execution_id): + """Gets the workflow execution state. + + :param workbook_name: Workbook name. + :param execution_id: Workflow execution id. + :return: Current workflow state. + """ + # TODO(m4dcoder): refactor auth context + cntx = {} + kwargs = {'workbook_name': workbook_name, + 'execution_id': execution_id} + return self._client.call( + cntx, 'get_workflow_execution_state', **kwargs) + + def get_task_state(self, workbook_name, execution_id, task_id): + """Gets task state. + + :param workbook_name: Workbook name. + :param execution_id: Workflow execution id. + :param task_id: Task id. + :return: Current task state. + """ + # TODO(m4dcoder): refactor auth context + cntx = {} + kwargs = {'workbook_name': workbook_name, + 'executioin_id': execution_id, + 'task_id': task_id} + return self._client.call(cntx, 'get_task_state', **kwargs) diff --git a/mistral/engine/engine.py b/mistral/engine/engine.py deleted file mode 100644 index a56fe8414..000000000 --- a/mistral/engine/engine.py +++ /dev/null @@ -1,99 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright 2013 - Mirantis, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Facade interface to Mistral Engine that provides control over lifecycle -of workflow executions. -""" - -from mistral.openstack.common import importutils -from oslo.config import cfg - -_engine = None - - -def load_engine(transport): - global _engine - module_name = cfg.CONF.engine.engine - module = importutils.import_module(module_name) - _engine = module.get_engine() - _engine.transport = transport - - -def start_workflow_execution(workbook_name, task_name, context=None): - """Starts a workflow execution based on the specified workbook name - and target task. - - :param workbook_name: Workbook name - :param task_name: Target task name - :param context: Execution context which defines a workflow input - :return: Workflow execution. - """ - return _engine.start_workflow_execution(workbook_name, task_name, context) - - -def stop_workflow_execution(workbook_name, execution_id): - """Stops the workflow execution with the given id. - - :param workbook_name: Workbook name. - :param execution_id: Workflow execution id. - :return: Workflow execution. - """ - return _engine.stop_workflow_execution(workbook_name, execution_id) - - -def convey_task_result(workbook_name, execution_id, task_id, state, result): - """Conveys task result to Mistral Engine. - - This method should be used by clients of Mistral Engine to update - state of a task once task action has been performed. One of the - clients of this method is Mistral REST API server that receives - task result from the outside action handlers. - - Note: calling this method serves an event notifying Mistral that - it possibly needs to move the workflow on, i.e. run other workflow - tasks for which all dependencies are satisfied. - - :param workbook_name: Workbook name. - :param execution_id: Workflow execution id. - :param task_id: Task id. - :param state: New task state. - :param result: Task result data. - :return: Task. - """ - return _engine.convey_task_result(workbook_name, execution_id, task_id, - state, result) - - -def get_workflow_execution_state(workbook_name, execution_id): - """Gets the workflow execution state. - - :param workbook_name: Workbook name. - :param execution_id: Workflow execution id. - :return: Current workflow state. - """ - return _engine.get_workflow_execution_state(workbook_name, execution_id) - - -def get_task_state(workbook_name, execution_id, task_id): - """Gets task state. - - :param workbook_name: Workbook name. - :param execution_id: Workflow execution id. - :param task_id: Task id. - :return: Current task state. - """ - return _engine.get_task_state(workbook_name, execution_id, task_id) diff --git a/mistral/engine/scalable/executor/server.py b/mistral/engine/scalable/executor/server.py index 564b9cb1b..5d351b472 100644 --- a/mistral/engine/scalable/executor/server.py +++ b/mistral/engine/scalable/executor/server.py @@ -19,7 +19,8 @@ from oslo.config import cfg from mistral.openstack.common import log as logging from mistral.db import api as db_api from mistral import exceptions as exc -from mistral.engine import engine +from mistral import engine +from mistral.engine import client from mistral.engine import states from mistral.actions import action_factory as a_f @@ -30,6 +31,10 @@ WORKFLOW_TRACE = logging.getLogger(cfg.CONF.workflow_trace_log_name) class Executor(object): + def __init__(self, transport=None): + self.transport = engine.get_transport(transport) + self.engine = client.EngineClient(self.transport) + def _do_task_action(self, task): """Executes the action defined by the task and return result. @@ -49,19 +54,19 @@ class Executor(object): except exc.ActionException: state, result = states.ERROR, None - engine.convey_task_result(task['workbook_name'], - task['execution_id'], - task['id'], - state, result) + self.engine.convey_task_result(task['workbook_name'], + task['execution_id'], + task['id'], + state, result) else: try: action.run() except exc.ActionException: - engine.convey_task_result(task['workbook_name'], - task['execution_id'], - task['id'], - states.ERROR, None) + self.engine.convey_task_result(task['workbook_name'], + task['execution_id'], + task['id'], + states.ERROR, None) def _handle_task_error(self, task, exception): """Handle exception from the task execution. diff --git a/mistral/services/periodic.py b/mistral/services/periodic.py index 69b1b208e..2925bf49f 100644 --- a/mistral/services/periodic.py +++ b/mistral/services/periodic.py @@ -14,9 +14,9 @@ # See the License for the specific language governing permissions and # limitations under the License. - from mistral.db import api as db_api -from mistral.engine import engine +from mistral import engine +from mistral.engine import client from mistral.openstack.common import log from mistral.openstack.common import periodic_task from mistral.openstack.common import threadgroup @@ -25,10 +25,17 @@ from mistral import dsl_parser as parser from mistral.services import scheduler as sched from mistral.services import trusts + LOG = log.getLogger(__name__) class MistralPeriodicTasks(periodic_task.PeriodicTasks): + + def __init__(self, transport=None): + super(MistralPeriodicTasks, self).__init__() + self.transport = engine.get_transport(transport) + self.engine = client.EngineClient(self.transport) + @periodic_task.periodic_task(spacing=1, run_immediately=True) def scheduler_triggers(self, ctx): LOG.debug('Processing next Scheduler triggers.') @@ -40,15 +47,15 @@ class MistralPeriodicTasks(periodic_task.PeriodicTasks): try: task = parser.get_workbook( wb['definition']).get_trigger_task_name(trigger['name']) - engine.start_workflow_execution(wb['name'], task) + self.engine.start_workflow_execution(wb['name'], task) finally: sched.set_next_execution_time(trigger) context.set_ctx(None) -def setup(): +def setup(transport): tg = threadgroup.ThreadGroup() - pt = MistralPeriodicTasks() + pt = MistralPeriodicTasks(transport=transport) tg.add_dynamic_timer( pt.run_periodic_tasks, diff --git a/mistral/tests/api/v1/controllers/test_executions.py b/mistral/tests/api/v1/controllers/test_executions.py index 85fdee824..fafaf57c8 100644 --- a/mistral/tests/api/v1/controllers/test_executions.py +++ b/mistral/tests/api/v1/controllers/test_executions.py @@ -21,7 +21,7 @@ from mistral import exceptions as ex from webtest.app import AppError from mistral.tests.api import base from mistral.db import api as db_api -from mistral.engine import engine +from mistral.engine import client # TODO: later we need additional tests verifying all the errors etc. @@ -75,7 +75,7 @@ class TestExecutionsController(base.FunctionalTest): self.assertEqual(resp.status_int, 200) self.assertDictEqual(UPDATED_EXEC, canonize(resp.json)) - @mock.patch.object(engine, 'start_workflow_execution', + @mock.patch.object(client.EngineClient, 'start_workflow_execution', mock.MagicMock(return_value=EXECS[0])) def test_post(self): new_exec = EXECS[0].copy() @@ -86,7 +86,7 @@ class TestExecutionsController(base.FunctionalTest): self.assertEqual(resp.status_int, 201) self.assertDictEqual(EXECS[0], canonize(resp.json)) - @mock.patch.object(engine, 'start_workflow_execution', + @mock.patch.object(client.EngineClient, 'start_workflow_execution', mock.MagicMock(side_effect=ex.MistralException)) def test_post_throws_exception(self): with self.assertRaises(AppError) as context: diff --git a/mistral/tests/api/v1/controllers/test_tasks.py b/mistral/tests/api/v1/controllers/test_tasks.py index 9d3d71f40..bd3bb38f0 100644 --- a/mistral/tests/api/v1/controllers/test_tasks.py +++ b/mistral/tests/api/v1/controllers/test_tasks.py @@ -18,7 +18,7 @@ import mock from mistral.tests.api import base from mistral.db import api as db_api -from mistral.engine import engine +from mistral.engine import client # TODO: later we need additional tests verifying all the errors etc. @@ -48,7 +48,7 @@ class TestTasksController(base.FunctionalTest): self.assertEqual(resp.status_int, 200) self.assertDictEqual(TASKS[0], resp.json) - @mock.patch.object(engine, "convey_task_result", + @mock.patch.object(client.EngineClient, "convey_task_result", mock.MagicMock(return_value=UPDATED_TASK)) def test_put(self): resp = self.app.put_json( diff --git a/mistral/tests/base.py b/mistral/tests/base.py index 33f3d01b3..9ade353f6 100644 --- a/mistral/tests/base.py +++ b/mistral/tests/base.py @@ -32,11 +32,11 @@ importutils.import_module("mistral.config") from mistral.db.sqlalchemy import api as db_api from mistral.openstack.common import log as logging -from mistral.engine import engine -from mistral.engine.scalable.executor import server -from mistral.engine.scalable import engine as concrete_engine from mistral.openstack.common.db.sqlalchemy import session from mistral import version +from mistral.engine import client +from mistral.engine.scalable import engine +from mistral.engine.scalable.executor import server RESOURCES_PATH = 'tests/resources/' @@ -120,12 +120,39 @@ class DbTestCase(BaseTest): class EngineTestCase(DbTestCase): + transport = get_fake_transport() + def __init__(self, *args, **kwargs): super(EngineTestCase, self).__init__(*args, **kwargs) - self.transport = get_fake_transport() - engine.load_engine(self.transport) - self.engine = concrete_engine.get_engine() - self.engine.transport = self.transport + self.engine = client.EngineClient(self.transport) + + @classmethod + def mock_task_result(cls, workbook_name, execution_id, + task_id, state, result): + """ + Mock the engine convey_task_results to send request directly + to the engine instead of going through the oslo.messaging transport. + """ + return engine.ScalableEngine.convey_task_result( + workbook_name, execution_id, task_id, state, result) + + @classmethod + def mock_start_workflow(cls, workbook_name, task_name, context=None): + """ + Mock the engine start_workflow_execution to send request directly + to the engine instead of going through the oslo.messaging transport. + """ + return engine.ScalableEngine.start_workflow_execution( + workbook_name, task_name, context) + + @classmethod + def mock_get_workflow_state(cls, workbook_name, execution_id): + """ + Mock the engine get_workflow_execution_state to send request directly + to the engine instead of going through the oslo.messaging transport. + """ + return engine.ScalableEngine.get_workflow_execution_state( + workbook_name, execution_id) @classmethod def mock_run_tasks(cls, tasks): @@ -133,6 +160,15 @@ class EngineTestCase(DbTestCase): Mock the engine _run_tasks to send requests directly to the task executor instead of going through the oslo.messaging transport. """ - executor = server.Executor() + executor = server.Executor(transport=cls.transport) for task in tasks: executor.handle_task({}, task=task) + + @classmethod + def mock_handle_task(cls, cntx, **kwargs): + """ + Mock the executor handle_task to send requests directory to the task + executor instead of going through the oslo.messaging transport. + """ + executor = server.Executor(transport=cls.transport) + return executor.handle_task(cntx, **kwargs) diff --git a/mistral/tests/unit/engine/scalable/test_engine.py b/mistral/tests/unit/engine/scalable/test_engine.py index 12c16fb0b..1d1b0c89e 100644 --- a/mistral/tests/unit/engine/scalable/test_engine.py +++ b/mistral/tests/unit/engine/scalable/test_engine.py @@ -17,13 +17,14 @@ import mock from oslo.config import cfg +from mistral.tests import base from mistral.openstack.common import log as logging from mistral.db import api as db_api from mistral.actions import std_actions from mistral import expressions from mistral.engine.scalable import engine from mistral.engine import states -from mistral.tests import base +from mistral.engine import client LOG = logging.getLogger(__name__) @@ -38,15 +39,23 @@ cfg.CONF.set_default('auth_enable', False, group='pecan') #TODO(rakhmerov): add more tests for errors, execution stop etc. +@mock.patch.object( + client.EngineClient, 'start_workflow_execution', + mock.MagicMock(side_effect=base.EngineTestCase.mock_start_workflow)) +@mock.patch.object( + client.EngineClient, 'convey_task_result', + mock.MagicMock(side_effect=base.EngineTestCase.mock_task_result)) +@mock.patch.object( + db_api, 'workbook_get', + mock.MagicMock( + return_value={'definition': base.get_resource('test_rest.yaml')})) +@mock.patch.object( + std_actions.HTTPAction, 'run', + mock.MagicMock(return_value={'state': states.SUCCESS})) class TestScalableEngine(base.EngineTestCase): - @mock.patch.object(engine.ScalableEngine, "_notify_task_executors", - mock.MagicMock(return_value="")) - @mock.patch.object(db_api, "workbook_get", - mock.MagicMock(return_value={ - 'definition': base.get_resource("test_rest.yaml") - })) - @mock.patch.object(std_actions.HTTPAction, "run", - mock.MagicMock(return_value="result")) + @mock.patch.object( + engine.ScalableEngine, "_notify_task_executors", + mock.MagicMock(return_value="")) def test_engine_one_task(self): execution = self.engine.start_workflow_execution(WB_NAME, "create-vms", CONTEXT) @@ -62,14 +71,13 @@ class TestScalableEngine(base.EngineTestCase): self.assertEqual(execution['state'], states.SUCCESS) self.assertEqual(task['state'], states.SUCCESS) - @mock.patch.object(engine.ScalableEngine, "_notify_task_executors", - mock.MagicMock(return_value="")) - @mock.patch.object(db_api, "workbook_get", - mock.MagicMock(return_value={ - 'definition': base.get_resource("test_rest.yaml") - })) - @mock.patch.object(std_actions.HTTPAction, "run", - mock.MagicMock(return_value="result")) + @mock.patch.object( + client.EngineClient, 'get_workflow_execution_state', + mock.MagicMock( + side_effect=base.EngineTestCase.mock_get_workflow_state)) + @mock.patch.object( + engine.ScalableEngine, "_notify_task_executors", + mock.MagicMock(return_value="")) def test_engine_multiple_tasks(self): execution = self.engine.start_workflow_execution(WB_NAME, "backup-vms", CONTEXT) @@ -107,19 +115,14 @@ class TestScalableEngine(base.EngineTestCase): self.engine.get_workflow_execution_state( WB_NAME, execution['id'])) - @mock.patch.object(engine.ScalableEngine, '_run_tasks', - mock.MagicMock( - side_effect=base.EngineTestCase.mock_run_tasks)) - @mock.patch.object(std_actions.HTTPAction, "run", - mock.MagicMock(return_value={'state': states.SUCCESS})) - @mock.patch.object(db_api, "workbook_get", - mock.MagicMock(return_value={ - 'definition': base.get_resource("test_rest.yaml") - })) - @mock.patch.object(states, "get_state_by_http_status_code", - mock.MagicMock(return_value=states.SUCCESS)) - @mock.patch.object(expressions, "evaluate", - mock.MagicMock(side_effect=lambda x, y: x)) + @mock.patch.object( + engine.ScalableEngine, '_run_tasks', + mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks)) + @mock.patch.object( + states, "get_state_by_http_status_code", + mock.MagicMock(return_value=states.SUCCESS)) + @mock.patch.object( + expressions, "evaluate", mock.MagicMock(side_effect=lambda x, y: x)) def test_engine_sync_task(self): execution = self.engine.start_workflow_execution(WB_NAME, "create-vm-nova", @@ -131,17 +134,11 @@ class TestScalableEngine(base.EngineTestCase): self.assertEqual(execution['state'], states.SUCCESS) self.assertEqual(task['state'], states.SUCCESS) - @mock.patch.object(engine.ScalableEngine, '_run_tasks', - mock.MagicMock( - side_effect=base.EngineTestCase.mock_run_tasks)) - @mock.patch.object(db_api, "workbook_get", - mock.MagicMock(return_value={ - 'definition': base.get_resource("test_rest.yaml") - })) - @mock.patch.object(std_actions.HTTPAction, "run", - mock.MagicMock(return_value={'state': states.SUCCESS})) - @mock.patch.object(expressions, "evaluate", - mock.MagicMock(side_effect=lambda x, y: x)) + @mock.patch.object( + engine.ScalableEngine, '_run_tasks', + mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks)) + @mock.patch.object( + expressions, "evaluate", mock.MagicMock(side_effect=lambda x, y: x)) def test_engine_tasks_on_success_finish(self): # Start workflow. execution = self.engine.start_workflow_execution(WB_NAME, @@ -203,17 +200,11 @@ class TestScalableEngine(base.EngineTestCase): self.assertEqual(execution['state'], states.SUCCESS) self._assert_multiple_items(tasks, 4, state=states.SUCCESS) - @mock.patch.object(engine.ScalableEngine, '_run_tasks', - mock.MagicMock( - side_effect=base.EngineTestCase.mock_run_tasks)) - @mock.patch.object(db_api, "workbook_get", - mock.MagicMock(return_value={ - 'definition': base.get_resource("test_rest.yaml") - })) - @mock.patch.object(std_actions.HTTPAction, "run", - mock.MagicMock(return_value={'state': states.SUCCESS})) - @mock.patch.object(expressions, "evaluate", - mock.MagicMock(side_effect=lambda x, y: x)) + @mock.patch.object( + engine.ScalableEngine, '_run_tasks', + mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks)) + @mock.patch.object( + expressions, "evaluate", mock.MagicMock(side_effect=lambda x, y: x)) def test_engine_tasks_on_error_finish(self): # Start workflow. execution = self.engine.start_workflow_execution(WB_NAME, diff --git a/mistral/tests/unit/engine/scalable/test_executor.py b/mistral/tests/unit/engine/scalable/test_executor.py index 44501c297..ad044d024 100644 --- a/mistral/tests/unit/engine/scalable/test_executor.py +++ b/mistral/tests/unit/engine/scalable/test_executor.py @@ -18,18 +18,31 @@ import eventlet eventlet.monkey_patch() import uuid -import time import mock +from oslo.config import cfg + from mistral.tests import base -from mistral.cmd import launch +from mistral.openstack.common import log as logging +from mistral.openstack.common import importutils from mistral.engine import states from mistral.db import api as db_api from mistral.actions import std_actions -from mistral.engine.scalable.executor import client +from mistral.engine import client as engine +from mistral.engine.scalable.executor import client as executor + + +# We need to make sure that all configuration properties are registered. +importutils.import_module("mistral.config") +LOG = logging.getLogger(__name__) + +# Use the set_default method to set value otherwise in certain test cases +# the change in value is not permanent. +cfg.CONF.set_default('auth_enable', False, group='pecan') + WORKBOOK_NAME = 'my_workbook' -TASK_NAME = 'my_task' +TASK_NAME = 'create-vms' SAMPLE_WORKBOOK = { 'id': str(uuid.uuid4()), @@ -77,27 +90,19 @@ SAMPLE_CONTEXT = { class TestExecutor(base.DbTestCase): - def mock_action_run(self): - std_actions.HTTPAction.run = mock.MagicMock(return_value={}) - return std_actions.HTTPAction.run - - def setUp(self): - super(TestExecutor, self).setUp() - - # Run the Executor in the background. + def __init__(self, *args, **kwargs): + super(TestExecutor, self).__init__(*args, **kwargs) self.transport = base.get_fake_transport() - self.ex_thread = eventlet.spawn(launch.launch_executor, self.transport) - - def tearDown(self): - # Stop the Executor. - self.ex_thread.kill() - - super(TestExecutor, self).tearDown() + @mock.patch.object( + executor.ExecutorClient, 'handle_task', + mock.MagicMock(side_effect=base.EngineTestCase.mock_handle_task)) + @mock.patch.object( + std_actions.HTTPAction, 'run', mock.MagicMock(return_value={})) + @mock.patch.object( + engine.EngineClient, 'convey_task_result', + mock.MagicMock(side_effect=base.EngineTestCase.mock_task_result)) def test_handle_task(self): - # Mock HTTP action. - mock_rest_action = self.mock_action_run() - # Create a new workbook. workbook = db_api.workbook_create(SAMPLE_WORKBOOK) self.assertIsInstance(workbook, dict) @@ -116,28 +121,11 @@ class TestExecutor(base.DbTestCase): self.assertIn('id', task) # Send the task request to the Executor. - ex_client = client.ExecutorClient(self.transport) + ex_client = executor.ExecutorClient(self.transport) ex_client.handle_task(SAMPLE_CONTEXT, task=task) - # Check task execution state. There is no timeout mechanism in - # unittest. There is an example to add a custom timeout decorator that - # can wrap this test function in another process and then manage the - # process time. However, it seems more straightforward to keep the - # loop finite. - for i in range(0, 50): - db_task = db_api.task_get(task['workbook_name'], - task['execution_id'], - task['id']) - # Ensure the request reached the executor and the action has ran. - if db_task['state'] != states.IDLE: - # We have to wait sometime due to time interval between set - # task state to RUNNING and invocation action.run() - time.sleep(0.1) - mock_rest_action.assert_called_once_with() - self.assertIn(db_task['state'], - [states.RUNNING, states.SUCCESS, states.ERROR]) - return - time.sleep(0.1) - - # Task is not being processed. Throw an exception here. - raise Exception('Timed out waiting for task to be processed.') + # Check task execution state. + db_task = db_api.task_get(task['workbook_name'], + task['execution_id'], + task['id']) + self.assertEqual(db_task['state'], states.SUCCESS) diff --git a/mistral/tests/unit/engine/test_data_flow.py b/mistral/tests/unit/engine/test_data_flow.py index f0c10ef6e..e83d0f430 100644 --- a/mistral/tests/unit/engine/test_data_flow.py +++ b/mistral/tests/unit/engine/test_data_flow.py @@ -25,6 +25,7 @@ from mistral.db import api as db_api from mistral.engine.scalable import engine from mistral.actions import std_actions from mistral.engine import states +from mistral.engine import client from mistral.utils.openstack import keystone @@ -60,6 +61,15 @@ def create_workbook(definition_path): }) +@mock.patch.object( + client.EngineClient, 'start_workflow_execution', + mock.MagicMock(side_effect=base.EngineTestCase.mock_start_workflow)) +@mock.patch.object( + client.EngineClient, 'convey_task_result', + mock.MagicMock(side_effect=base.EngineTestCase.mock_task_result)) +@mock.patch.object( + engine.ScalableEngine, '_run_tasks', + mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks)) class DataFlowTest(base.EngineTestCase): def _check_in_context_execution(self, task): self.assertIn('__execution', task['in_context']) @@ -70,9 +80,6 @@ class DataFlowTest(base.EngineTestCase): self.assertEqual(task['execution_id'], exec_dict['id']) self.assertIn('task', exec_dict) - @mock.patch.object(engine.ScalableEngine, '_run_tasks', - mock.MagicMock( - side_effect=base.EngineTestCase.mock_run_tasks)) def test_two_dependent_tasks(self): CTX = copy.copy(CONTEXT) @@ -141,9 +148,6 @@ class DataFlowTest(base.EngineTestCase): del build_greeting_task['in_context']['__execution'] self.assertDictEqual(CTX, build_greeting_task['in_context']) - @mock.patch.object(engine.ScalableEngine, '_run_tasks', - mock.MagicMock( - side_effect=base.EngineTestCase.mock_run_tasks)) def test_task_with_two_dependencies(self): CTX = copy.copy(CONTEXT) @@ -240,9 +244,6 @@ class DataFlowTest(base.EngineTestCase): }, send_greeting_task['output']) - @mock.patch.object(engine.ScalableEngine, '_run_tasks', - mock.MagicMock( - side_effect=base.EngineTestCase.mock_run_tasks)) def test_two_subsequent_tasks(self): CTX = copy.copy(CONTEXT) @@ -312,9 +313,6 @@ class DataFlowTest(base.EngineTestCase): del build_greeting_task['in_context']['__execution'] self.assertDictEqual(CTX, build_greeting_task['in_context']) - @mock.patch.object(engine.ScalableEngine, '_run_tasks', - mock.MagicMock( - side_effect=base.EngineTestCase.mock_run_tasks)) def test_three_subsequent_tasks(self): CTX = copy.copy(CONTEXT) @@ -415,15 +413,13 @@ class DataFlowTest(base.EngineTestCase): del send_greeting_task['in_context']['__execution'] self.assertDictEqual(CTX, send_greeting_task['in_context']) - @mock.patch.object(std_actions.HTTPAction, "run", - mock.MagicMock(return_value={'state': states.RUNNING})) - @mock.patch.object(keystone, "client_for_trusts", - mock.Mock( - return_value=mock.MagicMock(user_id=USER_ID, - auth_token=TOKEN))) - @mock.patch.object(engine.ScalableEngine, '_run_tasks', - mock.MagicMock( - side_effect=base.EngineTestCase.mock_run_tasks)) + @mock.patch.object( + std_actions.HTTPAction, 'run', + mock.MagicMock(return_value={'state': states.RUNNING})) + @mock.patch.object( + keystone, 'client_for_trusts', + mock.Mock(return_value=mock.MagicMock(user_id=USER_ID, + auth_token=TOKEN))) def test_add_token_to_context(self): task_name = "create-vms" diff --git a/mistral/tests/unit/engine/test_task_retry.py b/mistral/tests/unit/engine/test_task_retry.py index 6cb45dcc3..8604213ae 100644 --- a/mistral/tests/unit/engine/test_task_retry.py +++ b/mistral/tests/unit/engine/test_task_retry.py @@ -24,6 +24,7 @@ from mistral import exceptions as exc from mistral.openstack.common import log as logging from mistral.tests import base from mistral.db import api as db_api +from mistral.engine import client from mistral.engine.scalable import engine from mistral.actions import std_actions from mistral.engine import states @@ -57,16 +58,23 @@ class FailBeforeSuccessMocker(object): return "result" -@mock.patch.object(engine.ScalableEngine, '_run_tasks', - mock.MagicMock( - side_effect=base.EngineTestCase.mock_run_tasks)) -@mock.patch.object(db_api, "workbook_get", - mock.MagicMock(return_value={ - 'definition': base.get_resource( - "retry_task/retry_task.yaml") - })) -@mock.patch.object(std_actions.HTTPAction, "run", - mock.MagicMock(return_value="result")) +@mock.patch.object( + client.EngineClient, 'start_workflow_execution', + mock.MagicMock(side_effect=base.EngineTestCase.mock_start_workflow)) +@mock.patch.object( + client.EngineClient, 'convey_task_result', + mock.MagicMock(side_effect=base.EngineTestCase.mock_task_result)) +@mock.patch.object( + engine.ScalableEngine, '_run_tasks', + mock.MagicMock(side_effect=base.EngineTestCase.mock_run_tasks)) +@mock.patch.object( + db_api, 'workbook_get', + mock.MagicMock( + return_value={ + 'definition': base.get_resource('retry_task/retry_task.yaml')})) +@mock.patch.object( + std_actions.HTTPAction, 'run', + mock.MagicMock(return_value='result')) class TaskRetryTest(base.EngineTestCase): def test_no_retry(self): diff --git a/mistral/tests/unit/engine/test_transport.py b/mistral/tests/unit/engine/test_transport.py new file mode 100644 index 000000000..2912ce752 --- /dev/null +++ b/mistral/tests/unit/engine/test_transport.py @@ -0,0 +1,95 @@ +# Copyright (c) 2013 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import eventlet +eventlet.monkey_patch() + +import time +import mock + +from oslo.config import cfg + +from mistral.tests import base +from mistral.openstack.common import log as logging +from mistral.openstack.common import importutils +from mistral.cmd import launch +from mistral.engine import states +from mistral.db import api as db_api +from mistral.actions import std_actions + + +# We need to make sure that all configuration properties are registered. +importutils.import_module("mistral.config") +LOG = logging.getLogger(__name__) + +# Use the set_default method to set value otherwise in certain test cases +# the change in value is not permanent. +cfg.CONF.set_default('auth_enable', False, group='pecan') + +WB_NAME = 'my_workbook' +CONTEXT = None # TODO(rakhmerov): Use a meaningful value. + + +class TestTransport(base.EngineTestCase): + def setUp(self): + super(TestTransport, self).setUp() + + # Run the Engine and Executor in the background. + self.en_thread = eventlet.spawn(launch.launch_engine, self.transport) + self.ex_thread = eventlet.spawn(launch.launch_executor, self.transport) + + def tearDown(self): + # Stop the Engine and the Executor. + self.en_thread.kill() + self.ex_thread.kill() + + super(TestTransport, self).tearDown() + + @mock.patch.object( + db_api, 'workbook_get', + mock.MagicMock( + return_value={'definition': base.get_resource('test_rest.yaml')})) + @mock.patch.object( + std_actions.HTTPAction, 'run', mock.MagicMock(return_value={})) + def test_transport(self): + """ + Test if engine request traversed through the oslo.messaging transport. + """ + execution = self.engine.start_workflow_execution( + WB_NAME, 'create-vms', CONTEXT) + + task = db_api.tasks_get(WB_NAME, execution['id'])[0] + + # Check task execution state. There is no timeout mechanism in + # unittest. There is an example to add a custom timeout decorator that + # can wrap this test function in another process and then manage the + # process time. However, it seems more straightforward to keep the + # loop finite. + for i in range(0, 50): + db_task = db_api.task_get(task['workbook_name'], + task['execution_id'], + task['id']) + # Ensure the request reached the executor and the action has ran. + if db_task['state'] != states.IDLE: + # We have to wait sometime due to time interval between set + # task state to RUNNING and invocation action.run() + time.sleep(0.1) + self.assertIn(db_task['state'], + [states.RUNNING, states.SUCCESS, states.ERROR]) + return + time.sleep(0.1) + + # Task is not being processed. Throw an exception here. + raise Exception('Timed out waiting for task to be processed.') diff --git a/scripts/test.yaml b/scripts/test.yaml index 91a26d130..5d52d724d 100644 --- a/scripts/test.yaml +++ b/scripts/test.yaml @@ -1,18 +1,17 @@ -Services: - MyRest: - type: REST_API - parameters: - baseUrl: http://localhost:8989/v1/ - actions: - my-action: - parameters: - url: workbooks - method: GET +Namespaces: + MyRest: + actions: + my-action: + class: std.http + base-parameters: + url: http://localhost:8989/v1/workbooks + method: GET + output: Workflow: - tasks: - my_task: - action: MyRest:my-action + tasks: + my_task: + action: MyRest.my-action # events: # my_event: