From cc32c82c165df4083f1eed2ce2c18fb9cc38b346 Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Fri, 6 May 2016 14:27:59 +0700 Subject: [PATCH] Fixing engine facade hierarchy * Method rerun_workflow() was missing in Engine interface * Renamed parameters in the hierarchy to be more consistent * Added missing docstrings Change-Id: I08b2b552130fd16a20f6647349006939619b6659 --- mistral/engine/base.py | 35 ++++++++++++++++++-------- mistral/engine/default_engine.py | 4 +-- mistral/engine/rpc.py | 43 +++++++++++++++++++++++++------- 3 files changed, 60 insertions(+), 22 deletions(-) diff --git a/mistral/engine/base.py b/mistral/engine/base.py index d3d22195..63e0f857 100644 --- a/mistral/engine/base.py +++ b/mistral/engine/base.py @@ -69,28 +69,41 @@ class Engine(object): raise NotImplementedError @abc.abstractmethod - def pause_workflow(self, execution_id): - """Pauses workflow execution. + def pause_workflow(self, wf_ex_id): + """Pauses workflow. - :param execution_id: Execution id. + :param wf_ex_id: Execution id. :return: Workflow execution object. """ raise NotImplementedError @abc.abstractmethod - def resume_workflow(self, execution_id): - """Resumes workflow execution. + def resume_workflow(self, wf_ex_id): + """Resumes workflow. - :param execution_id: Execution id. + :param wf_ex_id: Execution id. :return: Workflow execution object. """ raise NotImplementedError @abc.abstractmethod - def stop_workflow(self, execution_id, state, message): - """Stops workflow execution. + def rerun_workflow(self, wf_ex_id, task_ex_id, reset=True, env=None): + """Rerun workflow from the specified task. - :param execution_id: Workflow execution id. + :param wf_ex_id: Workflow execution id. + :param task_ex_id: Task execution id. + :param reset: If True, reset task state including deleting its action + executions. + :param env: Workflow environment. + :return: Workflow execution object. + """ + raise NotImplementedError + + @abc.abstractmethod + def stop_workflow(self, wf_ex_id, state, message): + """Stops workflow. + + :param wf_ex_id: Workflow execution id. :param state: State assigned to the workflow. Permitted states are SUCCESS or ERROR. :param message: Optional information string. @@ -100,10 +113,10 @@ class Engine(object): raise NotImplementedError @abc.abstractmethod - def rollback_workflow(self, execution_id): + def rollback_workflow(self, wf_ex_id): """Rolls back workflow execution. - :param execution_id: Execution id. + :param wf_ex_id: Execution id. :return: Workflow execution object. """ raise NotImplementedError diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index f71adc1a..944e25e7 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -275,9 +275,9 @@ class DefaultEngine(base.Engine, coordination.Service): raise e @u.log_exec(LOG) - def pause_workflow(self, execution_id): + def pause_workflow(self, wf_ex_id): with db_api.transaction(): - wf_ex = wf_handler.lock_workflow_execution(execution_id) + wf_ex = wf_handler.lock_workflow_execution(wf_ex_id) wf_handler.set_execution_state(wf_ex, states.PAUSED) diff --git a/mistral/engine/rpc.py b/mistral/engine/rpc.py index b9598263..41a35381 100644 --- a/mistral/engine/rpc.py +++ b/mistral/engine/rpc.py @@ -140,6 +140,10 @@ class EngineServer(object): """Receives calls over RPC to start workflows on engine. :param rpc_ctx: RPC request context. + :param workflow_identifier: Workflow definition identifier. + :param workflow_input: Workflow input. + :param description: Workflow execution description. + :param params: Additional workflow type specific parameters. :return: Workflow execution. """ @@ -398,6 +402,8 @@ class EngineClient(base.Engine): it possibly needs to move the workflow on, i.e. run other workflow tasks for which all dependencies are satisfied. + :param action_ex_id: Action execution id. + :param result: Action execution result. :return: Task. """ @@ -410,16 +416,17 @@ class EngineClient(base.Engine): ) @wrap_messaging_exception - def pause_workflow(self, execution_id): + def pause_workflow(self, wf_ex_id): """Stops the workflow with the given execution id. + :param wf_ex_id: Workflow execution id. :return: Workflow execution. """ return self._client.call( auth_ctx.ctx(), 'pause_workflow', - execution_id=execution_id + execution_id=wf_ex_id ) @wrap_messaging_exception @@ -431,7 +438,8 @@ class EngineClient(base.Engine): :param wf_ex_id: Workflow execution id. :param task_ex_id: Task execution id. - :param reset: If true, then purge action execution for the task. + :param reset: If true, then reset task execution state and purge + action execution for the task. :param env: Environment variables to update. :return: Workflow execution. """ @@ -462,13 +470,13 @@ class EngineClient(base.Engine): ) @wrap_messaging_exception - def stop_workflow(self, execution_id, state, message=None): + def stop_workflow(self, wf_ex_id, state, message=None): """Stops workflow execution with given status. Once stopped, the workflow is complete with SUCCESS or ERROR, and can not be resumed. - :param execution_id: Workflow execution id + :param wf_ex_id: Workflow execution id :param state: State assigned to the workflow: SUCCESS or ERROR :param message: Optional information string @@ -478,22 +486,24 @@ class EngineClient(base.Engine): return self._client.call( auth_ctx.ctx(), 'stop_workflow', - execution_id=execution_id, + execution_id=wf_ex_id, state=state, message=message ) @wrap_messaging_exception - def rollback_workflow(self, execution_id): + def rollback_workflow(self, wf_ex_id): """Rolls back the workflow with the given execution id. + :param wf_ex_id: Workflow execution id. + :return: Workflow execution. """ return self._client.call( auth_ctx.ctx(), 'rollback_workflow', - execution_id=execution_id + execution_id=wf_ex_id ) @@ -508,6 +518,11 @@ class ExecutorServer(object): """Receives calls over RPC to run action on executor. :param rpc_ctx: RPC request context dictionary. + :param action_ex_id: Action execution id. + :param action_class_str: Action class name. + :param attributes: Action class attributes. + :param params: Action input parameters. + :return: Action result. """ LOG.info( @@ -546,7 +561,17 @@ class ExecutorClient(base.Executor): def run_action(self, action_ex_id, action_class_str, attributes, action_params, target=None, async=True): - """Sends a request to run action to executor.""" + """Sends a request to run action to executor. + + :param action_ex_id: Action execution id. + :param action_class_str: Action class name. + :param attributes: Action class attributes. + :param action_params: Action input parameters. + :param target: Target (group of action executors). + :param async: If True, run action in asynchronous mode (w/o waiting + for completion). + :return: Action result. + """ kwargs = { 'action_ex_id': action_ex_id,