Pass 'safe-rerun' param to RPC layer

Partially implements blueprint mistral-task-delivery-model

Change-Id: I3355af6277ca38049b79067b2535bdbf41f1c31f
This commit is contained in:
Dawid Deja 2016-07-07 16:12:08 +02:00
parent d2dba589dd
commit 6a69d3684b
4 changed files with 46 additions and 21 deletions

View File

@ -75,7 +75,7 @@ class Action(object):
self.action_ex.output = {'result': msg} self.action_ex.output = {'result': msg}
@abc.abstractmethod @abc.abstractmethod
def schedule(self, input_dict, target, index=0, desc=''): def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False):
"""Schedule action run. """Schedule action run.
This method is needed to schedule action run so its result can This method is needed to schedule action run so its result can
@ -88,11 +88,14 @@ class Action(object):
:param target: Target (group of action executors). :param target: Target (group of action executors).
:param index: Action execution index. Makes sense for some types. :param index: Action execution index. Makes sense for some types.
:param desc: Action execution description. :param desc: Action execution description.
:param safe_rerun: If true, action would be re-run if executor dies
during execution.
""" """
raise NotImplementedError raise NotImplementedError
@abc.abstractmethod @abc.abstractmethod
def run(self, input_dict, target, index=0, desc='', save=True): def run(self, input_dict, target, index=0, desc='', save=True,
safe_rerun=False):
"""Immediately run action. """Immediately run action.
This method runs method w/o scheduling its run for a later time. This method runs method w/o scheduling its run for a later time.
@ -104,6 +107,8 @@ class Action(object):
:param index: Action execution index. Makes sense for some types. :param index: Action execution index. Makes sense for some types.
:param desc: Action execution description. :param desc: Action execution description.
:param save: True if action execution object needs to be saved. :param save: True if action execution object needs to be saved.
:param safe_rerun: If true, action would be re-run if executor dies
during execution.
:return: Action output. :return: Action output.
""" """
raise NotImplementedError raise NotImplementedError
@ -196,7 +201,7 @@ class PythonAction(Action):
self._log_result(prev_state, result) self._log_result(prev_state, result)
@profiler.trace('action-schedule') @profiler.trace('action-schedule')
def schedule(self, input_dict, target, index=0, desc=''): def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False):
assert not self.action_ex assert not self.action_ex
# Assign the action execution ID here to minimize database calls. # Assign the action execution ID here to minimize database calls.
@ -209,7 +214,7 @@ class PythonAction(Action):
self._create_action_execution( self._create_action_execution(
self._prepare_input(input_dict), self._prepare_input(input_dict),
self._prepare_runtime_context(index), self._prepare_runtime_context(index, safe_rerun),
desc=desc, desc=desc,
action_ex_id=action_ex_id action_ex_id=action_ex_id
) )
@ -223,11 +228,12 @@ class PythonAction(Action):
) )
@profiler.trace('action-run') @profiler.trace('action-run')
def run(self, input_dict, target, index=0, desc='', save=True): def run(self, input_dict, target, index=0, desc='', save=True,
safe_rerun=False):
assert not self.action_ex assert not self.action_ex
input_dict = self._prepare_input(input_dict) input_dict = self._prepare_input(input_dict)
runtime_ctx = self._prepare_runtime_context(index) runtime_ctx = self._prepare_runtime_context(index, safe_rerun)
# Assign the action execution ID here to minimize database calls. # Assign the action execution ID here to minimize database calls.
# Otherwise, the input property of the action execution DB object needs # Otherwise, the input property of the action execution DB object needs
@ -251,7 +257,8 @@ class PythonAction(Action):
self.action_def.attributes or {}, self.action_def.attributes or {},
input_dict, input_dict,
target, target,
async=False async=False,
safe_rerun=safe_rerun
) )
return self._prepare_output(result) return self._prepare_output(result)
@ -287,12 +294,13 @@ class PythonAction(Action):
""" """
return _get_action_output(result) if result else None return _get_action_output(result) if result else None
def _prepare_runtime_context(self, index): def _prepare_runtime_context(self, index, safe_rerun):
"""Template method to prepare action runtime context. """Template method to prepare action runtime context.
Python action inserts index into runtime context. Python action inserts index into runtime context and information if
given action is safe_rerun.
""" """
return {'index': index} return {'index': index, 'safe_rerun': safe_rerun}
def _insert_action_context(self, action_ex_id, input_dict, save=True): def _insert_action_context(self, action_ex_id, input_dict, save=True):
"""Template method to prepare action context. """Template method to prepare action context.
@ -378,8 +386,11 @@ class AdHocAction(PythonAction):
return _get_action_output(result) if result else None return _get_action_output(result) if result else None
def _prepare_runtime_context(self, index): def _prepare_runtime_context(self, index, safe_rerun):
ctx = super(AdHocAction, self)._prepare_runtime_context(index) ctx = super(AdHocAction, self)._prepare_runtime_context(
index,
safe_rerun
)
# Insert special field into runtime context so that we track # Insert special field into runtime context so that we track
# a relationship between python action and adhoc action. # a relationship between python action and adhoc action.
@ -432,7 +443,7 @@ class WorkflowAction(Action):
pass pass
@profiler.trace('action-schedule') @profiler.trace('action-schedule')
def schedule(self, input_dict, target, index=0, desc=''): def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False):
assert not self.action_ex assert not self.action_ex
parent_wf_ex = self.task_ex.workflow_execution parent_wf_ex = self.task_ex.workflow_execution
@ -471,7 +482,8 @@ class WorkflowAction(Action):
) )
@profiler.trace('action-run') @profiler.trace('action-run')
def run(self, input_dict, target, index=0, desc='', save=True): def run(self, input_dict, target, index=0, desc='', save=True,
safe_rerun=True):
raise NotImplemented('Does not apply to this WorkflowAction.') raise NotImplemented('Does not apply to this WorkflowAction.')
def is_sync(self, input_dict): def is_sync(self, input_dict):
@ -492,7 +504,8 @@ def _run_existing_action(action_ex_id, target):
action_def.action_class, action_def.action_class,
action_def.attributes or {}, action_def.attributes or {},
action_ex.input, action_ex.input,
target target,
safe_rerun=action_ex.runtime_context.get('safe_rerun', False)
) )
return _get_action_output(result) if result else None return _get_action_output(result) if result else None

View File

@ -518,7 +518,7 @@ class ExecutorClient(base.Executor):
self._client = get_rpc_client_driver()(rpc_conf_dict) self._client = get_rpc_client_driver()(rpc_conf_dict)
def run_action(self, action_ex_id, action_class_str, attributes, def run_action(self, action_ex_id, action_class_str, attributes,
action_params, target=None, async=True): action_params, target=None, async=True, safe_rerun=False):
"""Sends a request to run action to executor. """Sends a request to run action to executor.
:param action_ex_id: Action execution id. :param action_ex_id: Action execution id.
@ -528,6 +528,8 @@ class ExecutorClient(base.Executor):
:param target: Target (group of action executors). :param target: Target (group of action executors).
:param async: If True, run action in asynchronous mode (w/o waiting :param async: If True, run action in asynchronous mode (w/o waiting
for completion). for completion).
:param safe_rerun: If true, action would be re-run if executor dies
during execution.
:return: Action result. :return: Action result.
""" """
@ -535,7 +537,7 @@ class ExecutorClient(base.Executor):
'action_ex_id': action_ex_id, 'action_ex_id': action_ex_id,
'action_class_str': action_class_str, 'action_class_str': action_class_str,
'attributes': attributes, 'attributes': attributes,
'params': action_params 'params': action_params,
} }
rpc_client_method = (self._client.async_call rpc_client_method = (self._client.async_call

View File

@ -311,7 +311,11 @@ class RegularTask(Task):
action.validate_input(input_dict) action.validate_input(input_dict)
action.schedule(input_dict, target) action.schedule(
input_dict,
target,
safe_rerun=self.task_spec.get_safe_rerun()
)
def _get_target(self, input_dict): def _get_target(self, input_dict):
return expr.evaluate_recursively( return expr.evaluate_recursively(
@ -399,7 +403,12 @@ class WithItemsTask(RegularTask):
action = self._build_action() action = self._build_action()
action.schedule(input_dict, target, index=idx) action.schedule(
input_dict,
target,
index=idx,
safe_rerun=self.task_spec.get_safe_rerun()
)
def _get_with_items_input(self): def _get_with_items_input(self):
"""Calculate input array for separating each action input. """Calculate input array for separating each action input.

View File

@ -78,7 +78,7 @@ workflows:
def _run_at_target(action_ex_id, action_class_str, attributes, def _run_at_target(action_ex_id, action_class_str, attributes,
action_params, target=None, async=True): action_params, target=None, async=True, safe_rerun=False):
# We'll just call executor directly for testing purposes. # We'll just call executor directly for testing purposes.
executor = default_executor.DefaultExecutor(rpc.get_engine_client()) executor = default_executor.DefaultExecutor(rpc.get_engine_client())
@ -172,7 +172,8 @@ class EnvironmentTest(base.EngineTestCase):
'mistral.actions.std_actions.EchoAction', 'mistral.actions.std_actions.EchoAction',
{}, {},
a_ex.input, a_ex.input,
TARGET TARGET,
safe_rerun=False
) )
def test_subworkflow_env_task_input(self): def test_subworkflow_env_task_input(self):