Merge "Workflow definition updated, when workbook is created"
This commit is contained in:
commit
eee5296393
@ -86,6 +86,7 @@ def _create_or_update_workflows(wb_db, workflows_spec):
|
||||
|
||||
values = {
|
||||
'name': wf_name,
|
||||
'definition': _get_wf_definition(wb_db, wf_spec),
|
||||
'spec': wf_spec.to_dict(),
|
||||
'scope': wb_db.scope,
|
||||
'project_id': wb_db.project_id,
|
||||
@ -107,3 +108,10 @@ def _get_workbook_values(wb_spec, definition, scope):
|
||||
}
|
||||
|
||||
return values
|
||||
|
||||
|
||||
def _get_wf_definition(wb_db, wf_spec):
|
||||
wf_definition = spec_parser.get_workflow_definition(wb_db.definition,
|
||||
wf_spec.get_name())
|
||||
|
||||
return wf_definition
|
||||
|
@ -42,6 +42,7 @@ actions:
|
||||
|
||||
workflows:
|
||||
wf1:
|
||||
#Sample Comment 1
|
||||
type: reverse
|
||||
tags: [wf_test]
|
||||
input:
|
||||
@ -67,6 +68,34 @@ workflows:
|
||||
result: "The result of subworkflow is '{$.final_result}'"
|
||||
"""
|
||||
|
||||
WORKBOOK_WF1_DEFINITION = """wf1:
|
||||
#Sample Comment 1
|
||||
type: reverse
|
||||
tags: [wf_test]
|
||||
input:
|
||||
- param1
|
||||
output:
|
||||
result: "{$.result}"
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
action: std.echo output="{$.param1}"
|
||||
publish:
|
||||
result: "{$}"
|
||||
"""
|
||||
|
||||
WORKBOOK_WF2_DEFINITION = """wf2:
|
||||
type: direct
|
||||
output:
|
||||
result: "{$.result}"
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
workflow: my_wb.wf1 param1='Hi' task_name='task1'
|
||||
publish:
|
||||
result: "The result of subworkflow is '{$.final_result}'"
|
||||
"""
|
||||
|
||||
UPDATED_WORKBOOK = """
|
||||
---
|
||||
version: '2.0'
|
||||
@ -106,6 +135,32 @@ workflows:
|
||||
result: "{$}"
|
||||
"""
|
||||
|
||||
UPDATED_WORKBOOK_WF1_DEFINITION = """wf1:
|
||||
type: direct
|
||||
output:
|
||||
result: "{$.result}"
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
workflow: my_wb.wf2 param1='Hi' task_name='task1'
|
||||
publish:
|
||||
result: "The result of subworkflow is '{$.final_result}'"
|
||||
"""
|
||||
|
||||
UPDATED_WORKBOOK_WF2_DEFINITION = """wf2:
|
||||
type: reverse
|
||||
input:
|
||||
- param1
|
||||
output:
|
||||
result: "{$.result}"
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
action: std.echo output="{$.param1}"
|
||||
publish:
|
||||
result: "{$}"
|
||||
"""
|
||||
|
||||
|
||||
class WorkbookServiceTest(base.DbTestCase):
|
||||
def test_create_workbook(self):
|
||||
@ -143,6 +198,7 @@ class WorkbookServiceTest(base.DbTestCase):
|
||||
self.assertEqual('reverse', wf1_spec.get_type())
|
||||
self.assertListEqual(['wf_test'], wf1_spec.get_tags())
|
||||
self.assertListEqual(['wf_test'], wf1_db.tags)
|
||||
self.assertEqual(WORKBOOK_WF1_DEFINITION, wf1_db.definition)
|
||||
|
||||
# Workflow 2.
|
||||
wf2_db = self._assert_single_item(db_wfs, name='my_wb.wf2')
|
||||
@ -150,6 +206,7 @@ class WorkbookServiceTest(base.DbTestCase):
|
||||
|
||||
self.assertEqual('wf2', wf2_spec.get_name())
|
||||
self.assertEqual('direct', wf2_spec.get_type())
|
||||
self.assertEqual(WORKBOOK_WF2_DEFINITION, wf2_db.definition)
|
||||
|
||||
def test_update_workbook(self):
|
||||
# Create workbook.
|
||||
@ -176,6 +233,7 @@ class WorkbookServiceTest(base.DbTestCase):
|
||||
|
||||
self.assertEqual('wf1', wf1_spec.get_name())
|
||||
self.assertEqual('direct', wf1_spec.get_type())
|
||||
self.assertEqual(UPDATED_WORKBOOK_WF1_DEFINITION, wf1_db.definition)
|
||||
|
||||
# Workflow 2.
|
||||
wf2_db = self._assert_single_item(db_wfs, name='my_wb.wf2')
|
||||
@ -183,3 +241,4 @@ class WorkbookServiceTest(base.DbTestCase):
|
||||
|
||||
self.assertEqual('wf2', wf2_spec.get_name())
|
||||
self.assertEqual('reverse', wf2_spec.get_type())
|
||||
self.assertEqual(UPDATED_WORKBOOK_WF2_DEFINITION, wf2_db.definition)
|
||||
|
@ -13,6 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import StringIO
|
||||
import yaml
|
||||
from yaml import error
|
||||
|
||||
@ -118,3 +119,34 @@ def get_task_spec(spec_dict):
|
||||
return base.instantiate_spec(tasks_v2.TaskSpec, spec_dict)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def get_workflow_definition(wb_def, wf_name):
|
||||
wf_def = []
|
||||
wf_name = wf_name + ":"
|
||||
io = StringIO.StringIO(wb_def[wb_def.index("workflows:"):])
|
||||
io.readline()
|
||||
ident = 0
|
||||
|
||||
# Get the indentation of the workflow name tag. (e.g. wf1:)
|
||||
for line in io:
|
||||
if wf_name == line.strip():
|
||||
ident = len(line.expandtabs()) - len(line.expandtabs().lstrip(' '))
|
||||
wf_def.append(line.lstrip())
|
||||
break
|
||||
|
||||
# Add strings to list unless same/less indentation is found.
|
||||
for line in io:
|
||||
if not line.strip() or line.startswith("#"):
|
||||
wf_def.append(line)
|
||||
else:
|
||||
temp = len(line.expandtabs()) - len(line.expandtabs().lstrip(' '))
|
||||
if ident < temp:
|
||||
wf_def.append(line)
|
||||
else:
|
||||
break
|
||||
|
||||
io.close()
|
||||
wf_def = ''.join(wf_def).strip() + '\n'
|
||||
|
||||
return wf_def
|
||||
|
Loading…
Reference in New Issue
Block a user