Fix with-items concurrency for sub-workflows

For with-items task that uses concurrency to execute subworkflows, the
remaining iterations after the first iteration will return the same item
on subworkflow completion. This bug does not affect with-items task that
executes action.

The bug is caused by a delay in the scheduling of workflow execution and
the creation of the execution record. For action execution, the record is
created within the same DB transaction prior to scheduling.

This patch creates the workflow execution record first just like for
action execution and then schedule a resume workflow.

Change-Id: Iba80068260caead9ae8f2f8f105abc5b9349db52
Closes-Bug: #1536415
This commit is contained in:
Winson Chan 2016-01-29 02:43:57 +00:00
parent ef4f197804
commit 433d8e7e99
7 changed files with 189 additions and 88 deletions

View File

@ -56,7 +56,7 @@ class Execution(resource.Resource):
"reference to the parent task execution"
state = wtypes.text
"state can be one of: RUNNING, SUCCESS, ERROR, PAUSED"
"state can be one of: IDLE, RUNNING, SUCCESS, ERROR, PAUSED"
state_info = wtypes.text
"an optional state information string"

View File

@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import six
import traceback
from oslo_log import log as logging
@ -25,16 +23,15 @@ from mistral.db.v2.sqlalchemy import models as db_models
from mistral.engine import action_handler
from mistral.engine import base
from mistral.engine import task_handler
from mistral.engine import utils as eng_utils
from mistral.engine import workflow_handler as wf_handler
from mistral.services import action_manager as a_m
from mistral.services import executions as wf_ex_service
from mistral.services import workflows as wf_service
from mistral import utils as u
from mistral.utils import wf_trace
from mistral.workbook import parser as spec_parser
from mistral.workflow import base as wf_base
from mistral.workflow import commands
from mistral.workflow import data_flow
from mistral.workflow import states
from mistral.workflow import utils as wf_utils
@ -54,39 +51,33 @@ class DefaultEngine(base.Engine, coordination.Service):
@u.log_exec(LOG)
def start_workflow(self, wf_name, wf_input, description='', **params):
wf_exec_id = None
params = self._canonize_workflow_params(params)
wf_ex_id = None
try:
with db_api.transaction():
wf_def = db_api.get_workflow_definition(wf_name)
wf_spec = spec_parser.get_workflow_spec(wf_def.spec)
eng_utils.validate_input(wf_def, wf_input, wf_spec)
wf_ex = self._create_workflow_execution(
wf_def,
wf_spec,
# The new workflow execution will be in an IDLE
# state on initial record creation.
wf_ex_id = wf_ex_service.create_workflow_execution(
wf_name,
wf_input,
description,
params
)
wf_exec_id = wf_ex.id
wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_name)
# Separate workflow execution creation and dispatching command
# transactions in order to be able to return workflow execution
# with corresponding error message in state_info when error occurs
# at dispatching commands.
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_exec_id)
wf_ex = db_api.get_workflow_execution(wf_ex_id)
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
wf_handler.set_execution_state(wf_ex, states.RUNNING)
wf_ctrl = wf_base.WorkflowController.get_controller(
wf_ex,
wf_spec
)
self._dispatch_workflow_commands(
wf_ex,
wf_ctrl.continue_workflow()
@ -96,9 +87,10 @@ class DefaultEngine(base.Engine, coordination.Service):
except Exception as e:
LOG.error(
"Failed to start workflow '%s' id=%s: %s\n%s",
wf_name, wf_exec_id, e, traceback.format_exc()
wf_name, wf_ex_id, e, traceback.format_exc()
)
wf_ex = self._fail_workflow(wf_exec_id, e)
wf_ex = self._fail_workflow(wf_ex_id, e)
if wf_ex:
return wf_ex.get_clone()
@ -252,16 +244,16 @@ class DefaultEngine(base.Engine, coordination.Service):
return action_ex.get_clone()
prev_task_state = task_ex.state
# Separate the task transition in a separate transaction. The task
# has already completed for better or worst. The task state should
# not be affected by errors during transition on conditions such as
# on-success and on-error.
with db_api.transaction():
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
action_ex = db_api.get_action_execution(action_ex_id)
task_ex = action_ex.task_execution
wf_ex = wf_handler.lock_workflow_execution(
task_ex.workflow_execution_id
)
self._on_task_state_change(
task_ex,
wf_ex,
@ -364,7 +356,8 @@ class DefaultEngine(base.Engine, coordination.Service):
with db_api.transaction():
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
if wf_ex.state != states.PAUSED:
if (not states.is_paused(wf_ex.state) and
not states.is_idle(wf_ex.state)):
return wf_ex.get_clone()
return self._continue_workflow(wf_ex, env=env)
@ -467,46 +460,3 @@ class DefaultEngine(base.Engine, coordination.Service):
)
return wf_ex
@staticmethod
def _canonize_workflow_params(params):
# Resolve environment parameter.
env = params.get('env', {})
if not isinstance(env, dict) and not isinstance(env, six.string_types):
raise ValueError(
'Unexpected type for environment [environment=%s]' % str(env)
)
if isinstance(env, six.string_types):
env_db = db_api.get_environment(env)
env = env_db.variables
params['env'] = env
return params
@staticmethod
def _create_workflow_execution(wf_def, wf_spec, wf_input, description,
params):
wf_ex = db_api.create_workflow_execution({
'name': wf_def.name,
'description': description,
'workflow_name': wf_def.name,
'spec': wf_spec.to_dict(),
'params': params or {},
'state': states.RUNNING,
'input': wf_input or {},
'output': {},
'context': copy.deepcopy(wf_input) or {},
'task_execution_id': params.get('task_execution_id'),
'runtime_context': {
'with_items_index': params.get('with_items_index', 0)
},
})
data_flow.add_openstack_data_to_context(wf_ex)
data_flow.add_execution_to_context(wf_ex)
data_flow.add_environment_to_context(wf_ex)
data_flow.add_workflow_variables_to_context(wf_ex, wf_spec)
return wf_ex

View File

@ -26,6 +26,7 @@ from mistral.engine import rpc
from mistral.engine import utils as e_utils
from mistral import exceptions as exc
from mistral import expressions as expr
from mistral.services import executions as wf_ex_service
from mistral.services import scheduler
from mistral import utils
from mistral.utils import wf_trace
@ -476,24 +477,25 @@ def _schedule_run_workflow(task_ex, task_spec, wf_input, index):
wf_params[k] = v
del wf_input[k]
scheduler.schedule_call(
None,
'mistral.engine.task_handler.run_workflow',
0,
wf_name=wf_def.name,
wf_input=wf_input,
wf_params=wf_params
)
def run_workflow(wf_name, wf_input, wf_params):
rpc.get_engine_client().start_workflow(
wf_name,
wf_ex_id = wf_ex_service.create_workflow_execution(
wf_def.name,
wf_input,
"sub-workflow execution",
**wf_params
wf_params
)
scheduler.schedule_call(
None,
'mistral.engine.task_handler.resume_workflow',
0,
wf_ex_id=wf_ex_id,
env=None
)
def resume_workflow(wf_ex_id, env):
rpc.get_engine_client().resume_workflow(wf_ex_id, env=env)
def _complete_task(task_ex, task_spec, state, state_info=None):
# Ignore if task already completed.

View File

@ -0,0 +1,91 @@
# Copyright 2016 - StackStorm, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import six
from oslo_log import log as logging
from mistral.db.v2 import api as db_api
from mistral.engine import utils as eng_utils
from mistral.utils import wf_trace
from mistral.workbook import parser as spec_parser
from mistral.workflow import data_flow
from mistral.workflow import states
LOG = logging.getLogger(__name__)
def canonize_workflow_params(params):
# Resolve environment parameter.
env = params.get('env', {})
if not isinstance(env, dict) and not isinstance(env, six.string_types):
raise ValueError(
'Unexpected type for environment [environment=%s]' % str(env)
)
if isinstance(env, six.string_types):
env_db = db_api.get_environment(env)
env = env_db.variables
params['env'] = env
return params
def _create_workflow_execution(wf_def, wf_spec, wf_input, desc, params):
wf_ex = db_api.create_workflow_execution({
'name': wf_def.name,
'description': desc,
'workflow_name': wf_def.name,
'spec': wf_spec.to_dict(),
'params': params or {},
'state': states.IDLE,
'input': wf_input or {},
'output': {},
'context': copy.deepcopy(wf_input) or {},
'task_execution_id': params.get('task_execution_id'),
'runtime_context': {
'with_items_index': params.get('with_items_index', 0)
},
})
data_flow.add_openstack_data_to_context(wf_ex)
data_flow.add_execution_to_context(wf_ex)
data_flow.add_environment_to_context(wf_ex)
data_flow.add_workflow_variables_to_context(wf_ex, wf_spec)
return wf_ex
def create_workflow_execution(wf_name, wf_input, description, params):
params = canonize_workflow_params(params)
wf_def = db_api.get_workflow_definition(wf_name)
wf_spec = spec_parser.get_workflow_spec(wf_def.spec)
eng_utils.validate_input(wf_def, wf_input, wf_spec)
wf_ex = _create_workflow_execution(
wf_def,
wf_spec,
wf_input,
description,
params
)
wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_name)
return wf_ex.id

View File

@ -1061,3 +1061,51 @@ class WithItemsEngineTest(base.EngineTestCase):
self.assertIn(2, result_task1)
self.assertIn(3, result_task2)
self.assertIn(4, result_task2)
def test_with_items_subflow_concurrency_gt_list_length(self):
workbook_definition = """---
version: "2.0"
name: wb1
workflows:
main:
type: direct
input:
- names
tasks:
task1:
with-items: name in <% $.names %>
workflow: subflow1 name=<% $.name %>
concurrency: 3
subflow1:
type: direct
input:
- name
output:
result: <% task(task1).result %>
tasks:
task1:
action: std.echo output=<% $.name %>
"""
wb_service.create_workbook_v2(workbook_definition)
# Start workflow.
names = ["Peter", "Susan", "Edmund", "Lucy", "Aslan", "Caspian"]
wf_ex = self.engine.start_workflow('wb1.main', {'names': names})
self._await(
lambda: self.is_execution_success(wf_ex.id),
)
wf_ex = db_api.get_execution(wf_ex.id)
task_ex = self._assert_single_item(wf_ex.task_executions, name='task1')
self.assertEqual(states.SUCCESS, task_ex.state)
result = [
item['result']
for item in data_flow.get_task_execution_result(task_ex)
]
self.assertListEqual(sorted(result), sorted(names))

View File

@ -49,6 +49,10 @@ def is_completed(state):
return state in [SUCCESS, ERROR]
def is_running(state):
return state in [RUNNING, RUNNING_DELAYED]
def is_waiting(state):
return state == WAITING
@ -57,8 +61,12 @@ def is_idle(state):
return state == IDLE
def is_paused(state):
return state == PAUSED
def is_paused_or_completed(state):
return state == PAUSED or is_completed(state)
return is_paused(state) or is_completed(state)
def is_valid_transition(from_state, to_state):

View File

@ -45,12 +45,14 @@ def is_completed(task_ex):
def get_index(task_ex):
return len(
list(filter(
lambda x: x.accepted or states.RUNNING, task_ex.executions
))
f = lambda x: (
x.accepted or
states.is_running(x.state) or
states.is_idle(x.state)
)
return len(list(filter(f, task_ex.executions)))
def get_concurrency(task_ex):
return task_ex.runtime_context.get(_CONCURRENCY)