Small refactoring in engine, task handler and workflow utils

* Renaming methods in task_handler
* Deleted unused method in workflow/utils.py
* Made signature of on_task_state_change more consistent with other
  methods
* Other minor things

Change-Id: Iaa83d3cdbd1836366401cb5558f194181b421511
This commit is contained in:
Renat Akhmerov 2015-03-30 18:32:58 +06:00
parent ea1187825c
commit 22ce7af298
8 changed files with 41 additions and 44 deletions

10
AUTHORS
View File

@ -2,17 +2,25 @@ Abhishek Chanda <abhishek@cloudscaling.com>
Alexander Kuznetsov <akuznetsov@mirantis.com> Alexander Kuznetsov <akuznetsov@mirantis.com>
Anastasia Kuznetsova <akuznetsova@mirantis.com> Anastasia Kuznetsova <akuznetsova@mirantis.com>
Angus Salkeld <angus.salkeld@rackspace.com> Angus Salkeld <angus.salkeld@rackspace.com>
Bryan Havenstein <bryan.havenstein@ericsson.com> Ankita Wagh <ankita_wagh@symmactoolkit-c02lr80ufd57.symc.symantec.com>
Boris Pavlovic <boris@pavlovic.me>
Christian Berendt <berendt@b1-systems.de> Christian Berendt <berendt@b1-systems.de>
David C Kennedy <david.c.kennedy@hp.com>
Dmitri Zimine <dz@stackstorm.com> Dmitri Zimine <dz@stackstorm.com>
Jeremy Stanley <fungi@yuggoth.org> Jeremy Stanley <fungi@yuggoth.org>
Kirill Izotov <enykeev@stackstorm.com> Kirill Izotov <enykeev@stackstorm.com>
Lakshmi Kannan <lakshmi@stackstorm.com> Lakshmi Kannan <lakshmi@stackstorm.com>
Lingxian Kong <anlin.kong@gmail.com>
Manas Kelshikar <manas@stackstorm.com> Manas Kelshikar <manas@stackstorm.com>
Nikolay Mahotkin <nmakhotkin@mirantis.com> Nikolay Mahotkin <nmakhotkin@mirantis.com>
Pierre-Arthur MATHIEU <pierre-arthur.mathieu@hp.com>
Ray Chen <chenrano2002@gmail.com> Ray Chen <chenrano2002@gmail.com>
Renat Akhmerov <rakhmerov@mirantis.com> Renat Akhmerov <rakhmerov@mirantis.com>
Sergey Kolekonov <skolekonov@mirantis.com> Sergey Kolekonov <skolekonov@mirantis.com>
Sergey Murashov <smurashov@mirantis.com> Sergey Murashov <smurashov@mirantis.com>
Timur Nurlygayanov <tnurlygayanov@mirantis.com> Timur Nurlygayanov <tnurlygayanov@mirantis.com>
Winson Chan <wcchan@stackstorm.com> Winson Chan <wcchan@stackstorm.com>
ZhiQiang Fan <zhiqiang.fan@huawei.com>
Bryan Havenstein <bryan.havenstein@ericsson.com>

View File

@ -86,9 +86,13 @@ class DefaultEngine(base.Engine):
self._fail_workflow(wf_exec_id, e) self._fail_workflow(wf_exec_id, e)
raise e raise e
def on_task_state_change(self, state, task_ex_id): def on_task_state_change(self, task_ex_id, state):
with db_api.transaction(): with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex_id) task_ex = db_api.get_task_execution(task_ex_id)
# TODO(rakhmerov): The method is mostly needed for policy and
# we are supposed to get the same action execution as when the
# policy worked. But by the moment this method is called the
# last execution object may have changed. It's a race condition.
execution = task_ex.executions[-1] execution = task_ex.executions[-1]
wf_ex_id = task_ex.workflow_execution_id wf_ex_id = task_ex.workflow_execution_id
@ -277,9 +281,9 @@ class DefaultEngine(base.Engine):
for cmd in wf_cmds: for cmd in wf_cmds:
if isinstance(cmd, commands.RunTask): if isinstance(cmd, commands.RunTask):
task_handler.run_task(cmd) task_handler.run_new_task(cmd)
elif isinstance(cmd, commands.RunExistentTask): elif isinstance(cmd, commands.RunExistingTask):
task_handler.run_existent_task(cmd.task_ex.id) task_handler.run_existing_task(cmd.task_ex.id)
elif isinstance(cmd, commands.SetWorkflowState): elif isinstance(cmd, commands.SetWorkflowState):
# TODO(rakhmerov): Special commands should be persisted too. # TODO(rakhmerov): Special commands should be persisted too.
wf_handler.set_execution_state(wf_ex, cmd.new_state) wf_handler.set_execution_state(wf_ex, cmd.new_state)

View File

@ -22,7 +22,7 @@ from mistral.workflow import states
_ENGINE_CLIENT_PATH = 'mistral.engine1.rpc.get_engine_client' _ENGINE_CLIENT_PATH = 'mistral.engine1.rpc.get_engine_client'
_RUN_EXISTENT_TASK_PATH = 'mistral.engine1.task_handler.run_existent_task' _RUN_EXISTING_TASK_PATH = 'mistral.engine1.task_handler.run_existing_task'
def _log_task_delay(task_ex, delay_sec): def _log_task_delay(task_ex, delay_sec):
@ -186,7 +186,7 @@ class WaitBeforePolicy(base.TaskPolicy):
scheduler.schedule_call( scheduler.schedule_call(
None, None,
_RUN_EXISTENT_TASK_PATH, _RUN_EXISTING_TASK_PATH,
self.delay, self.delay,
task_ex_id=task_ex.id, task_ex_id=task_ex.id,
) )
@ -304,7 +304,7 @@ class RetryPolicy(base.TaskPolicy):
scheduler.schedule_call( scheduler.schedule_call(
None, None,
_RUN_EXISTENT_TASK_PATH, _RUN_EXISTING_TASK_PATH,
self.delay, self.delay,
task_ex_id=task_ex.id, task_ex_id=task_ex.id,
) )
@ -403,6 +403,6 @@ def fail_task_if_incomplete(task_ex_id, timeout):
) )
rpc.get_engine_client().on_task_state_change( rpc.get_engine_client().on_task_state_change(
states.ERROR, task_ex_id,
task_ex_id states.ERROR
) )

View File

@ -82,8 +82,8 @@ class EngineServer(object):
**params **params
) )
def on_task_state_change(self, rpc_ctx, state, task_ex_id): def on_task_state_change(self, rpc_ctx, task_ex_id, state):
return self._engine.on_task_state_change(state, task_ex_id) return self._engine.on_task_state_change(task_ex_id, state)
def on_action_complete(self, rpc_ctx, action_ex_id, result_data, def on_action_complete(self, rpc_ctx, action_ex_id, result_data,
result_error): result_error):
@ -199,12 +199,12 @@ class EngineClient(base.Engine):
params=params params=params
) )
def on_task_state_change(self, state, task_ex_id): def on_task_state_change(self, task_ex_id, state):
return self._client.call( return self._client.call(
auth_ctx.ctx(), auth_ctx.ctx(),
'on_task_state_change', 'on_task_state_change',
state=state,
task_ex_id=task_ex_id, task_ex_id=task_ex_id,
state=state
) )
def on_action_complete(self, action_ex_id, result): def on_action_complete(self, action_ex_id, result):

View File

@ -36,8 +36,8 @@ from mistral.workflow import with_items
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
def run_existent_task(task_ex_id): def run_existing_task(task_ex_id):
"""This function runs existent task execution. """This function runs existing task execution.
It is needed mostly by scheduler. It is needed mostly by scheduler.
""" """
@ -50,10 +50,10 @@ def run_existent_task(task_ex_id):
# Explicitly change task state to RUNNING. # Explicitly change task state to RUNNING.
task_ex.state = states.RUNNING task_ex.state = states.RUNNING
_run_existent_task(task_ex, task_spec, wf_spec) _run_existing_task(task_ex, task_spec, wf_spec)
def _run_existent_task(task_ex, task_spec, wf_spec): def _run_existing_task(task_ex, task_spec, wf_spec):
input_dicts = _get_input_dictionaries( input_dicts = _get_input_dictionaries(
wf_spec, task_ex, task_spec, task_ex.in_context wf_spec, task_ex, task_spec, task_ex.in_context
) )
@ -66,7 +66,7 @@ def _run_existent_task(task_ex, task_spec, wf_spec):
_run_action_or_workflow(task_ex, task_spec, input_d) _run_action_or_workflow(task_ex, task_spec, input_d)
def run_task(wf_cmd): def run_new_task(wf_cmd):
"""Runs a task.""" """Runs a task."""
ctx = wf_cmd.ctx ctx = wf_cmd.ctx
wf_ex = wf_cmd.wf_ex wf_ex = wf_cmd.wf_ex
@ -89,7 +89,7 @@ def run_task(wf_cmd):
if task_ex.state != states.RUNNING: if task_ex.state != states.RUNNING:
return return
_run_existent_task(task_ex, task_spec, wf_spec) _run_existing_task(task_ex, task_spec, wf_spec)
def on_action_complete(action_ex, result): def on_action_complete(action_ex, result):
@ -205,24 +205,24 @@ def _get_input_dictionaries(wf_spec, task_ex, task_spec, ctx):
return [input_dict] return [input_dict]
else: else:
return get_with_items_input(wf_spec, task_ex, task_spec, ctx) return _get_with_items_input(wf_spec, task_ex, task_spec, ctx)
def _get_workflow_or_action_input(wf_spec, task_ex, task_spec, ctx): def _get_workflow_or_action_input(wf_spec, task_ex, task_spec, ctx):
if task_spec.get_action_name(): if task_spec.get_action_name():
return get_action_input( return _get_action_input(
wf_spec, wf_spec,
task_ex, task_ex,
task_spec, task_spec,
ctx ctx
) )
elif task_spec.get_workflow_name(): elif task_spec.get_workflow_name():
return get_workflow_input(task_spec, ctx) return _get_workflow_input(task_spec, ctx)
else: else:
raise RuntimeError('Must never happen.') raise RuntimeError('Must never happen.')
def get_with_items_input(wf_spec, task_ex, task_spec, ctx): def _get_with_items_input(wf_spec, task_ex, task_spec, ctx):
"""Calculate input array for separating each action input. """Calculate input array for separating each action input.
Example: Example:
@ -274,7 +274,7 @@ def get_with_items_input(wf_spec, task_ex, task_spec, ctx):
return action_inputs return action_inputs
def get_action_input(wf_spec, task_ex, task_spec, ctx): def _get_action_input(wf_spec, task_ex, task_spec, ctx):
input_dict = expr.evaluate_recursively(task_spec.get_input(), ctx) input_dict = expr.evaluate_recursively(task_spec.get_input(), ctx)
action_spec_name = task_spec.get_action_name() action_spec_name = task_spec.get_action_name()
@ -320,7 +320,7 @@ def get_action_input(wf_spec, task_ex, task_spec, ctx):
return input_dict return input_dict
def get_workflow_input(task_spec, ctx): def _get_workflow_input(task_spec, ctx):
return expr.evaluate_recursively(task_spec.get_input(), ctx) return expr.evaluate_recursively(task_spec.get_input(), ctx)

View File

@ -106,7 +106,7 @@ class WorkflowController(object):
# Add all tasks in IDLE state. # Add all tasks in IDLE state.
idle_tasks = wf_utils.find_tasks_with_state(self.wf_ex, states.IDLE) idle_tasks = wf_utils.find_tasks_with_state(self.wf_ex, states.IDLE)
return [commands.RunExistentTask(t) for t in idle_tasks] return [commands.RunExistingTask(t) for t in idle_tasks]
def _is_paused_or_completed(self): def _is_paused_or_completed(self):
return states.is_paused_or_completed(self.wf_ex.state) return states.is_paused_or_completed(self.wf_ex.state)

View File

@ -48,7 +48,7 @@ class RunTask(WorkflowCommand):
) )
class RunExistentTask(WorkflowCommand): class RunExistingTask(WorkflowCommand):
"""Command for running already existent task.""" """Command for running already existent task."""
def __init__(self, task_ex): def __init__(self, task_ex):
@ -56,7 +56,7 @@ class RunExistentTask(WorkflowCommand):
task_spec = spec_parser.get_task_spec(task_ex.spec) task_spec = spec_parser.get_task_spec(task_ex.spec)
self.task_ex = task_ex self.task_ex = task_ex
super(RunExistentTask, self).__init__( super(RunExistingTask, self).__init__(
wf_ex, task_spec, task_ex.in_context wf_ex, task_spec, task_ex.in_context
) )

View File

@ -13,7 +13,6 @@
# limitations under the License. # limitations under the License.
from mistral.utils import serializers from mistral.utils import serializers
from mistral.workbook.v2 import tasks as v2_tasks_spec
from mistral.workflow import states from mistral.workflow import states
@ -55,20 +54,6 @@ def find_task_execution(wf_ex, task_spec):
return task_execs[0] if len(task_execs) > 0 else None return task_execs[0] if len(task_execs) > 0 else None
def find_upstream_task_executions(wf_ex, task_spec, upstream_task_specs,
cause_task_ex=None):
# For direct workflow, if the current task does not have join, the
# task that caused this task execution is the only upstream task.
if (isinstance(task_spec, v2_tasks_spec.DirectWorkflowTaskSpec) and
not task_spec.get_join() and cause_task_ex):
return [cause_task_ex]
else:
# TODO(m4dcoder): Fix use case where there are parallel branches
# that join on a common task separately. Currently, this returns
# all tasks that list the common task as a transition.
return find_task_executions(wf_ex, upstream_task_specs)
def find_task_executions(wf_ex, task_specs): def find_task_executions(wf_ex, task_specs):
return filter( return filter(
None, None,