Merge "Add root_execution_id to sub-workflow executions"
This commit is contained in:
commit
fd127e36ef
mistral
db
engine
tests/unit/engine
36
mistral/db/sqlalchemy/migration/alembic_migrations/versions/023_add_root_execution_id.py
Normal file
36
mistral/db/sqlalchemy/migration/alembic_migrations/versions/023_add_root_execution_id.py
Normal file
@ -0,0 +1,36 @@
|
||||
# Copyright 2015 OpenStack Foundation.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Add the root execution ID to the workflow execution model
|
||||
|
||||
Revision ID: 023
|
||||
Revises: 022
|
||||
Create Date: 2017-07-26 14:51:02.384729
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '023'
|
||||
down_revision = '022'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.add_column(
|
||||
'workflow_executions_v2',
|
||||
sa.Column('root_execution_id', sa.String(length=80), nullable=True)
|
||||
)
|
@ -317,6 +317,14 @@ sa.Index(
|
||||
'task_execution_id'
|
||||
)
|
||||
|
||||
# Many-to-one for 'WorkflowExecution' and 'WorkflowExecution'
|
||||
|
||||
WorkflowExecution.root_execution_id = sa.Column(
|
||||
sa.String(36),
|
||||
sa.ForeignKey(WorkflowExecution.id, ondelete='SET NULL'),
|
||||
nullable=True
|
||||
)
|
||||
|
||||
# Many-to-one for 'TaskExecution' and 'WorkflowExecution'.
|
||||
|
||||
TaskExecution.workflow_execution_id = sa.Column(
|
||||
|
@ -534,7 +534,13 @@ class WorkflowAction(Action):
|
||||
wf_def.updated_at
|
||||
)
|
||||
|
||||
# If the parent has a root_execution_id, it must be a sub-workflow. So
|
||||
# we should propogate that ID down. Otherwise the parent must be the
|
||||
# root execution and we should use the parents ID.
|
||||
root_execution_id = parent_wf_ex.root_execution_id or parent_wf_ex.id
|
||||
|
||||
wf_params = {
|
||||
'root_execution_id': root_execution_id,
|
||||
'task_execution_id': self.task_ex.id,
|
||||
'index': index,
|
||||
'namespace': parent_wf_ex.params['namespace']
|
||||
|
@ -270,6 +270,7 @@ class Workflow(object):
|
||||
'state': states.IDLE,
|
||||
'output': {},
|
||||
'task_execution_id': params.get('task_execution_id'),
|
||||
'root_execution_id': params.get('root_execution_id'),
|
||||
'runtime_context': {
|
||||
'index': params.get('index', 0)
|
||||
},
|
||||
|
@ -187,7 +187,29 @@ workflows:
|
||||
tasks:
|
||||
task1:
|
||||
action: std.noop
|
||||
"""
|
||||
|
||||
WB6 = """
|
||||
---
|
||||
version: '2.0'
|
||||
|
||||
name: wb6
|
||||
|
||||
workflows:
|
||||
wf1:
|
||||
tasks:
|
||||
task1:
|
||||
workflow: wf2
|
||||
|
||||
wf2:
|
||||
tasks:
|
||||
task1:
|
||||
workflow: wf3
|
||||
|
||||
wf3:
|
||||
tasks:
|
||||
task1:
|
||||
action: std.noop
|
||||
"""
|
||||
|
||||
|
||||
@ -200,6 +222,7 @@ class SubworkflowsTest(base.EngineTestCase):
|
||||
wb_service.create_workbook_v2(WB3)
|
||||
wb_service.create_workbook_v2(WB4)
|
||||
wb_service.create_workbook_v2(WB5)
|
||||
wb_service.create_workbook_v2(WB6)
|
||||
|
||||
def test_subworkflow_success(self):
|
||||
wf2_ex = self.engine.start_workflow('wb1.wf2', '', None)
|
||||
@ -409,3 +432,28 @@ class SubworkflowsTest(base.EngineTestCase):
|
||||
with db_api.transaction():
|
||||
ex = db_api.get_workflow_execution(ex.id)
|
||||
self.assertEqual({'sub_wf_out': 'abc'}, ex.output)
|
||||
|
||||
def test_subworkflow_root_execution_id(self):
|
||||
wf1_ex = self.engine.start_workflow('wb6.wf1', '', None)
|
||||
|
||||
self._await(lambda: len(db_api.get_workflow_executions()) == 3, 0.5, 5)
|
||||
|
||||
wf_execs = db_api.get_workflow_executions()
|
||||
wf1_ex = self._assert_single_item(wf_execs, name='wb6.wf1')
|
||||
wf2_ex = self._assert_single_item(wf_execs, name='wb6.wf2')
|
||||
wf3_ex = self._assert_single_item(wf_execs, name='wb6.wf3')
|
||||
|
||||
self.assertEqual(3, len(wf_execs))
|
||||
|
||||
# Wait till workflow 'wf1' is completed (and all the sub-workflows
|
||||
# will be completed also).
|
||||
self.await_workflow_success(wf1_ex.id)
|
||||
|
||||
with db_api.transaction():
|
||||
wf1_ex = db_api.get_workflow_execution(wf1_ex.id)
|
||||
wf2_ex = db_api.get_workflow_execution(wf2_ex.id)
|
||||
wf3_ex = db_api.get_workflow_execution(wf3_ex.id)
|
||||
|
||||
self.assertIsNone(wf1_ex.root_execution_id, None)
|
||||
self.assertEqual(wf2_ex.root_execution_id, wf1_ex.id)
|
||||
self.assertEqual(wf3_ex.root_execution_id, wf1_ex.id)
|
||||
|
Loading…
x
Reference in New Issue
Block a user