Yaql Tasks Function

This new function will allow user to get a list of tasks matching certain
filter. For example only task in state ERROR from the current execution.

It is very useful for debugging, but also very expensive, since it might
require multiple DB queries. In addition it is important to remember a lot
of data can return from this function, so it should be used carefully

Change-Id: I452175bfb60636ed8de9b2b1ceab615359765964
Implements: blueprint yaql-tasks-function
Implements: blueprint yaql-errors-function
This commit is contained in:
Michal Gershenzon 2016-11-23 17:09:59 +00:00
parent 444ac82be9
commit eb6c0513c6
7 changed files with 628 additions and 13 deletions

View File

@ -0,0 +1,440 @@
# Copyright 2016 - Nokia, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from mistral.db.v2 import api as db_api
from mistral.services import workbooks as wb_service
from mistral.tests.unit.engine import base
from mistral.workflow import states
# Use the set_default method to set value otherwise in certain test cases
# the change in value is not permanent.
cfg.CONF.set_default('auth_enable', False, group='pecan')
WORKBOOK_WITH_EXPRESSIONS = """
---
version: '2.0'
name: wb
workflows:
test_tasks_function:
type: direct
input:
- wf1_wx_id
- wf2_wx_id
- wf3_wx_id
- wf4_wx_id
- wf5_wx_id
tasks:
main_task:
action: std.noop
publish:
all_tasks_yaql: <% tasks() %>
all_tasks_jinja: "{{ tasks() }}"
wf1_tasks_yaql: <% tasks($.wf1_wx_id) %>
wf1_tasks_jinja: "{{ tasks(_.wf1_wx_id) }}"
wf1_recursive_tasks_yaql: <% tasks($.wf1_wx_id, true) %>
wf1_recursive_tasks_jinja: "{{ tasks(_.wf1_wx_id, true) }}"
wf1_recursive_error_tasks_yaql: <% tasks($.wf1_wx_id, true, ERROR) %>
wf1_recursive_error_tasks_jinja:
"{{ tasks(_.wf1_wx_id, True, 'ERROR') }}"
wf1_not_recursive_error_tasks_yaql:
<% tasks($.wf1_wx_id, false, ERROR) %>
wf1_not_recursive_error_tasks_jinja:
"{{ tasks(_.wf1_wx_id, False, 'ERROR') }}"
wf1_recursive_success_flat_tasks_yaql:
<% tasks($.wf1_wx_id, true, SUCCESS, true) %>
wf1_recursive_success_flat_tasks_jinja:
"{{ tasks(_.wf1_wx_id, True, 'SUCCESS', True) }}"
wf2_recursive_tasks_yaql: <% tasks($.wf2_wx_id, true) %>
wf2_recursive_tasks_jinja: "{{ tasks(_.wf2_wx_id, true) }}"
wf3_recursive_error_tasks_yaql: <% tasks($.wf3_wx_id, true, ERROR) %>
wf3_recursive_error_tasks_jinja:
"{{ tasks(_.wf3_wx_id, True, 'ERROR') }}"
wf3_recursive_error_flat_tasks_yaql:
<% tasks($.wf3_wx_id, true, ERROR, true) %>
wf3_recursive_error_flat_tasks_jinja:
"{{ tasks(_.wf3_wx_id, True, 'ERROR', True) }}"
wf4_recursive_error_flat_tasks_yaql:
<% tasks($.wf4_wx_id, true, ERROR, true) %>
wf4_recursive_error_flat_tasks_jinja:
"{{ tasks(_.wf4_wx_id, True, 'ERROR', True) }}"
wf5_recursive_error_flat_tasks_yaql:
<% tasks($.wf5_wx_id, true, ERROR, true) %>
wf5_recursive_error_flat_tasks_jinja:
"{{ tasks(_.wf5_wx_id, True, 'ERROR', True) }}"
wf1_top_lvl:
type: direct
tasks:
top_lvl_wf1_task_1:
workflow: wf1_second_lvl
top_lvl_wf1_task_2:
action: std.noop
wf1_second_lvl:
type: direct
tasks:
second_lvl_wf1_task_1:
workflow: wf1_third_lvl_fail
on-error:
- second_lvl_wf1_task_2
second_lvl_wf1_task_2:
action: std.noop
second_lvl_wf1_task_3:
action: std.noop
wf1_third_lvl_fail:
type: direct
tasks:
third_lvl_wf1_task_1:
action: std.noop
on-success:
- third_lvl_wf1_task_2_fail
third_lvl_wf1_task_2_fail:
action: std.fail
third_lvl_wf1_task_3:
action: std.noop
wf2_top_lvl:
type: direct
tasks:
top_lvl_wf2_task_1:
action: std.noop
top_lvl_wf2_task_2:
action: std.noop
wf3_top_lvl:
type: direct
tasks:
top_lvl_wf3_task_1_fail:
workflow: wf3_second_lvl_fail
top_lvl_wf3_task_2_fail:
action: std.fail
wf3_second_lvl_fail:
type: direct
tasks:
second_lvl_wf3_task_1_fail:
workflow: wf3_third_lvl_fail
second_lvl_wf3_task_2:
action: std.noop
second_lvl_wf3_task_3:
action: std.noop
wf3_third_lvl_fail:
type: direct
tasks:
third_lvl_wf3_task_1:
action: std.noop
on-success:
- third_lvl_wf3_task_2
third_lvl_wf3_task_2:
action: std.noop
third_lvl_wf3_task_3_fail:
action: std.fail
wf4_top_lvl:
type: direct
tasks:
top_lvl_wf4_task_1:
workflow: wf4_second_lvl
publish:
raise_error: <% $.invalid_yaql_expression %>
wf4_second_lvl:
type: direct
tasks:
second_lvl_wf4_task_1:
action: std.noop
wf5_top_lvl:
type: direct
tasks:
top_lvl_wf5_task_1:
workflow: wf4_second_lvl
input:
raise_error: <% $.invalid_yaql_expression2 %>
wf5_second_lvl:
type: direct
tasks:
second_lvl_wf5_task_1:
workflow: wf5_third_lvl
wf5_third_lvl:
type: direct
tasks:
third_lvl_wf5_task_1:
action: std.noop
"""
class TasksFunctionTest(base.EngineTestCase):
def setUp(self):
super(TasksFunctionTest, self).setUp()
def _assert_published_tasks(self, task, published_key,
expected_tasks_count=None,
expected_tasks_names=None):
published = task.published[published_key]
self.assertIsNotNone(
published,
"there is a problem with publishing '{}'".format(published_key)
)
published_names = [t['name'] for t in published]
if expected_tasks_names:
for e in expected_tasks_names:
self.assertIn(e, published_names)
if not expected_tasks_count:
expected_tasks_count = len(expected_tasks_names)
if expected_tasks_count:
self.assertEqual(expected_tasks_count, len(published))
def test_tasks_function(self):
wb_service.create_workbook_v2(WORKBOOK_WITH_EXPRESSIONS)
# Start helping workflow executions.
wf1_ex = self.engine.start_workflow('wb.wf1_top_lvl', {})
wf2_ex = self.engine.start_workflow('wb.wf2_top_lvl', {})
wf3_ex = self.engine.start_workflow('wb.wf3_top_lvl', {})
wf4_ex = self.engine.start_workflow('wb.wf4_top_lvl', {})
wf5_ex = self.engine.start_workflow('wb.wf5_top_lvl', {})
self.await_workflow_success(wf1_ex.id)
self.await_workflow_success(wf2_ex.id)
self.await_workflow_error(wf3_ex.id)
self.await_workflow_error(wf4_ex.id)
self.await_workflow_error(wf5_ex.id)
# Start test workflow execution
execution = self.engine.start_workflow(
'wb.test_tasks_function',
{"wf1_wx_id": wf1_ex.id,
"wf2_wx_id": wf2_ex.id,
"wf3_wx_id": wf3_ex.id,
"wf4_wx_id": wf4_ex.id,
"wf5_wx_id": wf5_ex.id}
)
self.await_workflow_success(execution.id)
with db_api.transaction():
execution = db_api.get_workflow_execution(execution.id)
task_executions = execution.task_executions
self.assertEqual(states.SUCCESS, execution.state)
self.assertEqual(1, len(task_executions))
main_task = task_executions[0]
self._assert_published_tasks(
main_task,
"all_tasks_yaql",
22)
self._assert_published_tasks(
main_task,
"all_tasks_jinja",
22)
self._assert_published_tasks(
main_task,
"wf1_tasks_yaql",
2,
["top_lvl_wf1_task_1", "top_lvl_wf1_task_2"]
)
self._assert_published_tasks(
main_task,
"wf1_tasks_jinja",
2,
["top_lvl_wf1_task_1", "top_lvl_wf1_task_2"]
)
self._assert_published_tasks(
main_task,
"wf1_recursive_tasks_yaql",
8,
[
'top_lvl_wf1_task_1',
'top_lvl_wf1_task_2',
'second_lvl_wf1_task_3',
'second_lvl_wf1_task_1',
'second_lvl_wf1_task_2',
'third_lvl_wf1_task_3',
'third_lvl_wf1_task_1',
'third_lvl_wf1_task_2_fail'
]
)
self._assert_published_tasks(
main_task,
"wf1_recursive_tasks_jinja",
8,
[
'top_lvl_wf1_task_1',
'top_lvl_wf1_task_2',
'second_lvl_wf1_task_3',
'second_lvl_wf1_task_1',
'second_lvl_wf1_task_2',
'third_lvl_wf1_task_3',
'third_lvl_wf1_task_1',
'third_lvl_wf1_task_2_fail'
]
)
self._assert_published_tasks(
main_task,
"wf1_recursive_error_tasks_yaql",
2,
['second_lvl_wf1_task_1', 'third_lvl_wf1_task_2_fail']
)
self._assert_published_tasks(
main_task,
"wf1_recursive_error_tasks_jinja",
2,
['second_lvl_wf1_task_1', 'third_lvl_wf1_task_2_fail']
)
self._assert_published_tasks(
main_task,
"wf1_not_recursive_error_tasks_yaql",
0
)
self._assert_published_tasks(
main_task,
"wf1_not_recursive_error_tasks_jinja",
0
)
self._assert_published_tasks(
main_task,
"wf1_recursive_success_flat_tasks_yaql",
5,
[
'top_lvl_wf1_task_2',
'second_lvl_wf1_task_3',
'second_lvl_wf1_task_2',
'third_lvl_wf1_task_3',
'third_lvl_wf1_task_1'
]
)
self._assert_published_tasks(
main_task,
"wf1_recursive_success_flat_tasks_jinja",
5,
[
'top_lvl_wf1_task_2',
'second_lvl_wf1_task_3',
'second_lvl_wf1_task_2',
'third_lvl_wf1_task_3',
'third_lvl_wf1_task_1'
]
)
self._assert_published_tasks(
main_task,
"wf2_recursive_tasks_yaql",
2,
['top_lvl_wf2_task_2', 'top_lvl_wf2_task_1']
)
self._assert_published_tasks(
main_task,
"wf2_recursive_tasks_jinja",
2,
['top_lvl_wf2_task_2', 'top_lvl_wf2_task_1']
)
self._assert_published_tasks(
main_task,
"wf3_recursive_error_tasks_yaql",
4,
[
'top_lvl_wf3_task_1_fail',
'top_lvl_wf3_task_2_fail',
'second_lvl_wf3_task_1_fail',
'third_lvl_wf3_task_3_fail'
]
)
self._assert_published_tasks(
main_task,
"wf3_recursive_error_tasks_jinja",
4,
[
'top_lvl_wf3_task_1_fail',
'top_lvl_wf3_task_2_fail',
'second_lvl_wf3_task_1_fail',
'third_lvl_wf3_task_3_fail'
]
)
self._assert_published_tasks(
main_task,
"wf3_recursive_error_flat_tasks_yaql",
2,
['top_lvl_wf3_task_2_fail', 'third_lvl_wf3_task_3_fail']
)
self._assert_published_tasks(
main_task,
"wf3_recursive_error_flat_tasks_jinja",
2,
['top_lvl_wf3_task_2_fail', 'third_lvl_wf3_task_3_fail']
)
self._assert_published_tasks(
main_task,
"wf4_recursive_error_flat_tasks_yaql",
1,
['top_lvl_wf4_task_1']
)
self._assert_published_tasks(
main_task,
"wf4_recursive_error_flat_tasks_jinja",
1,
['top_lvl_wf4_task_1']
)
self._assert_published_tasks(
main_task,
"wf5_recursive_error_flat_tasks_yaql",
1,
['top_lvl_wf5_task_1']
)
self._assert_published_tasks(
main_task,
"wf5_recursive_error_flat_tasks_jinja",
1,
['top_lvl_wf5_task_1']
)

View File

@ -183,7 +183,7 @@ class JinjaEvaluatorTest(base.BaseTest):
@mock.patch('mistral.db.v2.api.get_task_executions')
@mock.patch('mistral.workflow.data_flow.get_task_execution_result')
def test_filter_task_without_taskexecution(self, task_execution_result,
def test_filter_task_without_task_execution(self, task_execution_result,
task_executions):
task = mock.MagicMock(return_value={})
task_executions.return_value = [task]
@ -203,9 +203,38 @@ class JinjaEvaluatorTest(base.BaseTest):
'result': task_execution_result(),
'spec': task.spec,
'state': task.state,
'state_info': task.state_info
'state_info': task.state_info,
'type': task.type,
'workflow_execution_id': task.workflow_execution_id
}, result)
@mock.patch('mistral.db.v2.api.get_task_executions')
@mock.patch('mistral.workflow.data_flow.get_task_execution_result')
def test_filter_tasks_without_task_execution(self, task_execution_result,
task_executions):
task = mock.MagicMock(return_value={})
task_executions.return_value = [task]
ctx = {
'__task_execution': None,
'__execution': {
'id': 'some'
}
}
result = self._evaluator.evaluate('_|tasks()', ctx)
self.assertEqual([{
'id': task.id,
'name': task.name,
'published': task.published,
'result': task_execution_result(),
'spec': task.spec,
'state': task.state,
'state_info': task.state_info,
'type': task.type,
'workflow_execution_id': task.workflow_execution_id
}], result)
@mock.patch('mistral.db.v2.api.get_task_execution')
@mock.patch('mistral.workflow.data_flow.get_task_execution_result')
def test_filter_task_with_taskexecution(self, task_execution_result,
@ -226,7 +255,9 @@ class JinjaEvaluatorTest(base.BaseTest):
'result': task_execution_result(),
'spec': task_execution().spec,
'state': task_execution().state,
'state_info': task_execution().state_info
'state_info': task_execution().state_info,
'type': task_execution().type,
'workflow_execution_id': task_execution().workflow_execution_id
}, result)
@mock.patch('mistral.db.v2.api.get_task_execution')
@ -248,7 +279,9 @@ class JinjaEvaluatorTest(base.BaseTest):
'result': task_execution_result(),
'spec': task_execution().spec,
'state': task_execution().state,
'state_info': task_execution().state_info
'state_info': task_execution().state_info,
'type': task_execution().type,
'workflow_execution_id': task_execution().workflow_execution_id
}, result)
@mock.patch('mistral.db.v2.api.get_workflow_execution')

View File

@ -18,6 +18,7 @@ from mistral import exceptions as exc
from mistral.expressions import yaql_expression as expr
from mistral.tests.unit import base
from mistral import utils
import mock
DATA = {
"server": {
@ -136,6 +137,48 @@ class YaqlEvaluatorTest(base.BaseTest):
self.assertTrue(utils.is_valid_uuid(uuid))
@mock.patch('mistral.db.v2.api.get_task_executions')
@mock.patch('mistral.workflow.data_flow.get_task_execution_result')
def test_filter_tasks_without_task_execution(self, task_execution_result,
task_executions):
task_execution_result.return_value = 'task_execution_result'
task = type("obj", (object,), {
'id': 'id',
'name': 'name',
'published': 'published',
'result': task_execution_result(),
'spec': 'spec',
'state': 'state',
'state_info': 'state_info',
'type': 'type',
'workflow_execution_id': 'workflow_execution_id'
})()
task_executions.return_value = [task]
ctx = {
'__task_execution': None,
'__execution': {
'id': 'some'
}
}
result = self._evaluator.evaluate('tasks(some)', ctx)
self.assertEqual(1, len(result))
self.assertDictEqual({
'id': task.id,
'name': task.name,
'published': task.published,
'result': task.result,
'spec': task.spec,
'state': task.state,
'state_info': task.state_info,
'type': task.type,
'workflow_execution_id': task.workflow_execution_id
}, result[0])
def test_function_env(self):
ctx = {'__env': 'some'}

View File

@ -41,6 +41,8 @@ from mistral import version
# Thread local storage.
_th_loc_storage = threading.local()
ACTION_TASK_TYPE = 'ACTION'
WORKFLOW_TASK_TYPE = 'WORKFLOW'
def generate_unicode_uuid():

View File

@ -118,8 +118,6 @@ def json_pp_(context, data=None):
def task_(context, task_name):
# Importing data_flow in order to break cycle dependency between modules.
from mistral.workflow import data_flow
# This section may not exist in a context if it's calculated not in
# task scope.
@ -140,6 +138,105 @@ def task_(context, task_name):
if not task_ex:
return None
# We don't use to_dict() db model method because not all fields
# make sense for user.
return _convert_to_user_model(task_ex)
def _should_pass_filter(t, state, flat):
# Start from assuming all is true, check only if needed.
state_match = True
flat_match = True
if state:
state_match = t['state'] == state
if flat:
is_action = t['type'] == utils.ACTION_TASK_TYPE
if not is_action:
nested_execs = db_api.get_workflow_executions(
task_execution_id=t.id
)
for n in nested_execs:
flat_match = flat_match and n.state != t.state
return state_match and flat_match
def _get_tasks_from_db(workflow_execution_id=None, recursive=False, state=None,
flat=False):
task_execs = []
nested_task_exs = []
kwargs = {}
if workflow_execution_id:
kwargs['workflow_execution_id'] = workflow_execution_id
# We can't add state to query if we want to filter by workflow_execution_id
# recursively. There might be a workflow_execution in one state with a
# nested workflow execution that has a task in the desired state until we
# have an optimization for queering all workflow executions under a given
# top level workflow execution, this is the way to go.
if state and not (workflow_execution_id and recursive):
kwargs['state'] = state
task_execs.extend(db_api.get_task_executions(**kwargs))
# If it is not recursive no need to check nested workflows.
# If there is no workflow execution id, we already have all we need, and
# doing more queries will just create duplication in the results.
if recursive and workflow_execution_id:
for t in task_execs:
if t.type == utils.WORKFLOW_TASK_TYPE:
# Get nested workflow execution that matches the task.
nested_workflow_executions = db_api.get_workflow_executions(
task_execution_id=t.id
)
# There might be zero nested executions.
for nested_workflow_execution in nested_workflow_executions:
nested_task_exs.extend(
_get_tasks_from_db(
nested_workflow_execution.id,
recursive,
state,
flat
)
)
if state or flat:
# Filter by state and flat.
task_execs = [
t for t in task_execs if _should_pass_filter(t, state, flat)
]
# The nested tasks were already filtered, since this is a recursion.
task_execs.extend(nested_task_exs)
return task_execs
def tasks_(context, workflow_execution_id=None, recursive=False, state=None,
flat=False):
task_execs = _get_tasks_from_db(
workflow_execution_id,
recursive,
state,
flat
)
# Convert task_execs to user model and return.
return [_convert_to_user_model(t) for t in task_execs]
def _convert_to_user_model(task_ex):
# Importing data_flow in order to break cycle dependency between modules.
from mistral.workflow import data_flow
# We don't use to_dict() db model method because not all fields
# make sense for user.
return {
@ -149,7 +246,9 @@ def task_(context, task_name):
'state': task_ex.state,
'state_info': task_ex.state_info,
'result': data_flow.get_task_execution_result(task_ex),
'published': task_ex.published
'published': task_ex.published,
'type': task_ex.type,
'workflow_execution_id': task_ex.workflow_execution_id
}

View File

@ -36,9 +36,6 @@ RESERVED_TASK_NAMES = [
'pause'
]
ACTION_TASK_TYPE = 'ACTION'
WORKFLOW_TASK_TYPE = 'WORKFLOW'
class TaskSpec(base.BaseSpec):
# See http://json-schema.org
@ -225,8 +222,8 @@ class TaskSpec(base.BaseSpec):
def get_type(self):
if self._workflow:
return WORKFLOW_TASK_TYPE
return ACTION_TASK_TYPE
return utils.WORKFLOW_TASK_TYPE
return utils.ACTION_TASK_TYPE
class DirectWorkflowTaskSpec(TaskSpec):

View File

@ -72,6 +72,7 @@ mistral.actions =
mistral.expression.functions =
json_pp = mistral.utils.expression_utils:json_pp_
task = mistral.utils.expression_utils:task_
tasks = mistral.utils.expression_utils:tasks_
execution = mistral.utils.expression_utils:execution_
env = mistral.utils.expression_utils:env_
uuid = mistral.utils.expression_utils:uuid_