Adding workflow parameters validation
Change-Id: I55b4a03296ddbd1468c2a7dd1cc7b7e2d867351d
This commit is contained in:
parent
74ebdf1122
commit
7eb6a0f66d
@ -51,6 +51,8 @@ class DefaultEngine(base.Engine):
|
|||||||
|
|
||||||
wf_spec = spec_parser.get_workflow_spec(wf_db.spec)
|
wf_spec = spec_parser.get_workflow_spec(wf_db.spec)
|
||||||
|
|
||||||
|
utils.validate_workflow_input(wf_db, wf_spec, workflow_input)
|
||||||
|
|
||||||
exec_db = self._create_db_execution(
|
exec_db = self._create_db_execution(
|
||||||
wf_db,
|
wf_db,
|
||||||
wf_spec,
|
wf_spec,
|
||||||
@ -69,7 +71,6 @@ class DefaultEngine(base.Engine):
|
|||||||
|
|
||||||
@u.log_exec(LOG)
|
@u.log_exec(LOG)
|
||||||
def on_task_result(self, task_id, raw_result):
|
def on_task_result(self, task_id, raw_result):
|
||||||
|
|
||||||
with db_api.transaction():
|
with db_api.transaction():
|
||||||
task_db = db_api.get_task(task_id)
|
task_db = db_api.get_task(task_id)
|
||||||
exec_db = db_api.get_execution(task_db.execution_id)
|
exec_db = db_api.get_execution(task_db.execution_id)
|
||||||
|
@ -12,6 +12,8 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
import copy
|
||||||
|
|
||||||
from mistral.db.v2 import api as db_api
|
from mistral.db.v2 import api as db_api
|
||||||
from mistral import exceptions as exc
|
from mistral import exceptions as exc
|
||||||
from mistral import expressions as expr
|
from mistral import expressions as expr
|
||||||
@ -22,6 +24,35 @@ from mistral.workflow import utils as wf_utils
|
|||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def validate_workflow_input(wf_db, wf_spec, wf_input):
|
||||||
|
input_param_names = copy.copy((wf_input or {}).keys())
|
||||||
|
missing_param_names = []
|
||||||
|
|
||||||
|
for p_name in wf_spec.get_parameters():
|
||||||
|
if p_name not in input_param_names:
|
||||||
|
missing_param_names.append(p_name)
|
||||||
|
else:
|
||||||
|
input_param_names.remove(p_name)
|
||||||
|
|
||||||
|
if missing_param_names or input_param_names:
|
||||||
|
msg = 'Invalid workflow input [workflow=%s'
|
||||||
|
msg_props = [wf_db.name]
|
||||||
|
|
||||||
|
if missing_param_names:
|
||||||
|
msg += ', missing=%s'
|
||||||
|
msg_props.append(missing_param_names)
|
||||||
|
|
||||||
|
if input_param_names:
|
||||||
|
msg += ', unexpected=%s'
|
||||||
|
msg_props.append(input_param_names)
|
||||||
|
|
||||||
|
msg += ']'
|
||||||
|
|
||||||
|
raise exc.WorkflowInputException(
|
||||||
|
msg % tuple(msg_props)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def resolve_action(wf_name, wf_spec_name, action_spec_name):
|
def resolve_action(wf_name, wf_spec_name, action_spec_name):
|
||||||
action_db = None
|
action_db = None
|
||||||
|
|
||||||
|
@ -85,6 +85,10 @@ class WorkflowException(MistralException):
|
|||||||
http_code = 400
|
http_code = 400
|
||||||
|
|
||||||
|
|
||||||
|
class WorkflowInputException(MistralException):
|
||||||
|
http_code = 400
|
||||||
|
|
||||||
|
|
||||||
class ApplicationContextNotFoundException(MistralException):
|
class ApplicationContextNotFoundException(MistralException):
|
||||||
http_code = 400
|
http_code = 400
|
||||||
message = "Application context not found"
|
message = "Application context not found"
|
||||||
|
@ -19,6 +19,7 @@ from oslo.config import cfg
|
|||||||
from mistral.db.v2 import api as db_api
|
from mistral.db.v2 import api as db_api
|
||||||
from mistral.db.v2.sqlalchemy import models
|
from mistral.db.v2.sqlalchemy import models
|
||||||
from mistral.engine1 import default_engine as d_eng
|
from mistral.engine1 import default_engine as d_eng
|
||||||
|
from mistral import exceptions as exc
|
||||||
from mistral.openstack.common import log as logging
|
from mistral.openstack.common import log as logging
|
||||||
from mistral.services import workbooks as wb_service
|
from mistral.services import workbooks as wb_service
|
||||||
from mistral.tests import base
|
from mistral.tests import base
|
||||||
@ -39,6 +40,10 @@ version: '2.0'
|
|||||||
workflows:
|
workflows:
|
||||||
wf1:
|
wf1:
|
||||||
type: reverse
|
type: reverse
|
||||||
|
parameters:
|
||||||
|
- param1
|
||||||
|
- param2
|
||||||
|
|
||||||
tasks:
|
tasks:
|
||||||
task1:
|
task1:
|
||||||
action: std.echo output="{$.param1}"
|
action: std.echo output="{$.param1}"
|
||||||
@ -48,6 +53,7 @@ workflows:
|
|||||||
task2:
|
task2:
|
||||||
action: std.echo output="{$.param2}"
|
action: std.echo output="{$.param2}"
|
||||||
requires: [task1]
|
requires: [task1]
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# TODO(rakhmerov): Add more advanced tests including various capabilities.
|
# TODO(rakhmerov): Add more advanced tests including various capabilities.
|
||||||
@ -105,6 +111,24 @@ class DefaultEngineTest(base.DbTestCase):
|
|||||||
self.assertIn('__execution', task_db.in_context)
|
self.assertIn('__execution', task_db.in_context)
|
||||||
self.assertDictEqual({'output': 'Hey'}, task_db.parameters)
|
self.assertDictEqual({'output': 'Hey'}, task_db.parameters)
|
||||||
|
|
||||||
|
def test_start_workflow_missing_parameters(self):
|
||||||
|
self.assertRaises(
|
||||||
|
exc.WorkflowInputException,
|
||||||
|
self.engine.start_workflow,
|
||||||
|
'%s.wf1' % self.wb_name,
|
||||||
|
None,
|
||||||
|
task_name='task2'
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_start_workflow_unexpected_parameters(self):
|
||||||
|
self.assertRaises(
|
||||||
|
exc.WorkflowInputException,
|
||||||
|
self.engine.start_workflow,
|
||||||
|
'%s.wf1' % self.wb_name,
|
||||||
|
{'param1': 'Hey', 'param2': 'Hi', 'unexpected_param': 'val'},
|
||||||
|
task_name='task2'
|
||||||
|
)
|
||||||
|
|
||||||
def test_on_task_result(self):
|
def test_on_task_result(self):
|
||||||
wf_input = {
|
wf_input = {
|
||||||
'param1': 'Hey',
|
'param1': 'Hey',
|
||||||
|
Loading…
Reference in New Issue
Block a user