Fully override default json values with user input
Change-Id: I721cca867c8651695ac1a2f34dc6a1c0c90fc8a1 Closes-Bug: #1664274
This commit is contained in:
parent
d9c2d636a9
commit
bea409c88d
@ -53,10 +53,9 @@ def validate_input(definition, input_dict, spec=None):
|
||||
|
||||
msg += ']'
|
||||
|
||||
raise exc.InputException(
|
||||
msg % tuple(msg_props)
|
||||
)
|
||||
raise exc.InputException(msg % tuple(msg_props))
|
||||
else:
|
||||
# TODO(rakhmerov): See the comment above. This is ugly.
|
||||
utils.merge_dicts(input_dict, spec_input, overwrite=False)
|
||||
|
||||
|
||||
|
@ -14,6 +14,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import abc
|
||||
import copy
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from osprofiler import profiler
|
||||
@ -23,7 +24,6 @@ from mistral.db.v2 import api as db_api
|
||||
from mistral.db.v2.sqlalchemy import models as db_models
|
||||
from mistral.engine import dispatcher
|
||||
from mistral.engine.rpc_backend import rpc
|
||||
from mistral.engine import utils as eng_utils
|
||||
from mistral import exceptions as exc
|
||||
from mistral.lang import parser as spec_parser
|
||||
from mistral.services import scheduler
|
||||
@ -85,23 +85,27 @@ class Workflow(object):
|
||||
|
||||
wf_trace.info(
|
||||
self.wf_ex,
|
||||
"Starting workflow [name=%s, input=%s]" %
|
||||
'Starting workflow [name=%s, input=%s]' %
|
||||
(wf_def.name, utils.cut(input_dict))
|
||||
)
|
||||
|
||||
# TODO(rakhmerov): This call implicitly changes input_dict! Fix it!
|
||||
# After fix we need to move validation after adding risky fields.
|
||||
eng_utils.validate_input(wf_def, input_dict, self.wf_spec)
|
||||
self.validate_input(input_dict)
|
||||
|
||||
self._create_execution(wf_def, input_dict, desc, params)
|
||||
self._create_execution(
|
||||
wf_def,
|
||||
self.prepare_input(input_dict),
|
||||
desc,
|
||||
params
|
||||
)
|
||||
|
||||
self.set_state(states.RUNNING)
|
||||
|
||||
wf_ctrl = wf_base.get_controller(self.wf_ex, self.wf_spec)
|
||||
|
||||
cmds = wf_ctrl.continue_workflow()
|
||||
|
||||
dispatcher.dispatch_workflow_commands(self.wf_ex, cmds)
|
||||
dispatcher.dispatch_workflow_commands(
|
||||
self.wf_ex,
|
||||
wf_ctrl.continue_workflow()
|
||||
)
|
||||
|
||||
def stop(self, state, msg=None):
|
||||
"""Stop workflow.
|
||||
@ -139,6 +143,43 @@ class Workflow(object):
|
||||
|
||||
self._continue_workflow(cmds)
|
||||
|
||||
def prepare_input(self, input_dict):
|
||||
for k, v in self.wf_spec.get_input().items():
|
||||
if k not in input_dict or input_dict[k] is utils.NotDefined:
|
||||
input_dict[k] = v
|
||||
|
||||
return input_dict
|
||||
|
||||
def validate_input(self, input_dict):
|
||||
input_param_names = copy.deepcopy(list((input_dict or {}).keys()))
|
||||
missing_param_names = []
|
||||
|
||||
for p_name, p_value in self.wf_spec.get_input().items():
|
||||
if p_value is utils.NotDefined and p_name not in input_param_names:
|
||||
missing_param_names.append(str(p_name))
|
||||
|
||||
if p_name in input_param_names:
|
||||
input_param_names.remove(p_name)
|
||||
|
||||
if missing_param_names or input_param_names:
|
||||
msg = 'Invalid input [name=%s, class=%s'
|
||||
msg_props = [
|
||||
self.wf_spec.get_name(),
|
||||
self.wf_spec.__class__.__name__
|
||||
]
|
||||
|
||||
if missing_param_names:
|
||||
msg += ', missing=%s'
|
||||
msg_props.append(missing_param_names)
|
||||
|
||||
if input_param_names:
|
||||
msg += ', unexpected=%s'
|
||||
msg_props.append(input_param_names)
|
||||
|
||||
msg += ']'
|
||||
|
||||
raise exc.InputException(msg % tuple(msg_props))
|
||||
|
||||
def rerun(self, task_ex, reset=True, env=None):
|
||||
"""Rerun workflow from the given task.
|
||||
|
||||
|
@ -25,6 +25,7 @@ from oslo_log import log as logging
|
||||
from oslotest import base
|
||||
import testtools.matchers as ttm
|
||||
|
||||
from mistral import config
|
||||
from mistral import context as auth_context
|
||||
from mistral.db.sqlalchemy import base as db_sa_base
|
||||
from mistral.db.sqlalchemy import sqlite_lock
|
||||
@ -240,8 +241,14 @@ class DbTestCase(BaseTest):
|
||||
if cfg.CONF.database.connection.startswith('sqlite'):
|
||||
cfg.CONF.set_default('connection', 'sqlite://', group='database')
|
||||
|
||||
cfg.CONF.set_default("openstack_actions_mapping_path",
|
||||
"tests/resources/openstack/test_mapping.json")
|
||||
# This option is normally registered in sync_db.py so we have to
|
||||
# register it here specifically for tests.
|
||||
cfg.CONF.register_opt(config.os_actions_mapping_path)
|
||||
|
||||
cfg.CONF.set_default(
|
||||
'openstack_actions_mapping_path',
|
||||
'tests/resources/openstack/test_mapping.json'
|
||||
)
|
||||
cfg.CONF.set_default('max_overflow', -1, group='database')
|
||||
cfg.CONF.set_default('max_pool_size', 1000, group='database')
|
||||
|
||||
|
@ -674,6 +674,45 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
|
||||
|
||||
self.assertIn('task(task1).result.message', task1.state_info)
|
||||
|
||||
def test_override_json_input(self):
|
||||
wf_text = """---
|
||||
version: 2.0
|
||||
|
||||
wf:
|
||||
input:
|
||||
- a:
|
||||
aa: aa
|
||||
bb: bb
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
action: std.noop
|
||||
publish:
|
||||
published_a: <% $.a %>
|
||||
"""
|
||||
|
||||
wf_service.create_workflows(wf_text)
|
||||
|
||||
wf_input = {
|
||||
'a': {
|
||||
'cc': 'cc',
|
||||
'dd': 'dd'
|
||||
}
|
||||
}
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow('wf', wf_input)
|
||||
|
||||
self.await_workflow_success(wf_ex.id)
|
||||
|
||||
with db_api.transaction():
|
||||
# Note: We need to reread execution to access related tasks.
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
task1 = wf_ex.task_executions[0]
|
||||
|
||||
self.assertDictEqual(wf_input['a'], task1.published['published_a'])
|
||||
|
||||
|
||||
class DataFlowTest(test_base.BaseTest):
|
||||
def test_get_task_execution_result(self):
|
||||
|
@ -332,12 +332,19 @@ def random_sleep(limit=1):
|
||||
|
||||
|
||||
class NotDefined(object):
|
||||
"""This class is just a marker of input params without value."""
|
||||
"""Marker of an empty value.
|
||||
|
||||
In a number of cases None can't be used to express the semantics of
|
||||
a not defined value because None is just a normal value rather than
|
||||
a value set to denote that it's not defined. This class can be used
|
||||
in such cases instead of None.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
def get_dict_from_string(input_string, delimiter=','):
|
||||
# TODO(rakhmerov): Why is it here? This module is too generic.
|
||||
if not input_string:
|
||||
return {}
|
||||
|
||||
@ -364,12 +371,16 @@ def get_dict_from_string(input_string, delimiter=','):
|
||||
|
||||
|
||||
def get_input_dict(inputs):
|
||||
# TODO(rakhmerov): Why is it here? This module is too generic.
|
||||
# TODO(rakhmerov): Move it to the spec.
|
||||
|
||||
"""Transform input list to dictionary.
|
||||
|
||||
Ensure every input param has a default value(it will be a NotDefined
|
||||
object if it's not provided).
|
||||
"""
|
||||
input_dict = {}
|
||||
|
||||
for x in inputs:
|
||||
if isinstance(x, dict):
|
||||
input_dict.update(x)
|
||||
|
Loading…
Reference in New Issue
Block a user