Add API for rerunning failed task execution
Add put method to the TasksController that allows a failed task execution to be re-run. Change-Id: I185a9a5c594bb6d215c1c5b34efed9389bad0faa Partial-Implements: blueprint mistral-workflow-pause-resume-with-intervention
This commit is contained in:
parent
490c56b33f
commit
46ee48df8b
@ -1,6 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright 2013 - Mirantis, Inc.
|
||||
# Copyright 2015 - StackStorm, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@ -24,7 +23,10 @@ import wsmeext.pecan as wsme_pecan
|
||||
from mistral.api.controllers import resource
|
||||
from mistral.api.controllers.v2 import action_execution
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.engine import rpc
|
||||
from mistral import exceptions as exc
|
||||
from mistral.utils import rest_utils
|
||||
from mistral.workbook import parser as spec_parser
|
||||
from mistral.workflow import data_flow
|
||||
from mistral.workflow import states
|
||||
|
||||
@ -91,22 +93,22 @@ class Tasks(resource.Resource):
|
||||
return cls(tasks=[Task.sample()])
|
||||
|
||||
|
||||
def _get_task_resource_with_result(task_ex):
|
||||
task = Task.from_dict(task_ex.to_dict())
|
||||
task.result = json.dumps(data_flow.get_task_execution_result(task_ex))
|
||||
|
||||
return task
|
||||
|
||||
|
||||
def _get_task_resources_with_results(wf_ex_id=None):
|
||||
filters = {}
|
||||
|
||||
if wf_ex_id:
|
||||
filters['workflow_execution_id'] = wf_ex_id
|
||||
|
||||
tasks = []
|
||||
with db_api.transaction():
|
||||
task_execs = db_api.get_task_executions(**filters)
|
||||
for task_ex in task_execs:
|
||||
task = Task.from_dict(task_ex.to_dict())
|
||||
task.result = json.dumps(
|
||||
data_flow.get_task_execution_result(task_ex)
|
||||
)
|
||||
|
||||
tasks += [task]
|
||||
task_exs = db_api.get_task_executions(**filters)
|
||||
tasks = [_get_task_resource_with_result(t_e) for t_e in task_exs]
|
||||
|
||||
return Tasks(tasks=tasks)
|
||||
|
||||
@ -121,11 +123,8 @@ class TasksController(rest.RestController):
|
||||
LOG.info("Fetch task [id=%s]" % id)
|
||||
|
||||
task_ex = db_api.get_task_execution(id)
|
||||
task = Task.from_dict(task_ex.to_dict())
|
||||
|
||||
task.result = json.dumps(data_flow.get_task_execution_result(task_ex))
|
||||
|
||||
return task
|
||||
return _get_task_resource_with_result(task_ex)
|
||||
|
||||
@wsme_pecan.wsexpose(Tasks)
|
||||
def get_all(self):
|
||||
@ -134,6 +133,52 @@ class TasksController(rest.RestController):
|
||||
|
||||
return _get_task_resources_with_results()
|
||||
|
||||
@rest_utils.wrap_wsme_controller_exception
|
||||
@wsme_pecan.wsexpose(Task, wtypes.text, bool, body=Task)
|
||||
def put(self, id, reset, task):
|
||||
"""Update the specified task execution.
|
||||
|
||||
:param id: Task execution ID.
|
||||
:param task: Task execution object.
|
||||
"""
|
||||
LOG.info("Update task execution [id=%s, task=%s]" % (id, task))
|
||||
|
||||
task_ex = db_api.get_task_execution(id)
|
||||
task_spec = spec_parser.get_task_spec(task_ex.spec)
|
||||
task_name = task.name or None
|
||||
|
||||
if task_name and task_name != task_ex.name:
|
||||
raise exc.WorkflowException('Task name does not match.')
|
||||
|
||||
wf_ex = db_api.get_workflow_execution(task_ex.workflow_execution_id)
|
||||
wf_name = task.workflow_name or None
|
||||
|
||||
if wf_name and wf_name != wf_ex.name:
|
||||
raise exc.WorkflowException('Workflow name does not match.')
|
||||
|
||||
if task.state != states.RUNNING:
|
||||
raise exc.WorkflowException('Invalid task state. Only updating '
|
||||
'task to rerun is supported.')
|
||||
|
||||
if task_ex.state != states.ERROR:
|
||||
raise exc.WorkflowException('The current task execution must be '
|
||||
'in ERROR for rerun. Only updating '
|
||||
'task to rerun is supported.')
|
||||
|
||||
if not task_spec.get_with_items() and not reset:
|
||||
raise exc.WorkflowException('Only with-items task has the '
|
||||
'option to not reset.')
|
||||
|
||||
rpc.get_engine_client().rerun_workflow(
|
||||
wf_ex.id,
|
||||
task_ex.id,
|
||||
reset
|
||||
)
|
||||
|
||||
task_ex = db_api.get_task_execution(id)
|
||||
|
||||
return _get_task_resource_with_result(task_ex)
|
||||
|
||||
|
||||
class ExecutionTasksController(rest.RestController):
|
||||
@wsme_pecan.wsexpose(Tasks, wtypes.text)
|
||||
|
@ -1,6 +1,5 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Copyright 2013 - Mirantis, Inc.
|
||||
# Copyright 2015 - StackStorm, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
@ -21,6 +20,7 @@ import mock
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.db.v2.sqlalchemy import models
|
||||
from mistral.engine import rpc
|
||||
from mistral import exceptions as exc
|
||||
from mistral.tests.unit.api import base
|
||||
from mistral.workflow import data_flow
|
||||
@ -30,11 +30,52 @@ from mistral.workflow import states
|
||||
|
||||
RESULT = {"some": "result"}
|
||||
PUBLISHED = {"var": "val"}
|
||||
task_ex = models.TaskExecution(
|
||||
|
||||
WF_EX = models.WorkflowExecution(
|
||||
id='abc',
|
||||
workflow_name='some',
|
||||
description='execution description.',
|
||||
spec={'name': 'some'},
|
||||
state=states.RUNNING,
|
||||
state_info=None,
|
||||
input={'foo': 'bar'},
|
||||
output={},
|
||||
params={'env': {'k1': 'abc'}},
|
||||
created_at=datetime.datetime(1970, 1, 1),
|
||||
updated_at=datetime.datetime(1970, 1, 1)
|
||||
)
|
||||
|
||||
TASK_EX = models.TaskExecution(
|
||||
id='123',
|
||||
name='task',
|
||||
workflow_name='flow',
|
||||
spec={},
|
||||
spec={
|
||||
'type': 'direct',
|
||||
'version': '2.0',
|
||||
'name': 'task'
|
||||
},
|
||||
action_spec={},
|
||||
state=states.RUNNING,
|
||||
tags=['a', 'b'],
|
||||
in_context={},
|
||||
runtime_context={},
|
||||
workflow_execution_id='123',
|
||||
created_at=datetime.datetime(1970, 1, 1),
|
||||
updated_at=datetime.datetime(1970, 1, 1),
|
||||
published=PUBLISHED,
|
||||
processed=True
|
||||
)
|
||||
|
||||
WITH_ITEMS_TASK_EX = models.TaskExecution(
|
||||
id='123',
|
||||
name='task',
|
||||
workflow_name='flow',
|
||||
spec={
|
||||
'type': 'direct',
|
||||
'version': '2.0',
|
||||
'name': 'task',
|
||||
'with-items': 'var in [1, 2, 3]'
|
||||
},
|
||||
action_spec={},
|
||||
state=states.RUNNING,
|
||||
tags=['a', 'b'],
|
||||
@ -60,22 +101,36 @@ TASK = {
|
||||
'processed': True
|
||||
}
|
||||
|
||||
UPDATED_task_ex = copy.copy(task_ex)
|
||||
UPDATED_task_ex['state'] = 'SUCCESS'
|
||||
UPDATED_TASK = copy.copy(TASK)
|
||||
UPDATED_TASK_EX = copy.deepcopy(TASK_EX)
|
||||
UPDATED_TASK_EX['state'] = 'SUCCESS'
|
||||
UPDATED_TASK = copy.deepcopy(TASK)
|
||||
UPDATED_TASK['state'] = 'SUCCESS'
|
||||
|
||||
ERROR_task_ex = copy.copy(task_ex)
|
||||
ERROR_task_ex['state'] = 'ERROR'
|
||||
ERROR_TASK = copy.copy(TASK)
|
||||
ERROR_TASK_EX = copy.deepcopy(TASK_EX)
|
||||
ERROR_TASK_EX['state'] = 'ERROR'
|
||||
ERROR_ITEMS_TASK_EX = copy.deepcopy(WITH_ITEMS_TASK_EX)
|
||||
ERROR_ITEMS_TASK_EX['state'] = 'ERROR'
|
||||
ERROR_TASK = copy.deepcopy(TASK)
|
||||
ERROR_TASK['state'] = 'ERROR'
|
||||
|
||||
BROKEN_TASK = copy.copy(TASK)
|
||||
BROKEN_TASK = copy.deepcopy(TASK)
|
||||
|
||||
MOCK_TASK = mock.MagicMock(return_value=task_ex)
|
||||
MOCK_TASKS = mock.MagicMock(return_value=[task_ex])
|
||||
RERUN_TASK = {
|
||||
'id': '123',
|
||||
'state': 'RUNNING'
|
||||
}
|
||||
|
||||
MOCK_WF_EX = mock.MagicMock(return_value=WF_EX)
|
||||
MOCK_TASK = mock.MagicMock(return_value=TASK_EX)
|
||||
MOCK_TASKS = mock.MagicMock(return_value=[TASK_EX])
|
||||
MOCK_EMPTY = mock.MagicMock(return_value=[])
|
||||
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.NotFoundException())
|
||||
MOCK_ERROR_TASK = mock.MagicMock(return_value=ERROR_TASK_EX)
|
||||
MOCK_ERROR_ITEMS_TASK = mock.MagicMock(return_value=ERROR_ITEMS_TASK_EX)
|
||||
MOCK_RERUN_TASKS = mock.MagicMock(side_effect=[ERROR_TASK_EX, TASK_EX])
|
||||
MOCK_RERUN_ITEMS_TASKS = mock.MagicMock(
|
||||
side_effect=[ERROR_ITEMS_TASK_EX, WITH_ITEMS_TASK_EX]
|
||||
)
|
||||
|
||||
|
||||
@mock.patch.object(
|
||||
@ -112,3 +167,125 @@ class TestTasksController(base.FunctionalTest):
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
|
||||
self.assertEqual(len(resp.json['tasks']), 0)
|
||||
|
||||
@mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX)
|
||||
@mock.patch.object(db_api, 'get_task_execution', MOCK_RERUN_TASKS)
|
||||
@mock.patch.object(rpc.EngineClient, 'rerun_workflow', MOCK_WF_EX)
|
||||
def test_put(self):
|
||||
params = copy.deepcopy(RERUN_TASK)
|
||||
params['reset'] = True
|
||||
|
||||
resp = self.app.put_json('/v2/tasks/123', params=params)
|
||||
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertDictEqual(TASK, resp.json)
|
||||
|
||||
@mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX)
|
||||
@mock.patch.object(db_api, 'get_task_execution', MOCK_RERUN_TASKS)
|
||||
@mock.patch.object(rpc.EngineClient, 'rerun_workflow', MOCK_WF_EX)
|
||||
def test_put_missing_reset(self):
|
||||
params = copy.deepcopy(RERUN_TASK)
|
||||
|
||||
resp = self.app.put_json(
|
||||
'/v2/tasks/123',
|
||||
params=params,
|
||||
expect_errors=True)
|
||||
|
||||
self.assertEqual(resp.status_int, 400)
|
||||
self.assertIn('faultstring', resp.json)
|
||||
self.assertIn('Missing argument', resp.json['faultstring'])
|
||||
|
||||
@mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX)
|
||||
@mock.patch.object(db_api, 'get_task_execution', MOCK_RERUN_ITEMS_TASKS)
|
||||
@mock.patch.object(rpc.EngineClient, 'rerun_workflow', MOCK_WF_EX)
|
||||
def test_put_with_items(self):
|
||||
params = copy.deepcopy(RERUN_TASK)
|
||||
params['reset'] = False
|
||||
|
||||
resp = self.app.put_json('/v2/tasks/123', params=params)
|
||||
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertDictEqual(TASK, resp.json)
|
||||
|
||||
@mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX)
|
||||
@mock.patch.object(db_api, 'get_task_execution', MOCK_TASK)
|
||||
def test_put_current_task_not_in_error(self):
|
||||
params = copy.deepcopy(RERUN_TASK)
|
||||
params['reset'] = True
|
||||
|
||||
resp = self.app.put_json(
|
||||
'/v2/tasks/123',
|
||||
params=params,
|
||||
expect_errors=True
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_int, 400)
|
||||
self.assertIn('faultstring', resp.json)
|
||||
self.assertIn('execution must be in ERROR', resp.json['faultstring'])
|
||||
|
||||
@mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX)
|
||||
@mock.patch.object(db_api, 'get_task_execution', MOCK_ERROR_TASK)
|
||||
def test_put_invalid_state(self):
|
||||
params = copy.deepcopy(RERUN_TASK)
|
||||
params['state'] = states.IDLE
|
||||
params['reset'] = True
|
||||
|
||||
resp = self.app.put_json(
|
||||
'/v2/tasks/123',
|
||||
params=params,
|
||||
expect_errors=True
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_int, 400)
|
||||
self.assertIn('faultstring', resp.json)
|
||||
self.assertIn('Invalid task state', resp.json['faultstring'])
|
||||
|
||||
@mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX)
|
||||
@mock.patch.object(db_api, 'get_task_execution', MOCK_ERROR_TASK)
|
||||
def test_put_invalid_reset(self):
|
||||
params = copy.deepcopy(RERUN_TASK)
|
||||
params['reset'] = False
|
||||
|
||||
resp = self.app.put_json(
|
||||
'/v2/tasks/123',
|
||||
params=params,
|
||||
expect_errors=True
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_int, 400)
|
||||
self.assertIn('faultstring', resp.json)
|
||||
self.assertIn('Only with-items task', resp.json['faultstring'])
|
||||
|
||||
@mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX)
|
||||
@mock.patch.object(db_api, 'get_task_execution', MOCK_ERROR_TASK)
|
||||
def test_put_mismatch_task_name(self):
|
||||
params = copy.deepcopy(RERUN_TASK)
|
||||
params['name'] = 'abc'
|
||||
params['reset'] = True
|
||||
|
||||
resp = self.app.put_json(
|
||||
'/v2/tasks/123',
|
||||
params=params,
|
||||
expect_errors=True
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_int, 400)
|
||||
self.assertIn('faultstring', resp.json)
|
||||
self.assertIn('Task name does not match', resp.json['faultstring'])
|
||||
|
||||
@mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX)
|
||||
@mock.patch.object(db_api, 'get_task_execution', MOCK_ERROR_TASK)
|
||||
def test_put_mismatch_workflow_name(self):
|
||||
params = copy.deepcopy(RERUN_TASK)
|
||||
params['workflow_name'] = 'xyz'
|
||||
params['reset'] = True
|
||||
|
||||
resp = self.app.put_json(
|
||||
'/v2/tasks/123',
|
||||
params=params,
|
||||
expect_errors=True
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_int, 400)
|
||||
self.assertIn('faultstring', resp.json)
|
||||
self.assertIn('Workflow name does not match', resp.json['faultstring'])
|
||||
|
Loading…
Reference in New Issue
Block a user