diff --git a/mistral/api/controllers/v2/execution.py b/mistral/api/controllers/v2/execution.py index a4288c1b..31b0f3c7 100644 --- a/mistral/api/controllers/v2/execution.py +++ b/mistral/api/controllers/v2/execution.py @@ -42,6 +42,9 @@ class Execution(resource.Resource): workflow_name = wtypes.text "reference to workflow definition" + description = wtypes.text + "description of workflow execution." + params = wtypes.text "params define workflow type specific parameters. For example, reverse \ workflow takes one parameter 'task_name' that defines a target task." @@ -93,6 +96,7 @@ class Execution(resource.Resource): def sample(cls): return cls(id='123e4567-e89b-12d3-a456-426655440000', workflow_name='flow', + description='this is the first execution.', state='SUCCESS', input='{}', output='{}', @@ -134,17 +138,25 @@ class ExecutionsController(rest.RestController): (id, execution)) db_api.ensure_workflow_execution_exists(id) - # Currently we can change only state. - if not execution.state: - raise exc.DataAccessException( - "Only state of execution can change. " - "Missing 'state' property." - ) - new_state = execution.state + new_description = execution.description msg = execution.state_info - if new_state == states.PAUSED: + # Currently we can change only state or description. + if (not (new_state or new_description) or + (new_state and new_description)): + raise exc.DataAccessException( + "Only state or description of execution can be changed. " + "But they can not be changed at the same time." + ) + + if new_description: + wf_ex = db_api.update_workflow_execution( + id, + description=new_description + ) + + elif new_state == states.PAUSED: wf_ex = rpc.get_engine_client().pause_workflow(id) elif new_state == states.RUNNING: wf_ex = rpc.get_engine_client().resume_workflow(id) @@ -177,6 +189,7 @@ class ExecutionsController(rest.RestController): result = engine.start_workflow( exec_dict['workflow_name'], exec_dict.get('input'), + exec_dict.get('description', ''), **exec_dict.get('params') or {} ) diff --git a/mistral/engine/base.py b/mistral/engine/base.py index 7c05141f..2da1152c 100644 --- a/mistral/engine/base.py +++ b/mistral/engine/base.py @@ -29,11 +29,12 @@ class Engine(object): """Engine interface.""" @abc.abstractmethod - def start_workflow(self, wf_name, wf_input, **params): + def start_workflow(self, wf_name, wf_input, description='', **params): """Starts the specified workflow. :param wf_name: Workflow name. :param wf_input: Workflow input data as a dictionary. + :param description: Execution description. :param params: Additional workflow type specific parameters. :return: Workflow execution object. """ diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index 0bf90df8..9d2dfecb 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -45,7 +45,7 @@ class DefaultEngine(base.Engine): self._engine_client = engine_client @u.log_exec(LOG) - def start_workflow(self, wf_name, wf_input, **params): + def start_workflow(self, wf_name, wf_input, description='', **params): wf_exec_id = None try: @@ -61,6 +61,7 @@ class DefaultEngine(base.Engine): wf_def, wf_spec, wf_input, + description, params ) wf_exec_id = wf_ex.id @@ -363,9 +364,11 @@ class DefaultEngine(base.Engine): return params @staticmethod - def _create_workflow_execution(wf_def, wf_spec, wf_input, params): + def _create_workflow_execution(wf_def, wf_spec, wf_input, description, + params): wf_ex = db_api.create_workflow_execution({ 'name': wf_def.name, + 'description': description, 'workflow_name': wf_def.name, 'spec': wf_spec.to_dict(), 'params': params or {}, diff --git a/mistral/engine/rpc.py b/mistral/engine/rpc.py index 0ca2dfcb..2c5f802d 100644 --- a/mistral/engine/rpc.py +++ b/mistral/engine/rpc.py @@ -77,7 +77,8 @@ class EngineServer(object): def __init__(self, engine): self._engine = engine - def start_workflow(self, rpc_ctx, workflow_name, workflow_input, params): + def start_workflow(self, rpc_ctx, workflow_name, workflow_input, + description, params): """Receives calls over RPC to start workflows on engine. :param rpc_ctx: RPC request context. @@ -86,13 +87,14 @@ class EngineServer(object): LOG.info( "Received RPC request 'start_workflow'[rpc_ctx=%s," - " workflow_name=%s, workflow_input=%s, params=%s]" - % (rpc_ctx, workflow_name, workflow_input, params) + " workflow_name=%s, workflow_input=%s, description=%s, params=%s]" + % (rpc_ctx, workflow_name, workflow_input, description, params) ) return self._engine.start_workflow( workflow_name, workflow_input, + description, **params ) @@ -223,7 +225,7 @@ class EngineClient(base.Engine): ) @wrap_messaging_exception - def start_workflow(self, wf_name, wf_input, **params): + def start_workflow(self, wf_name, wf_input, exec_desc, **params): """Starts workflow sending a request to engine over RPC. :return: Workflow execution. @@ -233,6 +235,7 @@ class EngineClient(base.Engine): 'start_workflow', workflow_name=wf_name, workflow_input=wf_input or {}, + description=exec_desc, params=params ) diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index 333a0afb..8dc90a7c 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -503,6 +503,7 @@ def run_workflow(wf_name, wf_input, wf_params): rpc.get_engine_client().start_workflow( wf_name, wf_input, + "sub-workflow execution", **wf_params ) diff --git a/mistral/services/periodic.py b/mistral/services/periodic.py index 2b028b0c..bd0efa60 100644 --- a/mistral/services/periodic.py +++ b/mistral/services/periodic.py @@ -41,6 +41,7 @@ class MistralPeriodicTasks(periodic_task.PeriodicTasks): rpc.get_engine_client().start_workflow( t.workflow.name, t.workflow_input, + description="workflow execution by cron trigger.", **t.workflow_params ) finally: diff --git a/mistral/tests/unit/api/v2/test_executions.py b/mistral/tests/unit/api/v2/test_executions.py index 1c033ecf..1ac4e1a5 100644 --- a/mistral/tests/unit/api/v2/test_executions.py +++ b/mistral/tests/unit/api/v2/test_executions.py @@ -1,5 +1,6 @@ # Copyright 2013 - Mirantis, Inc. # Copyright 2015 - StackStorm, Inc. +# Copyright 2015 Huawei Technologies Co., Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -29,6 +30,7 @@ from mistral.workflow import states WF_EX = models.WorkflowExecution( id='123', workflow_name='some', + description='execution description.', spec={'name': 'some'}, state=states.RUNNING, state_info=None, @@ -48,7 +50,7 @@ WF_EX_JSON = { 'state_info': None, 'created_at': '1970-01-01 00:00:00', 'updated_at': '1970-01-01 00:00:00', - 'workflow_name': 'some' + 'workflow_name': 'some', } UPDATED_WF_EX = copy.copy(WF_EX) @@ -57,6 +59,9 @@ UPDATED_WF_EX['state'] = states.PAUSED UPDATED_WF_EX_JSON = copy.copy(WF_EX_JSON) UPDATED_WF_EX_JSON['state'] = states.PAUSED +WF_EX_JSON_WITH_DESC = copy.copy(WF_EX_JSON) +WF_EX_JSON_WITH_DESC['description'] = "execution description." + MOCK_WF_EX = mock.MagicMock(return_value=WF_EX) MOCK_WF_EXECUTIONS = mock.MagicMock(return_value=[WF_EX]) MOCK_UPDATED_WF_EX = mock.MagicMock(return_value=UPDATED_WF_EX) @@ -74,7 +79,7 @@ class TestExecutionsController(base.FunctionalTest): self.maxDiff = None self.assertEqual(resp.status_int, 200) - self.assertDictEqual(WF_EX_JSON, resp.json) + self.assertDictEqual(WF_EX_JSON_WITH_DESC, resp.json) @mock.patch.object(db_api, 'get_workflow_execution', MOCK_NOT_FOUND) def test_get_not_found(self): @@ -92,8 +97,11 @@ class TestExecutionsController(base.FunctionalTest): def test_put(self): resp = self.app.put_json('/v2/executions/123', UPDATED_WF_EX_JSON) + UPDATED_WF_EX_WITH_DESC = copy.copy(UPDATED_WF_EX_JSON) + UPDATED_WF_EX_WITH_DESC['description'] = 'execution description.' + self.assertEqual(resp.status_int, 200) - self.assertDictEqual(UPDATED_WF_EX_JSON, resp.json) + self.assertDictEqual(UPDATED_WF_EX_WITH_DESC, resp.json) @mock.patch.object( db_api, @@ -113,6 +121,8 @@ class TestExecutionsController(base.FunctionalTest): resp = self.app.put_json('/v2/executions/123', update_exec) + update_exec['description'] = "execution description." + self.assertEqual(resp.status_int, 200) self.assertDictEqual(update_exec, resp.json) mock_pw.assert_called_once_with('123', 'ERROR', "Force") @@ -127,20 +137,29 @@ class TestExecutionsController(base.FunctionalTest): self.assertEqual(resp.status_int, 404) + def test_put_both_state_and_description(self): + self.assertRaises( + webtest_app.AppError, + self.app.put_json, + '/v2/executions/123', + WF_EX_JSON_WITH_DESC + ) + @mock.patch.object(rpc.EngineClient, 'start_workflow') def test_post(self, f): f.return_value = WF_EX.to_dict() - resp = self.app.post_json('/v2/executions', WF_EX_JSON) + resp = self.app.post_json('/v2/executions', WF_EX_JSON_WITH_DESC) self.assertEqual(resp.status_int, 201) - self.assertDictEqual(WF_EX_JSON, resp.json) + self.assertDictEqual(WF_EX_JSON_WITH_DESC, resp.json) - exec_dict = execution.Execution(**WF_EX_JSON).to_dict() + exec_dict = execution.Execution(**WF_EX_JSON_WITH_DESC).to_dict() f.assert_called_once_with( exec_dict['workflow_name'], exec_dict['input'], + exec_dict['description'], **exec_dict['params'] ) @@ -170,7 +189,7 @@ class TestExecutionsController(base.FunctionalTest): self.assertEqual(resp.status_int, 200) self.assertEqual(len(resp.json['executions']), 1) - self.assertDictEqual(WF_EX_JSON, resp.json['executions'][0]) + self.assertDictEqual(WF_EX_JSON_WITH_DESC, resp.json['executions'][0]) @mock.patch.object(db_api, 'get_workflow_executions', MOCK_EMPTY) def test_get_all_empty(self): diff --git a/mistral/tests/unit/engine/test_default_engine.py b/mistral/tests/unit/engine/test_default_engine.py index e01d1b22..ca8c01fa 100644 --- a/mistral/tests/unit/engine/test_default_engine.py +++ b/mistral/tests/unit/engine/test_default_engine.py @@ -111,11 +111,13 @@ class DefaultEngineTest(base.DbTestCase): wf_ex = self.engine.start_workflow( 'wb.wf', wf_input, + 'my execution', task_name='task2' ) self.assertIsNotNone(wf_ex) self.assertEqual(states.RUNNING, wf_ex.state) + self.assertEqual('my execution', wf_ex.description) self._assert_dict_contains_subset(wf_input, wf_ex.context) self.assertIn('__execution', wf_ex.context) @@ -433,5 +435,8 @@ class DefaultEngineWithTransportTest(eng_test_base.EngineTestCase): self.assertRaises( exc.InputException, - self.engine_client.start_workflow, 'some_wf', {} + self.engine_client.start_workflow, + 'some_wf', + {}, + 'some_description' )