Merge "Correction in workflow state change handling"

This commit is contained in:
Jenkins 2017-03-02 12:25:51 +00:00 committed by Gerrit Code Review
commit 9571c5489b
3 changed files with 57 additions and 43 deletions

View File

@ -34,11 +34,14 @@ _CHECK_AND_COMPLETE_PATH = (
@profiler.trace('workflow-handler-start-workflow') @profiler.trace('workflow-handler-start-workflow')
def start_workflow(wf_identifier, wf_input, desc, params): def start_workflow(wf_identifier, wf_input, desc, params):
wf = workflows.Workflow( wf = workflows.Workflow()
db_api.get_workflow_definition(wf_identifier)
)
wf.start(wf_input, desc=desc, params=params) wf.start(
wf_def=db_api.get_workflow_definition(wf_identifier),
input_dict=wf_input,
desc=desc,
params=params
)
_schedule_check_and_complete(wf.wf_ex) _schedule_check_and_complete(wf.wf_ex)
@ -46,10 +49,7 @@ def start_workflow(wf_identifier, wf_input, desc, params):
def stop_workflow(wf_ex, state, msg=None): def stop_workflow(wf_ex, state, msg=None):
wf = workflows.Workflow( wf = workflows.Workflow(wf_ex=wf_ex)
db_api.get_workflow_definition(wf_ex.workflow_id),
wf_ex=wf_ex
)
# In this case we should not try to handle possible errors. Instead, # In this case we should not try to handle possible errors. Instead,
# we need to let them pop up since the typical way of failing objects # we need to let them pop up since the typical way of failing objects
@ -86,10 +86,7 @@ def _check_and_complete(wf_ex_id):
if not wf_ex or states.is_completed(wf_ex.state): if not wf_ex or states.is_completed(wf_ex.state):
return return
wf = workflows.Workflow( wf = workflows.Workflow(wf_ex=wf_ex)
db_api.get_workflow_definition(wf_ex.workflow_id),
wf_ex=wf_ex
)
try: try:
incomplete_tasks_count = wf.check_and_complete() incomplete_tasks_count = wf.check_and_complete()
@ -121,10 +118,7 @@ def _check_and_complete(wf_ex_id):
def pause_workflow(wf_ex, msg=None): def pause_workflow(wf_ex, msg=None):
wf = workflows.Workflow( wf = workflows.Workflow(wf_ex=wf_ex)
db_api.get_workflow_definition(wf_ex.workflow_id),
wf_ex=wf_ex
)
wf.set_state(states.PAUSED, msg) wf.set_state(states.PAUSED, msg)
@ -133,10 +127,7 @@ def rerun_workflow(wf_ex, task_ex, reset=True, env=None):
if wf_ex.state == states.PAUSED: if wf_ex.state == states.PAUSED:
return wf_ex.get_clone() return wf_ex.get_clone()
wf = workflows.Workflow( wf = workflows.Workflow(wf_ex=wf_ex)
db_api.get_workflow_definition(wf_ex.workflow_id),
wf_ex=wf_ex
)
wf.rerun(task_ex, reset=reset, env=env) wf.rerun(task_ex, reset=reset, env=env)
@ -150,10 +141,7 @@ def resume_workflow(wf_ex, env=None):
if not states.is_paused_or_idle(wf_ex.state): if not states.is_paused_or_idle(wf_ex.state):
return wf_ex.get_clone() return wf_ex.get_clone()
wf = workflows.Workflow( wf = workflows.Workflow(wf_ex=wf_ex)
db_api.get_workflow_definition(wf_ex.workflow_id),
wf_ex=wf_ex
)
wf.resume(env=env) wf.resume(env=env)

View File

@ -54,8 +54,7 @@ class Workflow(object):
Mistral engine or its components in order to manipulate with workflows. Mistral engine or its components in order to manipulate with workflows.
""" """
def __init__(self, wf_def, wf_ex=None): def __init__(self, wf_ex=None):
self.wf_def = wf_def
self.wf_ex = wf_ex self.wf_ex = wf_ex
if wf_ex: if wf_ex:
@ -64,16 +63,13 @@ class Workflow(object):
wf_ex.id wf_ex.id
) )
else: else:
# New workflow execution. self.wf_spec = None
self.wf_spec = spec_parser.get_workflow_spec_by_definition_id(
wf_def.id,
wf_def.updated_at
)
@profiler.trace('workflow-start') @profiler.trace('workflow-start')
def start(self, input_dict, desc='', params=None): def start(self, wf_def, input_dict, desc='', params=None):
"""Start workflow. """Start workflow.
:param wf_def: Workflow definition.
:param input_dict: Workflow input. :param input_dict: Workflow input.
:param desc: Workflow execution description. :param desc: Workflow execution description.
:param params: Workflow type specific parameters. :param params: Workflow type specific parameters.
@ -81,17 +77,23 @@ class Workflow(object):
assert not self.wf_ex assert not self.wf_ex
# New workflow execution.
self.wf_spec = spec_parser.get_workflow_spec_by_definition_id(
wf_def.id,
wf_def.updated_at
)
wf_trace.info( wf_trace.info(
self.wf_ex, self.wf_ex,
"Starting workflow [name=%s, input=%s]" % "Starting workflow [name=%s, input=%s]" %
(self.wf_def.name, utils.cut(input_dict)) (wf_def.name, utils.cut(input_dict))
) )
# TODO(rakhmerov): This call implicitly changes input_dict! Fix it! # TODO(rakhmerov): This call implicitly changes input_dict! Fix it!
# After fix we need to move validation after adding risky fields. # After fix we need to move validation after adding risky fields.
eng_utils.validate_input(self.wf_def, input_dict, self.wf_spec) eng_utils.validate_input(wf_def, input_dict, self.wf_spec)
self._create_execution(input_dict, desc, params) self._create_execution(wf_def, input_dict, desc, params)
self.set_state(states.RUNNING) self.set_state(states.RUNNING)
@ -202,12 +204,12 @@ class Workflow(object):
) )
return final_context return final_context
def _create_execution(self, input_dict, desc, params): def _create_execution(self, wf_def, input_dict, desc, params):
self.wf_ex = db_api.create_workflow_execution({ self.wf_ex = db_api.create_workflow_execution({
'name': self.wf_def.name, 'name': wf_def.name,
'description': desc, 'description': desc,
'workflow_name': self.wf_def.name, 'workflow_name': wf_def.name,
'workflow_id': self.wf_def.id, 'workflow_id': wf_def.id,
'spec': self.wf_spec.to_dict(), 'spec': self.wf_spec.to_dict(),
'state': states.IDLE, 'state': states.IDLE,
'output': {}, 'output': {},
@ -272,10 +274,7 @@ class Workflow(object):
self.wf_ex.task_execution_id self.wf_ex.task_execution_id
) )
parent_wf = Workflow( parent_wf = Workflow(wf_ex=parent_task_ex.workflow_execution)
db_api.get_workflow_definition(parent_task_ex.workflow_id),
parent_task_ex.workflow_execution
)
parent_wf.lock() parent_wf.lock()

View File

@ -71,6 +71,33 @@ class WorkflowCancelTest(base.EngineTestCase):
self.assertEqual(1, len(task_execs)) self.assertEqual(1, len(task_execs))
self.assertEqual(states.SUCCESS, task_1_ex.state) self.assertEqual(states.SUCCESS, task_1_ex.state)
def test_cancel_workflow_if_definition_deleted(self):
workflow = """
version: '2.0'
wf:
type: direct
tasks:
task1:
action: std.echo output="foo"
wait-before: 5
"""
wf = wf_service.create_workflows(workflow)[0]
wf_ex = self.engine.start_workflow('wf', {})
with db_api.transaction():
db_api.delete_workflow_definition(wf.id)
self.engine.stop_workflow(
wf_ex.id,
states.CANCELLED,
"Cancelled by user."
)
self.await_workflow_cancelled(wf_ex.id)
def test_cancel_paused_workflow(self): def test_cancel_paused_workflow(self):
workflow = """ workflow = """
version: '2.0' version: '2.0'