Advanced publishing: change workflow lang schema
* 'on-success', 'on-error' and 'on-complete' can now look like described in "Advanced Publishing" specification [1] * Refactored all places related to the spec changes * Added unit tests for advanced schema of 'on-xxx' clauses [1] https://github.com/openstack/mistral-specs/blob/master/specs/pike/approved/advanced_publishing.rst Change-Id: I190fcec0a40ca6f97d712168f4be7a418bd1f0e8 Partially implements: blueprint mistral-advanced-publishing-global-vars
This commit is contained in:
parent
a772530af2
commit
6c6e212688
@ -102,7 +102,8 @@ class BaseSpec(object):
|
||||
It represents a DSL entity such as workflow or task as a python object
|
||||
providing more convenient API to analyse DSL than just working with raw
|
||||
data in form of a dictionary. Specification classes also implement
|
||||
all required validation logic by overriding instance method 'validate()'.
|
||||
all required validation logic by overriding instance methods
|
||||
'validate_schema()' and 'validate_semantics()'.
|
||||
|
||||
Note that the specification mechanism allows to have polymorphic entities
|
||||
in DSL. For example, if we find it more convenient to have separate
|
||||
@ -197,7 +198,7 @@ class BaseSpec(object):
|
||||
def validate_expr(self, dsl_part):
|
||||
if isinstance(dsl_part, six.string_types):
|
||||
expr.validate(dsl_part)
|
||||
elif isinstance(dsl_part, list):
|
||||
elif isinstance(dsl_part, (list, tuple)):
|
||||
for expression in dsl_part:
|
||||
if isinstance(expression, six.string_types):
|
||||
expr.validate(expression)
|
||||
@ -250,21 +251,6 @@ class BaseSpec(object):
|
||||
elif isinstance(prop_val, six.string_types):
|
||||
return {prop_val: ''}
|
||||
|
||||
def _as_list_of_tuples(self, prop_name):
|
||||
prop_val = self._data.get(prop_name)
|
||||
|
||||
if not prop_val:
|
||||
return []
|
||||
|
||||
if isinstance(prop_val, six.string_types):
|
||||
return [self._as_tuple(prop_val)]
|
||||
|
||||
return [self._as_tuple(item) for item in prop_val]
|
||||
|
||||
@staticmethod
|
||||
def _as_tuple(val):
|
||||
return list(val.items())[0] if isinstance(val, dict) else (val, '')
|
||||
|
||||
@staticmethod
|
||||
def _parse_cmd_and_input(cmd_str):
|
||||
# TODO(rakhmerov): Try to find a way with one expression.
|
||||
|
87
mistral/lang/v2/on_clause.py
Normal file
87
mistral/lang/v2/on_clause.py
Normal file
@ -0,0 +1,87 @@
|
||||
# Copyright 2014 - Mirantis, Inc.
|
||||
# Copyright 2015 - StackStorm, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import six
|
||||
|
||||
from mistral.lang import types
|
||||
from mistral.lang.v2 import base
|
||||
from mistral.lang.v2 import publish
|
||||
|
||||
|
||||
class OnClauseSpec(base.BaseSpec):
|
||||
_simple_schema = {
|
||||
"oneOf": [
|
||||
types.NONEMPTY_STRING,
|
||||
types.UNIQUE_STRING_OR_EXPRESSION_CONDITION_LIST
|
||||
]
|
||||
}
|
||||
|
||||
_advanced_schema = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"publish": publish.PublishSpec.get_schema(includes=None),
|
||||
"next": _simple_schema,
|
||||
},
|
||||
"additionalProperties": False
|
||||
}
|
||||
|
||||
_schema = {"oneOf": [_simple_schema, _advanced_schema]}
|
||||
|
||||
def __init__(self, data):
|
||||
super(OnClauseSpec, self).__init__(data)
|
||||
|
||||
if not isinstance(data, dict):
|
||||
# Old simple schema.
|
||||
self._publish = None
|
||||
self._next = prepare_next_clause(data)
|
||||
else:
|
||||
# New advanced schema.
|
||||
self._publish = self._spec_property('publish', publish.PublishSpec)
|
||||
self._next = prepare_next_clause(data.get('next'))
|
||||
|
||||
@classmethod
|
||||
def get_schema(cls, includes=['definitions']):
|
||||
return super(OnClauseSpec, cls).get_schema(includes)
|
||||
|
||||
def get_publish(self):
|
||||
return self._publish
|
||||
|
||||
def get_next(self):
|
||||
return self._next
|
||||
|
||||
|
||||
def _as_list_of_tuples(data):
|
||||
if not data:
|
||||
return []
|
||||
|
||||
if isinstance(data, six.string_types):
|
||||
return [_as_tuple(data)]
|
||||
|
||||
return [_as_tuple(item) for item in data]
|
||||
|
||||
|
||||
def _as_tuple(val):
|
||||
return list(val.items())[0] if isinstance(val, dict) else (val, '')
|
||||
|
||||
|
||||
def prepare_next_clause(next_clause):
|
||||
list_of_tuples = _as_list_of_tuples(next_clause)
|
||||
|
||||
for i, task in enumerate(list_of_tuples):
|
||||
task_name, params = OnClauseSpec._parse_cmd_and_input(task[0])
|
||||
|
||||
list_of_tuples[i] = (task_name, task[1], params)
|
||||
|
||||
return list_of_tuples
|
61
mistral/lang/v2/publish.py
Normal file
61
mistral/lang/v2/publish.py
Normal file
@ -0,0 +1,61 @@
|
||||
# Copyright 2014 - Mirantis, Inc.
|
||||
# Copyright 2015 - StackStorm, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from mistral import exceptions as exc
|
||||
from mistral.lang import types
|
||||
from mistral.lang.v2 import base
|
||||
|
||||
|
||||
class PublishSpec(base.BaseSpec):
|
||||
_schema = {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"branch": types.NONEMPTY_DICT,
|
||||
"global": types.NONEMPTY_DICT,
|
||||
"atomic": types.NONEMPTY_DICT
|
||||
},
|
||||
"additionalProperties": False
|
||||
}
|
||||
|
||||
def __init__(self, data):
|
||||
super(PublishSpec, self).__init__(data)
|
||||
|
||||
self._branch = self._data.get('branch')
|
||||
self._global = self._data.get('global')
|
||||
self._atomic = self._data.get('atomic')
|
||||
|
||||
@classmethod
|
||||
def get_schema(cls, includes=['definitions']):
|
||||
return super(PublishSpec, cls).get_schema(includes)
|
||||
|
||||
def validate_semantics(self):
|
||||
if not self._branch and not self._global and not self._atomic:
|
||||
raise exc.InvalidModelException(
|
||||
"Either 'branch', 'global' or 'atomic' must be specified: "
|
||||
% self._data
|
||||
)
|
||||
|
||||
self.validate_expr(self._branch)
|
||||
self.validate_expr(self._global)
|
||||
self.validate_expr(self._atomic)
|
||||
|
||||
def get_branch(self):
|
||||
return self._branch
|
||||
|
||||
def get_global(self):
|
||||
return self._global
|
||||
|
||||
def get_atomic(self):
|
||||
return self._atomic
|
@ -56,6 +56,7 @@ class RetrySpec(base.BaseSpec):
|
||||
|
||||
def __init__(self, data):
|
||||
data = self._transform_retry_one_line(data)
|
||||
|
||||
super(RetrySpec, self).__init__(data)
|
||||
|
||||
self._break_on = data.get('break-on')
|
||||
|
@ -17,11 +17,13 @@ import six
|
||||
|
||||
from mistral.lang import types
|
||||
from mistral.lang.v2 import base
|
||||
from mistral.lang.v2 import on_clause
|
||||
from mistral.lang.v2 import policies
|
||||
from mistral.lang.v2 import tasks
|
||||
|
||||
|
||||
direct_wf_ts = tasks.DirectWorkflowTaskSpec
|
||||
# TODO(rakhmerov): This specification should be broken into two separate
|
||||
# specs for direct and reverse workflows. It's weird to combine them into
|
||||
# one because they address different use cases.
|
||||
|
||||
|
||||
class TaskDefaultsSpec(base.BaseSpec):
|
||||
@ -71,30 +73,31 @@ class TaskDefaultsSpec(base.BaseSpec):
|
||||
'pause-before',
|
||||
'concurrency'
|
||||
)
|
||||
self._on_complete = direct_wf_ts.prepare_on_clause(
|
||||
self._as_list_of_tuples('on-complete')
|
||||
)
|
||||
self._on_success = direct_wf_ts.prepare_on_clause(
|
||||
self._as_list_of_tuples('on-success')
|
||||
)
|
||||
self._on_error = direct_wf_ts.prepare_on_clause(
|
||||
self._as_list_of_tuples('on-error')
|
||||
)
|
||||
|
||||
on_spec_cls = on_clause.OnClauseSpec
|
||||
|
||||
self._on_complete = self._spec_property('on-complete', on_spec_cls)
|
||||
self._on_success = self._spec_property('on-success', on_spec_cls)
|
||||
self._on_error = self._spec_property('on-error', on_spec_cls)
|
||||
|
||||
# TODO(rakhmerov): 'requires' should reside in a different spec for
|
||||
# reverse workflows.
|
||||
self._requires = data.get('requires', [])
|
||||
|
||||
def validate_schema(self):
|
||||
super(TaskDefaultsSpec, self).validate_schema()
|
||||
|
||||
def validate_semantics(self):
|
||||
# Validate YAQL expressions.
|
||||
self._validate_transitions('on-complete')
|
||||
self._validate_transitions('on-success')
|
||||
self._validate_transitions('on-error')
|
||||
self._validate_transitions(self._on_complete)
|
||||
self._validate_transitions(self._on_success)
|
||||
self._validate_transitions(self._on_error)
|
||||
|
||||
def _validate_transitions(self, on_clause):
|
||||
val = self._data.get(on_clause, [])
|
||||
def _validate_transitions(self, on_clause_spec):
|
||||
val = on_clause_spec.get_next() if on_clause_spec else []
|
||||
|
||||
if not val:
|
||||
return
|
||||
|
||||
[self.validate_expr(t)
|
||||
for t in ([val] if isinstance(val, six.string_types) else val)]
|
||||
for t in ([val] if isinstance(val, six.string_types) else val)]
|
||||
|
||||
def get_policies(self):
|
||||
return self._policies
|
||||
|
@ -22,8 +22,11 @@ from mistral import exceptions as exc
|
||||
from mistral import expressions
|
||||
from mistral.lang import types
|
||||
from mistral.lang.v2 import base
|
||||
from mistral.lang.v2 import on_clause
|
||||
from mistral.lang.v2 import policies
|
||||
from mistral.lang.v2 import publish
|
||||
from mistral import utils
|
||||
from mistral.workflow import states
|
||||
|
||||
_expr_ptrns = [expressions.patterns[name] for name in expressions.patterns]
|
||||
WITH_ITEMS_PTRN = re.compile(
|
||||
@ -141,8 +144,9 @@ class TaskSpec(base.BaseSpec):
|
||||
|
||||
for item in raw:
|
||||
if not isinstance(item, six.string_types):
|
||||
raise exc.InvalidModelException("'with-items' elements should"
|
||||
" be strings: %s" % self._data)
|
||||
raise exc.InvalidModelException(
|
||||
"'with-items' elements should be strings: %s" % self._data
|
||||
)
|
||||
|
||||
match = re.match(WITH_ITEMS_PTRN, item)
|
||||
|
||||
@ -209,11 +213,17 @@ class TaskSpec(base.BaseSpec):
|
||||
def get_target(self):
|
||||
return self._target
|
||||
|
||||
def get_publish(self):
|
||||
return self._publish
|
||||
def get_publish(self, state):
|
||||
spec = None
|
||||
|
||||
def get_publish_on_error(self):
|
||||
return self._publish_on_error
|
||||
if state == states.SUCCESS and self._publish:
|
||||
spec = publish.PublishSpec({'branch': self._publish})
|
||||
elif state == states.ERROR and self._publish_on_error:
|
||||
spec = publish.PublishSpec(
|
||||
{'branch': self._publish_on_error}
|
||||
)
|
||||
|
||||
return spec
|
||||
|
||||
def get_keep_result(self):
|
||||
return self._keep_result
|
||||
@ -222,20 +232,14 @@ class TaskSpec(base.BaseSpec):
|
||||
return self._safe_rerun
|
||||
|
||||
def get_type(self):
|
||||
if self._workflow:
|
||||
return utils.WORKFLOW_TASK_TYPE
|
||||
return utils.ACTION_TASK_TYPE
|
||||
return (utils.WORKFLOW_TASK_TYPE if self._workflow
|
||||
else utils.ACTION_TASK_TYPE)
|
||||
|
||||
|
||||
class DirectWorkflowTaskSpec(TaskSpec):
|
||||
_polymorphic_value = 'direct'
|
||||
|
||||
_on_clause_type = {
|
||||
"oneOf": [
|
||||
types.NONEMPTY_STRING,
|
||||
types.UNIQUE_STRING_OR_EXPRESSION_CONDITION_LIST
|
||||
]
|
||||
}
|
||||
_on_clause_schema = on_clause.OnClauseSpec._schema
|
||||
|
||||
_direct_workflow_schema = {
|
||||
"type": "object",
|
||||
@ -247,52 +251,62 @@ class DirectWorkflowTaskSpec(TaskSpec):
|
||||
types.POSITIVE_INTEGER
|
||||
]
|
||||
},
|
||||
"on-complete": _on_clause_type,
|
||||
"on-success": _on_clause_type,
|
||||
"on-error": _on_clause_type
|
||||
"on-complete": _on_clause_schema,
|
||||
"on-success": _on_clause_schema,
|
||||
"on-error": _on_clause_schema
|
||||
}
|
||||
}
|
||||
|
||||
_schema = utils.merge_dicts(copy.deepcopy(TaskSpec._schema),
|
||||
_direct_workflow_schema)
|
||||
_schema = utils.merge_dicts(
|
||||
copy.deepcopy(TaskSpec._schema),
|
||||
_direct_workflow_schema
|
||||
)
|
||||
|
||||
def __init__(self, data):
|
||||
super(DirectWorkflowTaskSpec, self).__init__(data)
|
||||
|
||||
self._join = data.get('join')
|
||||
self._on_complete = self.prepare_on_clause(
|
||||
self._as_list_of_tuples('on-complete')
|
||||
)
|
||||
self._on_success = self.prepare_on_clause(
|
||||
self._as_list_of_tuples('on-success')
|
||||
)
|
||||
self._on_error = self.prepare_on_clause(
|
||||
self._as_list_of_tuples('on-error')
|
||||
)
|
||||
|
||||
def validate_schema(self):
|
||||
super(DirectWorkflowTaskSpec, self).validate_schema()
|
||||
on_spec_cls = on_clause.OnClauseSpec
|
||||
|
||||
self._on_complete = self._spec_property('on-complete', on_spec_cls)
|
||||
self._on_success = self._spec_property('on-success', on_spec_cls)
|
||||
self._on_error = self._spec_property('on-error', on_spec_cls)
|
||||
|
||||
def validate_semantics(self):
|
||||
# Validate YAQL expressions.
|
||||
self._validate_transitions('on-complete')
|
||||
self._validate_transitions('on-success')
|
||||
self._validate_transitions('on-error')
|
||||
self._validate_transitions(self._on_complete)
|
||||
self._validate_transitions(self._on_success)
|
||||
self._validate_transitions(self._on_error)
|
||||
|
||||
def _validate_transitions(self, on_clause):
|
||||
val = self._data.get(on_clause, [])
|
||||
def _validate_transitions(self, on_clause_spec):
|
||||
val = on_clause_spec.get_next() if on_clause_spec else []
|
||||
|
||||
if not val:
|
||||
return
|
||||
|
||||
[self.validate_expr(t)
|
||||
for t in ([val] if isinstance(val, six.string_types) else val)]
|
||||
for t in ([val] if isinstance(val, six.string_types) else val)]
|
||||
|
||||
@staticmethod
|
||||
def prepare_on_clause(list_of_tuples):
|
||||
for i, task in enumerate(list_of_tuples):
|
||||
task_name, params = DirectWorkflowTaskSpec._parse_cmd_and_input(
|
||||
task[0]
|
||||
)
|
||||
list_of_tuples[i] = (task_name, task[1], params)
|
||||
def get_publish(self, state):
|
||||
spec = super(DirectWorkflowTaskSpec, self).get_publish(state)
|
||||
|
||||
return list_of_tuples
|
||||
# TODO(rakhmerov): How do we need to resolve a possible conflict
|
||||
# between 'on-complete' and 'on-success/on-error' and
|
||||
# 'publish/publish-on-error'? For now we assume that 'on-error'
|
||||
# and 'on-success' take precedence over on-complete.
|
||||
|
||||
on_clause = self._on_complete
|
||||
|
||||
if state == states.SUCCESS:
|
||||
on_clause = self._on_success
|
||||
elif state == states.ERROR:
|
||||
on_clause = self._on_error
|
||||
|
||||
if not on_clause:
|
||||
return spec
|
||||
|
||||
return on_clause.get_publish() or spec
|
||||
|
||||
def get_join(self):
|
||||
return self._join
|
||||
|
@ -292,42 +292,57 @@ class DirectWorkflowSpec(WorkflowSpec):
|
||||
return to_task_name in t_names
|
||||
|
||||
def get_on_error_clause(self, t_name):
|
||||
result = self.get_tasks()[t_name].get_on_error()
|
||||
result = []
|
||||
|
||||
on_clause = self.get_tasks()[t_name].get_on_error()
|
||||
|
||||
if on_clause:
|
||||
result = on_clause.get_next()
|
||||
|
||||
if not result:
|
||||
t_defaults = self.get_task_defaults()
|
||||
|
||||
if t_defaults:
|
||||
if t_defaults and t_defaults.get_on_error():
|
||||
result = self._remove_task_from_clause(
|
||||
t_defaults.get_on_error(),
|
||||
t_defaults.get_on_error().get_next(),
|
||||
t_name
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
def get_on_success_clause(self, t_name):
|
||||
result = self.get_tasks()[t_name].get_on_success()
|
||||
result = []
|
||||
|
||||
on_clause = self.get_tasks()[t_name].get_on_success()
|
||||
|
||||
if on_clause:
|
||||
result = on_clause.get_next()
|
||||
|
||||
if not result:
|
||||
t_defaults = self.get_task_defaults()
|
||||
|
||||
if t_defaults:
|
||||
if t_defaults and t_defaults.get_on_success():
|
||||
result = self._remove_task_from_clause(
|
||||
t_defaults.get_on_success(),
|
||||
t_defaults.get_on_success().get_next(),
|
||||
t_name
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
def get_on_complete_clause(self, t_name):
|
||||
result = self.get_tasks()[t_name].get_on_complete()
|
||||
result = []
|
||||
|
||||
on_clause = self.get_tasks()[t_name].get_on_complete()
|
||||
|
||||
if on_clause:
|
||||
result = on_clause.get_next()
|
||||
|
||||
if not result:
|
||||
t_defaults = self.get_task_defaults()
|
||||
|
||||
if t_defaults:
|
||||
if t_defaults and t_defaults.get_on_complete():
|
||||
result = self._remove_task_from_clause(
|
||||
t_defaults.get_on_complete(),
|
||||
t_defaults.get_on_complete().get_next(),
|
||||
t_name
|
||||
)
|
||||
|
||||
|
@ -713,6 +713,43 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
|
||||
|
||||
self.assertDictEqual(wf_input['a'], task1.published['published_a'])
|
||||
|
||||
def test_advanced_publishing_branch(self):
|
||||
wf_text = """---
|
||||
version: 2.0
|
||||
|
||||
wf:
|
||||
tasks:
|
||||
task1:
|
||||
action: std.noop
|
||||
on-success:
|
||||
publish:
|
||||
branch:
|
||||
my_var: my branch value
|
||||
next: task2
|
||||
|
||||
task2:
|
||||
action: std.echo output=<% $.my_var %>
|
||||
"""
|
||||
|
||||
wf_service.create_workflows(wf_text)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow('wf', {})
|
||||
|
||||
self.await_workflow_success(wf_ex.id)
|
||||
|
||||
with db_api.transaction():
|
||||
# Note: We need to reread execution to access related tasks.
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
tasks = wf_ex.task_executions
|
||||
|
||||
task1 = self._assert_single_item(tasks, name='task1')
|
||||
|
||||
self._assert_single_item(tasks, name='task2')
|
||||
|
||||
self.assertDictEqual({"my_var": "my branch value"}, task1.published)
|
||||
|
||||
|
||||
class DataFlowTest(test_base.BaseTest):
|
||||
def test_get_task_execution_result(self):
|
||||
|
@ -334,6 +334,306 @@ class TaskSpecValidation(v2_base.WorkflowSpecValidationTestCase):
|
||||
expect_error=expect_error
|
||||
)
|
||||
|
||||
def test_direct_transition_advanced_schema(self):
|
||||
tests = [
|
||||
({'on-success': {'publish': {'var1': 1234}}}, True),
|
||||
({'on-success': {'publish': {'branch': {'var1': 1234}}}}, False),
|
||||
(
|
||||
{
|
||||
'on-success': {
|
||||
'publish': {
|
||||
'branch': {'var1': 1234},
|
||||
'global': {'global_var1': 'val'},
|
||||
'atomic': {'atomic_var1': '<% my_func() %>'}
|
||||
}
|
||||
}
|
||||
},
|
||||
False
|
||||
),
|
||||
(
|
||||
{
|
||||
'on-success': {
|
||||
'publish': {
|
||||
'branch': {'var1': 1234},
|
||||
'global': {'global_var1': '<% * %>'},
|
||||
'atomic': {'atomic_var1': '<% my_func() %>'}
|
||||
}
|
||||
}
|
||||
},
|
||||
True
|
||||
),
|
||||
(
|
||||
{
|
||||
'on-success': {
|
||||
'publish': {
|
||||
'branch': {'var1': 1234},
|
||||
'global': {'global_var1': 'val'},
|
||||
'atomic': {'atomic_var1': '<% my_func() %>'}
|
||||
},
|
||||
'next': 'email'
|
||||
}
|
||||
},
|
||||
False
|
||||
),
|
||||
(
|
||||
{
|
||||
'on-success': {
|
||||
'publish': {
|
||||
'branch': {'var1': 1234},
|
||||
'global': {'global_var1': 'val'},
|
||||
'atomic': {'atomic_var1': '<% my_func() %>'}
|
||||
},
|
||||
'next': ['email']
|
||||
}
|
||||
},
|
||||
False
|
||||
),
|
||||
(
|
||||
{
|
||||
'on-success': {
|
||||
'publish': {
|
||||
'branch': {'var1': 1234},
|
||||
'global': {'global_var1': 'val'},
|
||||
'atomic': {'atomic_var1': '<% my_func() %>'}
|
||||
},
|
||||
'next': [{'email': '<% 1 %>'}]
|
||||
}
|
||||
},
|
||||
False
|
||||
),
|
||||
(
|
||||
{
|
||||
'on-success': {
|
||||
'publish': {
|
||||
'branch': {'var1': 1234},
|
||||
'global': {'global_var1': 'val'},
|
||||
'atomic': {'atomic_var1': '<% my_func() %>'}
|
||||
},
|
||||
'next': [{'email': '<% $.v1 and $.v2 %>'}]
|
||||
}
|
||||
},
|
||||
False
|
||||
),
|
||||
(
|
||||
{
|
||||
'on-success': {
|
||||
'publish': {
|
||||
'branch': {'var1': 1234},
|
||||
'global': {'global_var1': 'val'},
|
||||
'atomic': {'atomic_var1': '<% my_func() %>'}
|
||||
},
|
||||
'next': [{'email': '<% * %>'}]
|
||||
}
|
||||
},
|
||||
True
|
||||
),
|
||||
({'on-success': {'next': [{'email': '<% $.v1 %>'}]}}, False),
|
||||
({'on-success': {'next': 'email'}}, False),
|
||||
({'on-success': {'next': ['email']}}, False),
|
||||
({'on-success': {'next': [{'email': 'email'}]}}, True),
|
||||
({'on-error': {'publish': {'var1': 1234}}}, True),
|
||||
({'on-error': {'publish': {'branch': {'var1': 1234}}}}, False),
|
||||
(
|
||||
{
|
||||
'on-error': {
|
||||
'publish': {
|
||||
'branch': {'var1': 1234},
|
||||
'global': {'global_var1': 'val'},
|
||||
'atomic': {'atomic_var1': '<% my_func() %>'}
|
||||
}
|
||||
}
|
||||
},
|
||||
False
|
||||
),
|
||||
(
|
||||
{
|
||||
'on-error': {
|
||||
'publish': {
|
||||
'branch': {'var1': 1234},
|
||||
'global': {'global_var1': '<% * %>'},
|
||||
'atomic': {'atomic_var1': '<% my_func() %>'}
|
||||
}
|
||||
}
|
||||
},
|
||||
True
|
||||
),
|
||||
(
|
||||
{
|
||||
'on-error': {
|
||||
'publish': {
|
||||
'branch': {'var1': 1234},
|
||||
'global': {'global_var1': 'val'},
|
||||
'atomic': {'atomic_var1': '<% my_func() %>'}
|
||||
},
|
||||
'next': 'email'
|
||||
}
|
||||
},
|
||||
False
|
||||
),
|
||||
(
|
||||
{
|
||||
'on-error': {
|
||||
'publish': {
|
||||
'branch': {'var1': 1234},
|
||||
'global': {'global_var1': 'val'},
|
||||
'atomic': {'atomic_var1': '<% my_func() %>'}
|
||||
},
|
||||
'next': ['email']
|
||||
}
|
||||
},
|
||||
False
|
||||
),
|
||||
(
|
||||
{
|
||||
'on-error': {
|
||||
'publish': {
|
||||
'branch': {'var1': 1234},
|
||||
'global': {'global_var1': 'val'},
|
||||
'atomic': {'atomic_var1': '<% my_func() %>'}
|
||||
},
|
||||
'next': [{'email': '<% 1 %>'}]
|
||||
}
|
||||
},
|
||||
False
|
||||
),
|
||||
(
|
||||
{
|
||||
'on-error': {
|
||||
'publish': {
|
||||
'branch': {'var1': 1234},
|
||||
'global': {'global_var1': 'val'},
|
||||
'atomic': {'atomic_var1': '<% my_func() %>'}
|
||||
},
|
||||
'next': [{'email': '<% $.v1 and $.v2 %>'}]
|
||||
}
|
||||
},
|
||||
False
|
||||
),
|
||||
(
|
||||
{
|
||||
'on-error': {
|
||||
'publish': {
|
||||
'branch': {'var1': 1234},
|
||||
'global': {'global_var1': 'val'},
|
||||
'atomic': {'atomic_var1': '<% my_func() %>'}
|
||||
},
|
||||
'next': [{'email': '<% * %>'}]
|
||||
}
|
||||
},
|
||||
True
|
||||
),
|
||||
({'on-error': {'next': [{'email': '<% $.v1 %>'}]}}, False),
|
||||
({'on-error': {'next': 'email'}}, False),
|
||||
({'on-error': {'next': ['email']}}, False),
|
||||
({'on-error': {'next': [{'email': 'email'}]}}, True),
|
||||
({'on-complete': {'publish': {'var1': 1234}}}, True),
|
||||
({'on-complete': {'publish': {'branch': {'var1': 1234}}}}, False),
|
||||
(
|
||||
{
|
||||
'on-complete': {
|
||||
'publish': {
|
||||
'branch': {'var1': 1234},
|
||||
'global': {'global_var1': 'val'},
|
||||
'atomic': {'atomic_var1': '<% my_func() %>'}
|
||||
}
|
||||
}
|
||||
},
|
||||
False
|
||||
),
|
||||
(
|
||||
{
|
||||
'on-complete': {
|
||||
'publish': {
|
||||
'branch': {'var1': 1234},
|
||||
'global': {'global_var1': '<% * %>'},
|
||||
'atomic': {'atomic_var1': '<% my_func() %>'}
|
||||
}
|
||||
}
|
||||
},
|
||||
True
|
||||
),
|
||||
(
|
||||
{
|
||||
'on-complete': {
|
||||
'publish': {
|
||||
'branch': {'var1': 1234},
|
||||
'global': {'global_var1': 'val'},
|
||||
'atomic': {'atomic_var1': '<% my_func() %>'}
|
||||
},
|
||||
'next': 'email'
|
||||
}
|
||||
},
|
||||
False
|
||||
),
|
||||
(
|
||||
{
|
||||
'on-complete': {
|
||||
'publish': {
|
||||
'branch': {'var1': 1234},
|
||||
'global': {'global_var1': 'val'},
|
||||
'atomic': {'atomic_var1': '<% my_func() %>'}
|
||||
},
|
||||
'next': ['email']
|
||||
}
|
||||
},
|
||||
False
|
||||
),
|
||||
(
|
||||
{
|
||||
'on-complete': {
|
||||
'publish': {
|
||||
'branch': {'var1': 1234},
|
||||
'global': {'global_var1': 'val'},
|
||||
'atomic': {'atomic_var1': '<% my_func() %>'}
|
||||
},
|
||||
'next': [{'email': '<% 1 %>'}]
|
||||
}
|
||||
},
|
||||
False
|
||||
),
|
||||
(
|
||||
{
|
||||
'on-complete': {
|
||||
'publish': {
|
||||
'branch': {'var1': 1234},
|
||||
'global': {'global_var1': 'val'},
|
||||
'atomic': {'atomic_var1': '<% my_func() %>'}
|
||||
},
|
||||
'next': [{'email': '<% $.v1 and $.v2 %>'}]
|
||||
}
|
||||
},
|
||||
False
|
||||
),
|
||||
(
|
||||
{
|
||||
'on-complete': {
|
||||
'publish': {
|
||||
'branch': {'var1': 1234},
|
||||
'global': {'global_var1': 'val'},
|
||||
'atomic': {'atomic_var1': '<% my_func() %>'}
|
||||
},
|
||||
'next': [{'email': '<% * %>'}]
|
||||
}
|
||||
},
|
||||
True
|
||||
),
|
||||
({'on-complete': {'next': [{'email': '<% $.v1 %>'}]}}, False),
|
||||
({'on-complete': {'next': 'email'}}, False),
|
||||
({'on-complete': {'next': ['email']}}, False),
|
||||
({'on-complete': {'next': [{'email': 'email'}]}}, True)
|
||||
]
|
||||
|
||||
for transition, expect_error in tests:
|
||||
overlay = {'test': {'tasks': {}}}
|
||||
|
||||
utils.merge_dicts(overlay['test']['tasks'], {'get': transition})
|
||||
|
||||
self._parse_dsl_spec(
|
||||
add_tasks=True,
|
||||
changes=overlay,
|
||||
expect_error=expect_error
|
||||
)
|
||||
|
||||
def test_join(self):
|
||||
tests = [
|
||||
({'join': ''}, True),
|
||||
|
@ -121,15 +121,15 @@ class WorkbookSpecValidation(base.WorkbookSpecValidationTestCase):
|
||||
|
||||
self.assertListEqual(
|
||||
[('fail', '<% $.my_val = 0 %>', {})],
|
||||
task_defaults_spec.get_on_error()
|
||||
task_defaults_spec.get_on_error().get_next()
|
||||
)
|
||||
self.assertListEqual(
|
||||
[('pause', '', {})],
|
||||
task_defaults_spec.get_on_success()
|
||||
task_defaults_spec.get_on_success().get_next()
|
||||
)
|
||||
self.assertListEqual(
|
||||
[('succeed', '', {})],
|
||||
task_defaults_spec.get_on_complete()
|
||||
task_defaults_spec.get_on_complete().get_next()
|
||||
)
|
||||
|
||||
task3_spec = wf2_spec.get_tasks().get('task3')
|
||||
@ -150,15 +150,15 @@ class WorkbookSpecValidation(base.WorkbookSpecValidationTestCase):
|
||||
)
|
||||
self.assertListEqual(
|
||||
[('task4', '<% $.my_val = 1 %>', {})],
|
||||
task3_spec.get_on_error()
|
||||
task3_spec.get_on_error().get_next()
|
||||
)
|
||||
self.assertListEqual(
|
||||
[('task5', '<% $.my_val = 2 %>', {})],
|
||||
task3_spec.get_on_success()
|
||||
task3_spec.get_on_success().get_next()
|
||||
)
|
||||
self.assertListEqual(
|
||||
[('task6', '<% $.my_val = 3 %>', {})],
|
||||
task3_spec.get_on_complete()
|
||||
task3_spec.get_on_complete().get_next()
|
||||
)
|
||||
|
||||
task7_spec = wf2_spec.get_tasks().get('task7')
|
||||
|
@ -190,11 +190,7 @@ def publish_variables(task_ex, task_spec):
|
||||
|
||||
wf_ex = task_ex.workflow_execution
|
||||
|
||||
expr_ctx = ContextView(
|
||||
task_ex.in_context,
|
||||
wf_ex.context,
|
||||
wf_ex.input
|
||||
)
|
||||
expr_ctx = ContextView(task_ex.in_context, wf_ex.context, wf_ex.input)
|
||||
|
||||
if task_ex.name in expr_ctx:
|
||||
LOG.warning(
|
||||
@ -202,13 +198,20 @@ def publish_variables(task_ex, task_spec):
|
||||
task_ex.name
|
||||
)
|
||||
|
||||
data = (
|
||||
task_spec.get_publish()
|
||||
if task_ex.state == states.SUCCESS
|
||||
else task_spec.get_publish_on_error()
|
||||
)
|
||||
publish_spec = task_spec.get_publish(task_ex.state)
|
||||
|
||||
task_ex.published = expr.evaluate_recursively(data, expr_ctx)
|
||||
if not publish_spec:
|
||||
return
|
||||
|
||||
branch_vars = publish_spec.get_branch()
|
||||
|
||||
task_ex.published = expr.evaluate_recursively(branch_vars, expr_ctx)
|
||||
|
||||
# TODO(rakhmerov):
|
||||
# 1. Publish global and atomic variables.
|
||||
# 2. Add the field "publish" in TaskExecution model similar to "published"
|
||||
# but containing info as
|
||||
# {'branch': {vars}, 'global': {vars}, 'atomic': {vars}}
|
||||
|
||||
|
||||
def evaluate_task_outbound_context(task_ex):
|
||||
|
Loading…
Reference in New Issue
Block a user