Implementing 'start_action' on engine side
* new engine method (symmetrically to start_workflow) - start_action; * possibility to run action without saving the result to the DB; * adjusted model_base: updated_at indeed can be None in set of cases; * improved engine.utils.validate_input for checking action_input (also adhoc action input); for this new util method for getting input dict from input string is introduced; * executor client can call rpc method synchronously for immediately returning result from action and without callback to engine; * fixed uploading actions from workbook; * improved action_handler; * improved inspect_utils for input validation needs. TODO (next commit): - Implementing run action API side Partially implements blueprint mistral-run-individual-action Change-Id: I2fc1f3bb4382b72d6de7d7508c82d64e64ca656c
This commit is contained in:
parent
599fe503c1
commit
9d5f197869
@ -84,7 +84,12 @@ class _MistralModelBase(oslo_models.ModelBase, oslo_models.TimestampMixin):
|
||||
setattr(m, col.name, getattr(self, col.name))
|
||||
|
||||
setattr(m, 'created_at', getattr(self, 'created_at').isoformat(' '))
|
||||
setattr(m, 'updated_at', getattr(self, 'updated_at').isoformat(' '))
|
||||
|
||||
updated_at = getattr(self, 'updated_at')
|
||||
# NOTE(nmakhotkin): 'updated_at' field is empty for just created
|
||||
# object since it has not updated yet.
|
||||
if updated_at:
|
||||
setattr(m, 'updated_at', updated_at.isoformat(' '))
|
||||
|
||||
return m
|
||||
|
||||
|
@ -15,17 +15,20 @@
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.engine import rpc
|
||||
from mistral.engine import utils as e_utils
|
||||
from mistral import exceptions as exc
|
||||
from mistral import expressions as expr
|
||||
from mistral.services import action_manager as a_m
|
||||
from mistral.services import security
|
||||
from mistral import utils
|
||||
from mistral.utils import wf_trace
|
||||
from mistral.workbook import parser as spec_parser
|
||||
from mistral.workflow import states
|
||||
from mistral.workflow import utils as wf_utils
|
||||
|
||||
|
||||
def create_action_execution(action_def, action_input, task_ex=None, index=0):
|
||||
def create_action_execution(action_def, action_input, task_ex=None,
|
||||
index=0, description=''):
|
||||
# TODO(rakhmerov): We can avoid hitting DB at all when calling something
|
||||
# create_action_execution(), these operations can be just done using
|
||||
# SQLAlchemy session (1-level cache) and session flush (on TX commit) would
|
||||
@ -51,7 +54,8 @@ def create_action_execution(action_def, action_input, task_ex=None, index=0):
|
||||
'spec': action_def.spec,
|
||||
'state': states.RUNNING,
|
||||
'input': action_input,
|
||||
'runtime_context': {'with_items_index': index}
|
||||
'runtime_context': {'with_items_index': index},
|
||||
'description': description
|
||||
}
|
||||
|
||||
if task_ex:
|
||||
@ -75,13 +79,102 @@ def create_action_execution(action_def, action_input, task_ex=None, index=0):
|
||||
return action_ex
|
||||
|
||||
|
||||
def run_action(action_def, action_input, action_ex_id=None, target=None):
|
||||
rpc.get_executor_client().run_action(
|
||||
def _inject_action_ctx_for_validating(action_def, input_dict):
|
||||
if a_m.has_action_context(action_def.action_class, action_def.attributes):
|
||||
input_dict.update(a_m.get_empty_action_context())
|
||||
|
||||
|
||||
def get_action_input(action_name, input_dict, wf_name=None, wf_spec=None):
|
||||
action_def = resolve_action_definition(
|
||||
action_name,
|
||||
wf_name,
|
||||
wf_spec.get_name() if wf_spec else None
|
||||
)
|
||||
|
||||
if action_def.action_class:
|
||||
_inject_action_ctx_for_validating(action_def, input_dict)
|
||||
e_utils.validate_input(action_def, input_dict)
|
||||
|
||||
if action_def.spec:
|
||||
# Ad-hoc action.
|
||||
return _get_adhoc_action_input(
|
||||
action_def,
|
||||
input_dict,
|
||||
wf_name,
|
||||
wf_spec
|
||||
)
|
||||
|
||||
return input_dict
|
||||
|
||||
|
||||
def _get_adhoc_action_input(action_def, input_dict,
|
||||
wf_name=None, wf_spec=None):
|
||||
action_spec = spec_parser.get_action_spec(action_def.spec)
|
||||
|
||||
base_name = action_spec.get_base()
|
||||
|
||||
action_def = resolve_action_definition(
|
||||
base_name,
|
||||
wf_name if wf_name else None,
|
||||
wf_spec.get_name() if wf_spec else None
|
||||
)
|
||||
|
||||
_inject_action_ctx_for_validating(action_def, input_dict)
|
||||
e_utils.validate_input(action_def, input_dict, action_spec)
|
||||
|
||||
base_input = action_spec.get_base_input()
|
||||
|
||||
if base_input:
|
||||
input_dict = expr.evaluate_recursively(
|
||||
base_input,
|
||||
input_dict
|
||||
)
|
||||
else:
|
||||
input_dict = {}
|
||||
|
||||
return input_dict
|
||||
|
||||
|
||||
def run_action(action_def, action_input,
|
||||
action_ex_id=None, target=None, async=True):
|
||||
return rpc.get_executor_client().run_action(
|
||||
action_ex_id,
|
||||
action_def.action_class,
|
||||
action_def.attributes or {},
|
||||
action_input,
|
||||
target
|
||||
target,
|
||||
async
|
||||
)
|
||||
|
||||
|
||||
def store_action_result(action_ex, result):
|
||||
prev_state = action_ex.state
|
||||
|
||||
if result.is_success():
|
||||
action_ex.state = states.SUCCESS
|
||||
action_ex.output = {'result': result.data}
|
||||
action_ex.accepted = True
|
||||
else:
|
||||
action_ex.state = states.ERROR
|
||||
action_ex.output = {'result': result.error}
|
||||
action_ex.accepted = False
|
||||
|
||||
_log_action_result(action_ex, prev_state, action_ex.state, result)
|
||||
|
||||
return action_ex
|
||||
|
||||
|
||||
def _log_action_result(action_ex, from_state, to_state, result):
|
||||
def _result_msg():
|
||||
if action_ex.state == states.ERROR:
|
||||
return "error = %s" % utils.cut(result.error)
|
||||
|
||||
return "result = %s" % utils.cut(result.data)
|
||||
|
||||
wf_trace.info(
|
||||
None,
|
||||
"Action execution '%s' [%s -> %s, %s]" %
|
||||
(action_ex.name, from_state, to_state, _result_msg())
|
||||
)
|
||||
|
||||
|
||||
@ -117,8 +210,8 @@ def resolve_definition(action_name, task_ex=None, wf_spec=None):
|
||||
|
||||
action_def = resolve_action_definition(
|
||||
base_name,
|
||||
task_ex.workflow_name,
|
||||
wf_spec.get_name()
|
||||
task_ex.workflow_name if task_ex else None,
|
||||
wf_spec.get_name() if wf_spec else None
|
||||
)
|
||||
|
||||
return action_def
|
||||
|
@ -40,6 +40,19 @@ class Engine(object):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def start_action(self, action_name, action_input,
|
||||
description=None, **params):
|
||||
"""Starts the specific action.
|
||||
|
||||
:param action_name: Action name.
|
||||
:param action_input: Action input data as a dictionary.
|
||||
:param description: Execution description.
|
||||
:param params: Additional options for action running.
|
||||
:return: Action execution object.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def on_action_complete(self, action_ex_id, result):
|
||||
"""Accepts action result and continues the workflow.
|
||||
|
@ -20,10 +20,12 @@ from oslo_log import log as logging
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.db.v2.sqlalchemy import models as db_models
|
||||
from mistral.engine import action_handler
|
||||
from mistral.engine import base
|
||||
from mistral.engine import task_handler
|
||||
from mistral.engine import utils as eng_utils
|
||||
from mistral.engine import workflow_handler as wf_handler
|
||||
from mistral.services import action_manager as a_m
|
||||
from mistral import utils as u
|
||||
from mistral.utils import wf_trace
|
||||
from mistral.workbook import parser as spec_parser
|
||||
@ -56,7 +58,7 @@ class DefaultEngine(base.Engine):
|
||||
wf_def = db_api.get_workflow_definition(wf_name)
|
||||
wf_spec = spec_parser.get_workflow_spec(wf_def.spec)
|
||||
|
||||
eng_utils.validate_input(wf_def, wf_spec, wf_input)
|
||||
eng_utils.validate_input(wf_def, wf_input, wf_spec)
|
||||
|
||||
wf_ex = self._create_workflow_execution(
|
||||
wf_def,
|
||||
@ -88,6 +90,50 @@ class DefaultEngine(base.Engine):
|
||||
self._fail_workflow(wf_exec_id, e)
|
||||
raise e
|
||||
|
||||
@u.log_exec(LOG)
|
||||
def start_action(self, action_name, action_input,
|
||||
description=None, **params):
|
||||
with db_api.transaction():
|
||||
action_def = action_handler.resolve_definition(action_name)
|
||||
resolved_action_input = action_handler.get_action_input(
|
||||
action_name,
|
||||
action_input
|
||||
)
|
||||
action = a_m.get_action_class(action_def.name)(
|
||||
**resolved_action_input
|
||||
)
|
||||
|
||||
# If we see action is asynchronous, then we enforce 'save_result'.
|
||||
if params.get('save_result') or not action.is_sync():
|
||||
action_ex = action_handler.create_action_execution(
|
||||
action_def,
|
||||
resolved_action_input,
|
||||
description=description
|
||||
)
|
||||
|
||||
action_handler.run_action(
|
||||
action_def,
|
||||
resolved_action_input,
|
||||
action_ex.id,
|
||||
params.get('target')
|
||||
)
|
||||
|
||||
return action_ex
|
||||
else:
|
||||
result = action_handler.run_action(
|
||||
action_def,
|
||||
resolved_action_input,
|
||||
target=params.get('target'),
|
||||
async=False
|
||||
)
|
||||
|
||||
return db_models.ActionExecution(
|
||||
name=action_name,
|
||||
description=description,
|
||||
input=action_input,
|
||||
output={'result': result}
|
||||
)
|
||||
|
||||
def on_task_state_change(self, task_ex_id, state):
|
||||
with db_api.transaction():
|
||||
task_ex = db_api.get_task_execution(task_ex_id)
|
||||
@ -167,6 +213,13 @@ class DefaultEngine(base.Engine):
|
||||
with db_api.transaction():
|
||||
action_ex = db_api.get_action_execution(action_ex_id)
|
||||
|
||||
# In case of single action execution there is no
|
||||
# assigned task execution.
|
||||
if not action_ex.task_execution:
|
||||
return action_handler.store_action_result(
|
||||
action_ex, result
|
||||
).get_clone()
|
||||
|
||||
wf_ex_id = action_ex.task_execution.workflow_execution_id
|
||||
|
||||
# Must be before loading the object itself (see method doc).
|
||||
|
@ -40,10 +40,11 @@ class DefaultExecutor(base.Executor):
|
||||
"""
|
||||
|
||||
def send_error_to_engine(error_msg):
|
||||
self._engine_client.on_action_complete(
|
||||
action_ex_id,
|
||||
wf_utils.Result(error=error_msg)
|
||||
)
|
||||
if action_ex_id:
|
||||
self._engine_client.on_action_complete(
|
||||
action_ex_id,
|
||||
wf_utils.Result(error=error_msg)
|
||||
)
|
||||
|
||||
action_cls = a_f.construct_action_class(action_class_str, attributes)
|
||||
|
||||
@ -51,12 +52,13 @@ class DefaultExecutor(base.Executor):
|
||||
action = action_cls(**action_params)
|
||||
result = action.run()
|
||||
|
||||
if action.is_sync():
|
||||
if action.is_sync() and action_ex_id:
|
||||
self._engine_client.on_action_complete(
|
||||
action_ex_id,
|
||||
wf_utils.Result(data=result)
|
||||
)
|
||||
return
|
||||
|
||||
return result
|
||||
except TypeError as e:
|
||||
msg = ("Failed to initialize action %s. Action init params = %s."
|
||||
" Actual init params = %s. More info: %s"
|
||||
|
@ -98,6 +98,30 @@ class EngineServer(object):
|
||||
**params
|
||||
)
|
||||
|
||||
def start_action(self, rpc_ctx, action_name,
|
||||
action_input, description, params):
|
||||
"""Receives calls over RPC to start actions on engine.
|
||||
|
||||
:param rpc_ctx: RPC request context.
|
||||
:param action_name: name of the Action.
|
||||
:param action_input: input dictionary for Action.
|
||||
:param description: description of new Action execution.
|
||||
:param params: extra parameters to run Action.
|
||||
:return: Action execution.
|
||||
"""
|
||||
LOG.info(
|
||||
"Received RPC request 'start_action'[rpc_ctx=%s,"
|
||||
" name=%s, input=%s, description=%s, params=%s]"
|
||||
% (rpc_ctx, action_name, action_input, description, params)
|
||||
)
|
||||
|
||||
return self._engine.start_action(
|
||||
action_name,
|
||||
action_input,
|
||||
description,
|
||||
**params
|
||||
)
|
||||
|
||||
def on_task_state_change(self, rpc_ctx, task_ex_id, state):
|
||||
return self._engine.on_task_state_change(task_ex_id, state)
|
||||
|
||||
@ -239,6 +263,22 @@ class EngineClient(base.Engine):
|
||||
params=params
|
||||
)
|
||||
|
||||
@wrap_messaging_exception
|
||||
def start_action(self, action_name, action_input,
|
||||
description=None, **params):
|
||||
"""Starts action sending a request to engine over RPC.
|
||||
|
||||
:return: Action execution.
|
||||
"""
|
||||
return self._client.call(
|
||||
auth_ctx.ctx(),
|
||||
'start_action',
|
||||
action_name=action_name,
|
||||
action_input=action_input or {},
|
||||
description=description,
|
||||
params=params
|
||||
)
|
||||
|
||||
def on_task_state_change(self, task_ex_id, state):
|
||||
return self._client.call(
|
||||
auth_ctx.ctx(),
|
||||
@ -352,7 +392,7 @@ class ExecutorServer(object):
|
||||
% (rpc_ctx, action_ex_id, action_class_str, attributes, params)
|
||||
)
|
||||
|
||||
self._executor.run_action(
|
||||
return self._executor.run_action(
|
||||
action_ex_id,
|
||||
action_class_str,
|
||||
attributes,
|
||||
@ -381,7 +421,7 @@ class ExecutorClient(base.Executor):
|
||||
)
|
||||
|
||||
def run_action(self, action_ex_id, action_class_str, attributes,
|
||||
action_params, target=None):
|
||||
action_params, target=None, async=True):
|
||||
"""Sends a request to run action to executor."""
|
||||
|
||||
kwargs = {
|
||||
@ -391,7 +431,11 @@ class ExecutorClient(base.Executor):
|
||||
'params': action_params
|
||||
}
|
||||
|
||||
self._client.prepare(topic=self.topic, server=target).cast(
|
||||
call_ctx = self._client.prepare(topic=self.topic, server=target)
|
||||
|
||||
rpc_client_method = call_ctx.cast if async else call_ctx.call
|
||||
|
||||
return rpc_client_method(
|
||||
auth_ctx.ctx(),
|
||||
'run_action',
|
||||
**kwargs
|
||||
|
@ -126,7 +126,7 @@ def on_action_complete(action_ex, result):
|
||||
# Ignore workflow executions because they're handled during
|
||||
# workflow completion
|
||||
if not isinstance(action_ex, models.WorkflowExecution):
|
||||
_store_action_result(wf_ex, action_ex, result)
|
||||
action_handler.store_action_result(action_ex, result)
|
||||
|
||||
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
|
||||
task_spec = wf_spec.get_tasks()[task_ex.name]
|
||||
@ -270,43 +270,18 @@ def _get_action_input(wf_spec, task_ex, task_spec, ctx):
|
||||
|
||||
action_spec_name = task_spec.get_action_name()
|
||||
|
||||
action_def = action_handler.resolve_action_definition(
|
||||
action_spec_name,
|
||||
task_ex.workflow_name,
|
||||
wf_spec.get_name()
|
||||
)
|
||||
|
||||
input_dict = utils.merge_dicts(
|
||||
input_dict,
|
||||
_get_action_defaults(task_ex, task_spec),
|
||||
overwrite=False
|
||||
)
|
||||
|
||||
if action_def.spec:
|
||||
# Ad-hoc action.
|
||||
action_spec = spec_parser.get_action_spec(action_def.spec)
|
||||
|
||||
base_name = action_spec.get_base()
|
||||
|
||||
action_def = action_handler.resolve_action_definition(
|
||||
base_name,
|
||||
task_ex.workflow_name,
|
||||
wf_spec.get_name()
|
||||
)
|
||||
|
||||
e_utils.validate_input(action_def, action_spec, input_dict)
|
||||
|
||||
base_input = action_spec.get_base_input()
|
||||
|
||||
if base_input:
|
||||
input_dict = expr.evaluate_recursively(
|
||||
base_input,
|
||||
input_dict
|
||||
)
|
||||
else:
|
||||
input_dict = {}
|
||||
|
||||
return input_dict
|
||||
return action_handler.get_action_input(
|
||||
action_spec_name,
|
||||
input_dict,
|
||||
task_ex.workflow_name,
|
||||
wf_spec
|
||||
)
|
||||
|
||||
|
||||
def _get_workflow_input(task_spec, ctx):
|
||||
@ -444,23 +419,6 @@ def run_workflow(wf_name, wf_input, wf_params):
|
||||
)
|
||||
|
||||
|
||||
def _store_action_result(wf_ex, action_ex, result):
|
||||
prev_state = action_ex.state
|
||||
|
||||
if result.is_success():
|
||||
action_ex.state = states.SUCCESS
|
||||
action_ex.output = {'result': result.data}
|
||||
action_ex.accepted = True
|
||||
else:
|
||||
action_ex.state = states.ERROR
|
||||
action_ex.output = {'result': result.error}
|
||||
action_ex.accepted = False
|
||||
|
||||
_log_action_result(wf_ex, action_ex, prev_state, action_ex.state, result)
|
||||
|
||||
return action_ex.state
|
||||
|
||||
|
||||
def _complete_task(task_ex, task_spec, state):
|
||||
# Ignore if task already completed.
|
||||
if states.is_completed(task_ex.state):
|
||||
@ -487,17 +445,3 @@ def _set_task_state(task_ex, state):
|
||||
)
|
||||
|
||||
task_ex.state = state
|
||||
|
||||
|
||||
def _log_action_result(wf_ex, action_ex, from_state, to_state, result):
|
||||
def _result_msg():
|
||||
if action_ex.state == states.ERROR:
|
||||
return "error = %s" % utils.cut(result.error)
|
||||
|
||||
return "result = %s" % utils.cut(result.data)
|
||||
|
||||
wf_trace.info(
|
||||
wf_ex,
|
||||
"Action execution '%s' [%s -> %s, %s]" %
|
||||
(action_ex.name, from_state, to_state, _result_msg())
|
||||
)
|
||||
|
@ -25,11 +25,14 @@ from mistral import utils
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def validate_input(definition, spec, input):
|
||||
def validate_input(definition, input, spec=None):
|
||||
input_param_names = copy.copy((input or {}).keys())
|
||||
missing_param_names = []
|
||||
|
||||
for p_name, p_value in six.iteritems(spec.get_input()):
|
||||
spec_input = (spec.get_input() if spec else
|
||||
utils.get_input_dict_from_input_string(definition.input))
|
||||
|
||||
for p_name, p_value in six.iteritems(spec_input):
|
||||
if p_value is utils.NotDefined and p_name not in input_param_names:
|
||||
missing_param_names.append(p_name)
|
||||
if p_name in input_param_names:
|
||||
@ -53,7 +56,7 @@ def validate_input(definition, spec, input):
|
||||
msg % tuple(msg_props)
|
||||
)
|
||||
else:
|
||||
utils.merge_dicts(input, spec.get_input(), overwrite=False)
|
||||
utils.merge_dicts(input, spec_input, overwrite=False)
|
||||
|
||||
|
||||
def resolve_workflow_definition(parent_wf_name, parent_wf_spec_name,
|
||||
|
@ -159,6 +159,12 @@ def get_action_context(task_ex, action_ex_id):
|
||||
}
|
||||
|
||||
|
||||
def get_empty_action_context():
|
||||
return {
|
||||
_ACTION_CTX_PARAM: {}
|
||||
}
|
||||
|
||||
|
||||
def _has_argument(action, attributes, argument_name):
|
||||
action_cls = action_factory.construct_action_class(action, attributes)
|
||||
arg_spec = inspect.getargspec(action_cls.__init__)
|
||||
|
@ -12,6 +12,8 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral import exceptions as exc
|
||||
from mistral.workbook import parser as spec_parser
|
||||
@ -67,16 +69,23 @@ def create_or_update_action(action_spec, definition, scope):
|
||||
return db_api.create_or_update_action_definition(values['name'], values)
|
||||
|
||||
|
||||
def _get_action_values(action_spec, definition, scope):
|
||||
action_input = action_spec.to_dict().get('input', [])
|
||||
def get_input_list(action_input):
|
||||
input_list = []
|
||||
|
||||
for param in action_input:
|
||||
if isinstance(param, dict):
|
||||
for k, v in param.items():
|
||||
input_list.append("%s=%s" % (k, v))
|
||||
input_list.append("%s=%s" % (k, json.dumps(v)))
|
||||
else:
|
||||
input_list.append(param)
|
||||
|
||||
return input_list
|
||||
|
||||
|
||||
def _get_action_values(action_spec, definition, scope):
|
||||
action_input = action_spec.to_dict().get('input', [])
|
||||
input_list = get_input_list(action_input)
|
||||
|
||||
values = {
|
||||
'name': action_spec.get_name(),
|
||||
'description': action_spec.get_description(),
|
||||
|
@ -80,8 +80,8 @@ def create_cron_trigger(name, workflow_name, workflow_input,
|
||||
|
||||
eng_utils.validate_input(
|
||||
wf_def,
|
||||
parser.get_workflow_spec(wf_def.spec),
|
||||
workflow_input or {}
|
||||
workflow_input or {},
|
||||
parser.get_workflow_spec(wf_def.spec)
|
||||
)
|
||||
|
||||
values = {
|
||||
|
@ -15,6 +15,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
from mistral.db.v2 import api as db_api_v2
|
||||
from mistral.services import actions
|
||||
from mistral.services import security
|
||||
from mistral.workbook import parser as spec_parser
|
||||
|
||||
@ -61,10 +62,16 @@ def _create_or_update_actions(wb_db, actions_spec):
|
||||
for action_spec in actions_spec:
|
||||
action_name = '%s.%s' % (wb_db.name, action_spec.get_name())
|
||||
|
||||
input_list = actions.get_input_list(
|
||||
action_spec.to_dict().get('input', [])
|
||||
)
|
||||
values = {
|
||||
'name': action_name,
|
||||
'spec': action_spec.to_dict(),
|
||||
'tags': action_spec.get_tags(),
|
||||
'description': action_spec.get_description(),
|
||||
'is_system': False,
|
||||
'input': ', '.join(input_list) if input_list else None,
|
||||
'scope': wb_db.scope,
|
||||
'project_id': wb_db.project_id
|
||||
}
|
||||
|
@ -144,22 +144,12 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
|
||||
|
||||
wf_ex = self._run_workflow(wf_text)
|
||||
|
||||
task_ex = self._assert_single_item(wf_ex.task_executions, name='task2')
|
||||
action_ex = db_api.get_action_executions(
|
||||
task_execution_id=task_ex.id
|
||||
)[0]
|
||||
|
||||
self.assertIn(
|
||||
'Failed to initialize action',
|
||||
action_ex.output['result']
|
||||
)
|
||||
self.assertIn(
|
||||
'unexpected keyword argument',
|
||||
action_ex.output['result']
|
||||
'Invalid input',
|
||||
wf_ex.state_info
|
||||
)
|
||||
|
||||
self.assertTrue(wf_ex.state, states.ERROR)
|
||||
self.assertIn(action_ex.output['result'], wf_ex.state_info)
|
||||
|
||||
def test_wrong_first_task_input(self):
|
||||
wf_text = """
|
||||
@ -173,24 +163,11 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
|
||||
action: std.echo wrong_input="Ha-ha"
|
||||
"""
|
||||
|
||||
wf_ex = self._run_workflow(wf_text)
|
||||
|
||||
task_ex = wf_ex.task_executions[0]
|
||||
action_ex = db_api.get_action_executions(
|
||||
task_execution_id=task_ex.id
|
||||
)[0]
|
||||
|
||||
self.assertIn(
|
||||
"Failed to initialize action",
|
||||
action_ex.output['result']
|
||||
self.assertRaises(
|
||||
exc.InputException,
|
||||
self._run_workflow,
|
||||
wf_text
|
||||
)
|
||||
self.assertIn(
|
||||
"unexpected keyword argument",
|
||||
action_ex.output['result']
|
||||
)
|
||||
|
||||
self.assertTrue(wf_ex.state, states.ERROR)
|
||||
self.assertIn(action_ex.output['result'], wf_ex.state_info)
|
||||
|
||||
def test_wrong_action(self):
|
||||
wf_text = """
|
||||
|
@ -79,7 +79,7 @@ workflows:
|
||||
|
||||
|
||||
def _run_at_target(action_ex_id, action_class_str, attributes,
|
||||
action_params, target=None):
|
||||
action_params, target=None, async=True):
|
||||
# We'll just call executor directly for testing purposes.
|
||||
executor = default_executor.DefaultExecutor(rpc.get_engine_client())
|
||||
|
||||
@ -104,7 +104,7 @@ class SubworkflowsTest(base.EngineTestCase):
|
||||
def _test_subworkflow(self, env):
|
||||
wf2_ex = self.engine.start_workflow(
|
||||
'my_wb.wf2',
|
||||
None,
|
||||
{},
|
||||
env=env
|
||||
)
|
||||
|
||||
@ -173,7 +173,8 @@ class SubworkflowsTest(base.EngineTestCase):
|
||||
'mistral.actions.std_actions.EchoAction',
|
||||
{},
|
||||
a_ex.input,
|
||||
TARGET
|
||||
TARGET,
|
||||
True
|
||||
)
|
||||
|
||||
def test_subworkflow_env_task_input(self):
|
||||
|
@ -126,7 +126,8 @@ class LongActionTest(base.EngineTestCase):
|
||||
a_m.register_action_class(
|
||||
'std.block',
|
||||
'%s.%s' % (BlockingAction.__module__, BlockingAction.__name__),
|
||||
None
|
||||
{},
|
||||
input_str=""
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
|
105
mistral/tests/unit/engine/test_run_action.py
Normal file
105
mistral/tests/unit/engine/test_run_action.py
Normal file
@ -0,0 +1,105 @@
|
||||
# Copyright 2015 - Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral import exceptions as exc
|
||||
from mistral.services import actions
|
||||
from mistral.tests.unit.engine import base
|
||||
from mistral.workflow import states
|
||||
|
||||
# Use the set_default method to set value otherwise in certain test cases
|
||||
# the change in value is not permanent.
|
||||
cfg.CONF.set_default('auth_enable', False, group='pecan')
|
||||
|
||||
|
||||
class RunActionEngineTest(base.EngineTestCase):
|
||||
@classmethod
|
||||
def heavy_init(cls):
|
||||
super(RunActionEngineTest, cls).heavy_init()
|
||||
|
||||
action = """---
|
||||
version: '2.0'
|
||||
|
||||
concat:
|
||||
base: std.echo
|
||||
base-input:
|
||||
output: <% $.left %><% $.right %>
|
||||
input:
|
||||
- left
|
||||
- right
|
||||
"""
|
||||
actions.create_actions(action)
|
||||
|
||||
def tearDown(self):
|
||||
super(RunActionEngineTest, self).tearDown()
|
||||
|
||||
def test_run_action_sync(self):
|
||||
# Start action and see the result.
|
||||
action_ex = self.engine.start_action('std.echo', {'output': 'Hello!'})
|
||||
|
||||
self.assertEqual('Hello!', action_ex.output['result'])
|
||||
|
||||
def test_run_action_async(self):
|
||||
# Start action.
|
||||
action_ex = self.engine.start_action(
|
||||
'std.echo',
|
||||
{'output': 'Hello!'},
|
||||
save_result=True
|
||||
)
|
||||
|
||||
is_action_ex_success = (
|
||||
lambda: db_api.get_action_execution(
|
||||
action_ex.id
|
||||
).state == states.SUCCESS
|
||||
)
|
||||
|
||||
self._await(is_action_ex_success)
|
||||
|
||||
action_ex = db_api.get_action_execution(action_ex.id)
|
||||
|
||||
self.assertEqual(states.SUCCESS, action_ex.state)
|
||||
self.assertEqual({'result': 'Hello!'}, action_ex.output)
|
||||
|
||||
def test_run_action_adhoc(self):
|
||||
# Start action and see the result.
|
||||
action_ex = self.engine.start_action(
|
||||
'concat',
|
||||
{'left': 'Hello, ', 'right': 'John Doe!'}
|
||||
)
|
||||
|
||||
self.assertEqual('Hello, John Doe!', action_ex.output['result'])
|
||||
|
||||
def test_run_action_wrong_input(self):
|
||||
# Start action and see the result.
|
||||
exception = self.assertRaises(
|
||||
exc.InputException,
|
||||
self.engine.start_action,
|
||||
'std.http',
|
||||
{'url': 'Hello, ', 'metod': 'John Doe!'}
|
||||
)
|
||||
|
||||
self.assertIn('std.http', exception.message)
|
||||
|
||||
def test_adhoc_action_wrong_input(self):
|
||||
# Start action and see the result.
|
||||
exception = self.assertRaises(
|
||||
exc.InputException,
|
||||
self.engine.start_action,
|
||||
'concat',
|
||||
{'left': 'Hello, ', 'ri': 'John Doe!'}
|
||||
)
|
||||
|
||||
self.assertIn('concat', exception.message)
|
@ -414,7 +414,8 @@ class WithItemsEngineTest(base.EngineTestCase):
|
||||
'%s.%s' % (
|
||||
RandomSleepEchoAction.__module__,
|
||||
RandomSleepEchoAction.__name__
|
||||
), {}
|
||||
), {},
|
||||
input_str="output"
|
||||
)
|
||||
|
||||
wb_service.create_workbook_v2(workbook)
|
||||
|
@ -22,17 +22,17 @@ class ActionManagerTest(base.DbTestCase):
|
||||
std_email = db_api.get_action_definition("std.email")
|
||||
|
||||
http_action_input = (
|
||||
"url, method=GET, params=None, body=None, "
|
||||
"headers=None, cookies=None, auth=None, "
|
||||
"timeout=None, allow_redirects=None, "
|
||||
"proxies=None, verify=None"
|
||||
'url, method="GET", params=null, body=null, '
|
||||
'headers=null, cookies=null, auth=null, '
|
||||
'timeout=null, allow_redirects=null, '
|
||||
'proxies=null, verify=null'
|
||||
)
|
||||
|
||||
self.assertEqual(http_action_input, std_http.input)
|
||||
|
||||
std_email_input = (
|
||||
"from_addr, to_addrs, smtp_server, "
|
||||
"smtp_password, subject=None, body=None"
|
||||
"smtp_password, subject=null, body=null"
|
||||
)
|
||||
|
||||
self.assertEqual(std_email_input, std_email.input)
|
||||
|
@ -23,10 +23,10 @@ class InspectUtilsTest(base.BaseTest):
|
||||
parameters_str = i_u.get_arg_list_as_str(action_class.__init__)
|
||||
|
||||
http_action_params = (
|
||||
"url, method=GET, params=None, body=None, "
|
||||
"headers=None, cookies=None, auth=None, "
|
||||
"timeout=None, allow_redirects=None, "
|
||||
"proxies=None, verify=None"
|
||||
'url, method="GET", params=null, body=null, '
|
||||
'headers=null, cookies=null, auth=null, '
|
||||
'timeout=null, allow_redirects=null, '
|
||||
'proxies=null, verify=null'
|
||||
)
|
||||
|
||||
self.assertEqual(http_action_params, parameters_str)
|
||||
|
@ -106,3 +106,14 @@ class UtilsTest(base.BaseTest):
|
||||
self.assertIn('param2', input_dict)
|
||||
self.assertEqual(2, input_dict.get('param2'))
|
||||
self.assertIs(input_dict.get('param1'), utils.NotDefined)
|
||||
|
||||
def test_get_input_dict_from_input_string(self):
|
||||
input_string = 'param1, param2=2, param3="var3"'
|
||||
input_dict = utils.get_input_dict_from_input_string(input_string)
|
||||
|
||||
self.assertIn('param1', input_dict)
|
||||
self.assertIn('param2', input_dict)
|
||||
self.assertIn('param3', input_dict)
|
||||
self.assertEqual(2, input_dict.get('param2'))
|
||||
self.assertEqual('var3', input_dict.get('param3'))
|
||||
self.assertIs(input_dict.get('param1'), utils.NotDefined)
|
||||
|
@ -15,6 +15,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from os import path
|
||||
@ -203,6 +204,26 @@ class NotDefined(object):
|
||||
pass
|
||||
|
||||
|
||||
def get_input_dict_from_input_string(input_string):
|
||||
if not input_string:
|
||||
return {}
|
||||
|
||||
raw_inputs = input_string.split(',')
|
||||
|
||||
inputs = []
|
||||
|
||||
for raw in raw_inputs:
|
||||
input = raw.strip()
|
||||
name_value = input.split('=')
|
||||
|
||||
if len(name_value) > 1:
|
||||
inputs += [{name_value[0]: json.loads(name_value[1])}]
|
||||
else:
|
||||
inputs += [name_value[0]]
|
||||
|
||||
return get_input_dict(inputs)
|
||||
|
||||
|
||||
def get_input_dict(inputs):
|
||||
"""Transform input list to dictionary.
|
||||
|
||||
|
@ -13,6 +13,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import inspect
|
||||
import json
|
||||
|
||||
|
||||
def get_public_fields(obj):
|
||||
@ -61,8 +62,12 @@ def get_arg_list_as_str(func):
|
||||
|
||||
for index, default in enumerate(args):
|
||||
if index >= diff_args_defs:
|
||||
arg_str_list.append("%s=%s" % (args[index],
|
||||
defs[index - diff_args_defs]))
|
||||
arg_str_list.append(
|
||||
"%s=%s" % (
|
||||
args[index],
|
||||
json.dumps(defs[index - diff_args_defs])
|
||||
)
|
||||
)
|
||||
else:
|
||||
arg_str_list.append("%s" % args[index])
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user