diff --git a/mistral/api/controllers/v2/task.py b/mistral/api/controllers/v2/task.py index 793d9edc..3898f6eb 100644 --- a/mistral/api/controllers/v2/task.py +++ b/mistral/api/controllers/v2/task.py @@ -1,6 +1,5 @@ -# -*- coding: utf-8 -*- -# # Copyright 2013 - Mirantis, Inc. +# Copyright 2015 - StackStorm, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -24,7 +23,10 @@ import wsmeext.pecan as wsme_pecan from mistral.api.controllers import resource from mistral.api.controllers.v2 import action_execution from mistral.db.v2 import api as db_api +from mistral.engine import rpc +from mistral import exceptions as exc from mistral.utils import rest_utils +from mistral.workbook import parser as spec_parser from mistral.workflow import data_flow from mistral.workflow import states @@ -91,22 +93,22 @@ class Tasks(resource.Resource): return cls(tasks=[Task.sample()]) +def _get_task_resource_with_result(task_ex): + task = Task.from_dict(task_ex.to_dict()) + task.result = json.dumps(data_flow.get_task_execution_result(task_ex)) + + return task + + def _get_task_resources_with_results(wf_ex_id=None): filters = {} if wf_ex_id: filters['workflow_execution_id'] = wf_ex_id - tasks = [] with db_api.transaction(): - task_execs = db_api.get_task_executions(**filters) - for task_ex in task_execs: - task = Task.from_dict(task_ex.to_dict()) - task.result = json.dumps( - data_flow.get_task_execution_result(task_ex) - ) - - tasks += [task] + task_exs = db_api.get_task_executions(**filters) + tasks = [_get_task_resource_with_result(t_e) for t_e in task_exs] return Tasks(tasks=tasks) @@ -121,11 +123,8 @@ class TasksController(rest.RestController): LOG.info("Fetch task [id=%s]" % id) task_ex = db_api.get_task_execution(id) - task = Task.from_dict(task_ex.to_dict()) - task.result = json.dumps(data_flow.get_task_execution_result(task_ex)) - - return task + return _get_task_resource_with_result(task_ex) @wsme_pecan.wsexpose(Tasks) def get_all(self): @@ -134,6 +133,52 @@ class TasksController(rest.RestController): return _get_task_resources_with_results() + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose(Task, wtypes.text, bool, body=Task) + def put(self, id, reset, task): + """Update the specified task execution. + + :param id: Task execution ID. + :param task: Task execution object. + """ + LOG.info("Update task execution [id=%s, task=%s]" % (id, task)) + + task_ex = db_api.get_task_execution(id) + task_spec = spec_parser.get_task_spec(task_ex.spec) + task_name = task.name or None + + if task_name and task_name != task_ex.name: + raise exc.WorkflowException('Task name does not match.') + + wf_ex = db_api.get_workflow_execution(task_ex.workflow_execution_id) + wf_name = task.workflow_name or None + + if wf_name and wf_name != wf_ex.name: + raise exc.WorkflowException('Workflow name does not match.') + + if task.state != states.RUNNING: + raise exc.WorkflowException('Invalid task state. Only updating ' + 'task to rerun is supported.') + + if task_ex.state != states.ERROR: + raise exc.WorkflowException('The current task execution must be ' + 'in ERROR for rerun. Only updating ' + 'task to rerun is supported.') + + if not task_spec.get_with_items() and not reset: + raise exc.WorkflowException('Only with-items task has the ' + 'option to not reset.') + + rpc.get_engine_client().rerun_workflow( + wf_ex.id, + task_ex.id, + reset + ) + + task_ex = db_api.get_task_execution(id) + + return _get_task_resource_with_result(task_ex) + class ExecutionTasksController(rest.RestController): @wsme_pecan.wsexpose(Tasks, wtypes.text) diff --git a/mistral/tests/unit/api/v2/test_tasks.py b/mistral/tests/unit/api/v2/test_tasks.py index c510ac21..b81d542b 100644 --- a/mistral/tests/unit/api/v2/test_tasks.py +++ b/mistral/tests/unit/api/v2/test_tasks.py @@ -1,6 +1,5 @@ -# -*- coding: utf-8 -*- -# # Copyright 2013 - Mirantis, Inc. +# Copyright 2015 - StackStorm, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,6 +20,7 @@ import mock from mistral.db.v2 import api as db_api from mistral.db.v2.sqlalchemy import models +from mistral.engine import rpc from mistral import exceptions as exc from mistral.tests.unit.api import base from mistral.workflow import data_flow @@ -30,11 +30,52 @@ from mistral.workflow import states RESULT = {"some": "result"} PUBLISHED = {"var": "val"} -task_ex = models.TaskExecution( + +WF_EX = models.WorkflowExecution( + id='abc', + workflow_name='some', + description='execution description.', + spec={'name': 'some'}, + state=states.RUNNING, + state_info=None, + input={'foo': 'bar'}, + output={}, + params={'env': {'k1': 'abc'}}, + created_at=datetime.datetime(1970, 1, 1), + updated_at=datetime.datetime(1970, 1, 1) +) + +TASK_EX = models.TaskExecution( id='123', name='task', workflow_name='flow', - spec={}, + spec={ + 'type': 'direct', + 'version': '2.0', + 'name': 'task' + }, + action_spec={}, + state=states.RUNNING, + tags=['a', 'b'], + in_context={}, + runtime_context={}, + workflow_execution_id='123', + created_at=datetime.datetime(1970, 1, 1), + updated_at=datetime.datetime(1970, 1, 1), + published=PUBLISHED, + processed=True +) + +WITH_ITEMS_TASK_EX = models.TaskExecution( + id='123', + name='task', + workflow_name='flow', + spec={ + 'type': 'direct', + 'version': '2.0', + 'name': 'task', + 'with-items': 'var in [1, 2, 3]' + }, action_spec={}, state=states.RUNNING, tags=['a', 'b'], @@ -60,22 +101,36 @@ TASK = { 'processed': True } -UPDATED_task_ex = copy.copy(task_ex) -UPDATED_task_ex['state'] = 'SUCCESS' -UPDATED_TASK = copy.copy(TASK) +UPDATED_TASK_EX = copy.deepcopy(TASK_EX) +UPDATED_TASK_EX['state'] = 'SUCCESS' +UPDATED_TASK = copy.deepcopy(TASK) UPDATED_TASK['state'] = 'SUCCESS' -ERROR_task_ex = copy.copy(task_ex) -ERROR_task_ex['state'] = 'ERROR' -ERROR_TASK = copy.copy(TASK) +ERROR_TASK_EX = copy.deepcopy(TASK_EX) +ERROR_TASK_EX['state'] = 'ERROR' +ERROR_ITEMS_TASK_EX = copy.deepcopy(WITH_ITEMS_TASK_EX) +ERROR_ITEMS_TASK_EX['state'] = 'ERROR' +ERROR_TASK = copy.deepcopy(TASK) ERROR_TASK['state'] = 'ERROR' -BROKEN_TASK = copy.copy(TASK) +BROKEN_TASK = copy.deepcopy(TASK) -MOCK_TASK = mock.MagicMock(return_value=task_ex) -MOCK_TASKS = mock.MagicMock(return_value=[task_ex]) +RERUN_TASK = { + 'id': '123', + 'state': 'RUNNING' +} + +MOCK_WF_EX = mock.MagicMock(return_value=WF_EX) +MOCK_TASK = mock.MagicMock(return_value=TASK_EX) +MOCK_TASKS = mock.MagicMock(return_value=[TASK_EX]) MOCK_EMPTY = mock.MagicMock(return_value=[]) MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.NotFoundException()) +MOCK_ERROR_TASK = mock.MagicMock(return_value=ERROR_TASK_EX) +MOCK_ERROR_ITEMS_TASK = mock.MagicMock(return_value=ERROR_ITEMS_TASK_EX) +MOCK_RERUN_TASKS = mock.MagicMock(side_effect=[ERROR_TASK_EX, TASK_EX]) +MOCK_RERUN_ITEMS_TASKS = mock.MagicMock( + side_effect=[ERROR_ITEMS_TASK_EX, WITH_ITEMS_TASK_EX] +) @mock.patch.object( @@ -112,3 +167,125 @@ class TestTasksController(base.FunctionalTest): self.assertEqual(resp.status_int, 200) self.assertEqual(len(resp.json['tasks']), 0) + + @mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX) + @mock.patch.object(db_api, 'get_task_execution', MOCK_RERUN_TASKS) + @mock.patch.object(rpc.EngineClient, 'rerun_workflow', MOCK_WF_EX) + def test_put(self): + params = copy.deepcopy(RERUN_TASK) + params['reset'] = True + + resp = self.app.put_json('/v2/tasks/123', params=params) + + self.assertEqual(resp.status_int, 200) + self.assertDictEqual(TASK, resp.json) + + @mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX) + @mock.patch.object(db_api, 'get_task_execution', MOCK_RERUN_TASKS) + @mock.patch.object(rpc.EngineClient, 'rerun_workflow', MOCK_WF_EX) + def test_put_missing_reset(self): + params = copy.deepcopy(RERUN_TASK) + + resp = self.app.put_json( + '/v2/tasks/123', + params=params, + expect_errors=True) + + self.assertEqual(resp.status_int, 400) + self.assertIn('faultstring', resp.json) + self.assertIn('Missing argument', resp.json['faultstring']) + + @mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX) + @mock.patch.object(db_api, 'get_task_execution', MOCK_RERUN_ITEMS_TASKS) + @mock.patch.object(rpc.EngineClient, 'rerun_workflow', MOCK_WF_EX) + def test_put_with_items(self): + params = copy.deepcopy(RERUN_TASK) + params['reset'] = False + + resp = self.app.put_json('/v2/tasks/123', params=params) + + self.assertEqual(resp.status_int, 200) + self.assertDictEqual(TASK, resp.json) + + @mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX) + @mock.patch.object(db_api, 'get_task_execution', MOCK_TASK) + def test_put_current_task_not_in_error(self): + params = copy.deepcopy(RERUN_TASK) + params['reset'] = True + + resp = self.app.put_json( + '/v2/tasks/123', + params=params, + expect_errors=True + ) + + self.assertEqual(resp.status_int, 400) + self.assertIn('faultstring', resp.json) + self.assertIn('execution must be in ERROR', resp.json['faultstring']) + + @mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX) + @mock.patch.object(db_api, 'get_task_execution', MOCK_ERROR_TASK) + def test_put_invalid_state(self): + params = copy.deepcopy(RERUN_TASK) + params['state'] = states.IDLE + params['reset'] = True + + resp = self.app.put_json( + '/v2/tasks/123', + params=params, + expect_errors=True + ) + + self.assertEqual(resp.status_int, 400) + self.assertIn('faultstring', resp.json) + self.assertIn('Invalid task state', resp.json['faultstring']) + + @mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX) + @mock.patch.object(db_api, 'get_task_execution', MOCK_ERROR_TASK) + def test_put_invalid_reset(self): + params = copy.deepcopy(RERUN_TASK) + params['reset'] = False + + resp = self.app.put_json( + '/v2/tasks/123', + params=params, + expect_errors=True + ) + + self.assertEqual(resp.status_int, 400) + self.assertIn('faultstring', resp.json) + self.assertIn('Only with-items task', resp.json['faultstring']) + + @mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX) + @mock.patch.object(db_api, 'get_task_execution', MOCK_ERROR_TASK) + def test_put_mismatch_task_name(self): + params = copy.deepcopy(RERUN_TASK) + params['name'] = 'abc' + params['reset'] = True + + resp = self.app.put_json( + '/v2/tasks/123', + params=params, + expect_errors=True + ) + + self.assertEqual(resp.status_int, 400) + self.assertIn('faultstring', resp.json) + self.assertIn('Task name does not match', resp.json['faultstring']) + + @mock.patch.object(db_api, 'get_workflow_execution', MOCK_WF_EX) + @mock.patch.object(db_api, 'get_task_execution', MOCK_ERROR_TASK) + def test_put_mismatch_workflow_name(self): + params = copy.deepcopy(RERUN_TASK) + params['workflow_name'] = 'xyz' + params['reset'] = True + + resp = self.app.put_json( + '/v2/tasks/123', + params=params, + expect_errors=True + ) + + self.assertEqual(resp.status_int, 400) + self.assertIn('faultstring', resp.json) + self.assertIn('Workflow name does not match', resp.json['faultstring'])