diff --git a/mistral/api/controllers/v2/execution.py b/mistral/api/controllers/v2/execution.py index eb9f46b2..83654b99 100644 --- a/mistral/api/controllers/v2/execution.py +++ b/mistral/api/controllers/v2/execution.py @@ -45,6 +45,9 @@ class Execution(resource.Resource): workflow_name = wtypes.text "reference to workflow definition" + workflow_id = wtypes.text + "reference to workflow ID" + description = wtypes.text "description of workflow execution." @@ -74,6 +77,7 @@ class Execution(resource.Resource): def sample(cls): return cls(id='123e4567-e89b-12d3-a456-426655440000', workflow_name='flow', + workflow_id='123e4567-e89b-12d3-a456-426655441111', description='this is the first execution.', state='SUCCESS', input={}, @@ -220,8 +224,15 @@ class ExecutionsController(rest.RestController): engine = rpc.get_engine_client() exec_dict = wf_ex.to_dict() + if not (exec_dict.get('workflow_id') + or exec_dict.get('workflow_name')): + raise exc.WorkflowException( + "Workflow ID or workflow name must be provided. Workflow ID is" + " recommended." + ) + result = engine.start_workflow( - exec_dict['workflow_name'], + exec_dict.get('workflow_id', exec_dict['workflow_name']), exec_dict.get('input'), exec_dict.get('description', ''), **exec_dict.get('params') or {} diff --git a/mistral/api/controllers/v2/task.py b/mistral/api/controllers/v2/task.py index 3a63e5f0..82701340 100644 --- a/mistral/api/controllers/v2/task.py +++ b/mistral/api/controllers/v2/task.py @@ -43,6 +43,7 @@ class Task(resource.Resource): name = wtypes.text workflow_name = wtypes.text + workflow_id = wtypes.text workflow_execution_id = wtypes.text state = wtypes.text @@ -69,6 +70,7 @@ class Task(resource.Resource): return cls( id='123e4567-e89b-12d3-a456-426655440000', workflow_name='flow', + workflow_id='123e4567-e89b-12d3-a456-426655441111', workflow_execution_id='123e4567-e89b-12d3-a456-426655440000', name='task', state=states.SUCCESS, diff --git a/mistral/db/v2/sqlalchemy/models.py b/mistral/db/v2/sqlalchemy/models.py index ae66baac..78a42e5c 100644 --- a/mistral/db/v2/sqlalchemy/models.py +++ b/mistral/db/v2/sqlalchemy/models.py @@ -121,8 +121,8 @@ class Execution(mb.MistralSecureModelBase): id = mb.id_column() name = sa.Column(sa.String(80)) description = sa.Column(sa.String(255), nullable=True) - workflow_name = sa.Column(sa.String(80)) + workflow_id = sa.Column(sa.String(80)) spec = sa.Column(st.JsonDictType()) state = sa.Column(sa.String(20)) state_info = sa.Column(sa.Text(), nullable=True) diff --git a/mistral/engine/action_handler.py b/mistral/engine/action_handler.py index 4eb4fb1d..5761564b 100644 --- a/mistral/engine/action_handler.py +++ b/mistral/engine/action_handler.py @@ -62,6 +62,7 @@ def create_action_execution(action_def, action_input, task_ex=None, values.update({ 'task_execution_id': task_ex.id, 'workflow_name': task_ex.workflow_name, + 'workflow_id': task_ex.workflow_id, 'project_id': task_ex.project_id, }) else: diff --git a/mistral/engine/base.py b/mistral/engine/base.py index dc4288d3..d3d22195 100644 --- a/mistral/engine/base.py +++ b/mistral/engine/base.py @@ -29,10 +29,12 @@ class Engine(object): """Engine interface.""" @abc.abstractmethod - def start_workflow(self, wf_name, wf_input, description='', **params): + def start_workflow(self, wf_identifier, wf_input, description='', + **params): """Starts the specified workflow. - :param wf_name: Workflow name. + :param wf_identifier: Workflow ID or name. Workflow ID is recommended, + workflow name will be deprecated since Mitaka. :param wf_input: Workflow input data as a dictionary. :param description: Execution description. :param params: Additional workflow type specific parameters. diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index 8f7cb344..c388d379 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -50,7 +50,8 @@ class DefaultEngine(base.Engine, coordination.Service): coordination.Service.__init__(self, 'engine_group') @u.log_exec(LOG) - def start_workflow(self, wf_name, wf_input, description='', **params): + def start_workflow(self, wf_identifier, wf_input, description='', + **params): wf_ex_id = None try: @@ -58,7 +59,7 @@ class DefaultEngine(base.Engine, coordination.Service): # The new workflow execution will be in an IDLE # state on initial record creation. wf_ex_id = wf_ex_service.create_workflow_execution( - wf_name, + wf_identifier, wf_input, description, params @@ -87,7 +88,7 @@ class DefaultEngine(base.Engine, coordination.Service): except Exception as e: LOG.error( "Failed to start workflow '%s' id=%s: %s\n%s", - wf_name, wf_ex_id, e, traceback.format_exc() + wf_identifier, wf_ex_id, e, traceback.format_exc() ) wf_ex = self._fail_workflow(wf_ex_id, e) diff --git a/mistral/engine/rpc.py b/mistral/engine/rpc.py index 0ce69f76..103ed1cf 100644 --- a/mistral/engine/rpc.py +++ b/mistral/engine/rpc.py @@ -77,7 +77,7 @@ class EngineServer(object): def __init__(self, engine): self._engine = engine - def start_workflow(self, rpc_ctx, workflow_name, workflow_input, + def start_workflow(self, rpc_ctx, workflow_identifier, workflow_input, description, params): """Receives calls over RPC to start workflows on engine. @@ -87,12 +87,14 @@ class EngineServer(object): LOG.info( "Received RPC request 'start_workflow'[rpc_ctx=%s," - " workflow_name=%s, workflow_input=%s, description=%s, params=%s]" - % (rpc_ctx, workflow_name, workflow_input, description, params) + " workflow_identifier=%s, workflow_input=%s, description=%s, " + "params=%s]" + % (rpc_ctx, workflow_identifier, workflow_input, description, + params) ) return self._engine.start_workflow( - workflow_name, + workflow_identifier, workflow_input, description, **params @@ -283,7 +285,8 @@ class EngineClient(base.Engine): ) @wrap_messaging_exception - def start_workflow(self, wf_name, wf_input, description='', **params): + def start_workflow(self, wf_identifier, wf_input, description='', + **params): """Starts workflow sending a request to engine over RPC. :return: Workflow execution. @@ -291,7 +294,7 @@ class EngineClient(base.Engine): return self._client.call( auth_ctx.ctx(), 'start_workflow', - workflow_name=wf_name, + workflow_identifier=wf_identifier, workflow_input=wf_input or {}, description=description, params=params diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index 854d7bd4..56f86986 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -208,6 +208,7 @@ def _create_task_execution(wf_ex, task_spec, ctx, state=states.RUNNING): 'name': task_spec.get_name(), 'workflow_execution_id': wf_ex.id, 'workflow_name': wf_ex.workflow_name, + 'workflow_id': wf_ex.workflow_id, 'state': state, 'spec': task_spec.to_dict(), 'in_context': ctx, diff --git a/mistral/services/executions.py b/mistral/services/executions.py index 9a15863f..9b8e26d9 100644 --- a/mistral/services/executions.py +++ b/mistral/services/executions.py @@ -50,6 +50,7 @@ def _create_workflow_execution(wf_def, wf_spec, wf_input, desc, params): 'name': wf_def.name, 'description': desc, 'workflow_name': wf_def.name, + 'workflow_id': wf_def.id, 'spec': wf_spec.to_dict(), 'params': params or {}, 'state': states.IDLE, @@ -70,10 +71,10 @@ def _create_workflow_execution(wf_def, wf_spec, wf_input, desc, params): return wf_ex -def create_workflow_execution(wf_name, wf_input, description, params): +def create_workflow_execution(wf_identifier, wf_input, description, params): params = canonize_workflow_params(params) - wf_def = db_api.get_workflow_definition(wf_name) + wf_def = db_api.get_workflow_definition(wf_identifier) wf_spec = spec_parser.get_workflow_spec(wf_def.spec) eng_utils.validate_input(wf_def, wf_input, wf_spec) @@ -86,6 +87,6 @@ def create_workflow_execution(wf_name, wf_input, description, params): params ) - wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_name) + wf_trace.info(wf_ex, "Starting workflow: '%s'" % wf_identifier) return wf_ex.id diff --git a/mistral/tests/unit/api/v2/test_executions.py b/mistral/tests/unit/api/v2/test_executions.py index 4e306a67..53c4338e 100644 --- a/mistral/tests/unit/api/v2/test_executions.py +++ b/mistral/tests/unit/api/v2/test_executions.py @@ -33,6 +33,7 @@ from mistral.workflow import states WF_EX = models.WorkflowExecution( id='123e4567-e89b-12d3-a456-426655440000', workflow_name='some', + workflow_id='123e4567-e89b-12d3-a456-426655441111', description='execution description.', spec={'name': 'some'}, state=states.RUNNING, @@ -54,11 +55,13 @@ WF_EX_JSON = { 'created_at': '1970-01-01 00:00:00', 'updated_at': '1970-01-01 00:00:00', 'workflow_name': 'some', + 'workflow_id': '123e4567-e89b-12d3-a456-426655441111' } SUB_WF_EX = models.WorkflowExecution( id=str(uuid.uuid4()), workflow_name='some', + workflow_id='123e4567-e89b-12d3-a456-426655441111', description='foobar', spec={'name': 'some'}, state=states.RUNNING, @@ -74,6 +77,7 @@ SUB_WF_EX = models.WorkflowExecution( SUB_WF_EX_JSON = { 'id': SUB_WF_EX.id, 'workflow_name': 'some', + 'workflow_id': '123e4567-e89b-12d3-a456-426655441111', 'description': 'foobar', 'input': '{"foo": "bar"}', 'output': '{}', @@ -389,7 +393,7 @@ class TestExecutionsController(base.FunctionalTest): exec_dict = WF_EX_JSON_WITH_DESC f.assert_called_once_with( - exec_dict['workflow_name'], + exec_dict['workflow_id'], json.loads(exec_dict['input']), exec_dict['description'], **json.loads(exec_dict['params']) @@ -406,6 +410,16 @@ class TestExecutionsController(base.FunctionalTest): self.assertIn('Bad response: 400', context.args[0]) + def test_post_without_workflow_id_and_name(self): + context = self.assertRaises( + webtest_app.AppError, + self.app.post_json, + '/v2/executions', + {'description': 'some description here.'} + ) + + self.assertIn('Bad response: 400', context.args[0]) + @mock.patch.object(db_api, 'delete_workflow_execution', MOCK_DELETE) def test_delete(self): resp = self.app.delete('/v2/executions/123') diff --git a/mistral/tests/unit/api/v2/test_tasks.py b/mistral/tests/unit/api/v2/test_tasks.py index fea7c326..f54e9e9d 100644 --- a/mistral/tests/unit/api/v2/test_tasks.py +++ b/mistral/tests/unit/api/v2/test_tasks.py @@ -49,6 +49,7 @@ TASK_EX = models.TaskExecution( id='123', name='task', workflow_name='flow', + workflow_id='123e4567-e89b-12d3-a456-426655441111', spec={ 'type': 'direct', 'version': '2.0', @@ -70,6 +71,7 @@ WITH_ITEMS_TASK_EX = models.TaskExecution( id='123', name='task', workflow_name='flow', + workflow_id='123e4567-e89b-12d3-a456-426655441111', spec={ 'type': 'direct', 'version': '2.0', @@ -92,6 +94,7 @@ TASK = { 'id': '123', 'name': 'task', 'workflow_name': 'flow', + 'workflow_id': '123e4567-e89b-12d3-a456-426655441111', 'state': 'RUNNING', 'workflow_execution_id': WF_EX.id, 'created_at': '1970-01-01 00:00:00',