Remove workflow spec, input and params from workflow context

* The reason to remove workflow spec and input is optimizing
  DB space taken by task execution objects, now they are very
  heavy if case of large workflows because their inbound contexts
  always contain a full workflow specification
* execution() YAQL function now doesn't take values from task
  context, it just does a simple DB lookup

Change-Id: I00b88d4ca75f334ebce764332c21aef96eba5414
This commit is contained in:
Renat Akhmerov 2016-09-14 15:55:42 +03:00
parent 23ff5605a6
commit 097c68f641
4 changed files with 72 additions and 5 deletions

View File

@ -83,6 +83,14 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
task3.published
)
# Make sure that task inbound context doesn't contain workflow
# specification, input and params.
ctx = task1.in_context
self.assertFalse('spec' in ctx['__execution'])
self.assertFalse('input' in ctx['__execution'])
self.assertFalse('params' in ctx['__execution'])
def test_linear_with_branches_dataflow(self):
linear_with_branches_wf = """---
version: '2.0'

View File

@ -163,3 +163,57 @@ class YAQLFunctionsEngineTest(engine_test_base.EngineTestCase):
self.assertIsNotNone(result)
self.assertEqual(36, len(result))
self.assertEqual(4, result.count('-'))
def test_execution_function(self):
wf_text = """---
version: '2.0'
wf:
input:
- k1
- k2: v2_default
tasks:
task1:
action: std.echo output=<% execution() %>
publish:
result: <% task(task1).result %>
"""
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow(
'wf',
{'k1': 'v1'},
param1='blablabla'
)
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
task_ex = task_execs[0]
execution = task_ex.published['result']
self.assertIsInstance(execution, dict)
spec = execution['spec']
self.assertEqual('2.0', spec['version'])
self.assertEqual('wf', spec['name'])
self.assertIn('tasks', spec)
self.assertEqual(1, len(spec['tasks']))
self.assertDictEqual(
{
'k1': 'v1',
'k2': 'v2_default'
},
execution['input']
)
self.assertDictEqual({'param1': 'blablabla'}, execution['params'])

View File

@ -72,7 +72,15 @@ def env_(context):
def execution_(context):
return context['__execution']
wf_ex = db_api.get_workflow_execution(context['__execution']['id'])
return {
'id': wf_ex.id,
'name': wf_ex.name,
'spec': wf_ex.spec,
'input': wf_ex.input,
'params': wf_ex.params
}
def json_pp_(data):

View File

@ -163,10 +163,7 @@ def add_execution_to_context(wf_ex):
wf_ex.context = wf_ex.context or {}
wf_ex.context['__execution'] = {
'id': wf_ex.id,
'spec': wf_ex.spec,
'params': wf_ex.params,
'input': wf_ex.input
'id': wf_ex.id
}