Add implicit task access in workflow

Implements: blueprint task-result-implicit-access

Change-Id: I7391f0374a9371512c56152ebd68e2e189c63826
This commit is contained in:
Nikolay Mahotkin 2015-04-01 13:28:29 +03:00
parent 6b9ac50c4a
commit f8fd150f9c
7 changed files with 129 additions and 25 deletions

View File

@ -194,6 +194,7 @@ def _get_input_dictionaries(wf_spec, task_ex, task_spec, ctx):
In case of 'with-items' the result list will contain input dictionaries
for all 'with-items' iterations correspondingly.
"""
ctx = data_flow.extract_task_result_proxies_to_context(ctx)
if not task_spec.get_with_items():
input_dict = _get_workflow_or_action_input(

View File

@ -16,12 +16,14 @@
import abc
import copy
import re
import six
import yaql
from mistral.openstack.common import log as logging
from mistral import yaql_utils
LOG = logging.getLogger(__name__)

View File

@ -148,7 +148,7 @@ class BaseTest(base.BaseTestCase):
self.fail(self._formatMessage(msg, standardMsg))
def _await(self, predicate, delay=1, timeout=30):
def _await(self, predicate, delay=1, timeout=60):
"""Awaits for predicate function to evaluate to True.
If within a configured timeout predicate function hasn't evaluated

View File

@ -373,6 +373,53 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
task4.published
)
def test_linear_dataflow_implicit_publish(self):
linear_wf = """---
version: '2.0'
wf:
type: direct
tasks:
task1:
action: std.echo output="Hi"
on-success:
- task21
- task22
task21:
action: std.echo output="Morpheus"
on-success:
- task4
task22:
action: std.echo output="Neo"
on-success:
- task4
task4:
join: all
publish:
result: "<% $.task1 %>, <% $.task21 %>! Your <% $.task22 %>."
"""
wf_service.create_workflows(linear_wf)
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {})
self._await(lambda: self.is_execution_success(wf_ex.id))
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
task4 = self._assert_single_item(tasks, name='task4')
self.assertDictEqual(
{'result': 'Hi, Morpheus! Your Neo.'},
task4.published
)
class DataFlowTest(test_base.BaseTest):
def test_get_task_execution_result(self):

View File

@ -319,10 +319,7 @@ class DefaultEngineTest(base.DbTestCase):
self.assertEqual(states.SUCCESS, task2_action_ex.state)
# Data Flow properties.
self._assert_dict_contains_subset(
task1_ex.in_context,
task2_ex.in_context
)
self.assertIn('__tasks', task2_ex.in_context)
self.assertIn('__execution', task1_ex.in_context)
self.assertDictEqual({'output': 'Hi'}, task2_action_ex.input)
self.assertDictEqual({}, task2_ex.published)

View File

@ -89,7 +89,6 @@ class ReverseWorkflowEngineTest(base.EngineTestCase):
)
self.assertDictEqual({'result1': 'a'}, task_ex.published)
self.assertEqual('a', wf_ex.output['task1'])
def test_start_task2(self):
wf_input = {'param1': 'a', 'param2': 'b'}
@ -128,6 +127,3 @@ class ReverseWorkflowEngineTest(base.EngineTestCase):
)
self.assertDictEqual({'result2': 'a & b'}, task2_ex.published)
self.assertEqual('a', wf_ex.output['task1'])
self.assertEqual('a & b', wf_ex.output['task2'])

View File

@ -14,9 +14,11 @@
# limitations under the License.
import copy
from oslo.config import cfg
from mistral import context as auth_ctx
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models
from mistral import expressions as expr
from mistral.openstack.common import log as logging
@ -38,9 +40,24 @@ def evaluate_upstream_context(upstream_task_execs):
task_published_vars,
t_ex.published
)
utils.merge_dicts(ctx, evaluate_task_outbound_context(t_ex))
utils.merge_dicts(
ctx, evaluate_task_outbound_context(t_ex, including_result=False)
)
return utils.merge_dicts(ctx, task_published_vars)
ctx = utils.merge_dicts(ctx, task_published_vars)
return utils.merge_dicts(
ctx, _get_task_identifiers_dict(upstream_task_execs)
)
def _get_task_identifiers_dict(task_execs):
tasks = {}
for task_ex in task_execs:
tasks[task_ex.id] = task_ex.name
return {"__tasks": tasks}
def _extract_execution_result(ex):
@ -63,8 +80,40 @@ def get_task_execution_result(task_ex):
return []
class TaskResultProxy(object):
def __init__(self, task_id):
self.task_id = task_id
def get(self):
task_ex = db_api.get_task_execution(self.task_id)
return get_task_execution_result(task_ex)
def __str__(self):
return "%s [task_id = '%s']" % (self.__class__.__name__, self.task_id)
class ProxyAwareDict(dict):
def __getitem__(self, item):
val = super(ProxyAwareDict, self).__getitem__(item)
if isinstance(val, TaskResultProxy):
return val.get()
return val
def get(self, k, d=None):
try:
return self.__getitem__(k)
except KeyError:
return d
def to_builtin_dict(self):
return {k: self[k] for k, _ in self.iteritems()}
def publish_variables(task_ex, task_spec):
expr_ctx = copy.deepcopy(task_ex.in_context) or {}
expr_ctx = extract_task_result_proxies_to_context(
copy.deepcopy(task_ex.in_context)
)
if task_ex.name in expr_ctx:
LOG.warning(
@ -72,9 +121,8 @@ def publish_variables(task_ex, task_spec):
task_ex.name
)
task_ex_result = get_task_execution_result(task_ex)
expr_ctx[task_ex.name] = copy.deepcopy(task_ex_result) or {}
# Add result of current task to context for variables evaluation.
expr_ctx[task_ex.name] = TaskResultProxy(task_ex.id)
task_ex.published = expr.evaluate_recursively(
task_spec.get_publish(),
@ -82,12 +130,14 @@ def publish_variables(task_ex, task_spec):
)
def evaluate_task_outbound_context(task_ex):
def evaluate_task_outbound_context(task_ex, including_result=True):
"""Evaluates task outbound Data Flow context.
This method assumes that complete task output (after publisher etc.)
has already been evaluated.
:param task_ex: DB task.
:param including_result: boolean argument, if True - include the
TaskResultProxy in outbound context under <task_name> key.
:return: Outbound task Data Flow context.
"""
@ -100,16 +150,15 @@ def evaluate_task_outbound_context(task_ex):
out_ctx = utils.merge_dicts(in_context, task_ex.published)
# Add task output under key 'taskName'.
# TODO(rakhmerov): This must be a different mechanism since
# task result may be huge.
task_ex_result = get_task_execution_result(task_ex)
if including_result:
task_ex_result = TaskResultProxy(task_ex.id)
out_ctx = utils.merge_dicts(
out_ctx,
{task_ex.name: copy.deepcopy(task_ex_result) or None}
{task_ex.name: task_ex_result or None}
)
return out_ctx
return ProxyAwareDict(out_ctx)
def evaluate_workflow_output(wf_spec, context):
@ -118,6 +167,9 @@ def evaluate_workflow_output(wf_spec, context):
:param wf_spec: Workflow specification.
:param context: Final Data Flow context (cause task's outbound context).
"""
# Convert context to ProxyAwareDict for correct output evaluation.
context = ProxyAwareDict(copy.deepcopy(context))
output_dict = wf_spec.get_output()
# Evaluate workflow 'publish' clause using the final workflow context.
@ -125,7 +177,7 @@ def evaluate_workflow_output(wf_spec, context):
# TODO(rakhmerov): Many don't like that we return the whole context
# TODO(rakhmerov): if 'output' is not explicitly defined.
return output or context
return ProxyAwareDict(output or context).to_builtin_dict()
def add_openstack_data_to_context(context):
@ -170,6 +222,15 @@ def add_environment_to_context(wf_ex, context):
return context
def extract_task_result_proxies_to_context(ctx):
ctx = ProxyAwareDict(copy.deepcopy(ctx))
for task_ex_id, task_ex_name in ctx['__tasks'].iteritems():
ctx[task_ex_name] = TaskResultProxy(task_ex_id)
return ctx
def evaluate_object_fields(obj, context):
fields = inspect_utils.get_public_fields(obj)