Creating action_handler to separate action functionality
* Now it is possible to use action executions without task and workflow executions at all. TODO (next commit): - implemeting possibility to run single action on engine Partially implements blueprint mistral-run-individual-action Change-Id: Ib6c20fec57ff1fd02bef5b1f9e782262309b6cd2
This commit is contained in:
parent
e1d2cf3ff8
commit
599fe503c1
212
mistral/engine/action_handler.py
Normal file
212
mistral/engine/action_handler.py
Normal file
@ -0,0 +1,212 @@
|
||||
# Copyright 2015 - Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.engine import rpc
|
||||
from mistral import exceptions as exc
|
||||
from mistral import expressions as expr
|
||||
from mistral.services import action_manager as a_m
|
||||
from mistral.services import security
|
||||
from mistral import utils
|
||||
from mistral.workbook import parser as spec_parser
|
||||
from mistral.workflow import states
|
||||
from mistral.workflow import utils as wf_utils
|
||||
|
||||
|
||||
def create_action_execution(action_def, action_input, task_ex=None, index=0):
|
||||
# TODO(rakhmerov): We can avoid hitting DB at all when calling something
|
||||
# create_action_execution(), these operations can be just done using
|
||||
# SQLAlchemy session (1-level cache) and session flush (on TX commit) would
|
||||
# send necessary SQL queries to DB. Currently, session flush happens
|
||||
# on every operation which may not be optimal. The problem with using just
|
||||
# session level cache is in generating ids. Ids are generated only on
|
||||
# session flush. And now we have a lot places where we need to have ids
|
||||
# before TX completion.
|
||||
|
||||
# Assign the action execution ID here to minimize database calls.
|
||||
# Otherwise, the input property of the action execution DB object needs
|
||||
# to be updated with the action execution ID after the action execution
|
||||
# DB object is created.
|
||||
action_ex_id = utils.generate_unicode_uuid()
|
||||
|
||||
if a_m.has_action_context(
|
||||
action_def.action_class, action_def.attributes or {}) and task_ex:
|
||||
action_input.update(a_m.get_action_context(task_ex, action_ex_id))
|
||||
|
||||
values = {
|
||||
'id': action_ex_id,
|
||||
'name': action_def.name,
|
||||
'spec': action_def.spec,
|
||||
'state': states.RUNNING,
|
||||
'input': action_input,
|
||||
'runtime_context': {'with_items_index': index}
|
||||
}
|
||||
|
||||
if task_ex:
|
||||
values.update({
|
||||
'task_execution_id': task_ex.id,
|
||||
'workflow_name': task_ex.workflow_name,
|
||||
'project_id': task_ex.project_id,
|
||||
})
|
||||
else:
|
||||
values.update({
|
||||
'project_id': security.get_project_id(),
|
||||
})
|
||||
|
||||
action_ex = db_api.create_action_execution(values)
|
||||
|
||||
if task_ex:
|
||||
# Add to collection explicitly so that it's in a proper
|
||||
# state within the current session.
|
||||
task_ex.executions.append(action_ex)
|
||||
|
||||
return action_ex
|
||||
|
||||
|
||||
def run_action(action_def, action_input, action_ex_id=None, target=None):
|
||||
rpc.get_executor_client().run_action(
|
||||
action_ex_id,
|
||||
action_def.action_class,
|
||||
action_def.attributes or {},
|
||||
action_input,
|
||||
target
|
||||
)
|
||||
|
||||
|
||||
def run_existing_action(action_ex_id, target):
|
||||
action_ex = db_api.get_action_execution(action_ex_id)
|
||||
action_def = db_api.get_action_definition(action_ex.name)
|
||||
|
||||
return run_action(
|
||||
action_def,
|
||||
action_ex.input,
|
||||
action_ex_id,
|
||||
target
|
||||
)
|
||||
|
||||
|
||||
def resolve_definition(action_name, task_ex=None, wf_spec=None):
|
||||
if task_ex and wf_spec:
|
||||
wf_ex = task_ex.workflow_execution
|
||||
|
||||
action_def = resolve_action_definition(
|
||||
action_name,
|
||||
wf_ex.workflow_name,
|
||||
wf_spec.get_name()
|
||||
)
|
||||
else:
|
||||
action_def = resolve_action_definition(action_name)
|
||||
|
||||
if action_def.spec:
|
||||
# Ad-hoc action.
|
||||
action_spec = spec_parser.get_action_spec(action_def.spec)
|
||||
|
||||
base_name = action_spec.get_base()
|
||||
|
||||
action_def = resolve_action_definition(
|
||||
base_name,
|
||||
task_ex.workflow_name,
|
||||
wf_spec.get_name()
|
||||
)
|
||||
|
||||
return action_def
|
||||
|
||||
|
||||
def resolve_action_definition(action_spec_name, wf_name=None,
|
||||
wf_spec_name=None):
|
||||
action_db = None
|
||||
|
||||
if wf_name and wf_name != wf_spec_name:
|
||||
# If workflow belongs to a workbook then check
|
||||
# action within the same workbook (to be able to
|
||||
# use short names within workbooks).
|
||||
# If it doesn't exist then use a name from spec
|
||||
# to find an action in DB.
|
||||
wb_name = wf_name.rstrip(wf_spec_name)[:-1]
|
||||
|
||||
action_full_name = "%s.%s" % (wb_name, action_spec_name)
|
||||
|
||||
action_db = db_api.load_action_definition(action_full_name)
|
||||
|
||||
if not action_db:
|
||||
action_db = db_api.load_action_definition(action_spec_name)
|
||||
|
||||
if not action_db:
|
||||
raise exc.InvalidActionException(
|
||||
"Failed to find action [action_name=%s]" % action_spec_name
|
||||
)
|
||||
|
||||
return action_db
|
||||
|
||||
|
||||
def transform_result(result, task_ex=None, action_ex=None):
|
||||
"""Transforms task result accounting for ad-hoc actions.
|
||||
|
||||
In case if the given result is an action result and action is
|
||||
an ad-hoc action the method transforms the result according to
|
||||
ad-hoc action configuration.
|
||||
|
||||
:param task_ex: Task DB model.
|
||||
:param result: Result of task action/workflow.
|
||||
"""
|
||||
if result.is_error():
|
||||
return result
|
||||
|
||||
action_spec_name = None
|
||||
|
||||
if task_ex:
|
||||
action_spec_name = spec_parser.get_task_spec(
|
||||
task_ex.spec).get_action_name()
|
||||
elif action_ex:
|
||||
if action_ex.spec:
|
||||
action_spec_name = spec_parser.get_action_spec(action_ex.spec)
|
||||
else:
|
||||
action_spec_name = action_ex.name
|
||||
|
||||
if action_spec_name:
|
||||
wf_ex = task_ex.workflow_execution if task_ex else None
|
||||
wf_spec_name = (spec_parser.get_workflow_spec(
|
||||
wf_ex.spec).get_name() if task_ex else None)
|
||||
|
||||
return transform_action_result(
|
||||
action_spec_name,
|
||||
result,
|
||||
wf_ex.workflow_name if wf_ex else None,
|
||||
wf_spec_name if wf_ex else None,
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def transform_action_result(action_spec_name, result,
|
||||
wf_name=None, wf_spec_name=None):
|
||||
action_def = resolve_action_definition(
|
||||
action_spec_name,
|
||||
wf_name,
|
||||
wf_spec_name
|
||||
)
|
||||
|
||||
if not action_def.spec:
|
||||
return result
|
||||
|
||||
transformer = spec_parser.get_action_spec(action_def.spec).get_output()
|
||||
|
||||
if transformer is None:
|
||||
return result
|
||||
|
||||
return wf_utils.Result(
|
||||
data=expr.evaluate_recursively(transformer, result.data),
|
||||
error=result.error
|
||||
)
|
@ -18,11 +18,11 @@ from oslo_log import log as logging
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.db.v2.sqlalchemy import models
|
||||
from mistral.engine import action_handler
|
||||
from mistral.engine import policies
|
||||
from mistral.engine import rpc
|
||||
from mistral.engine import utils as e_utils
|
||||
from mistral import expressions as expr
|
||||
from mistral.services import action_manager as a_m
|
||||
from mistral.services import scheduler
|
||||
from mistral import utils
|
||||
from mistral.utils import wf_trace
|
||||
@ -119,7 +119,7 @@ def on_action_complete(action_ex, result):
|
||||
isinstance(action_ex, models.WorkflowExecution)):
|
||||
return task_ex
|
||||
|
||||
result = e_utils.transform_result(task_ex, result)
|
||||
result = action_handler.transform_result(result, task_ex)
|
||||
|
||||
wf_ex = task_ex.workflow_execution
|
||||
|
||||
@ -164,45 +164,6 @@ def _create_task_execution(wf_ex, task_spec, ctx):
|
||||
return task_ex
|
||||
|
||||
|
||||
def _create_action_execution(task_ex, action_def, action_input, index=0):
|
||||
# TODO(rakhmerov): We can avoid hitting DB at all when calling something
|
||||
# create_action_execution(), these operations can be just done using
|
||||
# SQLAlchemy session (1-level cache) and session flush (on TX commit) would
|
||||
# send necessary SQL queries to DB. Currently, session flush happens
|
||||
# on every operation which may not be optimal. The problem with using just
|
||||
# session level cache is in generating ids. Ids are generated only on
|
||||
# session flush. And now we have a lot places where we need to have ids
|
||||
# before TX completion.
|
||||
|
||||
# Assign the action execution ID here to minimize database calls.
|
||||
# Otherwise, the input property of the action execution DB object needs
|
||||
# to be updated with the action execution ID after the action execution
|
||||
# DB object is created.
|
||||
action_ex_id = utils.generate_unicode_uuid()
|
||||
|
||||
if a_m.has_action_context(
|
||||
action_def.action_class, action_def.attributes or {}):
|
||||
action_input.update(a_m.get_action_context(task_ex, action_ex_id))
|
||||
|
||||
action_ex = db_api.create_action_execution({
|
||||
'id': action_ex_id,
|
||||
'name': action_def.name,
|
||||
'task_execution_id': task_ex.id,
|
||||
'workflow_name': task_ex.workflow_name,
|
||||
'spec': action_def.spec,
|
||||
'project_id': task_ex.project_id,
|
||||
'state': states.RUNNING,
|
||||
'input': action_input,
|
||||
'runtime_context': {'with_items_index': index}}
|
||||
)
|
||||
|
||||
# Add to collection explicitly so that it's in a proper
|
||||
# state within the current session.
|
||||
task_ex.executions.append(action_ex)
|
||||
|
||||
return action_ex
|
||||
|
||||
|
||||
def before_task_start(task_ex, task_spec, wf_spec):
|
||||
for p in policies.build_policies(task_spec.get_policies(), wf_spec):
|
||||
p.before_task_start(task_ex, task_spec)
|
||||
@ -309,10 +270,10 @@ def _get_action_input(wf_spec, task_ex, task_spec, ctx):
|
||||
|
||||
action_spec_name = task_spec.get_action_name()
|
||||
|
||||
action_def = e_utils.resolve_action_definition(
|
||||
action_def = action_handler.resolve_action_definition(
|
||||
action_spec_name,
|
||||
task_ex.workflow_name,
|
||||
wf_spec.get_name(),
|
||||
action_spec_name
|
||||
wf_spec.get_name()
|
||||
)
|
||||
|
||||
input_dict = utils.merge_dicts(
|
||||
@ -327,10 +288,10 @@ def _get_action_input(wf_spec, task_ex, task_spec, ctx):
|
||||
|
||||
base_name = action_spec.get_base()
|
||||
|
||||
action_def = e_utils.resolve_action_definition(
|
||||
action_def = action_handler.resolve_action_definition(
|
||||
base_name,
|
||||
task_ex.workflow_name,
|
||||
wf_spec.get_name(),
|
||||
base_name
|
||||
wf_spec.get_name()
|
||||
)
|
||||
|
||||
e_utils.validate_input(action_def, action_spec, input_dict)
|
||||
@ -384,27 +345,14 @@ def _schedule_run_action(task_ex, task_spec, action_input, index):
|
||||
|
||||
action_spec_name = task_spec.get_action_name()
|
||||
|
||||
# TODO(rakhmerov): Refactor ad-hoc actions and isolate them.
|
||||
action_def = e_utils.resolve_action_definition(
|
||||
wf_ex.workflow_name,
|
||||
wf_spec.get_name(),
|
||||
action_spec_name
|
||||
action_def = action_handler.resolve_definition(
|
||||
action_spec_name,
|
||||
task_ex,
|
||||
wf_spec
|
||||
)
|
||||
|
||||
if action_def.spec:
|
||||
# Ad-hoc action.
|
||||
action_spec = spec_parser.get_action_spec(action_def.spec)
|
||||
|
||||
base_name = action_spec.get_base()
|
||||
|
||||
action_def = e_utils.resolve_action_definition(
|
||||
task_ex.workflow_name,
|
||||
wf_spec.get_name(),
|
||||
base_name
|
||||
)
|
||||
|
||||
action_ex = _create_action_execution(
|
||||
task_ex, action_def, action_input, index
|
||||
action_ex = action_handler.create_action_execution(
|
||||
action_def, action_input, task_ex, index
|
||||
)
|
||||
|
||||
target = expr.evaluate_recursively(
|
||||
@ -417,7 +365,7 @@ def _schedule_run_action(task_ex, task_spec, action_input, index):
|
||||
|
||||
scheduler.schedule_call(
|
||||
None,
|
||||
'mistral.engine.task_handler.run_action',
|
||||
'mistral.engine.action_handler.run_existing_action',
|
||||
0,
|
||||
action_ex_id=action_ex.id,
|
||||
target=target
|
||||
@ -428,13 +376,13 @@ def _schedule_noop_action(task_ex, task_spec):
|
||||
wf_ex = task_ex.workflow_execution
|
||||
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
|
||||
|
||||
action_def = e_utils.resolve_action_definition(
|
||||
action_def = action_handler.resolve_action_definition(
|
||||
'std.noop',
|
||||
wf_ex.workflow_name,
|
||||
wf_spec.get_name(),
|
||||
'std.noop'
|
||||
wf_spec.get_name()
|
||||
)
|
||||
|
||||
action_ex = _create_action_execution(task_ex, action_def, {})
|
||||
action_ex = action_handler.create_action_execution(action_def, {}, task_ex)
|
||||
|
||||
target = expr.evaluate_recursively(
|
||||
task_spec.get_target(),
|
||||
@ -443,26 +391,13 @@ def _schedule_noop_action(task_ex, task_spec):
|
||||
|
||||
scheduler.schedule_call(
|
||||
None,
|
||||
'mistral.engine.task_handler.run_action',
|
||||
'mistral.engine.action_handler.run_existing_action',
|
||||
0,
|
||||
action_ex_id=action_ex.id,
|
||||
target=target
|
||||
)
|
||||
|
||||
|
||||
def run_action(action_ex_id, target):
|
||||
action_ex = db_api.get_action_execution(action_ex_id)
|
||||
action_def = db_api.get_action_definition(action_ex.name)
|
||||
|
||||
rpc.get_executor_client().run_action(
|
||||
action_ex.id,
|
||||
action_def.action_class,
|
||||
action_def.attributes or {},
|
||||
action_ex.input,
|
||||
target
|
||||
)
|
||||
|
||||
|
||||
def _schedule_run_workflow(task_ex, task_spec, wf_input, index):
|
||||
parent_wf_ex = task_ex.workflow_execution
|
||||
parent_wf_spec = spec_parser.get_workflow_spec(parent_wf_ex.spec)
|
||||
|
@ -20,10 +20,7 @@ from oslo_log import log as logging
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral import exceptions as exc
|
||||
from mistral import expressions as expr
|
||||
from mistral import utils
|
||||
from mistral.workbook import parser as spec_parser
|
||||
from mistral.workflow import utils as wf_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -59,32 +56,6 @@ def validate_input(definition, spec, input):
|
||||
utils.merge_dicts(input, spec.get_input(), overwrite=False)
|
||||
|
||||
|
||||
def resolve_action_definition(wf_name, wf_spec_name, action_spec_name):
|
||||
action_db = None
|
||||
|
||||
if wf_name != wf_spec_name:
|
||||
# If workflow belongs to a workbook then check
|
||||
# action within the same workbook (to be able to
|
||||
# use short names within workbooks).
|
||||
# If it doesn't exist then use a name from spec
|
||||
# to find an action in DB.
|
||||
wb_name = wf_name.rstrip(wf_spec_name)[:-1]
|
||||
|
||||
action_full_name = "%s.%s" % (wb_name, action_spec_name)
|
||||
|
||||
action_db = db_api.load_action_definition(action_full_name)
|
||||
|
||||
if not action_db:
|
||||
action_db = db_api.load_action_definition(action_spec_name)
|
||||
|
||||
if not action_db:
|
||||
raise exc.InvalidActionException(
|
||||
"Failed to find action [action_name=%s]" % action_spec_name
|
||||
)
|
||||
|
||||
return action_db
|
||||
|
||||
|
||||
def resolve_workflow_definition(parent_wf_name, parent_wf_spec_name,
|
||||
wf_spec_name):
|
||||
wf_def = None
|
||||
@ -110,57 +81,3 @@ def resolve_workflow_definition(parent_wf_name, parent_wf_spec_name,
|
||||
)
|
||||
|
||||
return wf_def
|
||||
|
||||
|
||||
# TODO(rakhmerov): Think of a better home for this method.
|
||||
# Looks like we need a special module for ad-hoc actions.
|
||||
def transform_result(task_ex, result):
|
||||
"""Transforms task result accounting for ad-hoc actions.
|
||||
|
||||
In case if the given result is an action result and action is
|
||||
an ad-hoc action the method transforms the result according to
|
||||
ad-hoc action configuration.
|
||||
|
||||
:param task_ex: Task DB model.
|
||||
:param result: Result of task action/workflow.
|
||||
"""
|
||||
if result.is_error():
|
||||
return result
|
||||
|
||||
action_spec_name = spec_parser.get_task_spec(
|
||||
task_ex.spec).get_action_name()
|
||||
|
||||
if action_spec_name:
|
||||
wf_ex = task_ex.workflow_execution
|
||||
wf_spec_name = spec_parser.get_workflow_spec(wf_ex.spec).get_name()
|
||||
|
||||
return transform_action_result(
|
||||
wf_ex.workflow_name,
|
||||
wf_spec_name,
|
||||
action_spec_name,
|
||||
result
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# TODO(rakhmerov): Should probably go into task handler.
|
||||
def transform_action_result(wf_name, wf_spec_name, action_spec_name, result):
|
||||
action_def = resolve_action_definition(
|
||||
wf_name,
|
||||
wf_spec_name,
|
||||
action_spec_name
|
||||
)
|
||||
|
||||
if not action_def.spec:
|
||||
return result
|
||||
|
||||
transformer = spec_parser.get_action_spec(action_def.spec).get_output()
|
||||
|
||||
if transformer is None:
|
||||
return result
|
||||
|
||||
return wf_utils.Result(
|
||||
data=expr.evaluate_recursively(transformer, result.data),
|
||||
error=result.error
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user