From bea409c88d7cf19b59bb46789d3281c0ee932b07 Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Mon, 13 Mar 2017 13:49:08 +0700 Subject: [PATCH] Fully override default json values with user input Change-Id: I721cca867c8651695ac1a2f34dc6a1c0c90fc8a1 Closes-Bug: #1664274 --- mistral/engine/utils.py | 5 +- mistral/engine/workflows.py | 59 ++++++++++++++++++---- mistral/tests/unit/base.py | 11 +++- mistral/tests/unit/engine/test_dataflow.py | 39 ++++++++++++++ mistral/utils/__init__.py | 13 ++++- 5 files changed, 112 insertions(+), 15 deletions(-) diff --git a/mistral/engine/utils.py b/mistral/engine/utils.py index df19ba01..4910132c 100644 --- a/mistral/engine/utils.py +++ b/mistral/engine/utils.py @@ -53,10 +53,9 @@ def validate_input(definition, input_dict, spec=None): msg += ']' - raise exc.InputException( - msg % tuple(msg_props) - ) + raise exc.InputException(msg % tuple(msg_props)) else: + # TODO(rakhmerov): See the comment above. This is ugly. utils.merge_dicts(input_dict, spec_input, overwrite=False) diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index ee58d891..9586be40 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -14,6 +14,7 @@ # limitations under the License. import abc +import copy from oslo_config import cfg from oslo_log import log as logging from osprofiler import profiler @@ -23,7 +24,6 @@ from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models as db_models from mistral.engine import dispatcher from mistral.engine.rpc_backend import rpc -from mistral.engine import utils as eng_utils from mistral import exceptions as exc from mistral.lang import parser as spec_parser from mistral.services import scheduler @@ -85,23 +85,27 @@ class Workflow(object): wf_trace.info( self.wf_ex, - "Starting workflow [name=%s, input=%s]" % + 'Starting workflow [name=%s, input=%s]' % (wf_def.name, utils.cut(input_dict)) ) - # TODO(rakhmerov): This call implicitly changes input_dict! Fix it! - # After fix we need to move validation after adding risky fields. - eng_utils.validate_input(wf_def, input_dict, self.wf_spec) + self.validate_input(input_dict) - self._create_execution(wf_def, input_dict, desc, params) + self._create_execution( + wf_def, + self.prepare_input(input_dict), + desc, + params + ) self.set_state(states.RUNNING) wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec) - cmds = wf_ctrl.continue_workflow() - - dispatcher.dispatch_workflow_commands(self.wf_ex, cmds) + dispatcher.dispatch_workflow_commands( + self.wf_ex, + wf_ctrl.continue_workflow() + ) def stop(self, state, msg=None): """Stop workflow. @@ -139,6 +143,43 @@ class Workflow(object): self._continue_workflow(cmds) + def prepare_input(self, input_dict): + for k, v in self.wf_spec.get_input().items(): + if k not in input_dict or input_dict[k] is utils.NotDefined: + input_dict[k] = v + + return input_dict + + def validate_input(self, input_dict): + input_param_names = copy.deepcopy(list((input_dict or {}).keys())) + missing_param_names = [] + + for p_name, p_value in self.wf_spec.get_input().items(): + if p_value is utils.NotDefined and p_name not in input_param_names: + missing_param_names.append(str(p_name)) + + if p_name in input_param_names: + input_param_names.remove(p_name) + + if missing_param_names or input_param_names: + msg = 'Invalid input [name=%s, class=%s' + msg_props = [ + self.wf_spec.get_name(), + self.wf_spec.__class__.__name__ + ] + + if missing_param_names: + msg += ', missing=%s' + msg_props.append(missing_param_names) + + if input_param_names: + msg += ', unexpected=%s' + msg_props.append(input_param_names) + + msg += ']' + + raise exc.InputException(msg % tuple(msg_props)) + def rerun(self, task_ex, reset=True, env=None): """Rerun workflow from the given task. diff --git a/mistral/tests/unit/base.py b/mistral/tests/unit/base.py index 50af4796..12a84782 100644 --- a/mistral/tests/unit/base.py +++ b/mistral/tests/unit/base.py @@ -25,6 +25,7 @@ from oslo_log import log as logging from oslotest import base import testtools.matchers as ttm +from mistral import config from mistral import context as auth_context from mistral.db.sqlalchemy import base as db_sa_base from mistral.db.sqlalchemy import sqlite_lock @@ -240,8 +241,14 @@ class DbTestCase(BaseTest): if cfg.CONF.database.connection.startswith('sqlite'): cfg.CONF.set_default('connection', 'sqlite://', group='database') - cfg.CONF.set_default("openstack_actions_mapping_path", - "tests/resources/openstack/test_mapping.json") + # This option is normally registered in sync_db.py so we have to + # register it here specifically for tests. + cfg.CONF.register_opt(config.os_actions_mapping_path) + + cfg.CONF.set_default( + 'openstack_actions_mapping_path', + 'tests/resources/openstack/test_mapping.json' + ) cfg.CONF.set_default('max_overflow', -1, group='database') cfg.CONF.set_default('max_pool_size', 1000, group='database') diff --git a/mistral/tests/unit/engine/test_dataflow.py b/mistral/tests/unit/engine/test_dataflow.py index 86c24bfa..abac2303 100644 --- a/mistral/tests/unit/engine/test_dataflow.py +++ b/mistral/tests/unit/engine/test_dataflow.py @@ -674,6 +674,45 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase): self.assertIn('task(task1).result.message', task1.state_info) + def test_override_json_input(self): + wf_text = """--- + version: 2.0 + + wf: + input: + - a: + aa: aa + bb: bb + + tasks: + task1: + action: std.noop + publish: + published_a: <% $.a %> + """ + + wf_service.create_workflows(wf_text) + + wf_input = { + 'a': { + 'cc': 'cc', + 'dd': 'dd' + } + } + + # Start workflow. + wf_ex = self.engine.start_workflow('wf', wf_input) + + self.await_workflow_success(wf_ex.id) + + with db_api.transaction(): + # Note: We need to reread execution to access related tasks. + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + task1 = wf_ex.task_executions[0] + + self.assertDictEqual(wf_input['a'], task1.published['published_a']) + class DataFlowTest(test_base.BaseTest): def test_get_task_execution_result(self): diff --git a/mistral/utils/__init__.py b/mistral/utils/__init__.py index 2f66f771..ad51fd6f 100644 --- a/mistral/utils/__init__.py +++ b/mistral/utils/__init__.py @@ -332,12 +332,19 @@ def random_sleep(limit=1): class NotDefined(object): - """This class is just a marker of input params without value.""" + """Marker of an empty value. + + In a number of cases None can't be used to express the semantics of + a not defined value because None is just a normal value rather than + a value set to denote that it's not defined. This class can be used + in such cases instead of None. + """ pass def get_dict_from_string(input_string, delimiter=','): + # TODO(rakhmerov): Why is it here? This module is too generic. if not input_string: return {} @@ -364,12 +371,16 @@ def get_dict_from_string(input_string, delimiter=','): def get_input_dict(inputs): + # TODO(rakhmerov): Why is it here? This module is too generic. + # TODO(rakhmerov): Move it to the spec. + """Transform input list to dictionary. Ensure every input param has a default value(it will be a NotDefined object if it's not provided). """ input_dict = {} + for x in inputs: if isinstance(x, dict): input_dict.update(x)