mistral/mistral/tests/unit/engine/test_dataflow.py

1475 lines
43 KiB
Python

# Copyright 2014 - Mirantis, Inc.
# Copyright 2015 - StackStorm, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models
from mistral import exceptions as exc
from mistral import expressions as expr
from mistral.services import workbooks as wb_service
from mistral.services import workflows as wf_service
from mistral.tests.unit import base as test_base
from mistral.tests.unit.engine import base as engine_test_base
from mistral import utils
from mistral.workflow import data_flow
from mistral.workflow import states
import sys
# Use the set_default method to set value otherwise in certain test cases
# the change in value is not permanent.
cfg.CONF.set_default('auth_enable', False, group='pecan')
class DataFlowEngineTest(engine_test_base.EngineTestCase):
def test_linear_dataflow(self):
wf_text = """---
version: '2.0'
wf:
type: direct
tasks:
task1:
action: std.echo output="Hi"
publish:
hi: <% task(task1).result %>
on-success:
- task2
task2:
action: std.echo output="Morpheus"
publish:
to: <% task(task2).result %>
on-success:
- task3
task3:
publish:
result: "<% $.hi %>, <% $.to %>! Your <% env().from %>."
"""
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf', env={'from': 'Neo'})
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
task1 = self._assert_single_item(tasks, name='task1')
task2 = self._assert_single_item(tasks, name='task2')
task3 = self._assert_single_item(tasks, name='task3')
self.assertEqual(states.SUCCESS, task3.state)
self.assertDictEqual({'hi': 'Hi'}, task1.published)
self.assertDictEqual({'to': 'Morpheus'}, task2.published)
self.assertDictEqual(
{'result': 'Hi, Morpheus! Your Neo.'},
task3.published
)
# Make sure that task inbound context doesn't contain workflow
# execution info.
self.assertNotIn('__execution', task1.in_context)
def test_linear_with_branches_dataflow(self):
wf_text = """---
version: '2.0'
wf:
type: direct
tasks:
task1:
action: std.echo output="Hi"
publish:
hi: <% task(task1).result %>
progress: "completed task1"
on-success:
- notify
- task2
task2:
action: std.echo output="Morpheus"
publish:
to: <% task(task2).result %>
progress: "completed task2"
on-success:
- notify
- task3
task3:
publish:
result: "<% $.hi %>, <% $.to %>! Your <% env().from %>."
progress: "completed task3"
on-success:
- notify
notify:
action: std.echo output=<% $.progress %>
publish:
progress: <% task(notify).result %>
"""
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf', env={'from': 'Neo'})
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
task1 = self._assert_single_item(tasks, name='task1')
task2 = self._assert_single_item(tasks, name='task2')
task3 = self._assert_single_item(tasks, name='task3')
notify_tasks = self._assert_multiple_items(tasks, 3, name='notify')
notify_published_arr = [t.published['progress'] for t in notify_tasks]
self.assertEqual(states.SUCCESS, task3.state)
exp_published_arr = [
{
'hi': 'Hi',
'progress': 'completed task1'
},
{
'to': 'Morpheus',
'progress': 'completed task2'
},
{
'result': 'Hi, Morpheus! Your Neo.',
'progress': 'completed task3'
}
]
self.assertDictEqual(exp_published_arr[0], task1.published)
self.assertDictEqual(exp_published_arr[1], task2.published)
self.assertDictEqual(exp_published_arr[2], task3.published)
self.assertIn(
exp_published_arr[0]['progress'],
notify_published_arr
)
self.assertIn(
exp_published_arr[1]['progress'],
notify_published_arr
)
self.assertIn(
exp_published_arr[2]['progress'],
notify_published_arr
)
def test_parallel_tasks(self):
wf_text = """---
version: '2.0'
wf:
type: direct
tasks:
task1:
action: std.echo output=1
publish:
var1: <% task(task1).result %>
task2:
action: std.echo output=2
publish:
var2: <% task(task2).result %>
"""
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf',)
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
wf_output = wf_ex.output
tasks = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertEqual(2, len(tasks))
task1 = self._assert_single_item(tasks, name='task1')
task2 = self._assert_single_item(tasks, name='task2')
self.assertEqual(states.SUCCESS, task1.state)
self.assertEqual(states.SUCCESS, task2.state)
self.assertDictEqual({'var1': 1}, task1.published)
self.assertDictEqual({'var2': 2}, task2.published)
self.assertEqual(1, wf_output['var1'])
self.assertEqual(2, wf_output['var2'])
def test_parallel_tasks_complex(self):
wf_text = """---
version: '2.0'
wf:
type: direct
tasks:
task1:
action: std.noop
publish:
var1: 1
on-complete:
- task12
task12:
action: std.noop
publish:
var12: 12
on-complete:
- task13
- task14
task13:
action: std.fail
description: |
Since this task fails we expect that 'var13' won't go into
context. Only 'var14'.
publish:
var13: 13
on-error:
- noop
task14:
publish:
var14: 14
task2:
publish:
var2: 2
on-complete:
- task21
task21:
publish:
var21: 21
"""
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
wf_output = wf_ex.output
tasks = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertEqual(6, len(tasks))
task1 = self._assert_single_item(tasks, name='task1')
task12 = self._assert_single_item(tasks, name='task12')
task13 = self._assert_single_item(tasks, name='task13')
task14 = self._assert_single_item(tasks, name='task14')
task2 = self._assert_single_item(tasks, name='task2')
task21 = self._assert_single_item(tasks, name='task21')
self.assertEqual(states.SUCCESS, task1.state)
self.assertEqual(states.SUCCESS, task12.state)
self.assertEqual(states.ERROR, task13.state)
self.assertEqual(states.SUCCESS, task14.state)
self.assertEqual(states.SUCCESS, task2.state)
self.assertEqual(states.SUCCESS, task21.state)
self.assertDictEqual({'var1': 1}, task1.published)
self.assertDictEqual({'var12': 12}, task12.published)
self.assertDictEqual({'var14': 14}, task14.published)
self.assertDictEqual({'var2': 2}, task2.published)
self.assertDictEqual({'var21': 21}, task21.published)
self.assertEqual(1, wf_output['var1'])
self.assertEqual(12, wf_output['var12'])
self.assertNotIn('var13', wf_output)
self.assertEqual(14, wf_output['var14'])
self.assertEqual(2, wf_output['var2'])
self.assertEqual(21, wf_output['var21'])
def test_sequential_tasks_publishing_same_var(self):
wf_text = """---
version: '2.0'
wf:
type: direct
tasks:
task1:
action: std.echo output="Hi"
publish:
greeting: <% task(task1).result %>
on-success:
- task2
task2:
action: std.echo output="Yo"
publish:
greeting: <% task(task2).result %>
on-success:
- task3
task3:
action: std.echo output="Morpheus"
publish:
to: <% task(task3).result %>
on-success:
- task4
task4:
publish:
result: "<% $.greeting %>, <% $.to %>! <% env().from %>."
"""
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf', env={'from': 'Neo'})
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
task1 = self._assert_single_item(tasks, name='task1')
task2 = self._assert_single_item(tasks, name='task2')
task3 = self._assert_single_item(tasks, name='task3')
task4 = self._assert_single_item(tasks, name='task4')
self.assertEqual(states.SUCCESS, task4.state)
self.assertDictEqual({'greeting': 'Hi'}, task1.published)
self.assertDictEqual({'greeting': 'Yo'}, task2.published)
self.assertDictEqual({'to': 'Morpheus'}, task3.published)
self.assertDictEqual(
{'result': 'Yo, Morpheus! Neo.'},
task4.published
)
def test_sequential_tasks_publishing_same_structured(self):
wf_text = """---
version: '2.0'
wf:
type: direct
tasks:
task1:
publish:
greeting: {"a": "b"}
on-success:
- task2
task2:
publish:
greeting: {}
on-success:
- task3
task3:
publish:
result: <% $.greeting %>
"""
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf', env={'from': 'Neo'})
self.await_workflow_success(wf_ex.id)
# Note: We need to reread execution to access related tasks.
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
self.assertEqual(states.SUCCESS, wf_ex.state)
task1 = self._assert_single_item(tasks, name='task1')
task2 = self._assert_single_item(tasks, name='task2')
task3 = self._assert_single_item(tasks, name='task3')
self.assertEqual(states.SUCCESS, task3.state)
self.assertDictEqual({'greeting': {'a': 'b'}}, task1.published)
self.assertDictEqual({'greeting': {}}, task2.published)
self.assertDictEqual({'result': {}}, task3.published)
def test_linear_dataflow_implicit_publish(self):
wf_text = """---
version: '2.0'
wf:
type: direct
tasks:
task1:
action: std.echo output="Hi"
on-success:
- task21
- task22
task21:
action: std.echo output="Morpheus"
on-success:
- task4
task22:
action: std.echo output="Neo"
on-success:
- task4
task4:
join: all
publish:
result: >
<% task(task1).result %>, <% task(task21).result %>!
Your <% task(task22).result %>.
"""
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
task4 = self._assert_single_item(tasks, name='task4')
self.assertDictEqual(
{'result': 'Hi, Morpheus! Your Neo.\n'},
task4.published
)
def test_destroy_result(self):
wf_text = """---
version: '2.0'
wf:
type: direct
tasks:
task1:
action: std.echo output=["Hi", "John Doe!"]
publish:
hi: <% task(task1).result %>
keep-result: false
"""
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
result = data_flow.get_task_execution_result(task1)
# Published vars are saved.
self.assertDictEqual(
{'hi': ["Hi", "John Doe!"]},
task1.published
)
# But all result is cleared.
self.assertIsNone(result)
def test_empty_with_items(self):
wf_text = """---
version: "2.0"
wf1_with_items:
type: direct
tasks:
task1:
with-items: i in <% list() %>
action: std.echo output= "Task 1.<% $.i %>"
publish:
result: <% task(task1).result %>
"""
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf1_with_items')
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task1 = self._assert_single_item(
wf_ex.task_executions,
name='task1'
)
result = data_flow.get_task_execution_result(task1)
self.assertListEqual([], result)
def test_publish_on_error(self):
wf_text = """---
version: '2.0'
wf:
type: direct
output-on-error:
out: <% $.hi %>
tasks:
task1:
action: std.fail
publish-on-error:
hi: hello_from_error
err: <% task(task1).result %>
"""
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
wf_output = wf_ex.output
tasks = wf_ex.task_executions
self.assertEqual(states.ERROR, wf_ex.state)
task1 = self._assert_single_item(tasks, name='task1')
self.assertEqual(states.ERROR, task1.state)
self.assertEqual('hello_from_error', task1.published['hi'])
self.assertIn(
'Fail action expected exception',
task1.published['err']
)
self.assertEqual('hello_from_error', wf_output['out'])
self.assertIn(
'Fail action expected exception',
wf_output['result']
)
def test_publish_with_all(self):
wf_text = """---
version: '2.0'
wf:
tasks:
main-task:
publish:
res_x1: 111
on-complete:
next: complete-task
publish:
branch:
res_x3: 222
on-success:
next: success-task
publish:
branch:
res_x2: 222
success-task:
action: std.noop
publish:
success_x2: <% $.res_x2 %>
success_x1: <% $.res_x1 %>
complete-task:
action: std.noop
publish:
complete_x2: <% $.res_x3 %>
complete_x1: <% $.res_x1 %>
"""
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
wf_output = wf_ex.output
tasks = wf_ex.task_executions
main_task = self._assert_single_item(tasks, name='main-task')
main_task_published_vars = main_task.get("published")
expected_main_variables = {'res_x3', 'res_x2', 'res_x1'}
self.assertEqual(set(main_task_published_vars.keys()),
expected_main_variables)
complete_task = self._assert_single_item(tasks,
name='complete-task')
complete_task_published_vars = complete_task.get("published")
expected_complete_variables = {'complete_x2', 'complete_x1'}
self.assertEqual(set(complete_task_published_vars.keys()),
expected_complete_variables)
success_task = self._assert_single_item(tasks, name='success-task')
success_task_published_vars = success_task.get("published")
expected_success_variables = {'success_x2', 'success_x1'}
self.assertEqual(set(success_task_published_vars.keys()),
expected_success_variables)
all_expected_published_variables = expected_main_variables.union(
expected_success_variables,
expected_complete_variables
)
self.assertEqual(set(wf_output), all_expected_published_variables)
def test_publish_no_success(self):
wf_text = """---
version: '2.0'
wf:
tasks:
main-task:
publish:
res_x1: 111
on-complete:
next: complete-task
publish:
branch:
res_x3: 222
complete-task:
action: std.noop
publish:
complete_x2: <% $.res_x3 %>
complete_x1: <% $.res_x1 %>
"""
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
wf_output = wf_ex.output
tasks = wf_ex.task_executions
main_task = self._assert_single_item(tasks, name='main-task')
main_task_published_vars = main_task.get("published")
expected_main_variables = {'res_x3', 'res_x1'}
self.assertEqual(set(main_task_published_vars.keys()),
expected_main_variables)
complete_task = self._assert_single_item(tasks,
name='complete-task')
complete_task_published_vars = complete_task.get("published")
expected_complete_variables = {'complete_x2', 'complete_x1'}
self.assertEqual(set(complete_task_published_vars.keys()),
expected_complete_variables)
all_expected_published_variables = expected_main_variables.union(
expected_complete_variables)
self.assertEqual(set(wf_output), all_expected_published_variables)
def test_publish_no_complete(self):
wf_text = """---
version: '2.0'
wf:
tasks:
main-task:
publish:
res_x1: 111
on-success:
next: success-task
publish:
branch:
res_x2: 222
success-task:
action: std.noop
publish:
success_x2: <% $.res_x2 %>
success_x1: <% $.res_x1 %>
"""
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
wf_output = wf_ex.output
main_task = self._assert_single_item(tasks, name='main-task')
main_task_published_vars = main_task.get("published")
expected_main_variables = {'res_x2', 'res_x1'}
self.assertEqual(set(main_task_published_vars.keys()),
expected_main_variables)
success_task = self._assert_single_item(tasks, name='success-task')
success_task_published_vars = success_task.get("published")
expected_success_variables = {'success_x2', 'success_x1'}
self.assertEqual(set(success_task_published_vars.keys()),
expected_success_variables)
all_expected_published_variables = expected_main_variables.union(
expected_success_variables)
self.assertEqual(set(wf_output), all_expected_published_variables)
def test_publish_no_regular_publish(self):
wf_text = """---
version: '2.0'
wf2:
tasks:
main-task:
on-success:
next: success-task
publish:
branch:
res_x2: 222
success-task:
action: std.noop
publish:
success_x2: <% $.res_x2 %>
"""
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf2')
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
wf_output = wf_ex.output
main_task = self._assert_single_item(tasks, name='main-task')
main_task_published_vars = main_task.get("published")
expected_main_variables = {'res_x2'}
self.assertEqual(set(main_task_published_vars.keys()),
expected_main_variables)
success_task = self._assert_single_item(tasks, name='success-task')
success_task_published_vars = success_task.get("published")
expected_success_variables = {'success_x2'}
self.assertEqual(set(success_task_published_vars.keys()),
expected_success_variables)
all_expected_published_variables = expected_main_variables.union(
expected_success_variables)
self.assertEqual(set(wf_output), all_expected_published_variables)
def test_output_on_error_wb_yaql_failed(self):
wb_text = """---
version: '2.0'
name: wb
workflows:
wf1:
type: direct
output-on-error:
message: <% $.message %>
tasks:
task1:
workflow: wf2
publish-on-error:
message: <% task(task1).result.message %>
wf2:
type: direct
output-on-error:
message: <% $.not_existing_variable %>
tasks:
task1:
action: std.fail
publish-on-error:
message: <% task(task1).result %>
"""
wb_service.create_workbook_v2(wb_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1')
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIn('Failed to evaluate expression in output-on-error!',
wf_ex.state_info)
self.assertIn('$.message', wf_ex.state_info)
task1 = self._assert_single_item(tasks, name='task1')
self.assertIn('task(task1).result.message', task1.state_info)
def test_size_of_output_by_execution_field_size_limit_kb(self):
wf_text = """
version: '2.0'
wf:
type: direct
output-on-error:
custom_error: The action in the task does not exists
tasks:
task1:
action: wrong.task
"""
# Note: The number 1121 below added as value for field size
# limit is because the output of workflow error comes as
# workflow error string + custom error message and total length
# might be greater than 1121. It varies depending on the length
# of the custom message. This is a random number value used for
# test case only.
cfg.CONF.set_default(
'execution_field_size_limit_kb',
1121,
group='engine'
)
kilobytes = cfg.CONF.engine.execution_field_size_limit_kb
bytes_per_char = sys.getsizeof('s') - sys.getsizeof('')
total_output_length = int(kilobytes * 1024 / bytes_per_char)
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf', '', None)
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
wf_output = wf_ex.output
self.assertLess(
len(str(wf_output.get('custom_error'))),
total_output_length
)
def test_override_json_input(self):
wf_text = """---
version: 2.0
wf:
input:
- a:
aa: aa
bb: bb
tasks:
task1:
action: std.noop
publish:
published_a: <% $.a %>
"""
wf_service.create_workflows(wf_text)
wf_input = {
'a': {
'cc': 'cc',
'dd': 'dd'
}
}
# Start workflow.
wf_ex = self.engine.start_workflow('wf', wf_input=wf_input)
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task1 = wf_ex.task_executions[0]
self.assertDictEqual(wf_input['a'], task1.published['published_a'])
def test_branch_publishing_success(self):
wf_text = """---
version: 2.0
wf:
tasks:
task1:
action: std.noop
on-success:
publish:
branch:
my_var: my branch value
next: task2
task2:
action: std.echo output=<% $.my_var %>
"""
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
self._assert_single_item(tasks, name='task2')
self.assertDictEqual({"my_var": "my branch value"}, task1.published)
def test_global_publishing_success_access_via_root_context_(self):
wf_text = """---
version: '2.0'
wf:
tasks:
task1:
action: std.echo output="Hi"
on-success:
publish:
global:
my_var: <% task().result %>
next:
- task2
task2:
action: std.echo output=<% $.my_var %>
publish:
result: <% task().result %>
"""
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
self._assert_single_item(tasks, name='task1')
task2 = self._assert_single_item(tasks, name='task2')
self.assertDictEqual({'result': 'Hi'}, task2.published)
def test_global_publishing_error_access_via_root_context(self):
wf_text = """---
version: '2.0'
wf:
tasks:
task1:
action: std.fail
on-success:
publish:
global:
my_var: "We got success"
next:
- task2
on-error:
publish:
global:
my_var: "We got an error"
next:
- task2
task2:
action: std.echo output=<% $.my_var %>
publish:
result: <% task().result %>
"""
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
self._assert_single_item(tasks, name='task1')
task2 = self._assert_single_item(tasks, name='task2')
self.assertDictEqual({'result': 'We got an error'}, task2.published)
def test_global_publishing_success_access_via_function(self):
wf_text = """---
version: '2.0'
wf:
tasks:
task1:
action: std.noop
on-success:
publish:
branch:
my_var: Branch local value
global:
my_var: Global value
next:
- task2
task2:
action: std.noop
publish:
local: <% $.my_var %>
global: <% global(my_var) %>
"""
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
self._assert_single_item(tasks, name='task1')
task2 = self._assert_single_item(tasks, name='task2')
self.assertDictEqual(
{
'local': 'Branch local value',
'global': 'Global value'
},
task2.published
)
def test_global_publishing_error_access_via_function(self):
wf_text = """---
version: '2.0'
wf:
tasks:
task1:
action: std.fail
on-error:
publish:
branch:
my_var: Branch local value
global:
my_var: Global value
next:
- task2
task2:
action: std.noop
publish:
local: <% $.my_var %>
global: <% global(my_var) %>
"""
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
self._assert_single_item(tasks, name='task1')
task2 = self._assert_single_item(tasks, name='task2')
self.assertDictEqual(
{
'local': 'Branch local value',
'global': 'Global value'
},
task2.published
)
def test_get_published_global(self):
wf_text = """---
version: '2.0'
wf:
vars:
var1: 1
var2: 2
tasks:
task1:
action: std.noop
on-success:
publish:
global:
global_var1: Global value 1
global_var2: Global value 2
"""
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
published_global = (
data_flow.get_workflow_execution_published_global(wf_ex)
)
self.assertDictEqual(
{
'global_var1': 'Global value 1',
'global_var2': 'Global value 2'
},
published_global
)
def test_linear_data_with_input_expressions(self):
wf_text = """---
version: '2.0'
wf:
tasks:
task1:
action: std.echo
input:
output:
key1: value1
key2: value2
publish:
res1: <% task(task1).result %>
on-success:
- task2
task2:
action: std.echo output=<% $.res1.key2 %>
publish:
res2: <% task().result %>
"""
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
task1 = self._assert_single_item(
tasks,
name='task1',
state=states.SUCCESS
)
task2 = self._assert_single_item(
tasks,
name='task2',
state=states.SUCCESS
)
self.assertDictEqual(
{
'key1': 'value1',
'key2': 'value2'
},
task1.published['res1']
)
self.assertEqual('value2', task2.published['res2'])
class DataFlowTest(test_base.BaseTest):
def test_get_task_execution_result(self):
task_ex = models.TaskExecution(
name='task1',
spec={
"version": '2.0',
'name': 'task1',
'with-items': 'var in [1]',
'type': 'direct',
'action': 'my_action'
},
runtime_context={
'with_items': {'count': 1}
}
)
task_ex.action_executions = [models.ActionExecution(
name='my_action',
output={'result': 1},
accepted=True,
runtime_context={'index': 0}
)]
self.assertEqual([1], data_flow.get_task_execution_result(task_ex))
task_ex.action_executions.append(models.ActionExecution(
name='my_action',
output={'result': 1},
accepted=True,
runtime_context={'index': 0}
))
task_ex.action_executions.append(models.ActionExecution(
name='my_action',
output={'result': 1},
accepted=False,
runtime_context={'index': 0}
))
self.assertEqual(
[1, 1],
data_flow.get_task_execution_result(task_ex)
)
def test_context_view(self):
ctx = data_flow.ContextView(
{
'k1': 'v1',
'k11': 'v11',
'k3': 'v3'
},
{
'k2': 'v2',
'k21': 'v21',
'k3': 'v32'
}
)
self.assertIsInstance(ctx, dict)
self.assertEqual(5, len(ctx))
self.assertIn('k1', ctx)
self.assertIn('k11', ctx)
self.assertIn('k3', ctx)
self.assertIn('k2', ctx)
self.assertIn('k21', ctx)
self.assertEqual('v1', ctx['k1'])
self.assertEqual('v1', ctx.get('k1'))
self.assertEqual('v11', ctx['k11'])
self.assertEqual('v11', ctx.get('k11'))
self.assertEqual('v3', ctx['k3'])
self.assertEqual('v2', ctx['k2'])
self.assertEqual('v2', ctx.get('k2'))
self.assertEqual('v21', ctx['k21'])
self.assertEqual('v21', ctx.get('k21'))
self.assertIsNone(ctx.get('Not existing key'))
self.assertRaises(exc.MistralError, ctx.update)
self.assertRaises(exc.MistralError, ctx.clear)
self.assertRaises(exc.MistralError, ctx.pop, 'k1')
self.assertRaises(exc.MistralError, ctx.popitem)
self.assertRaises(exc.MistralError, ctx.__setitem__, 'k5', 'v5')
self.assertRaises(exc.MistralError, ctx.__delitem__, 'k2')
self.assertEqual('v1', expr.evaluate('<% $.k1 %>', ctx))
self.assertEqual('v2', expr.evaluate('<% $.k2 %>', ctx))
self.assertEqual('v3', expr.evaluate('<% $.k3 %>', ctx))
# Now change the order of dictionaries and make sure to have
# a different for key 'k3'.
ctx = data_flow.ContextView(
{
'k2': 'v2',
'k21': 'v21',
'k3': 'v32'
},
{
'k1': 'v1',
'k11': 'v11',
'k3': 'v3'
}
)
self.assertEqual('v32', expr.evaluate('<% $.k3 %>', ctx))
def test_context_view_eval_root_with_yaql(self):
ctx = data_flow.ContextView(
{'k1': 'v1'},
{'k2': 'v2'}
)
res = expr.evaluate('<% $ %>', ctx)
self.assertIsNotNone(res)
self.assertIsInstance(res, dict)
self.assertEqual(2, len(res))
def test_context_view_eval_keys(self):
ctx = data_flow.ContextView(
{'k1': 'v1'},
{'k2': 'v2'}
)
res = expr.evaluate('<% $.keys() %>', ctx)
self.assertIsNotNone(res)
self.assertIsInstance(res, list)
self.assertEqual(2, len(res))
self.assertIn('k1', res)
self.assertIn('k2', res)
def test_context_view_eval_values(self):
ctx = data_flow.ContextView(
{'k1': 'v1'},
{'k2': 'v2'}
)
res = expr.evaluate('<% $.values() %>', ctx)
self.assertIsNotNone(res)
self.assertIsInstance(res, list)
self.assertEqual(2, len(res))
self.assertIn('v1', res)
self.assertIn('v2', res)
def test_context_view_repr(self):
ctx = data_flow.ContextView(
{'k1': 'v1'},
{'k2': 'v2'},
{3: 3}
)
str_repr = str(ctx)
self.assertIsNotNone(str_repr)
self.assertFalse(str_repr == "{}")
self.assertEqual("{'k1': 'v1', 'k2': 'v2', 3: 3}", str_repr)
ctx = data_flow.ContextView()
self.assertEqual('{}', str(ctx))
def test_context_view_as_root_json(self):
ctx = data_flow.ContextView(
{'k1': 'v1'},
{'k2': 'v2'},
)
json_str = utils.to_json_str(ctx)
self.assertIsNotNone(json_str)
self.assertNotEqual('{}', json_str)
# We can't use regular dict comparison because key order
# is not defined.
self.assertIn('"k1": "v1"', json_str)
self.assertIn('"k2": "v2"', json_str)
def test_context_view_as_nested_json(self):
ctx = data_flow.ContextView(
{'k1': 'v1'},
{'k2': 'v2'},
)
d = {'root': ctx}
json_str = utils.to_json_str(d)
self.assertIsNotNone(json_str)
self.assertNotEqual('{"root": {}}', json_str)
# We can't use regular dict comparison because key order
# is not defined.
self.assertIn('"k1": "v1"', json_str)
self.assertIn('"k1": "v1"', json_str)
self.assertIn('"root"', json_str)