Optimize action scheduling
* If "target" property of a task is empty, there's no need to
evaluate an expression
* Refactored action input validation logic to avoid preparing
an action input twice for ad-hoc actions. The method
"_prepare_input" was previously called twice for ad-hoc actions
although it may be pretty expensive in some cases.
* Added more profiler decorators to track performance of ad-hoc
actions
Closes-Bug: #1821858
Change-Id: Ia56e2fda57c3af1a86a638018c23dd1c3281debb
(cherry picked from commit a73e3defea
)
This commit is contained in:
parent
35dc4eec23
commit
192af4ee12
@ -211,6 +211,11 @@ class Action(object):
|
||||
class PythonAction(Action):
|
||||
"""Regular Python action."""
|
||||
|
||||
def __init__(self, action_def, action_ex=None, task_ex=None):
|
||||
super(PythonAction, self).__init__(action_def, action_ex, task_ex)
|
||||
|
||||
self._prepared_input = None
|
||||
|
||||
@profiler.trace('action-complete', hide_args=True)
|
||||
def complete(self, result):
|
||||
assert self.action_ex
|
||||
@ -238,6 +243,8 @@ class PythonAction(Action):
|
||||
timeout=None):
|
||||
assert not self.action_ex
|
||||
|
||||
self.validate_input(input_dict)
|
||||
|
||||
# Assign the action execution ID here to minimize database calls.
|
||||
# Otherwise, the input property of the action execution DB object needs
|
||||
# to be updated with the action execution ID after the action execution
|
||||
@ -277,8 +284,9 @@ class PythonAction(Action):
|
||||
safe_rerun=False, timeout=None):
|
||||
assert not self.action_ex
|
||||
|
||||
input_dict = self._prepare_input(input_dict)
|
||||
runtime_ctx = self._prepare_runtime_context(index, safe_rerun)
|
||||
self.validate_input(input_dict)
|
||||
|
||||
prepared_input_dict = self._prepare_input(input_dict)
|
||||
|
||||
# Assign the action execution ID here to minimize database calls.
|
||||
# Otherwise, the input property of the action execution DB object needs
|
||||
@ -288,8 +296,8 @@ class PythonAction(Action):
|
||||
|
||||
if save:
|
||||
self._create_action_execution(
|
||||
input_dict,
|
||||
runtime_ctx,
|
||||
prepared_input_dict,
|
||||
self._prepare_runtime_context(index, safe_rerun),
|
||||
self.is_sync(input_dict),
|
||||
desc=desc,
|
||||
action_ex_id=action_ex_id
|
||||
@ -303,7 +311,7 @@ class PythonAction(Action):
|
||||
self.action_ex.id if self.action_ex else None,
|
||||
self.action_def.action_class,
|
||||
self.action_def.attributes or {},
|
||||
input_dict,
|
||||
prepared_input_dict,
|
||||
safe_rerun,
|
||||
execution_context,
|
||||
target=target,
|
||||
@ -314,14 +322,13 @@ class PythonAction(Action):
|
||||
return self._prepare_output(result)
|
||||
|
||||
def is_sync(self, input_dict):
|
||||
input_dict = self._prepare_input(input_dict)
|
||||
prepared_input_dict = self._prepare_input(input_dict)
|
||||
|
||||
a = a_m.get_action_class(self.action_def.name)(**input_dict)
|
||||
a = a_m.get_action_class(self.action_def.name)(**prepared_input_dict)
|
||||
|
||||
return a.is_sync()
|
||||
|
||||
def validate_input(self, input_dict):
|
||||
|
||||
# NOTE(kong): Don't validate action input if action initialization
|
||||
# method contains ** argument.
|
||||
if '**' in self.action_def.input:
|
||||
@ -337,19 +344,20 @@ class PythonAction(Action):
|
||||
)
|
||||
|
||||
def _prepare_execution_context(self):
|
||||
|
||||
exc_ctx = {}
|
||||
|
||||
if self.task_ex:
|
||||
wf_ex = self.task_ex.workflow_execution
|
||||
|
||||
exc_ctx['workflow_execution_id'] = wf_ex.id
|
||||
exc_ctx['task_execution_id'] = self.task_ex.id
|
||||
exc_ctx['workflow_name'] = wf_ex.name
|
||||
|
||||
if self.action_ex:
|
||||
exc_ctx['action_execution_id'] = self.action_ex.id
|
||||
callback_url = '/v2/action_executions/%s' % self.action_ex.id
|
||||
exc_ctx['callback_url'] = callback_url
|
||||
exc_ctx['callback_url'] = (
|
||||
'/v2/action_executions/%s' % self.action_ex.id
|
||||
)
|
||||
|
||||
return exc_ctx
|
||||
|
||||
@ -379,6 +387,7 @@ class PythonAction(Action):
|
||||
class AdHocAction(PythonAction):
|
||||
"""Ad-hoc action."""
|
||||
|
||||
@profiler.trace('ad-hoc-action-init', hide_args=True)
|
||||
def __init__(self, action_def, action_ex=None, task_ex=None, task_ctx=None,
|
||||
wf_ctx=None):
|
||||
self.action_spec = spec_parser.get_action_spec(action_def.spec)
|
||||
@ -408,6 +417,7 @@ class AdHocAction(PythonAction):
|
||||
self.task_ctx = task_ctx or {}
|
||||
self.wf_ctx = wf_ctx or {}
|
||||
|
||||
@profiler.trace('ad-hoc-action-validate-input', hide_args=True)
|
||||
def validate_input(self, input_dict):
|
||||
expected_input = self.action_spec.get_input()
|
||||
|
||||
@ -422,11 +432,16 @@ class AdHocAction(PythonAction):
|
||||
self._prepare_input(input_dict)
|
||||
)
|
||||
|
||||
@profiler.trace('ad-hoc-action-prepare-input', hide_args=True)
|
||||
def _prepare_input(self, input_dict):
|
||||
if self._prepared_input is not None:
|
||||
return self._prepared_input
|
||||
|
||||
base_input_dict = input_dict
|
||||
|
||||
for action_def in self.adhoc_action_defs:
|
||||
action_spec = spec_parser.get_action_spec(action_def.spec)
|
||||
|
||||
for k, v in action_spec.get_input().items():
|
||||
if (k not in base_input_dict or
|
||||
base_input_dict[k] is utils.NotDefined):
|
||||
@ -453,8 +468,13 @@ class AdHocAction(PythonAction):
|
||||
else:
|
||||
base_input_dict = {}
|
||||
|
||||
return super(AdHocAction, self)._prepare_input(base_input_dict)
|
||||
self._prepared_input = super(AdHocAction, self)._prepare_input(
|
||||
base_input_dict
|
||||
)
|
||||
|
||||
return self._prepared_input
|
||||
|
||||
@profiler.trace('ad-hoc-action-prepare-output', hide_args=True)
|
||||
def _prepare_output(self, result):
|
||||
# In case of error, we don't transform a result.
|
||||
if not result.is_error():
|
||||
@ -476,6 +496,7 @@ class AdHocAction(PythonAction):
|
||||
|
||||
return result
|
||||
|
||||
@profiler.trace('ad-hoc-action-prepare-runtime-context', hide_args=True)
|
||||
def _prepare_runtime_context(self, index, safe_rerun):
|
||||
ctx = super(AdHocAction, self)._prepare_runtime_context(
|
||||
index,
|
||||
@ -489,6 +510,7 @@ class AdHocAction(PythonAction):
|
||||
{'adhoc_action_name': self.adhoc_action_def.name}
|
||||
)
|
||||
|
||||
@profiler.trace('ad-hoc-action-gather-base-actions', hide_args=True)
|
||||
def _gather_base_actions(self, action_def, base_action_def):
|
||||
"""Find all base ad-hoc actions and store them
|
||||
|
||||
@ -537,6 +559,7 @@ class WorkflowAction(Action):
|
||||
|
||||
def __init__(self, wf_name, **kwargs):
|
||||
super(WorkflowAction, self).__init__(None, **kwargs)
|
||||
|
||||
self.wf_name = wf_name
|
||||
|
||||
@profiler.trace('workflow-action-complete', hide_args=True)
|
||||
@ -544,11 +567,13 @@ class WorkflowAction(Action):
|
||||
# No-op because in case of workflow result is already processed.
|
||||
pass
|
||||
|
||||
@profiler.trace('workflkow-action-schedule', hide_args=True)
|
||||
@profiler.trace('workflow-action-schedule', hide_args=True)
|
||||
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False,
|
||||
timeout=None):
|
||||
assert not self.action_ex
|
||||
|
||||
self.validate_input(input_dict)
|
||||
|
||||
parent_wf_ex = self.task_ex.workflow_execution
|
||||
parent_wf_spec = spec_parser.get_workflow_spec_by_execution_id(
|
||||
parent_wf_ex.id
|
||||
|
@ -103,8 +103,12 @@ class DefaultEngine(base.Engine):
|
||||
|
||||
return action.action_ex.get_clone()
|
||||
|
||||
output = action.run(action_input, target, save=False,
|
||||
timeout=timeout)
|
||||
output = action.run(
|
||||
action_input,
|
||||
target,
|
||||
save=False,
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
state = states.SUCCESS if output.is_success() else states.ERROR
|
||||
|
||||
|
@ -537,6 +537,9 @@ class RegularTask(Task):
|
||||
|
||||
@profiler.trace('regular-task-get-target', hide_args=True)
|
||||
def _get_target(self, input_dict):
|
||||
if not self.task_spec.get_target():
|
||||
return None
|
||||
|
||||
ctx_view = data_flow.ContextView(
|
||||
input_dict,
|
||||
self.ctx,
|
||||
@ -552,7 +555,11 @@ class RegularTask(Task):
|
||||
|
||||
@profiler.trace('regular-task-get-action-input', hide_args=True)
|
||||
def _get_action_input(self, ctx=None):
|
||||
input_dict = self._evaluate_expression(self.task_spec.get_input(), ctx)
|
||||
input_spec = self.task_spec.get_input()
|
||||
|
||||
input_dict = (
|
||||
self._evaluate_expression(input_spec, ctx) if input_spec else {}
|
||||
)
|
||||
|
||||
if not isinstance(input_dict, dict):
|
||||
raise exc.InputException(
|
||||
@ -576,10 +583,7 @@ class RegularTask(Task):
|
||||
self.wf_ex.input
|
||||
)
|
||||
|
||||
return expr.evaluate_recursively(
|
||||
expression,
|
||||
ctx_view
|
||||
)
|
||||
return expr.evaluate_recursively(expression, ctx_view)
|
||||
|
||||
def _build_action(self):
|
||||
action_name = self.task_spec.get_action_name()
|
||||
|
Loading…
Reference in New Issue
Block a user