From ff78d7f65919d42e4afeaf13f16335a7bcf2a792 Mon Sep 17 00:00:00 2001 From: Istvan Imre Date: Wed, 15 Feb 2017 08:06:36 +0100 Subject: [PATCH] Dynamic workflow name evaluation. Evaluate workflow names dynamically, so yaql or jinja expression is allowed as sub-workflow name. Tasks names are not yet dynamically evaluated. Partially implements: blueprint mistral-dynamic-actions Change-Id: Icfe591e27a4f45c2e3dcfa83512217f3b2122189 --- doc/source/user/dsl_v2.rst | 105 ++++++++++++- mistral/engine/action_handler.py | 3 +- mistral/engine/actions.py | 10 +- mistral/engine/tasks.py | 36 +++-- mistral/expressions/jinja_expression.py | 2 + mistral/lang/base.py | 12 +- mistral/lang/v2/tasks.py | 7 +- .../tests/unit/engine/test_subworkflows.py | 148 ++++++++++++++++++ 8 files changed, 298 insertions(+), 25 deletions(-) diff --git a/doc/source/user/dsl_v2.rst b/doc/source/user/dsl_v2.rst index 5a27c70e9..28d2ef2ae 100644 --- a/doc/source/user/dsl_v2.rst +++ b/doc/source/user/dsl_v2.rst @@ -191,14 +191,16 @@ attributes: - **action** - Name of the action associated with the task. *Mutually exclusive with* **workflow**. If neither action nor workflow are provided then the action 'std.noop' will be used. -- **workflow** - Name of the workflow associated with the task. +- **workflow** - Name of the workflow associated with the task. Can be static + value or an expression (for example, "{{ _.subworkflow_name }}"). *Mutually exclusive with* **action**. - **input** - Actual input parameter values of the task. *Optional*. Value of each parameter is a JSON-compliant type such as number, string etc, dictionary or list. It can also be an expression to retrieve value from task context or any of the mentioned types containing inline expressions (for example, string "<% - $.movie_name %> is a cool movie!") + $.movie_name %> is a cool movie!") Can be an expression that evaluates to + a JSON object. - **publish** - Dictionary of variables to publish to the workflow context. Any JSON-compatible data structure optionally containing expression to select precisely what needs to be published. @@ -226,6 +228,66 @@ attributes: during action execution. If set to 'true' task may be run twice. *Optional*. By default set to 'false'. +Workflow +'''''''' +Synchronously starts a sub-workflow with the given name. + +Example static workflow call: + +.. code-block:: mistral + + my_task: +   workflow: name_of_my_workflow + +Example dynamic workflow selection: + +.. code-block:: mistral + + --- + version: '2.0' + + name: weather_data_processing + + workflows: + framework: + input: + - magic_workflow_name: show_weather + + tasks: + weather_data: + action: std.echo + input: + output: + location: wherever + temperature: "22C" + publish: + weather_data: <% task(weather_data).result %> + on-success: + - do_magic + + do_magic: + # reference workflow by parameter + workflow: <% $.magic_workflow_name %> + # expand dictionary to input parameters + input: <% $.weather_data %> + + show_weather: + input: + - location + - temperature + + tasks: + write_data: + action: std.echo + input: + output: "<% $.location %>: <% $.temperature %>" + + +Note: Typical use for the dynamic workflow selection is when parts of a +workflow can be customized. E.g. collect some weather data and then execute +some custom workflow on it. + + Policies '''''''' @@ -307,8 +369,8 @@ Retry policy can also be configured on a single line as: All parameter values for any policy can be defined as expressions. -Simplified input syntax -''''''''''''''''''''''' +Input syntax +'''''''''''' When describing a workflow task it's possible to specify its input parameters in two ways: @@ -330,6 +392,25 @@ Simplified syntax: my_task:   action: std.http url="http://mywebsite.org" method="GET" +Syntax with dynamic input parameter map: + +.. code-block:: mistral + + --- + version: '2.0' + + example_workflow: + input: + - http_request_parameters: + url: http://mywebsite.org + method: GET + + tasks: + setup_task: + action: std.http + input: <% $.http_request_parameters %> + + The same rules apply to tasks associated with workflows. Full syntax: @@ -349,6 +430,22 @@ Simplified syntax: my_task:   workflow: some_nested_workflow param1='val1' param2='val2' +Syntax with dynamic input parameter map: + +.. code-block:: mistral + + --- + version: '2.0' + + example_workflow: + input: + - nested_params: {"param1": "val1", "param2": "val2"} + + tasks: + setup_task: + workflow: some_nested_workflow + input: <% $.nested_params %> + **NOTE**: It's also possible to merge these two approaches and specify a part of parameters using simplified key-value pairs syntax and using keyword *input*. In this case all the parameters will be effectively merged. If the same diff --git a/mistral/engine/action_handler.py b/mistral/engine/action_handler.py index 0b057f581..5db1172e5 100644 --- a/mistral/engine/action_handler.py +++ b/mistral/engine/action_handler.py @@ -82,7 +82,8 @@ def on_action_update(action_ex, state): @profiler.trace('action-handler-build-action', hide_args=True) def _build_action(action_ex): if isinstance(action_ex, models.WorkflowExecution): - return actions.WorkflowAction(None, action_ex=action_ex) + return actions.WorkflowAction(wf_name=action_ex.name, + action_ex=action_ex) wf_name = None wf_spec_name = None diff --git a/mistral/engine/actions.py b/mistral/engine/actions.py index 0ecaf309e..130989db3 100644 --- a/mistral/engine/actions.py +++ b/mistral/engine/actions.py @@ -489,6 +489,10 @@ class AdHocAction(PythonAction): class WorkflowAction(Action): """Workflow action.""" + def __init__(self, wf_name, **kwargs): + super(WorkflowAction, self).__init__(None, **kwargs) + self.wf_name = wf_name + @profiler.trace('workflow-action-complete', hide_args=True) def complete(self, result): # No-op because in case of workflow result is already processed. @@ -503,15 +507,11 @@ class WorkflowAction(Action): parent_wf_ex.id ) - task_spec = spec_parser.get_task_spec(self.task_ex.spec) - - wf_spec_name = task_spec.get_workflow_name() - wf_def = engine_utils.resolve_workflow_definition( parent_wf_ex.workflow_name, parent_wf_spec.get_name(), namespace=parent_wf_ex.params['namespace'], - wf_spec_name=wf_spec_name + wf_spec_name=self.wf_name ) wf_spec = spec_parser.get_workflow_spec_by_definition_id( diff --git a/mistral/engine/tasks.py b/mistral/engine/tasks.py index b4f2ca086..31cb8580a 100644 --- a/mistral/engine/tasks.py +++ b/mistral/engine/tasks.py @@ -416,18 +416,14 @@ class RegularTask(Task): @profiler.trace('regular-task-get-action-input', hide_args=True) def _get_action_input(self, ctx=None): - ctx = ctx or self.ctx + input_dict = self._evaluate_expression(self.task_spec.get_input(), ctx) - ctx_view = data_flow.ContextView( - ctx, - self.wf_ex.context, - self.wf_ex.input - ) - - input_dict = expr.evaluate_recursively( - self.task_spec.get_input(), - ctx_view - ) + if not isinstance(input_dict, dict): + raise exc.InputException( + "Wrong dynamic input for task: %s. Dict type is expected. " + "Actual type: %s. Actual value: %s" % + (self.task_spec.get_name(), type(input_dict), str(input_dict)) + ) return utils.merge_dicts( input_dict, @@ -435,12 +431,28 @@ class RegularTask(Task): overwrite=False ) + def _evaluate_expression(self, expression, ctx=None): + ctx = ctx or self.ctx + ctx_view = data_flow.ContextView( + ctx, + self.wf_ex.context, + self.wf_ex.input + ) + input_dict = expr.evaluate_recursively( + expression, + ctx_view + ) + return input_dict + def _build_action(self): action_name = self.task_spec.get_action_name() wf_name = self.task_spec.get_workflow_name() if wf_name: - return actions.WorkflowAction(wf_name, task_ex=self.task_ex) + return actions.WorkflowAction( + wf_name=self._evaluate_expression(wf_name), + task_ex=self.task_ex + ) if not action_name: action_name = 'std.noop' diff --git a/mistral/expressions/jinja_expression.py b/mistral/expressions/jinja_expression.py index 20113b6b1..15f3d1d5d 100644 --- a/mistral/expressions/jinja_expression.py +++ b/mistral/expressions/jinja_expression.py @@ -27,6 +27,8 @@ from mistral.utils import expression_utils LOG = logging.getLogger(__name__) +ANY_JINJA_REGEXP = "{{.*}}|{%.*%}" + JINJA_REGEXP = '({{(.*)}})' JINJA_BLOCK_REGEXP = '({%(.*)%})' diff --git a/mistral/lang/base.py b/mistral/lang/base.py index 71a7cd290..4bd4bf49b 100644 --- a/mistral/lang/base.py +++ b/mistral/lang/base.py @@ -21,11 +21,19 @@ import six from mistral import exceptions as exc from mistral import expressions as expr +from mistral.expressions.jinja_expression import ANY_JINJA_REGEXP +from mistral.expressions.yaql_expression import INLINE_YAQL_REGEXP from mistral.lang import types from mistral import utils - -CMD_PTRN = re.compile("^[\w\.]+[^=\(\s\"]*") +ACTION_PATTRENS = { + "command": "[\w\.]+[^=\(\s\"]*", + "yaql_expression": INLINE_YAQL_REGEXP, + "jinja_expression": ANY_JINJA_REGEXP, +} +CMD_PTRN = re.compile( + "^({})".format("|".join(six.itervalues(ACTION_PATTRENS))) +) EXPRESSION = '|'.join([expr.patterns[name] for name in expr.patterns]) _ALL_IN_BRACKETS = "\[.*\]\s*" diff --git a/mistral/lang/v2/tasks.py b/mistral/lang/v2/tasks.py index 258aab069..00b226d85 100644 --- a/mistral/lang/v2/tasks.py +++ b/mistral/lang/v2/tasks.py @@ -50,7 +50,12 @@ class TaskSpec(base.BaseSpec): "type": types.WORKFLOW_TYPE, "action": types.NONEMPTY_STRING, "workflow": types.NONEMPTY_STRING, - "input": types.NONEMPTY_DICT, + "input": { + "oneOf": [ + types.NONEMPTY_DICT, + types.NONEMPTY_STRING + ] + }, "with-items": { "oneOf": [ types.NONEMPTY_STRING, diff --git a/mistral/tests/unit/engine/test_subworkflows.py b/mistral/tests/unit/engine/test_subworkflows.py index 99b31aca3..38691b79e 100644 --- a/mistral/tests/unit/engine/test_subworkflows.py +++ b/mistral/tests/unit/engine/test_subworkflows.py @@ -96,6 +96,100 @@ workflows: action: std.noop """ +WB3 = """ +--- +version: '2.0' + +name: wb3 + +workflows: + wf1: + input: + - wf_name + output: + sub_wf_out: <% $.sub_wf_out %> + + tasks: + task1: + workflow: <% $.wf_name %> + publish: + sub_wf_out: <% task(task1).result.sub_wf_out %> + + wf2: + output: + sub_wf_out: wf2_out + + tasks: + task1: + action: std.noop +""" + +WB4 = """ +--- +version: '2.0' + +name: wb4 + +workflows: + wf1: + input: + - wf_name + - inp + output: + sub_wf_out: <% $.sub_wf_out %> + + tasks: + task1: + workflow: <% $.wf_name %> + input: <% $.inp %> + publish: + sub_wf_out: <% task(task1).result.sub_wf_out %> + + wf2: + input: + - inp + output: + sub_wf_out: <% $.inp %> + + tasks: + task1: + action: std.noop + +""" + +WB5 = """ +--- +version: '2.0' + +name: wb5 + +workflows: + wf1: + input: + - wf_name + - inp + output: + sub_wf_out: '{{ _.sub_wf_out }}' + + tasks: + task1: + workflow: '{{ _.wf_name }}' + input: '{{ _.inp }}' + publish: + sub_wf_out: '{{ task("task1").result.sub_wf_out }}' + + wf2: + input: + - inp + output: + sub_wf_out: '{{ _.inp }}' + + tasks: + task1: + action: std.noop + +""" + class SubworkflowsTest(base.EngineTestCase): def setUp(self): @@ -103,6 +197,9 @@ class SubworkflowsTest(base.EngineTestCase): wb_service.create_workbook_v2(WB1) wb_service.create_workbook_v2(WB2) + wb_service.create_workbook_v2(WB3) + wb_service.create_workbook_v2(WB4) + wb_service.create_workbook_v2(WB5) def test_subworkflow_success(self): wf2_ex = self.engine.start_workflow('wb1.wf2', '', None) @@ -261,3 +358,54 @@ class SubworkflowsTest(base.EngineTestCase): # Wait till workflow 'wf2' is completed. self.await_workflow_success(wf2_ex.id) + + def test_dynamic_subworkflow_wf2(self): + ex = self.engine.start_workflow( + wf_identifier='wb3.wf1', + wf_input={'wf_name': 'wf2'} + ) + + self.await_workflow_success(ex.id) + + with db_api.transaction(): + ex = db_api.get_workflow_execution(ex.id) + self.assertEqual({'sub_wf_out': 'wf2_out'}, ex.output) + + def test_dynamic_subworkflow_call_failure(self): + ex = self.engine.start_workflow( + wf_identifier='wb3.wf1', + wf_input={'wf_name': 'not_existing_wf'} + ) + + self.await_workflow_error(ex.id) + + with db_api.transaction(): + ex = db_api.get_workflow_execution(ex.id) + self.assertIn('not_existing_wf', ex.state_info) + + def test_dynamic_subworkflow_with_generic_input(self): + self._test_dynamic_workflow_with_dict_param('wb4.wf1') + + def test_dynamic_subworkflow_with_jinja(self): + self._test_dynamic_workflow_with_dict_param('wb5.wf1') + + def test_string_workflow_input_failure(self): + ex = self.engine.start_workflow( + wf_identifier='wb4.wf1', + wf_input={'wf_name': 'wf2', 'inp': 'invalid_string_input'} + ) + self.await_workflow_error(ex.id) + + with db_api.transaction(): + ex = db_api.get_workflow_execution(ex.id) + self.assertIn('invalid_string_input', ex.state_info) + + def _test_dynamic_workflow_with_dict_param(self, wf_identifier): + ex = self.engine.start_workflow( + wf_identifier=wf_identifier, + wf_input={'wf_name': 'wf2', 'inp': {'inp': 'abc'}} + ) + self.await_workflow_success(ex.id) + with db_api.transaction(): + ex = db_api.get_workflow_execution(ex.id) + self.assertEqual({'sub_wf_out': 'abc'}, ex.output)