diff --git a/mistral/engine/action_queue.py b/mistral/engine/action_queue.py index 9241d3a96..c6b586683 100644 --- a/mistral/engine/action_queue.py +++ b/mistral/engine/action_queue.py @@ -13,16 +13,23 @@ # See the License for the specific language governing permissions and # limitations under the License. +import eventlet import functools from oslo_config import cfg +from mistral import context from mistral.executors import base as exe +from mistral.rpc import clients as rpc from mistral import utils _THREAD_LOCAL_NAME = "__action_queue_thread_local" +# Action queue operations. +_RUN_ACTION = "run_action" +_ON_ACTION_COMPLETE = "on_action_complete" + def _prepare(): utils.set_thread_local(_THREAD_LOCAL_NAME, list()) @@ -45,18 +52,29 @@ def _get_queue(): return queue -def _run_actions(): +def _process_queue(queue): executor = exe.get_executor(cfg.CONF.executor.type) - for action_ex, action_def, target in _get_queue(): - executor.run_action( - action_ex.id, - action_def.action_class, - action_def.attributes or {}, - action_ex.input, - action_ex.runtime_context.get('safe_rerun', False), - target=target - ) + for operation, args in queue: + if operation == _RUN_ACTION: + action_ex, action_def, target = args + + executor.run_action( + action_ex.id, + action_def.action_class, + action_def.attributes or {}, + action_ex.input, + action_ex.runtime_context.get('safe_rerun', False), + target=target + ) + elif operation == _ON_ACTION_COMPLETE: + action_ex_id, result, wf_action = args + + rpc.get_engine_client().on_action_complete( + action_ex_id, + result, + wf_action + ) def process(func): @@ -73,7 +91,26 @@ def process(func): try: res = func(*args, **kw) - _run_actions() + queue = _get_queue() + auth_ctx = context.ctx() if context.has_ctx() else None + + # NOTE(rakhmerov): Since we make RPC calls to the engine itself + # we need to process the action queue asynchronously in a new + # thread. Otherwise, if we have one engine process the engine + # will may send a request to itself while already processing + # another one. In conjunction with blocking RPC it will lead + # to a deadlock (and RPC timeout). + def _within_new_thread(): + old_auth_ctx = context.ctx() if context.has_ctx() else None + + context.set_ctx(auth_ctx) + + try: + _process_queue(queue) + finally: + context.set_ctx(old_auth_ctx) + + eventlet.spawn(_within_new_thread) finally: _clear() @@ -82,5 +119,11 @@ def process(func): return decorate -def schedule(action_ex, action_def, target): - _get_queue().append((action_ex, action_def, target)) +def schedule_run_action(action_ex, action_def, target): + _get_queue().append((_RUN_ACTION, (action_ex, action_def, target))) + + +def schedule_on_action_complete(action_ex_id, result, wf_action=False): + _get_queue().append( + (_ON_ACTION_COMPLETE, (action_ex_id, result, wf_action)) + ) diff --git a/mistral/engine/actions.py b/mistral/engine/actions.py index 17a970702..469e66d20 100644 --- a/mistral/engine/actions.py +++ b/mistral/engine/actions.py @@ -248,7 +248,11 @@ class PythonAction(Action): action_ex_id=action_ex_id ) - action_queue.schedule(self.action_ex, self.action_def, target) + action_queue.schedule_run_action( + self.action_ex, + self.action_def, + target + ) @profiler.trace('action-run', hide_args=True) def run(self, input_dict, target, index=0, desc='', save=True, diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index 7f7d04ef3..d728669b0 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -133,6 +133,7 @@ class DefaultEngine(base.Engine): return action_ex.get_clone() + @action_queue.process def pause_workflow(self, wf_ex_id): with db_api.transaction(): wf_ex = db_api.get_workflow_execution(wf_ex_id) @@ -161,6 +162,7 @@ class DefaultEngine(base.Engine): return wf_ex.get_clone() + @action_queue.process def stop_workflow(self, wf_ex_id, state, message=None): with db_api.transaction(): wf_ex = db_api.get_workflow_execution(wf_ex_id) diff --git a/mistral/engine/engine_server.py b/mistral/engine/engine_server.py index 74fc87787..d5246b2cd 100644 --- a/mistral/engine/engine_server.py +++ b/mistral/engine/engine_server.py @@ -60,7 +60,7 @@ class EngineServer(service_base.MistralService): self._rpc_server.register_endpoint(self) # Note(ddeja): Engine needs to be run in default (blocking) mode - # since using another mode may leads to deadlock. + # since using another mode may lead to a deadlock. # See https://review.openstack.org/#/c/356343 for more info. self._rpc_server.run(executor='blocking') diff --git a/mistral/engine/workflow_handler.py b/mistral/engine/workflow_handler.py index db1b5758f..692890bf8 100644 --- a/mistral/engine/workflow_handler.py +++ b/mistral/engine/workflow_handler.py @@ -18,12 +18,12 @@ from osprofiler import profiler import traceback as tb from mistral.db.v2 import api as db_api +from mistral.engine import action_queue from mistral.engine import workflows from mistral import exceptions as exc from mistral.services import scheduler from mistral.workflow import states - LOG = logging.getLogger(__name__) @@ -82,6 +82,7 @@ def cancel_workflow(wf_ex, msg=None): stop_workflow(wf_ex, states.CANCELLED, msg) +@action_queue.process @profiler.trace('workflow-handler-check-and-complete', hide_args=True) def _check_and_complete(wf_ex_id): # Note: This method can only be called via scheduler. diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index 1bcf0b8f0..180afbcef 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -21,12 +21,11 @@ import six from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models as db_models +from mistral.engine import action_queue from mistral.engine import dispatcher from mistral.engine import utils as engine_utils from mistral import exceptions as exc from mistral.lang import parser as spec_parser -from mistral.rpc import clients as rpc -from mistral.services import scheduler from mistral.services import triggers from mistral.services import workflows as wf_service from mistral import utils @@ -42,10 +41,6 @@ from mistral_lib import actions as ml_actions LOG = logging.getLogger(__name__) -_SEND_RESULT_TO_PARENT_WORKFLOW_PATH = ( - 'mistral.engine.workflows._send_result_to_parent_workflow' -) - @six.add_metaclass(abc.ABCMeta) class Workflow(object): @@ -118,12 +113,11 @@ class Workflow(object): assert self.wf_ex if state == states.SUCCESS: - return self._succeed_workflow(self._get_final_context(), msg) + self._succeed_workflow(self._get_final_context(), msg) elif state == states.ERROR: - return self._fail_workflow(self._get_final_context(), msg) + self._fail_workflow(self._get_final_context(), msg) elif state == states.CANCELLED: - - return self._cancel_workflow(msg) + self._cancel_workflow(msg) def pause(self, msg=None): """Pause workflow. @@ -379,6 +373,7 @@ class Workflow(object): else: msg = _build_fail_info_message(wf_ctrl, self.wf_ex) final_context = wf_ctrl.evaluate_workflow_final_context() + self._fail_workflow(final_context, msg) return 0 @@ -394,13 +389,14 @@ class Workflow(object): self.set_state(states.SUCCESS, msg) if self.wf_ex.task_execution_id: - self._schedule_send_result_to_parent_workflow() + self._send_result_to_parent_workflow() def _fail_workflow(self, final_context, msg): if states.is_paused_or_completed(self.wf_ex.state): return output_on_error = {} + try: output_on_error = data_flow.evaluate_workflow_output( self.wf_ex, @@ -427,7 +423,7 @@ class Workflow(object): self.wf_ex.output = merge_dicts({'result': msg}, output_on_error) if self.wf_ex.task_execution_id: - self._schedule_send_result_to_parent_workflow() + self._send_result_to_parent_workflow() def _cancel_workflow(self, msg): if states.is_completed(self.wf_ex.state): @@ -445,14 +441,35 @@ class Workflow(object): self.wf_ex.output = {'result': msg} if self.wf_ex.task_execution_id: - self._schedule_send_result_to_parent_workflow() + self._send_result_to_parent_workflow() - def _schedule_send_result_to_parent_workflow(self): - scheduler.schedule_call( - None, - _SEND_RESULT_TO_PARENT_WORKFLOW_PATH, - 0, - wf_ex_id=self.wf_ex.id + def _send_result_to_parent_workflow(self): + if self.wf_ex.state == states.SUCCESS: + result = ml_actions.Result(data=self.wf_ex.output) + elif self.wf_ex.state == states.ERROR: + err_msg = ( + self.wf_ex.state_info or + 'Failed subworkflow [execution_id=%s]' % self.wf_ex.id + ) + + result = ml_actions.Result(error=err_msg) + elif self.wf_ex.state == states.CANCELLED: + err_msg = ( + self.wf_ex.state_info or + 'Cancelled subworkflow [execution_id=%s]' % self.wf_ex.id + ) + + result = ml_actions.Result(error=err_msg, cancel=True) + else: + raise RuntimeError( + "Method _send_result_to_parent_workflow() must never be called" + " if a workflow is not in SUCCESS, ERROR or CANCELLED state." + ) + + action_queue.schedule_on_action_complete( + self.wf_ex.id, + result, + wf_action=True ) @@ -478,41 +495,6 @@ def _get_environment(params): ) -def _send_result_to_parent_workflow(wf_ex_id): - with db_api.transaction(): - wf_ex = db_api.get_workflow_execution(wf_ex_id) - - wf_output = wf_ex.output - - if wf_ex.state == states.SUCCESS: - result = ml_actions.Result(data=wf_output) - elif wf_ex.state == states.ERROR: - err_msg = ( - wf_ex.state_info or - 'Failed subworkflow [execution_id=%s]' % wf_ex.id - ) - - result = ml_actions.Result(error=err_msg) - elif wf_ex.state == states.CANCELLED: - err_msg = ( - wf_ex.state_info or - 'Cancelled subworkflow [execution_id=%s]' % wf_ex.id - ) - - result = ml_actions.Result(error=err_msg, cancel=True) - else: - raise RuntimeError( - "Method _send_result_to_parent_workflow() must never be called" - " if a workflow is not in SUCCESS, ERROR or CANCELLED state." - ) - - rpc.get_engine_client().on_action_complete( - wf_ex.id, - result, - wf_action=True - ) - - def _build_fail_info_message(wf_ctrl, wf_ex): # Try to find where error is exactly. failed_tasks = sorted( diff --git a/mistral/executors/remote_executor.py b/mistral/executors/remote_executor.py index 6451d5a93..e654f344e 100644 --- a/mistral/executors/remote_executor.py +++ b/mistral/executors/remote_executor.py @@ -15,7 +15,6 @@ from oslo_config import cfg from oslo_log import log as logging -from mistral.rpc import base as rpc_base from mistral.rpc import clients as rpc_clients @@ -26,5 +25,4 @@ class RemoteExecutor(rpc_clients.ExecutorClient): """Executor that passes execution request to a remote executor.""" def __init__(self): - self.topic = cfg.CONF.executor.topic - self._client = rpc_base.get_rpc_client_driver()(cfg.CONF.executor) + super(RemoteExecutor, self).__init__(cfg.CONF.executor) diff --git a/mistral/tests/unit/engine/base.py b/mistral/tests/unit/engine/base.py index 93014c7bd..66d5adcf5 100644 --- a/mistral/tests/unit/engine/base.py +++ b/mistral/tests/unit/engine/base.py @@ -67,16 +67,22 @@ class EngineTestCase(base.DbTestCase): # Start remote executor. if cfg.CONF.executor.type == 'remote': LOG.info("Starting remote executor threads...") + self.executor_client = rpc_clients.get_executor_client() + exe_svc = executor_server.get_oslo_service(setup_profiler=False) + self.executor = exe_svc.executor self.threads.append(eventlet.spawn(launch_service, exe_svc)) self.addCleanup(exe_svc.stop, True) # Start engine. LOG.info("Starting engine threads...") + self.engine_client = rpc_clients.get_engine_client() + eng_svc = engine_server.get_oslo_service(setup_profiler=False) + self.engine = eng_svc.engine self.threads.append(eventlet.spawn(launch_service, eng_svc)) self.addCleanup(eng_svc.stop, True) diff --git a/mistral/tests/unit/engine/test_subworkflows.py b/mistral/tests/unit/engine/test_subworkflows.py index 9352ed745..b4f715682 100644 --- a/mistral/tests/unit/engine/test_subworkflows.py +++ b/mistral/tests/unit/engine/test_subworkflows.py @@ -404,6 +404,7 @@ class SubworkflowsTest(base.EngineTestCase): with db_api.transaction(): ex = db_api.get_workflow_execution(ex.id) + self.assertIn('not_existing_wf', ex.state_info) def test_dynamic_subworkflow_with_generic_input(self): @@ -417,10 +418,12 @@ class SubworkflowsTest(base.EngineTestCase): wf_identifier='wb4.wf1', wf_input={'wf_name': 'wf2', 'inp': 'invalid_string_input'} ) + self.await_workflow_error(ex.id) with db_api.transaction(): ex = db_api.get_workflow_execution(ex.id) + self.assertIn('invalid_string_input', ex.state_info) def _test_dynamic_workflow_with_dict_param(self, wf_identifier): @@ -428,17 +431,21 @@ class SubworkflowsTest(base.EngineTestCase): wf_identifier=wf_identifier, wf_input={'wf_name': 'wf2', 'inp': {'inp': 'abc'}} ) + self.await_workflow_success(ex.id) + with db_api.transaction(): ex = db_api.get_workflow_execution(ex.id) + self.assertEqual({'sub_wf_out': 'abc'}, ex.output) def test_subworkflow_root_execution_id(self): - wf1_ex = self.engine.start_workflow('wb6.wf1', '', None) + self.engine.start_workflow('wb6.wf1', '', None) self._await(lambda: len(db_api.get_workflow_executions()) == 3, 0.5, 5) wf_execs = db_api.get_workflow_executions() + wf1_ex = self._assert_single_item(wf_execs, name='wb6.wf1') wf2_ex = self._assert_single_item(wf_execs, name='wb6.wf2') wf3_ex = self._assert_single_item(wf_execs, name='wb6.wf3') diff --git a/mistral/tests/unit/executors/test_local_executor.py b/mistral/tests/unit/executors/test_local_executor.py index c10d0ff6f..e73455ad1 100644 --- a/mistral/tests/unit/executors/test_local_executor.py +++ b/mistral/tests/unit/executors/test_local_executor.py @@ -87,7 +87,9 @@ class LocalExecutorTestCase(base.ExecutorTestCase): """ wb_svc.create_workbook_v2(wb_def) + wf_ex = self.engine.start_workflow('wb1.wf1', '', {}) + self.await_workflow_success(wf_ex.id) with db_api.transaction(): @@ -144,7 +146,9 @@ class LocalExecutorTestCase(base.ExecutorTestCase): """ wb_svc.create_workbook_v2(wb_def) + wf_ex = self.engine.start_workflow('wb1.wf1', '', {}) + self.await_workflow_success(wf_ex.id) with db_api.transaction():