From 22ce7af2986e22096f07aed938301e130b1d14f9 Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Mon, 30 Mar 2015 18:32:58 +0600 Subject: [PATCH] Small refactoring in engine, task handler and workflow utils * Renaming methods in task_handler * Deleted unused method in workflow/utils.py * Made signature of on_task_state_change more consistent with other methods * Other minor things Change-Id: Iaa83d3cdbd1836366401cb5558f194181b421511 --- AUTHORS | 10 +++++++++- mistral/engine1/default_engine.py | 12 ++++++++---- mistral/engine1/policies.py | 10 +++++----- mistral/engine1/rpc.py | 8 ++++---- mistral/engine1/task_handler.py | 24 ++++++++++++------------ mistral/workflow/base.py | 2 +- mistral/workflow/commands.py | 4 ++-- mistral/workflow/utils.py | 15 --------------- 8 files changed, 41 insertions(+), 44 deletions(-) diff --git a/AUTHORS b/AUTHORS index 59fdc4cd..9ca40fcf 100644 --- a/AUTHORS +++ b/AUTHORS @@ -2,17 +2,25 @@ Abhishek Chanda Alexander Kuznetsov Anastasia Kuznetsova Angus Salkeld -Bryan Havenstein +Ankita Wagh +Boris Pavlovic Christian Berendt +David C Kennedy Dmitri Zimine Jeremy Stanley Kirill Izotov Lakshmi Kannan +Lingxian Kong Manas Kelshikar Nikolay Mahotkin +Pierre-Arthur MATHIEU Ray Chen Renat Akhmerov Sergey Kolekonov Sergey Murashov Timur Nurlygayanov Winson Chan +ZhiQiang Fan +Bryan Havenstein + + diff --git a/mistral/engine1/default_engine.py b/mistral/engine1/default_engine.py index 37a1df70..850d6426 100644 --- a/mistral/engine1/default_engine.py +++ b/mistral/engine1/default_engine.py @@ -86,9 +86,13 @@ class DefaultEngine(base.Engine): self._fail_workflow(wf_exec_id, e) raise e - def on_task_state_change(self, state, task_ex_id): + def on_task_state_change(self, task_ex_id, state): with db_api.transaction(): task_ex = db_api.get_task_execution(task_ex_id) + # TODO(rakhmerov): The method is mostly needed for policy and + # we are supposed to get the same action execution as when the + # policy worked. But by the moment this method is called the + # last execution object may have changed. It's a race condition. execution = task_ex.executions[-1] wf_ex_id = task_ex.workflow_execution_id @@ -277,9 +281,9 @@ class DefaultEngine(base.Engine): for cmd in wf_cmds: if isinstance(cmd, commands.RunTask): - task_handler.run_task(cmd) - elif isinstance(cmd, commands.RunExistentTask): - task_handler.run_existent_task(cmd.task_ex.id) + task_handler.run_new_task(cmd) + elif isinstance(cmd, commands.RunExistingTask): + task_handler.run_existing_task(cmd.task_ex.id) elif isinstance(cmd, commands.SetWorkflowState): # TODO(rakhmerov): Special commands should be persisted too. wf_handler.set_execution_state(wf_ex, cmd.new_state) diff --git a/mistral/engine1/policies.py b/mistral/engine1/policies.py index b90176c3..b492df0c 100644 --- a/mistral/engine1/policies.py +++ b/mistral/engine1/policies.py @@ -22,7 +22,7 @@ from mistral.workflow import states _ENGINE_CLIENT_PATH = 'mistral.engine1.rpc.get_engine_client' -_RUN_EXISTENT_TASK_PATH = 'mistral.engine1.task_handler.run_existent_task' +_RUN_EXISTING_TASK_PATH = 'mistral.engine1.task_handler.run_existing_task' def _log_task_delay(task_ex, delay_sec): @@ -186,7 +186,7 @@ class WaitBeforePolicy(base.TaskPolicy): scheduler.schedule_call( None, - _RUN_EXISTENT_TASK_PATH, + _RUN_EXISTING_TASK_PATH, self.delay, task_ex_id=task_ex.id, ) @@ -304,7 +304,7 @@ class RetryPolicy(base.TaskPolicy): scheduler.schedule_call( None, - _RUN_EXISTENT_TASK_PATH, + _RUN_EXISTING_TASK_PATH, self.delay, task_ex_id=task_ex.id, ) @@ -403,6 +403,6 @@ def fail_task_if_incomplete(task_ex_id, timeout): ) rpc.get_engine_client().on_task_state_change( - states.ERROR, - task_ex_id + task_ex_id, + states.ERROR ) diff --git a/mistral/engine1/rpc.py b/mistral/engine1/rpc.py index 19ad68a6..97f57adc 100644 --- a/mistral/engine1/rpc.py +++ b/mistral/engine1/rpc.py @@ -82,8 +82,8 @@ class EngineServer(object): **params ) - def on_task_state_change(self, rpc_ctx, state, task_ex_id): - return self._engine.on_task_state_change(state, task_ex_id) + def on_task_state_change(self, rpc_ctx, task_ex_id, state): + return self._engine.on_task_state_change(task_ex_id, state) def on_action_complete(self, rpc_ctx, action_ex_id, result_data, result_error): @@ -199,12 +199,12 @@ class EngineClient(base.Engine): params=params ) - def on_task_state_change(self, state, task_ex_id): + def on_task_state_change(self, task_ex_id, state): return self._client.call( auth_ctx.ctx(), 'on_task_state_change', - state=state, task_ex_id=task_ex_id, + state=state ) def on_action_complete(self, action_ex_id, result): diff --git a/mistral/engine1/task_handler.py b/mistral/engine1/task_handler.py index 50ee903c..733ea36d 100644 --- a/mistral/engine1/task_handler.py +++ b/mistral/engine1/task_handler.py @@ -36,8 +36,8 @@ from mistral.workflow import with_items LOG = logging.getLogger(__name__) -def run_existent_task(task_ex_id): - """This function runs existent task execution. +def run_existing_task(task_ex_id): + """This function runs existing task execution. It is needed mostly by scheduler. """ @@ -50,10 +50,10 @@ def run_existent_task(task_ex_id): # Explicitly change task state to RUNNING. task_ex.state = states.RUNNING - _run_existent_task(task_ex, task_spec, wf_spec) + _run_existing_task(task_ex, task_spec, wf_spec) -def _run_existent_task(task_ex, task_spec, wf_spec): +def _run_existing_task(task_ex, task_spec, wf_spec): input_dicts = _get_input_dictionaries( wf_spec, task_ex, task_spec, task_ex.in_context ) @@ -66,7 +66,7 @@ def _run_existent_task(task_ex, task_spec, wf_spec): _run_action_or_workflow(task_ex, task_spec, input_d) -def run_task(wf_cmd): +def run_new_task(wf_cmd): """Runs a task.""" ctx = wf_cmd.ctx wf_ex = wf_cmd.wf_ex @@ -89,7 +89,7 @@ def run_task(wf_cmd): if task_ex.state != states.RUNNING: return - _run_existent_task(task_ex, task_spec, wf_spec) + _run_existing_task(task_ex, task_spec, wf_spec) def on_action_complete(action_ex, result): @@ -205,24 +205,24 @@ def _get_input_dictionaries(wf_spec, task_ex, task_spec, ctx): return [input_dict] else: - return get_with_items_input(wf_spec, task_ex, task_spec, ctx) + return _get_with_items_input(wf_spec, task_ex, task_spec, ctx) def _get_workflow_or_action_input(wf_spec, task_ex, task_spec, ctx): if task_spec.get_action_name(): - return get_action_input( + return _get_action_input( wf_spec, task_ex, task_spec, ctx ) elif task_spec.get_workflow_name(): - return get_workflow_input(task_spec, ctx) + return _get_workflow_input(task_spec, ctx) else: raise RuntimeError('Must never happen.') -def get_with_items_input(wf_spec, task_ex, task_spec, ctx): +def _get_with_items_input(wf_spec, task_ex, task_spec, ctx): """Calculate input array for separating each action input. Example: @@ -274,7 +274,7 @@ def get_with_items_input(wf_spec, task_ex, task_spec, ctx): return action_inputs -def get_action_input(wf_spec, task_ex, task_spec, ctx): +def _get_action_input(wf_spec, task_ex, task_spec, ctx): input_dict = expr.evaluate_recursively(task_spec.get_input(), ctx) action_spec_name = task_spec.get_action_name() @@ -320,7 +320,7 @@ def get_action_input(wf_spec, task_ex, task_spec, ctx): return input_dict -def get_workflow_input(task_spec, ctx): +def _get_workflow_input(task_spec, ctx): return expr.evaluate_recursively(task_spec.get_input(), ctx) diff --git a/mistral/workflow/base.py b/mistral/workflow/base.py index 2368e49b..bc49dc3c 100644 --- a/mistral/workflow/base.py +++ b/mistral/workflow/base.py @@ -106,7 +106,7 @@ class WorkflowController(object): # Add all tasks in IDLE state. idle_tasks = wf_utils.find_tasks_with_state(self.wf_ex, states.IDLE) - return [commands.RunExistentTask(t) for t in idle_tasks] + return [commands.RunExistingTask(t) for t in idle_tasks] def _is_paused_or_completed(self): return states.is_paused_or_completed(self.wf_ex.state) diff --git a/mistral/workflow/commands.py b/mistral/workflow/commands.py index 07cbe20d..786d2954 100644 --- a/mistral/workflow/commands.py +++ b/mistral/workflow/commands.py @@ -48,7 +48,7 @@ class RunTask(WorkflowCommand): ) -class RunExistentTask(WorkflowCommand): +class RunExistingTask(WorkflowCommand): """Command for running already existent task.""" def __init__(self, task_ex): @@ -56,7 +56,7 @@ class RunExistentTask(WorkflowCommand): task_spec = spec_parser.get_task_spec(task_ex.spec) self.task_ex = task_ex - super(RunExistentTask, self).__init__( + super(RunExistingTask, self).__init__( wf_ex, task_spec, task_ex.in_context ) diff --git a/mistral/workflow/utils.py b/mistral/workflow/utils.py index f4195879..7e070f5c 100644 --- a/mistral/workflow/utils.py +++ b/mistral/workflow/utils.py @@ -13,7 +13,6 @@ # limitations under the License. from mistral.utils import serializers -from mistral.workbook.v2 import tasks as v2_tasks_spec from mistral.workflow import states @@ -55,20 +54,6 @@ def find_task_execution(wf_ex, task_spec): return task_execs[0] if len(task_execs) > 0 else None -def find_upstream_task_executions(wf_ex, task_spec, upstream_task_specs, - cause_task_ex=None): - # For direct workflow, if the current task does not have join, the - # task that caused this task execution is the only upstream task. - if (isinstance(task_spec, v2_tasks_spec.DirectWorkflowTaskSpec) and - not task_spec.get_join() and cause_task_ex): - return [cause_task_ex] - else: - # TODO(m4dcoder): Fix use case where there are parallel branches - # that join on a common task separately. Currently, this returns - # all tasks that list the common task as a transition. - return find_task_executions(wf_ex, upstream_task_specs) - - def find_task_executions(wf_ex, task_specs): return filter( None,