Refactor Mistral with Action Providers

* This patch refactors Mistral with the action provider concept
  that is responsible for delivering actions to the system. So
  it takes all the burden of managing action definitions w/o
  having to spread that across multiple subsystems like Engine
  and API and w/o having to assume that action definitions are
  always stored in DB.
* Added LegacyActionProvider  that represents the old way of
  delivering action definitions to the system. It pretty much just
  analyses what entries are configured in the entry point
  "mistral.actions" in setup.cfg and build a collection of
  corresponding Python action classes in memory accessible by names.
* The module mistral/services/actions.py is now renamed to
  adhoc_actions.py because it's effectively responsible only for
  ad-hoc actions (those defined in YAML).
* Added the new entry point in setup.cfg "mistral.action.providers"
  to register action provider classes
* Added the module mistral/services/actions.py that will be a facade
  for action providers. Engine and other subsystems will need to
  work with it.
* Other small code changes.

Depends-On: I13033253d5098655a001135c8702d1b1d13e76d4
Depends-On: Ic9108c9293731b3576081c75f2786e1156ba0ccd
Change-Id: I8e826657acb12bbd705668180f7a3305e1e597e2
This commit is contained in:
Renat Akhmerov 2020-06-08 14:46:56 +07:00
parent 4bc6162515
commit 06a0f33476
61 changed files with 2034 additions and 1394 deletions

View File

@ -10,7 +10,7 @@ Jinja2==2.10
jsonschema==3.2.0
keystonemiddleware==4.18.0
kombu==4.6.1
mistral-lib==1.4.0
mistral-lib==2.3.0
networkx==2.3
nose==1.3.7
oslo.concurrency==3.26.0

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# TODO(rakhmerov): the module needs to be deleted after we add action providers
from oslo_utils import importutils

260
mistral/actions/adhoc.py Normal file
View File

@ -0,0 +1,260 @@
# Copyright 2020 Nokia Software.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_log import log as logging
from mistral_lib import actions as ml_actions
from mistral_lib.actions.providers import base as action_providers_base
from mistral_lib import serialization
from mistral_lib import utils
from mistral.db.v2 import api as db_api
from mistral import exceptions as exc
from mistral import expressions as expr
from mistral.lang import parser as spec_parser
from mistral.services import actions as action_service
from mistral.workflow import data_flow
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class AdHocAction(ml_actions.Action):
def __init__(self, base_action):
super(AdHocAction, self).__init__()
assert base_action is not None
self._base_action = base_action
def run(self, context):
# Just run the base action. Note that the result of the base
# action gets converted by the corresponding "post_process_result"
# method of the ad-hoc action descriptor. It allows to implement
# asynchronous execution model when a result of the base
# action is delivered by a 3rd party and then it gets converted
# after it comes to Mistral engine.
result = self._base_action.run(context)
if not self.is_sync():
return None
if not isinstance(result, ml_actions.Result):
result = ml_actions.Result(data=result)
return result
@property
def base_action(self):
return self._base_action
def is_sync(self):
return self._base_action.is_sync()
@classmethod
def get_serialization_key(cls):
return '%s.%s' % (AdHocAction.__module__, AdHocAction.__name__)
class AdHocActionSerializer(serialization.DictBasedSerializer):
def serialize_to_dict(self, entity):
p_serializer = serialization.get_polymorphic_serializer()
return {
'base_action': p_serializer.serialize(entity.base_action)
}
def deserialize_from_dict(self, entity_dict):
p_serializer = serialization.get_polymorphic_serializer()
return AdHocAction(
p_serializer.deserialize(entity_dict['base_action'])
)
serialization.register_serializer(AdHocAction, AdHocActionSerializer())
class AdHocActionDescriptor(action_providers_base.ActionDescriptorBase):
def __init__(self, action_def):
super(AdHocActionDescriptor, self).__init__(
action_def.name,
action_def.description,
action_def.input or '',
action_def.namespace,
action_def.project_id,
action_def.scope
)
self._definition = action_def.definition
self._spec = spec_parser.get_action_spec(action_def.spec)
self._action_def = action_def
@property
def id(self):
return self._action_def.id
@property
def created_at(self):
return self._action_def.created_at
@property
def updated_at(self):
return self._action_def.updated_at
@property
def tags(self):
return self._action_def.tags
@property
def definition(self):
return self._definition
@property
def action_class_name(self):
return "{}.{}".format(AdHocAction.__module__, AdHocAction.__name__)
@property
def spec(self):
return self._spec
def __repr__(self):
return 'AdHoc action [name=%s, definition=%s]' % (
self.name,
self._definition
)
def _visit_hierarchy(self, callback):
callback_result = callback(self, None)
action_spec = self.spec
visited = {self.name}
while action_spec:
base_name = action_spec.get_base()
if base_name in visited:
raise ValueError(
'Found a cycle in an ad-hoc action chain '
'[action_name=%s, duplicate_action_name=%s]'
% (self.name, base_name)
)
visited.add(base_name)
system_provider = action_service.get_system_action_provider()
base_action_desc = system_provider.find(
base_name,
self.namespace
)
if base_action_desc is None:
raise exc.InvalidActionException(
"Failed to find base action [action_name=%s namespace=%s] "
% (base_name, self.namespace)
)
# For every ad-hoc action in the hierarchy invoke the callback.
callback_result = callback(base_action_desc, callback_result)
if isinstance(base_action_desc, AdHocActionDescriptor):
action_spec = base_action_desc.spec
else:
action_spec = None
return callback_result
def instantiate(self, input_dict, wf_ctx):
def _on_visit(action_desc, prev_res):
if action_desc is self:
base_action_desc = None
base_input_dict = input_dict
else:
base_action_desc = action_desc
base_input_dict = prev_res[1]
if not isinstance(action_desc, AdHocActionDescriptor):
return base_action_desc, base_input_dict
for k, v in action_desc.spec.get_input().items():
if (k not in base_input_dict or
base_input_dict[k] is utils.NotDefined):
base_input_dict[k] = v
ctx = data_flow.ContextView(base_input_dict, wf_ctx)
base_input_dict = expr.evaluate_recursively(
action_desc.spec.get_base_input(),
ctx
)
return base_action_desc, base_input_dict
base_desc, base_input = self._visit_hierarchy(_on_visit)
base_action = base_desc.instantiate(base_input, wf_ctx)
return AdHocAction(base_action)
def post_process_result(self, result):
output_transformers = []
def _on_visit(action_desc, prev_res):
if isinstance(action_desc, AdHocActionDescriptor):
output_transformers.append(action_desc.spec.get_output())
self._visit_hierarchy(_on_visit)
for transformer_expr in reversed(output_transformers):
if transformer_expr is not None:
result = ml_actions.Result(
data=expr.evaluate_recursively(
transformer_expr,
result.data
),
error=result.error
)
return result
class AdHocActionProvider(ml_actions.ActionProvider):
"""Provides ad-hoc actions."""
def __init__(self, name='adhoc'):
super().__init__(name)
def find(self, action_name, namespace=None):
action_def = db_api.load_action_definition(
action_name,
namespace=namespace
)
if action_def is None:
return None
return AdHocActionDescriptor(action_def)
def find_all(self, namespace=None, limit=None, sort_fields=None,
sort_dirs=None, **filters):
# TODO(rakhmerov): Apply sort_keys, sort_dirs and filters.
return [
AdHocActionDescriptor(a_d)
for a_d in db_api.get_action_definitions()
]

181
mistral/actions/legacy.py Normal file
View File

@ -0,0 +1,181 @@
# Copyright 2020 Nokia Software.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
from oslo_config import cfg
from oslo_log import log as logging
from stevedore import extension
from mistral_lib import actions as ml_actions
from mistral_lib.utils import inspect_utils as i_utils
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class GeneratedPythonActionDescriptor(ml_actions.PythonActionDescriptor):
"""Represents a legacy python action generated by a generator.
It's needed temporarily until we fully refactor OpenStack actions in the
'mistral-extra' project. The difference of this descriptor and the standard
PythonActionDescriptor class is how they initialize a spec of parameters
and description.
"""
def __init__(self, name, action_cls, action_cls_attrs=None, namespace='',
project_id=None, scope=None, desc=None, params_spec=None):
super(GeneratedPythonActionDescriptor, self).__init__(
name,
action_cls,
action_cls_attrs,
namespace,
project_id,
scope
)
if desc:
self._desc = desc
if params_spec:
self._params_spec = params_spec
def __repr__(self):
return 'Generated Python action [name=%s, cls=%s, params_spec=%s]' % (
self.name,
self.action_class,
self.params_spec
)
class LegacyActionProvider(ml_actions.ActionProvider):
"""Represents the old way of configuring actions.
There are two sources where this action provider loads actions
from:
* Action classes configured in the entry point "mistral.actions"
* Action classes generated by generators configured in the
entry point "mistral.generators" as a function returning a
collection of them.
"""
def __init__(self, name='legacy'):
super().__init__(name)
# TODO(rakhmerov): Come up with a convenient structure to keep action
# classes indexed so that we could search and filter easily.
self._action_descs = collections.OrderedDict()
self._load_actions()
def _load_actions(self):
self._load_action_plugins()
self._load_action_generators()
def _load_action_plugins(self):
if not CONF.legacy_action_provider.load_action_plugins:
return
LOG.info(
"Loading actions plugged in with the entry point "
"'mistral.actions'..."
)
ext_mgr = extension.ExtensionManager(
namespace='mistral.actions',
invoke_on_load=False
)
for action_name in ext_mgr.names():
action_cls = ext_mgr[action_name].plugin
if CONF.legacy_action_provider.only_builtin_actions:
if not action_cls.__module__.startswith('mistral.'):
continue
action_desc = ml_actions.PythonActionDescriptor(
action_name,
action_cls,
namespace=''
)
self._action_descs[action_name] = action_desc
LOG.debug('Registered action: %s', action_desc)
def _load_action_generators(self):
if not CONF.legacy_action_provider.load_action_generators:
return
LOG.info(
"Loading actions from the action generators plugged in "
"with the entry point 'mistral.generators'"
)
for gen in self._get_action_generators():
self._register_generator_actions(gen)
@staticmethod
def _get_action_generators():
res = []
ext_mgr = extension.ExtensionManager(
namespace='mistral.generators',
invoke_on_load=True
)
# TODO(rakhmerov): this is all ugly. It turns out that the only
# way to register actions via generators is to register a special
# function in the entry point that returns a list of generators.
# But we can't directly register a generator.
for ext in ext_mgr:
if ext.obj is not None:
for gen in ext.obj:
res.append(gen)
return res
def _register_generator_actions(self, generator):
# TODO(rakhmerov): Here we have an implicit dependency on
# "mistral-extra" because ActionGenerator class is defined
# in "mistral-extra". Of course, it works because of duck
# typing but architecture wise it's just very bad. "mistral"
# must not depend on "mistral-extra" because the latter is
# just a project with mistral extensions. In fact, we can't
# even extend ActionGenerator class within "mistral" for
# testing purposes.
# So it's all done this way for compatibility until all
# OpenStack actions are redesigned with action providers.
for action in generator.create_actions():
action_desc = GeneratedPythonActionDescriptor(
action['name'],
generator.base_action_class,
i_utils.get_public_fields(action['class']),
desc=action['description'],
params_spec=action['arg_list']
)
LOG.debug('Registered action: %s', action_desc)
self._action_descs[action['name']] = action_desc
def find(self, action_name, namespace=None):
return self._action_descs.get(action_name)
def find_all(self, namespace=None, limit=None, sort_fields=None,
sort_dirs=None, **filters):
# TODO(rakhmerov): Apply sort_keys, sort_dirs, and filters.
return self._action_descs.values()

View File

@ -46,6 +46,7 @@ class EchoAction(actions.Action):
super(EchoAction, self).__init__()
self.output = output
try:
self._delay = float(delay)
self._delay = 0 if self._delay < 0 else self._delay
@ -66,6 +67,15 @@ class EchoAction(actions.Action):
def test(self, context):
return 'Echo'
def __eq__(self, other):
if type(other) is not EchoAction:
return False
if self.output != other.output or self._delay != other._delay:
return False
return True
class NoOpAction(actions.Action):
"""No-operation action.
@ -106,6 +116,9 @@ class FailAction(actions.Action):
self.error_data = error_data
def __repr__(self):
return 'Fail action'
def run(self, context):
LOG.info('Running fail action.')

66
mistral/actions/test.py Normal file
View File

@ -0,0 +1,66 @@
# Copyright 2020 Nokia Software.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
from oslo_config import cfg
from oslo_log import log as logging
from mistral_lib import actions as ml_actions
from mistral_lib.utils import inspect_utils as i_utils
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
def _build_action_descriptor(name, action_cls):
action_cls_attrs = i_utils.get_public_fields(action_cls)
return ml_actions.PythonActionDescriptor(
name,
action_cls,
action_cls_attrs
)
class TestActionProvider(ml_actions.ActionProvider):
"""Action provider for tests.
It allows to register python actions with a direct call.
"""
def __init__(self, name='test'):
super().__init__(name)
self._action_descs = collections.OrderedDict()
def register_python_action(self, action_name, action_cls):
self._action_descs[action_name] = _build_action_descriptor(
action_name,
action_cls
)
def cleanup(self):
self._action_descs.clear()
def find(self, action_name, namespace=None):
return self._action_descs.get(action_name)
def find_all(self, namespace=None, limit=None, sort_fields=None,
sort_dirs=None, **filters):
# TODO(rakhmerov): Apply sort_keys, sort_dirs, and filters.
return self._action_descs.values()

View File

@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
from oslo_log import log as logging
import pecan
from pecan import hooks
@ -30,13 +31,38 @@ from mistral import context
from mistral.db.v2 import api as db_api
from mistral import exceptions as exc
from mistral.lang import parser as spec_parser
from mistral.services import actions
from mistral.services import actions as action_service
from mistral.services import adhoc_actions
from mistral.utils import filter_utils
from mistral.utils import rest_utils
from mistral_lib import utils
LOG = logging.getLogger(__name__)
def _action_descriptor_to_resource(action_desc):
return resources.Action(
id=getattr(action_desc, 'id', action_desc.name),
name=action_desc.name,
description=action_desc.description,
input=action_desc.params_spec,
namespace=action_desc.namespace,
project_id=action_desc.project_id,
scope=action_desc.scope,
definition=getattr(action_desc, 'definition', ''),
is_system=False,
tags=getattr(action_desc, 'tags', None),
created_at=utils.datetime_to_str(
getattr(action_desc, 'created_at', '')
),
updated_at=utils.datetime_to_str(
getattr(action_desc, 'updated_at', '')
)
)
class ActionsController(rest.RestController, hooks.HookController):
# TODO(nmakhotkin): Have a discussion with pecan/WSME folks in order
# to have requests and response of different content types. Then
@ -59,12 +85,24 @@ class ActionsController(rest.RestController, hooks.HookController):
LOG.debug("Fetch action [identifier=%s]", identifier)
# Use retries to prevent possible failures.
db_model = rest_utils.rest_retry_on_db_error(
db_api.get_action_definition
action_provider = action_service.get_system_action_provider()
# Here we assume that the action search might involve DB operations
# so we need to apply the regular retrying logic as everywhere else.
action_desc = rest_utils.rest_retry_on_db_error(
action_provider.find
)(identifier, namespace=namespace)
return resources.Action.from_db_model(db_model)
if action_desc is None:
# TODO(rakhmerov): We need to change exception class so that
# it's not DB specific. But it should be associated with the
# same HTTP code.
raise exc.DBEntityNotFoundError(
'Action not found [name=%s, namespace=%s]'
% (identifier, namespace)
)
return _action_descriptor_to_resource(action_desc)
@rest_utils.wrap_pecan_controller_exception
@pecan.expose(content_type="text/plain")
@ -86,15 +124,18 @@ class ActionsController(rest.RestController, hooks.HookController):
LOG.debug("Update action(s) [definition=%s]", definition)
namespace = namespace or ''
scope = pecan.request.GET.get('scope', 'private')
resources.Action.validate_scope(scope)
if scope == 'public':
acl.enforce('actions:publicize', context.ctx())
@rest_utils.rest_retry_on_db_error
def _update_actions():
with db_api.transaction():
return actions.update_actions(
return adhoc_actions.update_actions(
definition,
scope=scope,
identifier=identifier,
@ -123,6 +164,7 @@ class ActionsController(rest.RestController, hooks.HookController):
of multiple actions. In this case they all will be created.
"""
acl.enforce('actions:create', context.ctx())
namespace = namespace or ''
definition = pecan.request.text
@ -130,6 +172,7 @@ class ActionsController(rest.RestController, hooks.HookController):
pecan.response.status = 201
resources.Action.validate_scope(scope)
if scope == 'public':
acl.enforce('actions:publicize', context.ctx())
@ -138,9 +181,11 @@ class ActionsController(rest.RestController, hooks.HookController):
@rest_utils.rest_retry_on_db_error
def _create_action_definitions():
with db_api.transaction():
return actions.create_actions(definition,
return adhoc_actions.create_actions(
definition,
scope=scope,
namespace=namespace)
namespace=namespace
)
db_acts = _create_action_definitions()
@ -159,19 +204,26 @@ class ActionsController(rest.RestController, hooks.HookController):
:param namespace: The namespace of which the action is in.
"""
acl.enforce('actions:delete', context.ctx())
LOG.debug("Delete action [identifier=%s]", identifier)
@rest_utils.rest_retry_on_db_error
def _delete_action_definition():
with db_api.transaction():
db_model = db_api.get_action_definition(identifier,
namespace=namespace)
db_model = db_api.get_action_definition(
identifier,
namespace=namespace
)
if db_model.is_system:
msg = "Attempt to delete a system action: %s" % identifier
raise exc.DataAccessException(msg)
db_api.delete_action_definition(identifier,
namespace=namespace)
raise exc.DataAccessException(
"Attempt to delete a system action: %s" % identifier
)
db_api.delete_action_definition(
identifier,
namespace=namespace
)
_delete_action_definition()
@ -232,19 +284,74 @@ class ActionsController(rest.RestController, hooks.HookController):
namespace=namespace
)
LOG.debug("Fetch actions. marker=%s, limit=%s, sort_keys=%s, "
"sort_dirs=%s, filters=%s", marker, limit, sort_keys,
sort_dirs, filters)
LOG.debug(
"Fetch actions. marker=%s, limit=%s, sort_keys=%s, "
"sort_dirs=%s, filters=%s",
marker,
limit,
sort_keys,
sort_dirs,
filters
)
return rest_utils.get_all(
resources.Actions,
resources.Action,
db_api.get_action_definitions,
db_api.get_action_definition_by_id,
marker=marker,
sort_keys = ['name'] if sort_keys is None else sort_keys
sort_dirs = ['asc'] if sort_dirs is None else sort_dirs
fields = [] if fields is None else fields
if fields and 'name' not in fields:
fields.insert(0, 'name')
rest_utils.validate_query_params(limit, sort_keys, sort_dirs)
action_provider = action_service.get_system_action_provider()
# Here we assume that the action search might involve DB operations
# so we need to apply the regular retrying logic as everywhere else.
action_descriptors = rest_utils.rest_retry_on_db_error(
action_provider.find_all
)(
namespace=namespace,
limit=limit,
sort_keys=sort_keys,
sort_fields=sort_keys,
sort_dirs=sort_dirs,
fields=fields,
filters=filters
)
# We can't guarantee that at this point the collection of action
# descriptors is properly filtered and sorted.
# Apply filters.
action_descriptors = filter(
lambda a_d: filter_utils.match_filters(a_d, filters),
action_descriptors
)
# Apply sorting.
def compare_(a_d1, a_d2):
# TODO(rakhmerov): Implement properly
return 0
action_descriptors = sorted(
action_descriptors,
key=functools.cmp_to_key(compare_)
)
if limit and limit > 0:
action_descriptors = action_descriptors[0:limit]
action_resources = [
_action_descriptor_to_resource(a_d)
for a_d in action_descriptors
]
# TODO(rakhmerov): Fix pagination so that it doesn't work with
# the 'id' field as a marker. We can't use IDs anymore. "name"
# seems a good candidate for this.
return resources.Actions.convert_with_links(
action_resources,
limit,
pecan.request.application_url,
sort_keys=','.join(sort_keys),
sort_dirs=','.join(sort_dirs),
**filters
)

View File

@ -388,7 +388,7 @@ class ExecutionsController(rest.RestController):
update time and date.
:param include_output: Optional. Include the output for all executions
in the list.
:param project_id: Optional. Only get exectuions belong to the project.
:param project_id: Optional. Only get executions belong to the project.
Admin required.
:param all_projects: Optional. Get resources of all projects. Admin
required.

View File

@ -121,7 +121,7 @@ class Workflow(resource.Resource, ScopedResource):
"output": []}
)
def _set_attributes_from_spec(self, wf_spec):
def set_attributes_from_spec(self, wf_spec):
# Sets input and interface fields for the Workflow resource.
self._set_input(wf_spec)
self._set_interface(wf_spec)
@ -146,20 +146,23 @@ class Workflow(resource.Resource, ScopedResource):
if wf_spec:
self.interface['input'] = wf_spec.get('input', [])
self.interface['output'] = [output for output
in wf_spec.get('output', {})]
self.interface['output'] = [
output for output in wf_spec.get('output', {})
]
@classmethod
def from_dict(cls, d):
obj = super(Workflow, cls).from_dict(d)
obj._set_attributes_from_spec(d.get('spec'))
obj.set_attributes_from_spec(d.get('spec'))
return obj
@classmethod
def from_db_model(cls, db_model):
obj = super(Workflow, cls).from_db_model(db_model)
obj._set_attributes_from_spec(db_model.get('spec'))
obj.set_attributes_from_spec(db_model.get('spec'))
return obj
@ -177,7 +180,7 @@ class Workflow(resource.Resource, ScopedResource):
spec = col_val
if spec:
obj._set_attributes_from_spec(spec)
obj.set_attributes_from_spec(spec)
return obj

View File

@ -249,8 +249,10 @@ def main():
else:
# Validate launch option.
if set(cfg.CONF.server) - set(LAUNCH_OPTIONS.keys()):
raise Exception('Valid options are all or any combination of '
', '.join(LAUNCH_OPTIONS.keys()))
raise Exception(
"Valid options are 'all' or any combination of [%s]" %
', '.join(LAUNCH_OPTIONS.keys())
)
# Launch distinct set of server(s).
launch_any(set(cfg.CONF.server))

View File

@ -63,6 +63,35 @@ auth_type_opt = cfg.StrOpt(
help=_('Authentication type (valid options: keystone, keycloak-oidc)')
)
legacy_action_provider_opts = [
cfg.BoolOpt(
'load_action_plugins',
default=True,
help=_(
'If True, enables loading actions configured in the '
'entry point "mistral.actions".'
)
),
cfg.BoolOpt(
'load_action_generators',
default=True,
help=_(
'If True, enables loading actions from action generators '
'configured in the entry point "mistral.generators".'
)
),
cfg.BoolOpt(
'only_builtin_actions',
default=False,
help=_(
'If True, then the legacy action provider loads only '
'the actions delivered by the Mistral project out of '
'the box plugged in with the entry point "mistral.actions".'
'This property is needed mostly for testing.'
)
),
]
api_opts = [
cfg.HostAddressOpt(
'host',
@ -645,6 +674,7 @@ yaql_opts = [
CONF = cfg.CONF
LEGACY_ACTION_PROVIDER_GROUP = 'legacy_action_provider'
API_GROUP = 'api'
ENGINE_GROUP = 'engine'
EXECUTOR_GROUP = 'executor'
@ -671,6 +701,10 @@ CONF.register_opt(rpc_response_timeout_opt)
CONF.register_opt(oslo_rpc_executor)
CONF.register_opt(expiration_token_duration)
CONF.register_opts(
legacy_action_provider_opts,
group=LEGACY_ACTION_PROVIDER_GROUP
)
CONF.register_opts(api_opts, group=API_GROUP)
CONF.register_opts(engine_opts, group=ENGINE_GROUP)
CONF.register_opts(executor_opts, group=EXECUTOR_GROUP)

View File

@ -0,0 +1,43 @@
# Copyright 2020 Nokia Software.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Add 'workbook_name' column to 'action_definitions_v2'
and 'workflow_definitions_v2'.
Revision ID: 039
Revises: 038
Create Date: 2020-09-07 10:51:20
"""
# revision identifiers, used by Alembic.
from alembic import op
import sqlalchemy as sa
revision = '039'
down_revision = '038'
def upgrade():
op.add_column(
'action_definitions_v2',
sa.Column('workbook_name', sa.String(length=255), nullable=True)
)
op.add_column(
'workflow_definitions_v2',
sa.Column('workbook_name', sa.String(length=255), nullable=True)
)

View File

@ -14,9 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import cachetools
import contextlib
import threading
from oslo_config import cfg
from oslo_db import api as db_api
@ -30,13 +28,6 @@ IMPL = db_api.DBAPI('sqlalchemy', backend_mapping=_BACKEND_MAPPING)
CONF = cfg.CONF
_ACTION_DEF_CACHE = cachetools.TTLCache(
maxsize=1000,
ttl=CONF.engine.action_definition_cache_time # 60 seconds by default
)
_ACTION_DEF_CACHE_LOCK = threading.RLock()
def setup_db():
IMPL.setup_db()
@ -192,29 +183,19 @@ def get_action_definition(name, fields=(), namespace=''):
def load_action_definition(name, fields=(), namespace=''):
"""Unlike get_action_definition this method is allowed to return None."""
key = '{}:{}'.format(name, namespace) if namespace else name
with _ACTION_DEF_CACHE_LOCK:
action_def = _ACTION_DEF_CACHE.get(key)
if action_def:
return action_def
action_def = IMPL.load_action_definition(name, fields=fields,
namespace=namespace,)
action_def = IMPL.load_action_definition(
name,
fields=fields,
namespace=namespace
)
# If action definition was not found in the workflow namespace,
# check in the default namespace
if not action_def:
action_def = IMPL.load_action_definition(name, fields=fields,
namespace='')
with _ACTION_DEF_CACHE_LOCK:
_ACTION_DEF_CACHE[key] = (
action_def.get_clone() if action_def else None
)
if action_def:
return action_def
return IMPL.load_action_definition(name, fields=fields, namespace='')
def get_action_definitions(limit=None, marker=None, sort_keys=None,
sort_dirs=None, **kwargs):

View File

@ -129,6 +129,7 @@ class Definition(mb.MistralSecureModelBase):
id = mb.id_column()
name = sa.Column(sa.String(255))
namespace = sa.Column(sa.String(255), nullable=True)
definition = sa.Column(st.MediumText(), nullable=True)
spec = sa.Column(st.JsonMediumDictType())
tags = sa.Column(st.JsonListType())
@ -140,8 +141,6 @@ class Workbook(Definition):
"""Contains info about workbook (including definition in Mistral DSL)."""
__tablename__ = 'workbooks_v2'
namespace = sa.Column(sa.String(255), nullable=True)
__table_args__ = (
sa.UniqueConstraint(
'name',
@ -157,7 +156,6 @@ class WorkflowDefinition(Definition):
"""Contains info about workflow (including definition in Mistral DSL)."""
__tablename__ = 'workflow_definitions_v2'
namespace = sa.Column(sa.String(255), nullable=True)
__table_args__ = (
sa.UniqueConstraint(
'name',
@ -169,12 +167,13 @@ class WorkflowDefinition(Definition):
sa.Index('%s_scope' % __tablename__, 'scope'),
)
workbook_name = sa.Column(sa.String(255))
class ActionDefinition(Definition):
"""Contains info about registered Actions."""
__tablename__ = 'action_definitions_v2'
namespace = sa.Column(sa.String(255), nullable=True)
__table_args__ = (
sa.UniqueConstraint(
'name',
@ -186,6 +185,8 @@ class ActionDefinition(Definition):
sa.Index('%s_scope' % __tablename__, 'scope'),
)
workbook_name = sa.Column(sa.String(255))
# Main properties.
description = sa.Column(sa.Text())
input = sa.Column(sa.Text())

View File

@ -22,6 +22,8 @@ from mistral.db.v2.sqlalchemy import models
from mistral.engine import actions
from mistral.engine import task_handler
from mistral import exceptions as exc
from mistral.services import actions as action_service
LOG = logging.getLogger(__name__)
@ -31,6 +33,7 @@ def on_action_complete(action_ex, result):
task_ex = action_ex.task_execution
action = _build_action(action_ex)
try:
action.complete(result)
except exc.MistralException as e:
@ -80,27 +83,35 @@ def on_action_update(action_ex, state):
@profiler.trace('action-handler-build-action', hide_args=True)
def _build_action(action_ex):
if isinstance(action_ex, models.WorkflowExecution):
return actions.WorkflowAction(wf_name=action_ex.name,
action_ex=action_ex)
adhoc_action_name = action_ex.runtime_context.get('adhoc_action_name')
if adhoc_action_name:
action_def = actions.resolve_action_definition(
adhoc_action_name,
namespace=action_ex.workflow_namespace
return actions.WorkflowAction(
wf_name=action_ex.name,
action_ex=action_ex
)
return actions.AdHocAction(action_def, action_ex=action_ex)
action_def = actions.resolve_action_definition(action_ex.name)
action_desc = action_service.get_system_action_provider().find(
action_ex.name,
action_ex.workflow_namespace
)
return actions.PythonAction(action_def, action_ex=action_ex)
if action_desc is None:
raise exc.InvalidActionException(
"Failed to find action [action_name=%s]" %
action_ex.name
)
return actions.RegularAction(action_desc, action_ex)
def build_action_by_name(action_name, namespace=''):
action_def = actions.resolve_action_definition(action_name,
namespace=namespace)
action_cls = (actions.PythonAction if not action_def.spec
else actions.AdHocAction)
action_desc = action_service.get_system_action_provider().find(
action_name,
namespace=namespace
)
return action_cls(action_def)
if action_desc is None:
raise exc.InvalidActionException(
"Failed to find action [action_name=%s]" %
action_name
)
return actions.RegularAction(action_desc)

View File

@ -25,17 +25,15 @@ from mistral.engine import utils as engine_utils
from mistral.engine import workflow_handler as wf_handler
from mistral import exceptions as exc
from mistral.executors import base as exe
from mistral import expressions as expr
from mistral.lang import parser as spec_parser
from mistral.rpc import clients as rpc
from mistral.services import action_manager as a_m
from mistral.services import security
from mistral.utils import wf_trace
from mistral.workflow import data_flow
from mistral.workflow import states
from mistral_lib import actions as ml_actions
from mistral_lib import utils
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
@ -47,11 +45,13 @@ class Action(object, metaclass=abc.ABCMeta):
Mistral engine or its components in order to manipulate with actions.
"""
def __init__(self, action_def, action_ex=None, task_ex=None):
self.action_def = action_def
def __init__(self, action_desc, action_ex=None, task_ex=None,
task_ctx=None):
self.action_desc = action_desc
self.action_ex = action_ex
self.namespace = action_def.namespace if action_def else None
self.namespace = action_desc.namespace if action_desc else None
self.task_ex = action_ex.task_execution if action_ex else task_ex
self.task_ctx = task_ctx
@abc.abstractmethod
def complete(self, result):
@ -77,7 +77,10 @@ class Action(object, metaclass=abc.ABCMeta):
def update(self, state):
assert self.action_ex
if state == states.PAUSED and self.is_sync(self.action_ex.input):
# TODO(rakhmerov): Not sure we can do it for all actions.
action = self.action_desc.instantiate(self.action_ex.input, {})
if state == states.PAUSED and action.is_sync():
raise exc.InvalidStateTransitionException(
'Transition to the PAUSED state is only supported '
'for asynchronous action execution.'
@ -135,28 +138,33 @@ class Action(object, metaclass=abc.ABCMeta):
"""
raise NotImplementedError
def validate_input(self, input_dict):
"""Validates action input parameters.
def _prepare_execution_context(self):
res = {}
:param input_dict: Dictionary with input parameters.
"""
raise NotImplementedError
if self.task_ex:
wf_ex = self.task_ex.workflow_execution
def is_sync(self, input_dict):
"""Determines if action is synchronous.
res['workflow_execution_id'] = wf_ex.id
res['task_execution_id'] = self.task_ex.id
res['workflow_name'] = wf_ex.name
:param input_dict: Dictionary with input parameters.
"""
return True
if self.action_ex:
res['action_execution_id'] = self.action_ex.id
res['callback_url'] = (
'/v2/action_executions/%s' % self.action_ex.id
)
def _create_action_execution(self, input_dict, runtime_ctx, is_sync,
desc='', action_ex_id=None):
return res
def _create_action_execution(self, input_dict, runtime_ctx,
desc='', action_ex_id=None, is_sync=True):
action_ex_id = action_ex_id or utils.generate_unicode_uuid()
values = {
'id': action_ex_id,
'name': self.action_def.name,
'spec': self.action_def.spec,
'name': self.action_desc.name,
# TODO(rakhmerov): do we really need to keep "spec" in action_ex?
# 'spec': self.action_desc.spec,
'state': states.RUNNING,
'input': input_dict,
'runtime_context': runtime_ctx,
@ -202,21 +210,17 @@ class Action(object, metaclass=abc.ABCMeta):
)
class PythonAction(Action):
class RegularAction(Action):
"""Regular Python action."""
def __init__(self, action_def, action_ex=None, task_ex=None):
super(PythonAction, self).__init__(action_def, action_ex, task_ex)
self._prepared_input = None
@profiler.trace('action-complete', hide_args=True)
@profiler.trace('regular-action-complete', hide_args=True)
def complete(self, result):
assert self.action_ex
if states.is_completed(self.action_ex.state):
raise ValueError(
"Action {} is already completed".format(self.action_ex.id))
"Action {} is already completed".format(self.action_ex.id)
)
prev_state = self.action_ex.state
@ -227,7 +231,10 @@ class PythonAction(Action):
else:
self.action_ex.state = states.ERROR
self.action_ex.output = self._prepare_output(result).to_dict()
# Convert the result, if needed.
converted_result = self.action_desc.post_process_result(result)
self.action_ex.output = converted_result.to_dict()
self.action_ex.accepted = True
self._log_result(prev_state, result)
@ -237,7 +244,24 @@ class PythonAction(Action):
timeout=None):
assert not self.action_ex
self.validate_input(input_dict)
self.action_desc.check_parameters(input_dict)
wf_ex = self.task_ex.workflow_execution if self.task_ex else None
wf_ctx = data_flow.ContextView(
self.task_ctx,
data_flow.get_workflow_environment_dict(wf_ex),
wf_ex.context if wf_ex else {}
)
try:
action = self.action_desc.instantiate(input_dict, wf_ctx)
except Exception:
raise exc.InvalidActionException(
'Failed to instantiate an action'
' [action_desc=%s, input_dict=%s]'
% (self.action_desc, input_dict)
)
# Assign the action execution ID here to minimize database calls.
# Otherwise, the input property of the action execution DB object needs
@ -246,31 +270,27 @@ class PythonAction(Action):
action_ex_id = utils.generate_unicode_uuid()
self._create_action_execution(
self._prepare_input(input_dict),
input_dict,
self._prepare_runtime_context(index, safe_rerun),
self.is_sync(input_dict),
desc=desc,
action_ex_id=action_ex_id
action_ex_id=action_ex_id,
is_sync=action.is_sync()
)
action_ex_ctx = self._prepare_execution_context()
# Register an asynchronous command to send the action to
# run on an executor outside of the main DB transaction.
def _run_action():
executor = exe.get_executor(cfg.CONF.executor.type)
executor.run_action(
self.action_ex.id,
self.action_def.action_class,
self.action_def.attributes or {},
self.action_ex.input,
self.action_ex.runtime_context.get('safe_rerun', False),
action_ex_ctx,
return executor.run_action(
action,
self.action_ex.id if self.action_ex is not None else None,
safe_rerun,
self._prepare_execution_context(),
target=target,
timeout=timeout
)
# Register an asynchronous command to run the action
# on an executor outside of the main DB transaction.
post_tx_queue.register_operation(_run_action)
@profiler.trace('action-run', hide_args=True)
@ -278,9 +298,16 @@ class PythonAction(Action):
safe_rerun=False, timeout=None):
assert not self.action_ex
self.validate_input(input_dict)
self.action_desc.check_parameters(input_dict)
prepared_input_dict = self._prepare_input(input_dict)
try:
action = self.action_desc.instantiate(input_dict, {})
except Exception:
raise exc.InvalidActionException(
'Failed to instantiate an action'
' [action_desc=%s, input_dict=%s]'
% (self.action_desc, input_dict)
)
# Assign the action execution ID here to minimize database calls.
# Otherwise, the input property of the action execution DB object needs
@ -290,278 +317,35 @@ class PythonAction(Action):
if save:
self._create_action_execution(
prepared_input_dict,
input_dict,
self._prepare_runtime_context(index, safe_rerun),
self.is_sync(input_dict),
desc=desc,
action_ex_id=action_ex_id
action_ex_id=action_ex_id,
is_sync=action.is_sync()
)
executor = exe.get_executor(cfg.CONF.executor.type)
execution_context = self._prepare_execution_context()
result = executor.run_action(
self.action_ex.id if self.action_ex else None,
self.action_def.action_class,
self.action_def.attributes or {},
prepared_input_dict,
return executor.run_action(
action,
self.action_ex.id if self.action_ex is not None else None,
safe_rerun,
execution_context,
self._prepare_execution_context(),
target=target,
async_=False,
timeout=timeout
)
return self._prepare_output(result)
def is_sync(self, input_dict):
try:
prepared_input_dict = self._prepare_input(input_dict)
a = a_m.get_action_class(self.action_def.name,
self.action_def.namespace)(
**prepared_input_dict
)
return a.is_sync()
except BaseException as e:
LOG.exception(e)
raise exc.InputException(str(e))
def validate_input(self, input_dict):
# NOTE(kong): Don't validate action input if action initialization
# method contains ** argument.
if '**' in self.action_def.input:
return
expected_input = utils.get_dict_from_string(self.action_def.input)
engine_utils.validate_input(
expected_input,
input_dict,
self.action_def.name,
self.action_def.action_class
)
def _prepare_execution_context(self):
exc_ctx = {}
if self.task_ex:
wf_ex = self.task_ex.workflow_execution
exc_ctx['workflow_execution_id'] = wf_ex.id
exc_ctx['task_execution_id'] = self.task_ex.id
exc_ctx['workflow_name'] = wf_ex.name
if self.action_ex:
exc_ctx['action_execution_id'] = self.action_ex.id
exc_ctx['callback_url'] = (
'/v2/action_executions/%s' % self.action_ex.id
)
return exc_ctx
def _prepare_input(self, input_dict):
"""Template method to do manipulations with input parameters.
Python action doesn't do anything specific with initial input.
"""
return input_dict
def _prepare_output(self, result):
"""Template method to do manipulations with action result.
Python action doesn't do anything specific with result.
"""
return result
def _prepare_runtime_context(self, index, safe_rerun):
"""Template method to prepare action runtime context.
Python action inserts index into runtime context and information if
given action is safe_rerun.
Regular action inserts an index into its runtime context and
the flag showing if the action is safe to rerun (i.e. it's
idempotent).
"""
return {'index': index, 'safe_rerun': safe_rerun}
class AdHocAction(PythonAction):
"""Ad-hoc action."""
@profiler.trace('ad-hoc-action-init', hide_args=True)
def __init__(self, action_def, action_ex=None, task_ex=None, task_ctx=None,
wf_ctx=None):
self.action_spec = spec_parser.get_action_spec(action_def.spec)
base_action_def = db_api.load_action_definition(
self.action_spec.get_base(),
namespace=action_def.namespace
)
if not base_action_def:
raise exc.InvalidActionException(
"Failed to find action [action_name=%s]" %
self.action_spec.get_base()
)
base_action_def = self._gather_base_actions(
action_def,
base_action_def
)
super(AdHocAction, self).__init__(
base_action_def,
action_ex,
task_ex
)
self.adhoc_action_def = action_def
self.namespace = action_def.namespace
self.task_ctx = task_ctx or {}
self.wf_ctx = wf_ctx or {}
@profiler.trace('ad-hoc-action-validate-input', hide_args=True)
def validate_input(self, input_dict):
expected_input = self.action_spec.get_input()
engine_utils.validate_input(
expected_input,
input_dict,
self.adhoc_action_def.name,
self.action_spec.__class__.__name__
)
super(AdHocAction, self).validate_input(
self._prepare_input(input_dict)
)
@profiler.trace('ad-hoc-action-prepare-input', hide_args=True)
def _prepare_input(self, input_dict):
if self._prepared_input is not None:
return self._prepared_input
base_input_dict = input_dict
for action_def in self.adhoc_action_defs:
action_spec = spec_parser.get_action_spec(action_def.spec)
for k, v in action_spec.get_input().items():
if (k not in base_input_dict or
base_input_dict[k] is utils.NotDefined):
base_input_dict[k] = v
base_input_expr = action_spec.get_base_input()
if base_input_expr:
wf_ex = (
self.task_ex.workflow_execution if self.task_ex else None
)
ctx_view = data_flow.ContextView(
base_input_dict,
self.task_ctx,
data_flow.get_workflow_environment_dict(wf_ex),
self.wf_ctx
)
base_input_dict = expr.evaluate_recursively(
base_input_expr,
ctx_view
)
else:
base_input_dict = {}
self._prepared_input = super(AdHocAction, self)._prepare_input(
base_input_dict
)
return self._prepared_input
@profiler.trace('ad-hoc-action-prepare-output', hide_args=True)
def _prepare_output(self, result):
# In case of error, we don't transform a result.
if not result.is_error():
for action_def in reversed(self.adhoc_action_defs):
adhoc_action_spec = spec_parser.get_action_spec(
action_def.spec
)
transformer = adhoc_action_spec.get_output()
if transformer is not None:
result = ml_actions.Result(
data=expr.evaluate_recursively(
transformer,
result.data
),
error=result.error
)
return result
@profiler.trace('ad-hoc-action-prepare-runtime-context', hide_args=True)
def _prepare_runtime_context(self, index, safe_rerun):
ctx = super(AdHocAction, self)._prepare_runtime_context(
index,
safe_rerun
)
# Insert special field into runtime context so that we track
# a relationship between python action and adhoc action.
return utils.merge_dicts(
ctx,
{'adhoc_action_name': self.adhoc_action_def.name}
)
@profiler.trace('ad-hoc-action-gather-base-actions', hide_args=True)
def _gather_base_actions(self, action_def, base_action_def):
"""Find all base ad-hoc actions and store them.
An ad-hoc action may be based on another ad-hoc action and this
works recursively, so that the base action can also be based on an
ad-hoc action. Using the same base action more than once in this
action hierarchy is not allowed to avoid infinite loops.
The method stores the list of ad-hoc actions.
:param action_def: Action definition
:type action_def: ActionDefinition
:param base_action_def: Original base action definition
:type base_action_def: ActionDefinition
:return: The definition of the base system action
:rtype: ActionDefinition
"""
self.adhoc_action_defs = [action_def]
original_base_name = self.action_spec.get_name()
action_names = set([original_base_name])
base = base_action_def
while not base.is_system and base.name not in action_names:
action_names.add(base.name)
self.adhoc_action_defs.append(base)
base_name = base.spec['base']
try:
base = db_api.get_action_definition(base_name,
namespace=base.namespace)
except exc.DBEntityNotFoundError:
raise exc.InvalidActionException(
"Failed to find action [action_name=%s namespace=%s] "
% (base_name, base.namespace)
)
# if the action is repeated
if base.name in action_names:
raise ValueError(
'An ad-hoc action cannot use twice the same action, %s is '
'used at least twice' % base.name
)
return base
class WorkflowAction(Action):
"""Workflow action."""
@ -647,49 +431,6 @@ class WorkflowAction(Action):
safe_rerun=True, timeout=None):
raise NotImplementedError('Does not apply to this WorkflowAction.')
def is_sync(self, input_dict):
# Workflow action is always asynchronous.
return False
def validate_input(self, input_dict):
# TODO(rakhmerov): Implement.
pass
def resolve_action_definition(action_spec_name, wf_name=None,
wf_spec_name=None, namespace=''):
"""Resolve action definition accounting for ad-hoc action namespacing.
:param action_spec_name: Action name according to a spec.
:param wf_name: Workflow name.
:param wf_spec_name: Workflow name according to a spec.
:param namespace: The namespace of the action.
:return: Action definition (python or ad-hoc).
"""
action_db = None
if wf_name and wf_name != wf_spec_name:
# If workflow belongs to a workbook then check
# action within the same workbook (to be able to
# use short names within workbooks).
# If it doesn't exist then use a name from spec
# to find an action in DB.
wb_name = wf_name.rstrip(wf_spec_name)[:-1]
action_full_name = "%s.%s" % (wb_name, action_spec_name)
action_db = db_api.load_action_definition(action_full_name,
namespace=namespace)
if not action_db:
action_db = db_api.load_action_definition(action_spec_name,
namespace=namespace)
if not action_db:
raise exc.InvalidActionException(
"Failed to find action [action_name=%s] in [namespace=%s]" %
(action_spec_name, namespace)
)
return action_db

View File

@ -86,32 +86,42 @@ class DefaultEngine(base.Engine):
def start_action(self, action_name, action_input,
description=None, namespace='', **params):
with db_api.transaction():
action = action_handler.build_action_by_name(action_name,
namespace=namespace)
engine_action = action_handler.build_action_by_name(
action_name,
namespace=namespace
)
action.validate_input(action_input)
action_desc = engine_action.action_desc
action_desc.check_parameters(action_input)
sync = params.get('run_sync')
save = params.get('save_result')
target = params.get('target')
timeout = params.get('timeout')
is_action_sync = action.is_sync(action_input)
# In order to know if it's sync or not we have to instantiate
# the actual runnable action.
action = action_desc.instantiate(action_input, {})
is_action_sync = action.is_sync()
if sync and not is_action_sync:
raise exceptions.InputException(
"Action does not support synchronous execution.")
if not sync and (save or not is_action_sync):
action.schedule(action_input, target, timeout=timeout)
return action.action_ex.get_clone()
engine_action.schedule(action_input, target, timeout=timeout)
output = action.run(
return engine_action.action_ex.get_clone()
output = engine_action.run(
action_input,
target,
save=False,
timeout=timeout
)
state = states.SUCCESS if output.is_success() else states.ERROR
if not save:
@ -125,6 +135,7 @@ class DefaultEngine(base.Engine):
state=state,
workflow_namespace=namespace
)
action_ex_id = u.generate_unicode_uuid()
values = {

View File

@ -32,6 +32,11 @@ from mistral.workflow import base as wf_base
from mistral.workflow import commands as wf_cmds
from mistral.workflow import states
# TODO(rakhmerov): At some point we need to completely switch to
# exceptions from mistral_lib. For the time being though we'll
# have to catch MistralException from both 'mistral' and 'mistral-lib'.
from mistral_lib import exceptions as mistral_lib_exc
"""Responsible for running tasks and handling results."""
@ -66,7 +71,7 @@ def run_task(wf_cmd):
_schedule_refresh_task_state(task.task_ex.id)
task.run()
except exc.MistralException as e:
except (exc.MistralException, mistral_lib_exc.MistralException) as e:
wf_ex = wf_cmd.wf_ex
task_spec = wf_cmd.task_spec
@ -114,7 +119,7 @@ def _on_action_complete(action_ex):
try:
task.on_action_complete(action_ex)
except exc.MistralException as e:
except (exc.MistralException, mistral_lib_exc.MistralException) as e:
wf_ex = task_ex.workflow_execution
msg = ("Failed to handle action completion [error=%s, wf=%s, task=%s,"
@ -169,7 +174,7 @@ def _on_action_update(action_ex):
# then resume the parent workflow execution.
wf_handler.resume_workflow(wf_ex)
except exc.MistralException as e:
except (exc.MistralException, mistral_lib_exc.MistralException) as e:
wf_ex = task_ex.workflow_execution
msg = ("Failed to handle action update [error=%s, wf=%s, task=%s,"
@ -253,7 +258,7 @@ def complete_task(task_ex, state, state_info):
try:
task.complete(state, state_info)
except exc.MistralException as e:
except (exc.MistralException, mistral_lib_exc.MistralException) as e:
wf_ex = task_ex.workflow_execution
msg = (

View File

@ -33,6 +33,7 @@ from mistral import exceptions as exc
from mistral import expressions as expr
from mistral.notifiers import base as notif
from mistral.notifiers import notification_events as events
from mistral.services import actions as action_service
from mistral.utils import wf_trace
from mistral.workflow import base as wf_base
from mistral.workflow import commands
@ -666,8 +667,6 @@ class RegularTask(Task):
action = self._build_action()
action.validate_input(input_dict)
action.schedule(
input_dict,
target,
@ -714,40 +713,66 @@ class RegularTask(Task):
overwrite=False
)
def _get_action_name(self):
result = self.task_spec.get_action_name()
# An action name can be an expression so we reevaluate it.
if result:
result = self.evaluate(result)
if not result:
result = 'std.noop'
return result
def _build_action(self):
action_name = self.task_spec.get_action_name()
wf_name = self.task_spec.get_workflow_name()
# For dynamic workflow evaluation we regenerate the action.
# A workflow name can be an expression so we reevaluate it.
if wf_name:
return actions.WorkflowAction(
wf_name=self.evaluate(wf_name),
task_ex=self.task_ex
)
# For dynamic action evaluation we just regenerate the name.
if action_name:
action_name = self.evaluate(action_name)
action_desc = self._get_action_descriptor()
if not action_name:
action_name = 'std.noop'
action_def = actions.resolve_action_definition(
action_name,
self.wf_ex.name,
self.wf_spec.get_name(),
namespace=self.wf_ex.workflow_namespace
)
if action_def.spec:
return actions.AdHocAction(
action_def,
return actions.RegularAction(
action_desc=action_desc,
task_ex=self.task_ex,
task_ctx=self.ctx,
wf_ctx=self.wf_ex.context
task_ctx=self.ctx
)
return actions.PythonAction(action_def, task_ex=self.task_ex)
def _get_action_descriptor(self):
res = None
action_name = self._get_action_name()
namespace = self.wf_ex.workflow_namespace
provider = action_service.get_system_action_provider()
wb_name = self.wf_ex.runtime_context.get('wb_name')
if wb_name:
# First try to find the action within the same workbook
# that the workflow belongs to.
res = provider.find(
'%s.%s' % (wb_name, action_name),
namespace=namespace
)
if res is None:
# Now try to find by the action name as it appears in the
# workflow text.
res = provider.find(action_name, namespace=namespace)
if res is None:
raise exc.MistralException(
"Failed to find action [action_name=%s, namespace=%s]"
% (action_name, namespace)
)
return res
def _get_timeout(self):
timeout = self.task_spec.get_policies().get_timeout()
@ -843,8 +868,6 @@ class WithItemsTask(RegularTask):
action = self._build_action()
action.validate_input(input_dict)
action.schedule(
input_dict,
target,

View File

@ -41,6 +41,8 @@ def _compare_parameters(expected_input, actual_input):
return missing_params, unexpected_params
# TODO(rakhmerov): this method now should probably belong to workflows.
# It's not related to actions anymore.
def validate_input(expected_input, actual_input, obj_name, obj_class):
actual_input = actual_input or {}

View File

@ -346,7 +346,7 @@ class Workflow(object, metaclass=abc.ABCMeta):
return final_ctx
def _create_execution(self, wf_def, wf_ex_id, input_dict, desc, params):
self.wf_ex = db_api.create_workflow_execution({
values = {
'id': wf_ex_id,
'name': wf_def.name,
'description': desc,
@ -359,10 +359,13 @@ class Workflow(object, metaclass=abc.ABCMeta):
'output': {},
'task_execution_id': params.get('task_execution_id'),
'root_execution_id': params.get('root_execution_id'),
'runtime_context': {
'index': params.get('index', 0)
},
})
'runtime_context': {'index': params.get('index', 0)}
}
if wf_def.workbook_name:
values['runtime_context']['wb_name'] = wf_def.workbook_name
self.wf_ex = db_api.create_workflow_execution(values)
self.wf_ex.input = input_dict or {}

View File

@ -43,26 +43,23 @@ class Executor(object, metaclass=abc.ABCMeta):
"""Action executor interface."""
@abc.abstractmethod
def run_action(self, action_ex_id, action_cls_str, action_cls_attrs,
params, safe_rerun, execution_context, redelivered=False,
target=None, async_=True, timeout=None):
"""Runs action.
def run_action(self, action, action_ex_id, safe_rerun, exec_ctx,
redelivered=False, target=None, async_=True, timeout=None):
"""Runs the given action.
:param timeout: a period of time in seconds after which execution of
action will be interrupted
:param action: Action to run.
An instance of mistral_lib.actions.Action.
:param action_ex_id: Corresponding action execution id.
:param action_cls_str: Path to action class in dot notation.
:param action_cls_attrs: Attributes of action class which
will be set to.
:param params: Action parameters.
:param safe_rerun: Tells if given action can be safely rerun.
:param execution_context: A dict of values providing information about
:param exec_ctx: A dict of values providing information about
the current execution.
:param redelivered: Tells if given action was run before on another
executor.
:param target: Target (group of action executors).
:param async_: If True, run action in asynchronous mode (w/o waiting
for completion).
:param timeout: a period of time in seconds after which execution of
action will be interrupted
:return: Action result.
"""
raise NotImplementedError()

View File

@ -18,13 +18,11 @@ from mistral_lib import actions as mistral_lib
from oslo_log import log as logging
from osprofiler import profiler
from mistral.actions import action_factory as a_f
from mistral import context
from mistral import exceptions as exc
from mistral.executors import base
from mistral.rpc import clients as rpc
from mistral.services import action_heartbeat_sender
from mistral_lib.utils import inspect_utils as i_u
LOG = logging.getLogger(__name__)
@ -34,18 +32,14 @@ class DefaultExecutor(base.Executor):
self._engine_client = rpc.get_engine_client()
@profiler.trace('default-executor-run-action', hide_args=True)
def run_action(self, action_ex_id, action_cls_str, action_cls_attrs,
params, safe_rerun, execution_context, redelivered=False,
target=None, async_=True, timeout=None):
def run_action(self, action, action_ex_id, safe_rerun, exec_ctx,
redelivered=False, target=None, async_=True, timeout=None):
"""Runs action.
:param action: Action to run.
:param action_ex_id: Action execution id.
:param action_cls_str: Path to action class in dot notation.
:param action_cls_attrs: Attributes of action class which
will be set to.
:param params: Action parameters.
:param safe_rerun: Tells if given action can be safely rerun.
:param execution_context: A dict of values providing information about
:param exec_ctx: A dict of values providing information about
the current execution.
:param redelivered: Tells if given action was run before on another
executor.
@ -61,11 +55,9 @@ class DefaultExecutor(base.Executor):
action_heartbeat_sender.add_action(action_ex_id)
return self._do_run_action(
action_cls_attrs,
action_cls_str,
action,
action_ex_id,
execution_context,
params,
exec_ctx,
redelivered,
safe_rerun,
timeout
@ -73,8 +65,8 @@ class DefaultExecutor(base.Executor):
finally:
action_heartbeat_sender.remove_action(action_ex_id)
def _do_run_action(self, action_cls_attrs, action_cls_str, action_ex_id,
execution_context, params, redelivered, safe_rerun,
def _do_run_action(self, action, action_ex_id, exec_ctx,
redelivered, safe_rerun,
timeout):
def send_error_back(error_msg):
error_result = mistral_lib.Result(error=error_msg)
@ -88,48 +80,27 @@ class DefaultExecutor(base.Executor):
return None
return error_result
if redelivered and not safe_rerun:
msg = (
"Request to run action %s was redelivered, but action %s "
"cannot be re-run safely. The only safe thing to do is fail "
"action." % (action_cls_str, action_cls_str)
"Request to run an action was redelivered, but it cannot "
"be re-run safely. The only safe thing to do is fail "
"it [action=%s]." % action
)
return send_error_back(msg)
# Load action module.
action_cls = a_f.construct_action_class(
action_cls_str,
action_cls_attrs
)
# Instantiate action.
try:
action = action_cls(**params)
except Exception as e:
msg = (
"Failed to initialize action %s. Action init params = %s. "
"Actual init params = %s. More info: %s" % (
action_cls_str,
i_u.get_arg_list(action_cls.__init__),
params.keys(),
e
)
)
LOG.warning(msg)
return send_error_back(msg)
# Run action.
try:
with ev_timeout.Timeout(seconds=timeout):
# NOTE(d0ugal): If the action is a subclass of mistral-lib we
# know that it expects to be passed the context.
if isinstance(action, mistral_lib.Action):
action_ctx = context.create_action_context(
execution_context)
result = action.run(action_ctx)
print("Action:", action)
result = action.run(
context.create_action_context(exec_ctx)
)
else:
result = action.run()
@ -141,14 +112,8 @@ class DefaultExecutor(base.Executor):
except BaseException as e:
msg = (
"The action raised an exception [action_ex_id=%s, msg='%s', "
"action_cls='%s', attributes='%s', params='%s']" % (
action_ex_id,
e,
action_cls,
action_cls_attrs,
params
)
"The action raised an exception [action=%s, action_ex_id=%s, "
"msg='%s']" % (action, action_ex_id, e)
)
LOG.warning(msg, exc_info=True)
@ -172,14 +137,8 @@ class DefaultExecutor(base.Executor):
# serialized.
msg = (
"Failed to complete action due to a Mistral exception "
"[action_ex_id=%s, action_cls='%s', "
"attributes='%s', params='%s']\n %s" % (
action_ex_id,
action_cls,
action_cls_attrs,
params,
e
)
"[action=%s, action_ex_id=%s]\n %s" %
(action, action_ex_id, e)
)
LOG.exception(msg)
@ -190,14 +149,8 @@ class DefaultExecutor(base.Executor):
# log the error.
msg = (
"Failed to complete action due to an unexpected exception "
"[action_ex_id=%s, action_cls='%s', "
"attributes='%s', params='%s']\n %s" % (
action_ex_id,
action_cls,
action_cls_attrs,
params,
e
)
"[action=%s, action_ex_id=%s]\n %s" %
(action, action_ex_id, e)
)
LOG.exception(msg)

View File

@ -19,8 +19,9 @@ from mistral.executors import default_executor as exe
from mistral.rpc import base as rpc
from mistral.service import base as service_base
from mistral.services import action_heartbeat_sender
from mistral.services import actions as action_service
from mistral.utils import profiler as profiler_utils
from mistral_lib import utils
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
@ -48,6 +49,10 @@ class ExecutorServer(service_base.MistralService):
if self._setup_profiler:
profiler_utils.setup('mistral-executor', cfg.CONF.executor.host)
# Initialize action providers to make sure all action classes
# are initially imported.
action_service.get_system_action_provider()
# Initialize and start RPC server.
self._rpc_server = rpc.get_rpc_server_driver()(cfg.CONF.executor)
@ -65,53 +70,45 @@ class ExecutorServer(service_base.MistralService):
if self._rpc_server:
self._rpc_server.stop(graceful)
def run_action(self, rpc_ctx, action_ex_id, action_cls_str,
action_cls_attrs, params, safe_rerun, execution_context,
def run_action(self, rpc_ctx, action, action_ex_id, safe_rerun, exec_ctx,
timeout):
"""Receives calls over RPC to run action on executor.
:param rpc_ctx: RPC request context dictionary.
:param action: Action.
:param action_ex_id: Action execution id.
:param safe_rerun: Tells if given action can be safely rerun.
:param exec_ctx: A dict of values providing information about
the current execution.
:param timeout: a period of time in seconds after which execution of
action will be interrupted
:param execution_context: A dict of values providing information about
the current execution.
:param rpc_ctx: RPC request context dictionary.
:param action_ex_id: Action execution id.
:param action_cls_str: Action class name.
:param action_cls_attrs: Action class attributes.
:param params: Action input parameters.
:param safe_rerun: Tells if given action can be safely rerun.
:return: Action result.
"""
LOG.debug(
"Received RPC request 'run_action'[action_ex_id=%s, "
"action_cls_str=%s, action_cls_attrs=%s, params=%s, "
"timeout=%s]",
"Received RPC request 'run_action'"
"[action=%s, action_ex_id=%s, timeout=%s]",
action,
action_ex_id,
action_cls_str,
action_cls_attrs,
utils.cut(params),
timeout
)
redelivered = rpc_ctx.redelivered or False
res = self.executor.run_action(
action,
action_ex_id,
action_cls_str,
action_cls_attrs,
params,
safe_rerun,
execution_context,
exec_ctx,
redelivered,
timeout=timeout
)
LOG.debug(
"Sending action result to engine"
" [action_ex_id=%s, action_cls=%s]",
action_ex_id,
action_cls_str
" [action=%s, action_ex_id=%s]",
action,
action_ex_id
)
return res

View File

@ -353,18 +353,15 @@ class ExecutorClient(exe.Executor):
self._client = base.get_rpc_client_driver()(rpc_conf_dict)
@profiler.trace('executor-client-run-action')
def run_action(self, action_ex_id, action_cls_str, action_cls_attrs,
params, safe_rerun, execution_context, redelivered=False,
target=None, async_=True, timeout=None):
def run_action(self, action, action_ex_id, safe_rerun, exec_ctx,
redelivered=False, target=None, async_=True, timeout=None):
"""Sends a request to run action to executor.
:param action: Action to run.
:param action_ex_id: Action execution id.
:param action_cls_str: Action class name.
:param action_cls_attrs: Action class attributes.
:param params: Action input parameters.
:param safe_rerun: If true, action would be re-run if executor dies
during execution.
:param execution_context: A dict of values providing information about
:param exec_ctx: A dict of values providing information about
the current execution.
:param redelivered: Tells if given action was run before on another
executor.
@ -376,21 +373,22 @@ class ExecutorClient(exe.Executor):
:return: Action result.
"""
rpc_kwargs = {
'action': action,
'action_ex_id': action_ex_id,
'action_cls_str': action_cls_str,
'action_cls_attrs': action_cls_attrs,
'params': params,
'safe_rerun': safe_rerun,
'execution_context': execution_context,
'exec_ctx': exec_ctx,
'timeout': timeout
}
rpc_client_method = (self._client.async_call
if async_ else self._client.sync_call)
rpc_client_method = (
self._client.async_call if async_
else self._client.sync_call
)
LOG.debug(
"Sending an action to executor [action_ex_id=%s, action_cls=%s]",
action_ex_id, action_cls_str
'Sending an action to executor [action=%s, action_ex_id=%s]',
action,
action_ex_id
)
return rpc_client_method(auth_ctx.ctx(), 'run_action', **rpc_kwargs)

View File

@ -15,138 +15,31 @@
# limitations under the License.
from oslo_log import log as logging
from stevedore import extension
from mistral.actions import action_factory
from mistral.db.v2 import api as db_api
from mistral import exceptions as exc
from mistral.services import actions
from mistral.services import adhoc_actions
from mistral_lib import utils
from mistral_lib.utils import inspect_utils as i_utils
# TODO(rakhmerov): Make methods more consistent and granular.
# TODO(rakhmerov): This module won't be needed after we add action providers
LOG = logging.getLogger(__name__)
ACTIONS_PATH = 'resources/actions'
def register_preinstalled_actions():
def _register_preinstalled_adhoc_actions():
action_paths = utils.get_file_list(ACTIONS_PATH)
for action_path in action_paths:
action_definition = open(action_path).read()
actions.create_or_update_actions(
adhoc_actions.create_or_update_actions(
action_definition,
scope='public'
)
def get_registered_actions(**kwargs):
return db_api.get_action_definitions(**kwargs)
def register_action_class(name, action_class_str, attributes,
description=None, input_str=None, namespace=''):
values = {
'name': name,
'action_class': action_class_str,
'attributes': attributes,
'description': description,
'input': input_str,
'is_system': True,
'scope': 'public',
'namespace': namespace
}
try:
LOG.debug("Registering action in DB: %s", name)
db_api.create_action_definition(values)
except exc.DBDuplicateEntryError:
LOG.debug("Action %s already exists in DB.", name)
def _clear_system_action_db():
db_api.delete_action_definitions(is_system=True)
def sync_db():
with db_api.transaction():
_clear_system_action_db()
register_action_classes()
register_preinstalled_actions()
def _register_dynamic_action_classes(namespace=''):
extensions = extension.ExtensionManager(
namespace='mistral.generators',
invoke_on_load=True
)
for ext in extensions:
for generator in ext.obj:
_register_actions(generator, namespace)
def _register_actions(generator, namespace):
module = generator.base_action_class.__module__
class_name = generator.base_action_class.__name__
action_class_str = "%s.%s" % (module, class_name)
for action in generator.create_actions():
attrs = i_utils.get_public_fields(action['class'])
register_action_class(
action['name'],
action_class_str,
attrs,
action['description'],
action['arg_list'],
namespace=namespace
)
def register_action_classes(namespace=''):
mgr = extension.ExtensionManager(
namespace='mistral.actions',
invoke_on_load=False
)
for name in mgr.names():
action_class_str = mgr[name].entry_point_target.replace(':', '.')
action_class = mgr[name].plugin
description = i_utils.get_docstring(action_class)
input_str = i_utils.get_arg_list_as_str(action_class.__init__)
attrs = i_utils.get_public_fields(mgr[name].plugin)
register_action_class(
name,
action_class_str,
attrs,
description=description,
input_str=input_str,
namespace=namespace
)
_register_dynamic_action_classes(namespace=namespace)
def get_action_db(action_name, namespace=''):
return db_api.load_action_definition(action_name, namespace=namespace)
def get_action_class(action_full_name, namespace=''):
"""Finds action class by full action name (i.e. 'namespace.action_name').
:param action_full_name: Full action name (that includes namespace).
:return: Action class or None if not found.
"""
action_db = get_action_db(action_full_name, namespace)
if action_db:
return action_factory.construct_action_class(
action_db.action_class,
action_db.attributes
)
_register_preinstalled_adhoc_actions()

View File

@ -1,4 +1,3 @@
# Copyright 2015 - Mirantis, Inc.
# Copyright 2020 Nokia Software.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@ -13,132 +12,88 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
"""
This module is a facade for all action subsystem. It represents a
collection of functions for accessing information about actions
available in the system.
"""
from mistral.db.v2 import api as db_api
from mistral import exceptions as exc
from mistral.lang import parser as spec_parser
from oslo_log import log as logging
from stevedore import extension
from mistral_lib import actions as ml_actions
from mistral.actions import test
def create_actions(definition, scope='private', namespace=''):
action_list_spec = spec_parser.get_action_list_spec_from_yaml(definition)
LOG = logging.getLogger(__name__)
db_actions = []
for action_spec in action_list_spec.get_actions():
db_actions.append(create_action(
action_spec,
definition,
scope,
namespace))
return db_actions
_SYSTEM_PROVIDER = None
_TEST_PROVIDER = None
def update_actions(definition, scope='private', identifier=None, namespace=''):
action_list_spec = spec_parser.get_action_list_spec_from_yaml(definition)
actions = action_list_spec.get_actions()
def _get_registered_providers():
providers = []
if identifier and len(actions) > 1:
raise exc.InputException(
"More than one actions are not supported for "
"update with identifier. [identifier: %s]" %
identifier
mgr = extension.ExtensionManager(
namespace='mistral.action.providers',
invoke_on_load=False
)
db_actions = []
for provider_name in mgr.names():
provider_cls = mgr[provider_name].plugin
for action_spec in action_list_spec.get_actions():
db_actions.append(update_action(
action_spec,
definition,
scope,
identifier=identifier,
namespace=namespace
))
return db_actions
def create_or_update_actions(definition, scope='private', namespace=''):
action_list_spec = spec_parser.get_action_list_spec_from_yaml(definition)
db_actions = []
for action_spec in action_list_spec.get_actions():
db_actions.append(
create_or_update_action(action_spec, definition, scope, namespace)
try:
providers.append(provider_cls(provider_name))
except Exception:
LOG.exception(
'Failed to instantiate an action provider from the class: %s',
provider_cls
)
return db_actions
raise
if not providers:
LOG.warning("No action providers found in the system.")
return providers
def create_action(action_spec, definition, scope, namespace):
return db_api.create_action_definition(
_get_action_values(action_spec, definition, scope, namespace)
def get_test_action_provider():
"""Returns a singleton for the test action provider."""
global _TEST_PROVIDER
if _TEST_PROVIDER is None:
_TEST_PROVIDER = test.TestActionProvider()
return _TEST_PROVIDER
def get_system_action_provider():
"""Returns a singleton for the system action provider.
In fact, this method serves a facade for the entire action subsystem.
Clients of the acton subsystem must get the system action provider
and work with actions through it. The system action provider created
by this method (on the first call) is nothing but just a composite
on top of the action providers registered in the entry point
"mistral.action.providers".
"""
global _SYSTEM_PROVIDER
if _SYSTEM_PROVIDER is None:
delegates = _get_registered_providers()
# Add an action provider for testing to the end of the list
# so that it has the lowest priority. In production runs it's
# always empty so it won't take any effect.
delegates.append(get_test_action_provider())
_SYSTEM_PROVIDER = ml_actions.CompositeActionProvider(
'system',
delegates
)
def update_action(action_spec, definition, scope, identifier=None,
namespace=''):
action = db_api.load_action_definition(action_spec.get_name())
if action and action.is_system:
raise exc.InvalidActionException(
"Attempt to modify a system action: %s" %
action.name
)
values = _get_action_values(action_spec, definition, scope, namespace)
return db_api.update_action_definition(
identifier if identifier else values['name'],
values
)
def create_or_update_action(action_spec, definition, scope, namespace):
action = db_api.load_action_definition(action_spec.get_name())
if action and action.is_system:
raise exc.InvalidActionException(
"Attempt to modify a system action: %s" %
action.name
)
values = _get_action_values(action_spec, definition, scope, namespace)
return db_api.create_or_update_action_definition(values['name'], values)
def get_input_list(action_input):
input_list = []
for param in action_input:
if isinstance(param, dict):
for k, v in param.items():
input_list.append("%s=%s" % (k, json.dumps(v)))
else:
input_list.append(param)
return input_list
def _get_action_values(action_spec, definition, scope, namespace=''):
action_input = action_spec.to_dict().get('input', [])
input_list = get_input_list(action_input)
values = {
'name': action_spec.get_name(),
'description': action_spec.get_description(),
'tags': action_spec.get_tags(),
'definition': definition,
'spec': action_spec.to_dict(),
'is_system': False,
'input': ", ".join(input_list) if input_list else None,
'scope': scope,
'namespace': namespace
}
return values
return _SYSTEM_PROVIDER

View File

@ -0,0 +1,145 @@
# Copyright 2015 - Mirantis, Inc.
# Copyright 2020 Nokia Software.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from mistral.db.v2 import api as db_api
from mistral import exceptions as exc
from mistral.lang import parser as spec_parser
def create_actions(definition, scope='private', namespace=''):
action_list_spec = spec_parser.get_action_list_spec_from_yaml(definition)
db_actions = []
for action_spec in action_list_spec.get_actions():
db_actions.append(
create_action(
action_spec,
definition,
scope,
namespace
)
)
return db_actions
def update_actions(definition, scope='private', identifier=None, namespace=''):
action_list_spec = spec_parser.get_action_list_spec_from_yaml(definition)
actions = action_list_spec.get_actions()
if identifier and len(actions) > 1:
raise exc.InputException(
"More than one actions are not supported for "
"update with identifier. [identifier: %s]" %
identifier
)
db_actions = []
for action_spec in action_list_spec.get_actions():
db_actions.append(update_action(
action_spec,
definition,
scope,
identifier=identifier,
namespace=namespace
))
return db_actions
def create_or_update_actions(definition, scope='private', namespace=''):
action_list_spec = spec_parser.get_action_list_spec_from_yaml(definition)
db_actions = []
for action_spec in action_list_spec.get_actions():
db_actions.append(
create_or_update_action(action_spec, definition, scope, namespace)
)
return db_actions
def create_action(action_spec, definition, scope, namespace):
return db_api.create_action_definition(
_get_action_values(action_spec, definition, scope, namespace)
)
def update_action(action_spec, definition, scope, identifier=None,
namespace=''):
action = db_api.load_action_definition(action_spec.get_name())
if action and action.is_system:
raise exc.InvalidActionException(
"Attempt to modify a system action: %s" %
action.name
)
values = _get_action_values(action_spec, definition, scope, namespace)
return db_api.update_action_definition(
identifier if identifier else values['name'],
values
)
def create_or_update_action(action_spec, definition, scope, namespace):
action = db_api.load_action_definition(action_spec.get_name())
if action and action.is_system:
raise exc.InvalidActionException(
"Attempt to modify a system action: %s" %
action.name
)
values = _get_action_values(action_spec, definition, scope, namespace)
return db_api.create_or_update_action_definition(values['name'], values)
def get_input_list(action_input):
input_list = []
for param in action_input:
if isinstance(param, dict):
for k, v in param.items():
input_list.append("%s=%s" % (k, json.dumps(v)))
else:
input_list.append(param)
return input_list
def _get_action_values(action_spec, definition, scope, namespace=''):
action_input = action_spec.to_dict().get('input', [])
input_list = get_input_list(action_input)
return {
'name': action_spec.get_name(),
'description': action_spec.get_description(),
'tags': action_spec.get_tags(),
'definition': definition,
'spec': action_spec.to_dict(),
'is_system': False,
'input': ", ".join(input_list) if input_list else None,
'scope': scope,
'namespace': namespace
}

View File

@ -16,7 +16,7 @@
from mistral.db.v2 import api as db_api_v2
from mistral.lang import parser as spec_parser
from mistral import services
from mistral.services import actions
from mistral.services import adhoc_actions
def create_workbook_v2(definition, namespace='', scope='private',
@ -60,7 +60,8 @@ def update_workbook_v2(definition, namespace='', scope='private',
def _on_workbook_update(wb_db, wb_spec, namespace=''):
db_actions = _create_or_update_actions(
wb_db, wb_spec.get_actions(),
wb_db,
wb_spec.get_actions(),
namespace=namespace
)
@ -80,11 +81,13 @@ def _create_or_update_actions(wb_db, actions_spec, namespace):
for action_spec in actions_spec:
action_name = '%s.%s' % (wb_db.name, action_spec.get_name())
input_list = actions.get_input_list(
input_list = adhoc_actions.get_input_list(
action_spec.to_dict().get('input', [])
)
values = {
'name': action_name,
'workbook_name': wb_db.name,
'spec': action_spec.to_dict(),
'tags': action_spec.get_tags(),
'definition': _get_action_definition(wb_db, action_spec),
@ -115,6 +118,7 @@ def _create_or_update_workflows(wb_db, workflows_spec, namespace):
values = {
'name': wf_name,
'workbook_name': wb_db.name,
'definition': _get_wf_definition(wb_db, wf_spec),
'spec': wf_spec.to_dict(),
'scope': wb_db.scope,

View File

@ -1,54 +0,0 @@
# Copyright 2014 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.actions import std_actions as std
from mistral.services import action_manager as a_m
from mistral.tests.unit import base
class ActionManagerTest(base.DbTestCase):
def test_register_standard_actions(self):
action_list = a_m.get_registered_actions()
self._assert_single_item(action_list, name="std.echo")
self._assert_single_item(action_list, name="std.email")
self._assert_single_item(action_list, name="std.http")
self._assert_single_item(action_list, name="std.mistral_http")
self._assert_single_item(action_list, name="std.ssh")
self._assert_single_item(action_list, name="std.javascript")
def test_get_action_class(self):
self.assertTrue(
issubclass(a_m.get_action_class("std.echo"), std.EchoAction)
)
self.assertTrue(
issubclass(a_m.get_action_class("std.http"), std.HTTPAction)
)
self.assertTrue(
issubclass(
a_m.get_action_class("std.mistral_http"),
std.MistralHTTPAction
)
)
self.assertTrue(
issubclass(a_m.get_action_class("std.email"), std.SendEmailAction)
)
self.assertTrue(
issubclass(
a_m.get_action_class("std.javascript"),
std.JavaScriptAction
)
)

View File

@ -0,0 +1,48 @@
# Copyright 2020 Nokia Software.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.services import actions
from mistral.tests.unit import base
class LegacyActionProviderTest(base.DbTestCase):
def test_get_system_action_provider(self):
self.override_config(
'load_action_generators',
False,
'legacy_action_provider'
)
self.override_config(
'only_builtin_actions',
True,
'legacy_action_provider'
)
system_provider = actions.get_system_action_provider()
action_descs = system_provider.find_all()
for a in action_descs:
print(a)
self.assertTrue(len(action_descs) > 0)
self.assertTrue(
all(
[
a_d.action_class.__module__.startswith('mistral.')
for a_d in action_descs
]
)
)

View File

@ -0,0 +1,50 @@
# Copyright 2020 Nokia Software.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.actions import adhoc
from mistral.services import adhoc_actions as adhoc_action_service
from mistral.tests.unit import base
class AdHocActionProviderTest(base.DbTestCase):
def test_adhoc_actions(self):
provider = adhoc.AdHocActionProvider()
action_descs = provider.find_all()
self.assertEqual(0, len(action_descs))
action_txt = """
version: '2.0'
my_adhoc_action:
base: std.echo
base-input:
output: "<% $.s1 %>+<% $.s2 %>"
input:
- s1: "a"
- s2
output: "<% $ %> and <% $ %>"
"""
adhoc_action_service.create_actions(action_txt)
action_descs = provider.find_all()
self.assertEqual(1, len(action_descs))
action_desc = action_descs[0]
self.assertEqual('my_adhoc_action', action_desc.name)
self.assertEqual(action_txt, action_desc.definition)

View File

@ -0,0 +1,202 @@
# Copyright 2020 Nokia Software.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import mock
from mistral_lib import actions as ml_actions
from mistral_lib.utils import inspect_utils
from mistral.actions import legacy
from mistral.tests.unit import base
class BuildMessageAction(ml_actions.Action):
msg_pattern = '%s'
def __init__(self, name):
super(BuildMessageAction, self).__init__()
self._name = name
def run(self, context):
return self.msg_pattern % self._name
class TestActionGenerator(object):
base_action_class = BuildMessageAction
@classmethod
def create_actions(cls):
action_dicts = []
hello_action_cls = type(
'HelloAction',
(BuildMessageAction,),
{'msg_pattern': 'Hello, %s!'}
)
action_dicts.append(
{
'class': hello_action_cls,
'name': 'hello',
'description': 'The action builds a hello message',
'arg_list': inspect_utils.get_arg_list_as_str(
hello_action_cls.__init__
)
}
)
goodbye_action_cls = type(
'GoodbyeAction',
(BuildMessageAction,),
{'msg_pattern': 'Goodbye, %s!'}
)
action_dicts.append(
{
'class': goodbye_action_cls,
'name': 'goodbye',
'description': 'The action builds a goodbye message',
'arg_list': inspect_utils.get_arg_list_as_str(
goodbye_action_cls.__init__
)
}
)
return action_dicts
class LegacyActionProviderTest(base.BaseTest):
def test_only_builtin_actions(self):
self.override_config(
'load_action_generators',
False,
'legacy_action_provider'
)
self.override_config(
'only_builtin_actions',
True,
'legacy_action_provider'
)
provider = legacy.LegacyActionProvider()
# Test find_all() method.
action_descs = provider.find_all()
self.assertTrue(len(action_descs) > 0)
self.assertTrue(
all(
[
a_d.action_class.__module__.startswith('mistral.')
for a_d in action_descs
]
)
)
self._assert_single_item(action_descs, name='std.echo')
# Test find() method.
action_desc = provider.find('std.echo')
self.assertIsNotNone(action_desc)
self.assertEqual('std.echo', action_desc.name)
self.assertIn('Echo action.', action_desc.description)
self.assertEqual(
'mistral.actions.std_actions.EchoAction',
action_desc.action_class_name
)
self.assertEqual('output, delay=0', action_desc.params_spec)
def test_only_action_plugins(self):
self.override_config(
'load_action_generators',
False,
'legacy_action_provider'
)
provider = legacy.LegacyActionProvider()
# TODO(rakhmerov): Implement loading actions from generators
# and test with a generator.
action_descs = provider.find_all()
prefix = 'mistral.actions.std_actions'
self.assertTrue(
all(
[
a_d.action_class.__module__ == prefix
for a_d in action_descs
]
)
)
self._assert_single_item(action_descs, name='std.echo')
@mock.patch.object(
legacy.LegacyActionProvider,
'_get_action_generators',
mock.MagicMock(return_value=[TestActionGenerator])
)
def test_only_action_generators(self):
self.override_config(
'load_action_generators',
True,
'legacy_action_provider'
)
self.override_config(
'load_action_plugins',
False,
'legacy_action_provider'
)
provider = legacy.LegacyActionProvider()
action_descs = provider.find_all()
self.assertEqual(2, len(action_descs))
hello_action_desc = self._assert_single_item(
action_descs,
name='hello',
params_spec='name',
description='The action builds a hello message',
action_class_name='mistral.tests.unit.actions.'
'test_legacy_action_provider.BuildMessageAction',
action_class_attributes={'msg_pattern': 'Hello, %s!'}
)
hello_action = hello_action_desc.instantiate({'name': 'Forest'}, {})
self.assertEqual('Hello, Forest!', hello_action.run(None))
goodbye_action_desc = self._assert_single_item(
action_descs,
name='goodbye',
params_spec='name',
description='The action builds a goodbye message',
action_class_name='mistral.tests.unit.actions.'
'test_legacy_action_provider.BuildMessageAction',
action_class_attributes={'msg_pattern': 'Goodbye, %s!'}
)
goodbye_action = goodbye_action_desc.instantiate(
{'name': 'Lieutenant Dan'},
{}
)
self.assertEqual('Goodbye, Lieutenant Dan!', goodbye_action.run(None))

View File

@ -17,14 +17,18 @@ from unittest import mock
import sqlalchemy as sa
from mistral.actions import adhoc
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models
from mistral import exceptions as exc
from mistral.lang import parser as spec_parser
from mistral.services import adhoc_actions
from mistral.tests.unit.api import base
from mistral_lib.actions.providers import composite
from mistral_lib import utils
ACTION_DEFINITION = """
ADHOC_ACTION_YAML = """
---
version: '2.0'
@ -34,6 +38,9 @@ my_action:
base: std.echo
base-input:
output: "{$.str1}{$.str2}"
input:
- str1
- str2
"""
ACTION_DEFINITION_INVALID_NO_BASE = """
@ -65,42 +72,30 @@ ACTION_DSL_PARSE_EXCEPTION = """
%
"""
SYSTEM_ACTION_DEFINITION = """
---
version: '2.0'
ACTION_SPEC = spec_parser.get_action_list_spec_from_yaml(ADHOC_ACTION_YAML)[0]
std.echo:
base: std.http
base-input:
url: "some.url"
"""
ACTION = {
ACTION_DEF_VALUES = {
'id': '123e4567-e89b-12d3-a456-426655440000',
'name': 'my_action',
'is_system': False,
'description': 'My super cool action.',
'tags': ['test', 'v2'],
'definition': ACTION_DEFINITION
'definition': ADHOC_ACTION_YAML,
'spec': ACTION_SPEC.to_dict(),
'input': '',
'project_id': None,
'scope': 'public',
'namespace': None
}
SYSTEM_ACTION = {
'id': '1234',
'name': 'std.echo',
'is_system': True,
'definition': SYSTEM_ACTION_DEFINITION
}
ACTION_DB = models.ActionDefinition()
ACTION_DB.update(ACTION)
ACTION_DEF = models.ActionDefinition()
ACTION_DEF.update(ACTION_DEF_VALUES)
SYSTEM_ACTION_DB = models.ActionDefinition()
SYSTEM_ACTION_DB.update(SYSTEM_ACTION)
ACTION_DESC = adhoc.AdHocActionDescriptor(ACTION_DEF)
PROJECT_ID_ACTION_DB = ACTION_DB.get_clone()
PROJECT_ID_ACTION_DB.project_id = '<default-project>'
UPDATED_ACTION_DEFINITION = """
UPDATED_ADHOC_ACTION_YAML = """
---
version: '2.0'
@ -111,182 +106,178 @@ my_action:
output: "{$.str1}{$.str2}{$.str3}"
"""
UPDATED_ACTION_DB = copy.copy(ACTION_DB)
UPDATED_ACTION_DB['definition'] = UPDATED_ACTION_DEFINITION
UPDATED_ACTION = copy.deepcopy(ACTION)
UPDATED_ACTION['definition'] = UPDATED_ACTION_DEFINITION
UPDATED_ACTION_DEF = copy.copy(ACTION_DEF)
UPDATED_ACTION_DEF['definition'] = UPDATED_ADHOC_ACTION_YAML
UPDATED_ACTION = copy.deepcopy(ACTION_DEF_VALUES)
UPDATED_ACTION['definition'] = UPDATED_ADHOC_ACTION_YAML
MOCK_ACTION = mock.MagicMock(return_value=ACTION_DB)
MOCK_SYSTEM_ACTION = mock.MagicMock(return_value=SYSTEM_ACTION_DB)
MOCK_ACTIONS = mock.MagicMock(return_value=[ACTION_DB])
MOCK_UPDATED_ACTION = mock.MagicMock(return_value=UPDATED_ACTION_DB)
MOCK_DELETE = mock.MagicMock(return_value=None)
MOCK_EMPTY = mock.MagicMock(return_value=[])
MOCK_NOT_FOUND = mock.MagicMock(side_effect=exc.DBEntityNotFoundError())
MOCK_DUPLICATE = mock.MagicMock(side_effect=exc.DBDuplicateEntryError())
MOCK_ACTIONS = mock.MagicMock(return_value=[ACTION_DEF])
MOCK_UPDATED_ACTION = mock.MagicMock(return_value=UPDATED_ACTION_DEF)
class TestActionsController(base.APITest):
@mock.patch.object(
db_api, "get_action_definition", MOCK_ACTION)
def check_adhoc_action_json(self, action_json):
self.assertIsNotNone(action_json)
self.assertIsInstance(action_json, dict)
action_name = action_json['name']
action_def = db_api.get_action_definition(action_name)
self.assertIsNotNone(
action_def,
'Ad-hoc action definition does not exist [name=%s]' % action_name
)
# Compare action JSON with the state of the corresponding
# persistent object.
for k, v in action_json.items():
self.assertEqual(v, utils.datetime_to_str(getattr(action_def, k)))
def test_get(self):
# Create an adhoc action for the purpose of the test.
adhoc_actions.create_actions(ADHOC_ACTION_YAML)
resp = self.app.get('/v2/actions/my_action')
self.assertEqual(200, resp.status_int)
self.assertDictEqual(ACTION, resp.json)
@mock.patch.object(db_api, 'get_action_definition')
self.check_adhoc_action_json(resp.json)
@mock.patch.object(db_api, 'load_action_definition')
def test_get_operational_error(self, mocked_get):
# Create an adhoc action for the purpose of the test.
adhoc_actions.create_actions(ADHOC_ACTION_YAML)
action_def = db_api.get_action_definition('my_action')
mocked_get.side_effect = [
# Emulating DB OperationalError
sa.exc.OperationalError('Mock', 'mock', 'mock'),
ACTION_DB # Successful run
action_def # Successful run
]
resp = self.app.get('/v2/actions/my_action')
self.assertEqual(200, resp.status_int)
self.assertDictEqual(ACTION, resp.json)
@mock.patch.object(
db_api, "get_action_definition", MOCK_NOT_FOUND)
self.check_adhoc_action_json(resp.json)
def test_get_not_found(self):
# This time we don't create an action in DB upfront.
resp = self.app.get('/v2/actions/my_action', expect_errors=True)
self.assertEqual(404, resp.status_int)
@mock.patch.object(db_api, "update_action_definition", MOCK_UPDATED_ACTION)
@mock.patch.object(
db_api, "get_action_definition", MOCK_ACTION)
def test_get_by_id(self):
url = '/v2/actions/{0}'.format(ACTION['id'])
resp = self.app.get(url)
# NOTE(rakhmerov): We can't support this case anymore because
# action descriptors can now be identified only by names and
# namespaces.
pass
self.assertEqual(200, resp.status_int)
self.assertEqual(ACTION['id'], resp.json['id'])
def test_get_within_project_id(self):
# Create an adhoc action for the purpose of the test.
adhoc_actions.create_actions(ADHOC_ACTION_YAML)
@mock.patch.object(
db_api, "get_action_definition", MOCK_NOT_FOUND)
def test_get_by_id_not_found(self):
url = '/v2/actions/1234'
resp = self.app.get(url, expect_errors=True)
self.assertEqual(404, resp.status_int)
@mock.patch.object(
db_api, "get_action_definition", return_value=PROJECT_ID_ACTION_DB)
def test_get_within_project_id(self, mock_get):
url = '/v2/actions/1234'
resp = self.app.get(url, expect_errors=True)
self.assertEqual(200, resp.status_int)
self.assertTrue('project_id' in resp.json)
@mock.patch.object(
db_api, "get_action_definition", MOCK_ACTION)
@mock.patch.object(
db_api, "update_action_definition", MOCK_UPDATED_ACTION
# We should not be able to change 'project_id' even with a
# direct DB call.
db_api.update_action_definition(
'my_action',
{'project_id': 'foobar'}
)
resp = self.app.get('/v2/actions/my_action')
self.assertEqual(200, resp.status_int)
self.assertEqual('<default-project>', resp.json['project_id'])
def test_put(self):
# Create an adhoc action for the purpose of the test.
adhoc_actions.create_actions(ADHOC_ACTION_YAML)
resp = self.app.put(
'/v2/actions',
UPDATED_ACTION_DEFINITION,
UPDATED_ADHOC_ACTION_YAML,
headers={'Content-Type': 'text/plain'}
)
self.assertEqual(200, resp.status_int)
self.assertEqual({"actions": [UPDATED_ACTION]}, resp.json)
self.check_adhoc_action_json(resp.json['actions'][0])
@mock.patch.object(db_api, "load_action_definition", MOCK_ACTION)
@mock.patch.object(db_api, "update_action_definition")
def test_put_public(self, mock_mtd):
mock_mtd.return_value = UPDATED_ACTION_DB
def test_put_public(self):
# Create an adhoc action for the purpose of the test.
adhoc_actions.create_actions(ADHOC_ACTION_YAML)
resp = self.app.put(
'/v2/actions?scope=public',
UPDATED_ACTION_DEFINITION,
UPDATED_ADHOC_ACTION_YAML,
headers={'Content-Type': 'text/plain'}
)
action_json = resp.json['actions'][0]
self.assertEqual(200, resp.status_int)
self.assertEqual('public', action_json['scope'])
self.assertEqual({"actions": [UPDATED_ACTION]}, resp.json)
self.check_adhoc_action_json(action_json)
self.assertEqual("public", mock_mtd.call_args[0][1]['scope'])
@mock.patch.object(db_api, "update_action_definition", MOCK_NOT_FOUND)
def test_put_not_found(self):
resp = self.app.put(
'/v2/actions',
UPDATED_ACTION_DEFINITION,
UPDATED_ADHOC_ACTION_YAML,
headers={'Content-Type': 'text/plain'},
expect_errors=True
)
self.assertEqual(404, resp.status_int)
@mock.patch.object(
db_api, "get_action_definition", MOCK_SYSTEM_ACTION)
def test_put_system(self):
# Create an adhoc action for the purpose of the test.
adhoc_actions.create_actions(ADHOC_ACTION_YAML)
db_api.update_action_definition('my_action', {'is_system': True})
resp = self.app.put(
'/v2/actions',
SYSTEM_ACTION_DEFINITION,
ADHOC_ACTION_YAML,
headers={'Content-Type': 'text/plain'},
expect_errors=True
)
self.assertEqual(400, resp.status_int)
self.assertIn(
'Attempt to modify a system action: std.echo',
'Attempt to modify a system action: my_action',
resp.body.decode()
)
@mock.patch.object(db_api, "create_action_definition")
def test_post(self, mock_mtd):
mock_mtd.return_value = ACTION_DB
def test_post(self):
resp = self.app.post(
'/v2/actions',
ACTION_DEFINITION,
ADHOC_ACTION_YAML,
headers={'Content-Type': 'text/plain'}
)
self.assertEqual(201, resp.status_int)
self.assertEqual({"actions": [ACTION]}, resp.json)
self.assertEqual(1, mock_mtd.call_count)
values = mock_mtd.call_args[0][0]
self.assertEqual('My super cool action.', values['description'])
spec = values['spec']
self.assertIsNotNone(spec)
self.assertEqual(ACTION_DB.name, spec['name'])
@mock.patch.object(db_api, "create_action_definition")
def test_post_public(self, mock_mtd):
mock_mtd.return_value = ACTION_DB
self.check_adhoc_action_json(resp.json['actions'][0])
def test_post_public(self):
resp = self.app.post(
'/v2/actions?scope=public',
ACTION_DEFINITION,
ADHOC_ACTION_YAML,
headers={'Content-Type': 'text/plain'}
)
self.assertEqual(201, resp.status_int)
self.assertEqual({"actions": [ACTION]}, resp.json)
self.assertEqual("public", mock_mtd.call_args[0][0]['scope'])
@mock.patch.object(db_api, "create_action_definition")
def test_post_wrong_scope(self, mock_mtd):
mock_mtd.return_value = ACTION_DB
self.check_adhoc_action_json(resp.json['actions'][0])
self.assertEqual('public', resp.json['actions'][0]['scope'])
def test_post_wrong_scope(self):
resp = self.app.post(
'/v2/actions?scope=unique',
ACTION_DEFINITION,
ADHOC_ACTION_YAML,
headers={'Content-Type': 'text/plain'},
expect_errors=True
)
@ -294,67 +285,111 @@ class TestActionsController(base.APITest):
self.assertEqual(400, resp.status_int)
self.assertIn("Scope must be one of the following", resp.body.decode())
@mock.patch.object(db_api, "create_action_definition", MOCK_DUPLICATE)
def test_post_dup(self):
resp = self.app.post(
'/v2/actions',
ACTION_DEFINITION,
ADHOC_ACTION_YAML,
headers={'Content-Type': 'text/plain'},
expect_errors=True
)
self.assertEqual(201, resp.status_int)
# Try to create it again.
resp = self.app.post(
'/v2/actions',
ADHOC_ACTION_YAML,
headers={'Content-Type': 'text/plain'},
expect_errors=True
)
self.assertEqual(409, resp.status_int)
@mock.patch.object(
db_api, "get_action_definition", MOCK_ACTION)
@mock.patch.object(db_api, "delete_action_definition", MOCK_DELETE)
def test_delete(self):
# Create an adhoc action for the purpose of the test.
adhoc_actions.create_actions(ADHOC_ACTION_YAML)
self.assertIsNotNone(db_api.load_action_definition('my_action'))
resp = self.app.delete('/v2/actions/my_action')
self.assertEqual(204, resp.status_int)
@mock.patch.object(db_api, "delete_action_definition", MOCK_NOT_FOUND)
self.assertIsNone(db_api.load_action_definition('my_action'))
def test_delete_not_found(self):
resp = self.app.delete('/v2/actions/my_action', expect_errors=True)
self.assertEqual(404, resp.status_int)
@mock.patch.object(
db_api, "get_action_definition", MOCK_SYSTEM_ACTION)
def test_delete_system(self):
resp = self.app.delete('/v2/actions/std.echo', expect_errors=True)
self.assertEqual(400, resp.status_int)
self.assertIn('Attempt to delete a system action: std.echo',
resp.json['faultstring'])
@mock.patch.object(
db_api, "get_action_definitions", MOCK_ACTIONS)
def test_get_all(self):
# Create an adhoc action for the purpose of the test.
adhoc_actions.create_actions(ADHOC_ACTION_YAML)
resp = self.app.get('/v2/actions')
self.assertEqual(200, resp.status_int)
self.assertEqual(1, len(resp.json['actions']))
self.assertDictEqual(ACTION, resp.json['actions'][0])
actions_json = resp.json['actions']
# There will be 'std.' actions and the one we've just created.
self.assertGreater(len(actions_json), 1)
# Let's check some of the well-known 'std.' actions.
self._assert_single_item(actions_json, name='std.echo')
self._assert_single_item(actions_json, name='std.ssh')
self._assert_single_item(actions_json, name='std.fail')
self._assert_single_item(actions_json, name='std.noop')
self._assert_single_item(actions_json, name='std.async_noop')
# Now let's check the ad-hoc action data.
adhoc_action_json = self._assert_single_item(
actions_json,
name='my_action'
)
self.check_adhoc_action_json(adhoc_action_json)
@mock.patch.object(db_api, 'get_action_definitions')
def test_get_all_operational_error(self, mocked_get_all):
# Create an adhoc action for the purpose of the test.
adhoc_actions.create_actions(ADHOC_ACTION_YAML)
action_def = db_api.get_action_definition('my_action')
mocked_get_all.side_effect = [
# Emulating DB OperationalError
sa.exc.OperationalError('Mock', 'mock', 'mock'),
[ACTION_DB] # Successful run
[action_def] # Successful run
]
resp = self.app.get('/v2/actions')
self.assertEqual(200, resp.status_int)
actions_json = resp.json['actions']
self.assertEqual(1, len(resp.json['actions']))
self.assertDictEqual(ACTION, resp.json['actions'][0])
# There will be 'std.' actions and the one we've just created.
self.assertGreater(len(actions_json), 1)
# Let's check some of the well-known 'std.' actions.
self._assert_single_item(actions_json, name='std.echo')
self._assert_single_item(actions_json, name='std.ssh')
self._assert_single_item(actions_json, name='std.fail')
self._assert_single_item(actions_json, name='std.noop')
self._assert_single_item(actions_json, name='std.async_noop')
# Now let's check the ad-hoc action data.
adhoc_action_json = self._assert_single_item(
actions_json,
name='my_action'
)
self.check_adhoc_action_json(adhoc_action_json)
@mock.patch.object(
db_api, "get_action_definitions", MOCK_EMPTY)
composite.CompositeActionProvider,
'find_all',
mock.MagicMock(return_value=[])
)
def test_get_all_empty(self):
resp = self.app.get('/v2/actions')
@ -362,24 +397,43 @@ class TestActionsController(base.APITest):
self.assertEqual(0, len(resp.json['actions']))
@mock.patch.object(
db_api, "get_action_definitions", MOCK_ACTIONS)
def test_get_all_filtered(self):
# First check that w/o filters the result set
# will contain more than 1 item.
resp = self.app.get('/v2/actions')
self.assertEqual(200, resp.status_int)
self.assertGreater(len(resp.json['actions']), 1)
# Now we'll filter it by action name.
resp = self.app.get('/v2/actions?name=std.echo')
self.assertEqual(200, resp.status_int)
self.assertEqual(1, len(resp.json['actions']))
def test_get_all_pagination(self):
resp = self.app.get(
'/v2/actions?limit=1&sort_keys=id,name')
# Create an adhoc action for the purpose of the test.
adhoc_actions.create_actions(ADHOC_ACTION_YAML)
resp = self.app.get('/v2/actions?limit=1&sort_keys=id,name')
self.assertEqual(200, resp.status_int)
self.assertIn('next', resp.json)
self.assertEqual(1, len(resp.json['actions']))
self.assertDictEqual(ACTION, resp.json['actions'][0])
self.check_adhoc_action_json(resp.json['actions'][0])
param_dict = utils.get_dict_from_string(
resp.json['next'].split('?')[1],
delimiter='&'
)
action_def = db_api.get_action_definition('my_action')
# TODO(rakhmerov): In this case we can't use IDs for marker because
# in general we don't identify action descriptors with IDs.
expected_dict = {
'marker': '123e4567-e89b-12d3-a456-426655440000',
'marker': action_def.id,
'limit': 1,
'sort_keys': 'id,name',
'sort_dirs': 'asc,asc'
@ -435,7 +489,7 @@ class TestActionsController(base.APITest):
def test_validate(self):
resp = self.app.post(
'/v2/actions/validate',
ACTION_DEFINITION,
ADHOC_ACTION_YAML,
headers={'Content-Type': 'text/plain'}
)

View File

@ -877,7 +877,7 @@ class TestExecutionsController(base.APITest):
def test_get_all_pagination_unknown_direction(self):
resp = self.app.get(
'/v2/actions?limit=1&sort_keys=id&sort_dirs=nonexist',
'/v2/executions?limit=1&sort_keys=id&sort_dirs=nonexist',
expect_errors=True
)

View File

@ -30,11 +30,10 @@ from mistral.db.sqlalchemy import base as db_sa_base
from mistral.db.sqlalchemy import sqlite_lock
from mistral.db.v2 import api as db_api
from mistral.lang import parser as spec_parser
from mistral.services import action_manager
from mistral.services import actions as action_service
from mistral.services import security
from mistral.tests.unit import config as test_config
from mistral import version
from mistral_lib.utils import inspect_utils as i_utils
RESOURCES_PATH = 'tests/resources/'
LOG = logging.getLogger(__name__)
@ -68,15 +67,6 @@ def get_context(default=True, admin=False):
})
def register_action_class(name, cls, attributes=None, desc=None):
action_manager.register_action_class(
name,
'%s.%s' % (cls.__module__, cls.__name__),
attributes or {},
input_str=i_utils.get_arg_list_as_str(cls.__init__)
)
class FakeHTTPResponse(object):
def __init__(self, text, status_code, reason=None, headers=None,
history=None, encoding='utf-8', url='', cookies=None,
@ -100,11 +90,32 @@ class BaseTest(base.BaseTestCase):
def setUp(self):
super(BaseTest, self).setUp()
# By default, retain only built-in actions so that the unit tests
# don't see unexpected actions (i.e. provided by action generators
# installed by other projects).
self.override_config(
'only_builtin_actions',
True,
'legacy_action_provider'
)
self.override_config(
'load_action_generators',
False,
'legacy_action_provider'
)
self.addCleanup(spec_parser.clear_caches)
def register_action_class(self, name, cls, attributes=None, desc=None):
# Added for convenience (to avoid unnecessary imports).
register_action_class(name, cls, attributes, desc)
def _cleanup_actions():
action_service.get_test_action_provider().cleanup()
self.addCleanup(_cleanup_actions)
def register_action_class(self, name, cls):
action_service.get_test_action_provider().register_python_action(
name,
cls
)
def assertRaisesWithMessage(self, exception, msg, func, *args, **kwargs):
try:
@ -269,11 +280,7 @@ class DbTestCase(BaseTest):
db_api.setup_db()
action_manager.sync_db()
def _clean_db(self):
db_api._ACTION_DEF_CACHE.clear()
contexts = [
get_context(default=False),
get_context(default=True)
@ -292,6 +299,7 @@ class DbTestCase(BaseTest):
db_api.delete_action_executions()
db_api.delete_workbooks()
db_api.delete_workflow_definitions()
db_api.delete_action_definitions()
db_api.delete_environments()
db_api.delete_resource_members()
db_api.delete_delayed_calls()
@ -308,6 +316,7 @@ class DbTestCase(BaseTest):
self.__heavy_init()
self.ctx = get_context()
auth_context.set_ctx(self.ctx)
self.addCleanup(auth_context.set_ctx, None)

View File

@ -28,10 +28,10 @@ from mistral.executors import executor_server
from mistral.notifiers import notification_server as notif_server
from mistral.rpc import base as rpc_base
from mistral.rpc import clients as rpc_clients
from mistral.services import actions as action_service
from mistral.tests.unit import base
from mistral.workflow import states
LOG = logging.getLogger(__name__)
# Default delay and timeout in seconds for await_xxx() functions.
@ -51,6 +51,12 @@ class EngineTestCase(base.DbTestCase):
def setUp(self):
super(EngineTestCase, self).setUp()
# We assume that most tests don't need a remote executor.
# But this option can be overridden on a test level, if needed,
# because an executor instance (local or a client to a remote one)
# is obtained by engine dynamically on every need.
self.override_config('type', 'local', 'executor')
# Get transport here to let oslo.messaging setup default config
# before changing the rpc_backend to the fake driver; otherwise,
# oslo.messaging will throw exception.
@ -75,8 +81,12 @@ class EngineTestCase(base.DbTestCase):
exe_svc = executor_server.get_oslo_service(setup_profiler=False)
self.executor = exe_svc.executor
self.threads.append(eventlet.spawn(launch_service, exe_svc))
self.addCleanup(exe_svc.stop, True)
else:
self.executor = exe.get_executor(cfg.CONF.executor.type)
# Start remote notifier.
if cfg.CONF.notifier.type == 'remote':
@ -104,6 +114,10 @@ class EngineTestCase(base.DbTestCase):
self.addOnException(self.print_executions)
self.addCleanup(self.kill_threads)
# This call ensures that all plugged in action providers are
# properly initialized.
action_service.get_system_action_provider()
# Make sure that both services fully started, otherwise
# the test may run too early.
if cfg.CONF.executor.type == 'remote':

View File

@ -1,123 +0,0 @@
# Copyright 2017 - Nokia Networks.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from unittest import mock
import cachetools
from oslo_config import cfg
from mistral.db.v2 import api as db_api
from mistral.services import actions as action_service
from mistral.services import workflows as wf_service
from mistral.tests.unit.engine import base
# Use the set_default method to set value otherwise in certain test cases
# the change in value is not permanent.
cfg.CONF.set_default('auth_enable', False, group='pecan')
class LookupUtilsTest(base.EngineTestCase):
ACTION = """---
version: '2.0'
action1:
base: std.echo output='Hi'
output:
result: $
"""
WF_TEXT = """---
version: '2.0'
wf:
tasks:
task1:
action: action1
on-success: join_task
task2:
action: action1
on-success: join_task
join_task:
join: all
on-success: task4
task4:
action: action1
pause-before: true
"""
def test_action_definition_cache_ttl(self):
namespace = 'test_namespace'
wf_service.create_workflows(self.WF_TEXT, namespace=namespace)
# Create an action.
db_actions = action_service.create_actions(self.ACTION,
namespace=namespace)
self.assertEqual(1, len(db_actions))
self._assert_single_item(db_actions,
name='action1',
namespace=namespace)
# Explicitly mark the action to be deleted after the test execution.
self.addCleanup(db_api.delete_action_definitions,
name='action1',
namespace=namespace)
# Reinitialise the cache with reduced action_definition_cache_time
# to make sure the test environment is under control.
new_cache = cachetools.TTLCache(
maxsize=1000,
ttl=50 # 50 seconds
)
cache_patch = mock.patch.object(
db_api, '_ACTION_DEF_CACHE', new_cache)
cache_patch.start()
self.addCleanup(cache_patch.stop)
# Start workflow.
wf_ex = self.engine.start_workflow('wf', wf_namespace=namespace)
self.await_workflow_paused(wf_ex.id)
# Check that 'action1' 'echo' and 'noop' are cached.
self.assertEqual(5, len(db_api._ACTION_DEF_CACHE))
self.assertIn('action1:test_namespace', db_api._ACTION_DEF_CACHE)
self.assertIn('std.noop:test_namespace', db_api._ACTION_DEF_CACHE)
self.assertIn('std.echo:test_namespace', db_api._ACTION_DEF_CACHE)
self.assertIn('std.noop', db_api._ACTION_DEF_CACHE)
self.assertIn('std.echo', db_api._ACTION_DEF_CACHE)
# Simulate cache expiry
new_cache.clear()
# Wait some time until cache expires
self._await(
lambda: len(db_api._ACTION_DEF_CACHE) == 0,
fail_message="No triggers were found"
)
self.assertEqual(0, len(db_api._ACTION_DEF_CACHE))
self.engine.resume_workflow(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Check all actions are cached again.
self.assertEqual(3, len(db_api._ACTION_DEF_CACHE))
self.assertIn('action1:test_namespace', db_api._ACTION_DEF_CACHE)
self.assertIn('std.echo', db_api._ACTION_DEF_CACHE)
self.assertIn('std.echo:test_namespace', db_api._ACTION_DEF_CACHE)

View File

@ -14,7 +14,6 @@ from unittest import mock
from mistral.db.v2 import api as db_api
from mistral.services import workflows as wf_service
from mistral.tests.unit import base as test_base
from mistral.tests.unit.engine import base
from mistral_lib import actions as actions_base
@ -31,7 +30,6 @@ wf:
class MyAction(actions_base.Action):
def run(self, context):
pass
@ -40,7 +38,7 @@ class ActionContextTest(base.EngineTestCase):
def setUp(self):
super(ActionContextTest, self).setUp()
test_base.register_action_class('my_action', MyAction)
self.register_action_class('my_action', MyAction)
@mock.patch.object(MyAction, 'run', return_value=None)
def test_context(self, mocked_run):
@ -52,6 +50,7 @@ class ActionContextTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
self.assertEqual(1, len(mocked_run.call_args_list))
action_context = mocked_run.call_args[0][0]
exec_context = action_context.execution

View File

@ -36,6 +36,8 @@ class ActionHeartbeatCheckerTest(base.EngineTestCase):
super(ActionHeartbeatCheckerTest, self).setUp()
self.override_config('type', 'remote', 'executor')
# Make sure actions are not sent to an executor.
@mock.patch.object(
rpc_clients.ExecutorClient,

View File

@ -15,6 +15,8 @@
from oslo_config import cfg
from mistral_lib import actions as ml_actions
from mistral.db.v2 import api as db_api
from mistral import exceptions as exc
from mistral.services import workbooks as wb_service
@ -289,6 +291,7 @@ class AdhocActionsTest(base.EngineTestCase):
- my_param
base: std.async_noop
output: (((<% $ %>)))
workflows:
my_wf:
@ -301,7 +304,23 @@ class AdhocActionsTest(base.EngineTestCase):
wf_ex = self.engine.start_workflow('my_wb1.my_wf')
self.await_workflow_running(wf_ex.id)
self.await_workflow_running(wf_ex.id, timeout=4)
with db_api.transaction(read_only=True):
wf_ex = db_api.get_workflow_execution(wf_ex.id)
wf_ex_id = wf_ex.id
a_ex_id = wf_ex.task_executions[0].action_executions[0].id
self.engine.on_action_complete(a_ex_id, ml_actions.Result(data='Hi!'))
self.await_action_success(a_ex_id)
self.await_workflow_success(wf_ex_id)
with db_api.transaction(read_only=True):
a_ex = db_api.get_action_execution(a_ex_id)
self.assertEqual('(((Hi!)))', a_ex.output.get('result'))
def test_adhoc_action_definition_with_namespace(self):
namespace1 = 'ad-hoc_test'
@ -375,22 +394,16 @@ class AdhocActionsTest(base.EngineTestCase):
wf_namespace=namespace
)
self.await_workflow_success(wf_ex.id)
self.await_workflow_success(wf_ex.id, timeout=5)
with db_api.transaction():
action_execs = db_api.get_action_executions(
name='std.echo',
name='my_wb.test_env',
workflow_namespace=namespace
)
self.assertEqual(1, len(action_execs))
context = action_execs[0].runtime_context
self.assertEqual(
'my_wb.test_env',
context.get('adhoc_action_name')
)
self.assertEqual(namespace, action_execs[0].workflow_namespace)
def test_adhoc_action_runtime_context_name(self):
@ -403,12 +416,6 @@ class AdhocActionsTest(base.EngineTestCase):
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
action_execs = db_api.get_action_executions(name='std.echo')
action_execs = db_api.get_action_executions(name='my_wb.test_env')
self.assertEqual(1, len(action_execs))
action_name = action_execs[0].runtime_context.get(
'adhoc_action_name'
)
self.assertEqual('my_wb.test_env', action_name)

View File

@ -338,6 +338,7 @@ class DefaultEngineTest(base.DbTestCase):
# Start workflow.
wf_service.create_workflows(workflow)
wf_ex = self.engine.start_workflow('wf_async')
self.assertIsNotNone(wf_ex)

View File

@ -1,5 +1,6 @@
# Copyright 2014 - Mirantis, Inc.
# Copyright 2015 - StackStorm, Inc.
# Copyright 2020 Nokia Software.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -283,7 +284,7 @@ class DirectWorkflowEngineTest(base.EngineTestCase):
wf_ex = self.engine.start_workflow('wf')
self.assertIn(
"Failed to find action [action_name=wrong.task]",
"Failed to find action [action_name=wrong.task, namespace=]",
wf_ex.state_info
)
self.assertEqual(states.ERROR, wf_ex.state)

View File

@ -15,6 +15,7 @@
from unittest import mock
from mistral.actions import std_actions
from mistral.db.v2 import api as db_api
from mistral.engine import engine_server
from mistral import exceptions as exc
@ -237,7 +238,5 @@ class DisabledYAQLConversionTest(engine_test_base.EngineTestCase):
# action parameters.
args = mocked.call_args[0]
self.assertEqual(action_ex.id, args[0])
self.assertEqual('mistral.actions.std_actions.EchoAction', args[1])
self.assertDictEqual({}, args[2])
self.assertDictEqual(action_ex.input, args[3])
self.assertIsInstance(args[0], std_actions.EchoAction)
self.assertEqual(action_ex.id, args[1])

View File

@ -16,6 +16,7 @@ from unittest import mock
from oslo_config import cfg
from mistral.actions import std_actions
from mistral.db.v2 import api as db_api
from mistral.executors import default_executor as d_exe
from mistral.executors import remote_executor as r_exe
@ -79,19 +80,16 @@ workflows:
"""
def _run_at_target(action_ex_id, action_cls_str, action_cls_attrs,
params, safe_rerun, execution_context, target=None,
def _run_at_target(action, action_ex_id, safe_rerun, exec_ctx, target=None,
async_=True, timeout=None):
# We'll just call executor directly for testing purposes.
executor = d_exe.DefaultExecutor()
executor.run_action(
action,
action_ex_id,
action_cls_str,
action_cls_attrs,
params,
safe_rerun,
execution_context=execution_context,
exec_ctx=exec_ctx,
target=target,
async_=async_,
timeout=timeout
@ -109,6 +107,8 @@ class EnvironmentTest(base.EngineTestCase):
@mock.patch.object(r_exe.RemoteExecutor, 'run_action', MOCK_RUN_AT_TARGET)
def _test_subworkflow(self, env):
self.override_config('type', 'remote', 'executor')
wf2_ex = self.engine.start_workflow('my_wb.wf2', env=env)
# Execution of 'wf2'.
@ -172,10 +172,8 @@ class EnvironmentTest(base.EngineTestCase):
callback_url = '/v2/action_executions/%s' % a_ex.id
r_exe.RemoteExecutor.run_action.assert_any_call(
std_actions.EchoAction(**a_ex.input),
a_ex.id,
'mistral.actions.std_actions.EchoAction',
{},
a_ex.input,
False,
{
'task_execution_id': t_ex.id,

View File

@ -600,7 +600,7 @@ class ErrorHandlingEngineTest(base.EngineTestCase):
self.assertIsNotNone(state_info)
self.assertGreater(state_info.find('error='), 0)
self.assertLess(state_info.find('error='), state_info.find('action='))
self.assertGreater(state_info.find('action='), 0)
def test_publish_bad_yaql(self):
wf_text = """---
@ -848,8 +848,8 @@ class ErrorHandlingEngineTest(base.EngineTestCase):
task_ex = wf_ex.task_executions[0]
expected = (
"The action raised an exception [action_ex_id=%s, "
"msg='Fail action expected exception.'"
"The action raised an exception [action=Fail action, "
"action_ex_id=%s, msg='Fail action expected exception.'"
) % task_ex.action_executions[0].id
# Making sure that the actual error message goes before

View File

@ -16,7 +16,6 @@ from oslo_config import cfg
from mistral.db.v2 import api as db_api
from mistral.services import workflows as wf_service
from mistral.tests.unit import base as test_base
from mistral.tests.unit.engine import base
from mistral.workflow import data_flow
from mistral.workflow import states
@ -81,8 +80,8 @@ class ErrorResultTest(base.EngineTestCase):
def setUp(self):
super(ErrorResultTest, self).setUp()
test_base.register_action_class('my_action', MyAction)
test_base.register_action_class('my_async_action', MyAsyncAction)
self.register_action_class('my_action', MyAction)
self.register_action_class('my_async_action', MyAsyncAction)
def test_error_result1(self):
wf_service.create_workflows(WF.format(action_name="my_action"))

View File

@ -19,7 +19,6 @@ from mistral_lib import actions as actions_base
from mistral.db.v2 import api as db_api
from mistral import exceptions as exc
from mistral.services import workflows as wf_service
from mistral.tests.unit import base as test_base
from mistral.tests.unit.engine import base
from mistral.workflow import states
@ -97,7 +96,7 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase):
group='engine'
)
test_base.register_action_class('my_action', MyAction)
self.register_action_class('my_action', MyAction)
def tearDown(self):
"""Restores the size limit config to default"""

View File

@ -18,7 +18,6 @@ import testtools
from mistral.db.v2 import api as db_api
from mistral.lang.v2 import tasks as tasks_lang
from mistral.services import workflows as wf_service
from mistral.tests.unit import base as test_base
from mistral.tests.unit.engine import base
from mistral.workflow import states
from mistral_lib import actions as actions_base
@ -35,14 +34,12 @@ class ActionWithExceptionInInit(actions_base.Action):
super(ActionWithExceptionInInit, self).__init__()
if aaa != "bbb":
raise Exception("Aaa doesn't equal bbb")
raise Exception("Aaa doesn't equal to bbb")
self.aaa = aaa
def run(self, context):
return actions_base.Result(
data=self.aaa
)
return actions_base.Result(data=self.aaa)
def test(self):
raise NotImplementedError
@ -1370,7 +1367,7 @@ class JoinEngineTest(base.EngineTestCase):
)
def test_join_task_with_input_error(self):
test_base.register_action_class(
self.register_action_class(
'my_action',
ActionWithExceptionInInit
)
@ -1397,7 +1394,7 @@ class JoinEngineTest(base.EngineTestCase):
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_error(wf_ex.id)
self.await_workflow_error(wf_ex.id, timeout=5)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
@ -1406,5 +1403,4 @@ class JoinEngineTest(base.EngineTestCase):
self._assert_single_item(t_execs, name='task1', state=states.SUCCESS)
self._assert_single_item(t_execs, name='task2', state=states.SUCCESS)
self._assert_single_item(t_execs, name='join_task',
state=states.ERROR)
self._assert_single_item(t_execs, name='join_task', state=states.ERROR)

View File

@ -19,7 +19,6 @@ import testtools
from mistral.db.v2 import api as db_api
from mistral.services import workflows as wf_service
from mistral.tests.unit import base as test_base
from mistral.tests.unit.engine import base
from mistral.workflow import states
from mistral_lib import actions as actions_base
@ -122,7 +121,7 @@ class EngineActionRaceConditionTest(base.EngineTestCase):
ACTION_SEMAPHORE = semaphore.Semaphore(1)
TEST_SEMAPHORE = semaphore.Semaphore(0)
test_base.register_action_class('test.block', BlockingAction)
self.register_action_class('test.block', BlockingAction)
@staticmethod
def block_action():

View File

@ -19,12 +19,13 @@ from oslo_config import cfg
from mistral.actions import std_actions
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models
from mistral import exceptions as exc
from mistral.services import actions
from mistral.services import adhoc_actions
from mistral.tests.unit.engine import base
from mistral.workflow import states
from mistral_lib import actions as ml_actions
from mistral_lib import exceptions as ml_exc
# Use the set_default method to set value otherwise in certain test cases
# the change in value is not permanent.
@ -32,9 +33,8 @@ cfg.CONF.set_default('auth_enable', False, group='pecan')
class RunActionEngineTest(base.EngineTestCase):
@classmethod
def heavy_init(cls):
super(RunActionEngineTest, cls).heavy_init()
def setUp(self):
super(RunActionEngineTest, self).setUp()
action = """---
version: '2.0'
@ -93,7 +93,8 @@ class RunActionEngineTest(base.EngineTestCase):
input:
- output
"""
actions.create_actions(action)
adhoc_actions.create_actions(action)
def test_run_action_sync(self):
# Start action and see the result.
@ -127,7 +128,7 @@ class RunActionEngineTest(base.EngineTestCase):
- right
"""
actions.create_actions(action_text, namespace=namespace)
adhoc_actions.create_actions(action_text, namespace=namespace)
self.assertRaises(
exc.InvalidActionException,
@ -193,6 +194,7 @@ class RunActionEngineTest(base.EngineTestCase):
save_result=True,
namespace='namespace'
)
self.assertIsNotNone(action_ex)
@mock.patch.object(
@ -371,7 +373,7 @@ class RunActionEngineTest(base.EngineTestCase):
def test_run_action_wrong_input(self):
# Start action and see the result.
exception = self.assertRaises(
exc.InputException,
ml_exc.ActionException,
self.engine.start_action,
'std.http',
{'url': 'Hello, ', 'metod': 'John Doe!'}
@ -382,7 +384,7 @@ class RunActionEngineTest(base.EngineTestCase):
def test_adhoc_action_wrong_input(self):
# Start action and see the result.
exception = self.assertRaises(
exc.InputException,
ml_exc.ActionException,
self.engine.start_action,
'concat',
{'left': 'Hello, ', 'ri': 'John Doe!'}
@ -390,45 +392,21 @@ class RunActionEngineTest(base.EngineTestCase):
self.assertIn('concat', str(exception))
# TODO(rakhmerov): This is an example of a bad test. It pins to
# implementation details too much and prevents from making refactoring
# easily. When writing tests we should make assertions about
# consequences, not about how internal machinery works, i.e. we need to
# follow "black box" testing paradigm.
@mock.patch('mistral.engine.actions.resolve_action_definition')
@mock.patch('mistral.engine.utils.validate_input')
@mock.patch('mistral.services.action_manager.get_action_class')
@mock.patch('mistral.engine.actions.PythonAction.run')
def test_run_action_with_kwargs_input(self, run_mock, class_mock,
validate_mock, def_mock):
action_def = models.ActionDefinition()
action_def.update({
'name': 'fake_action',
'action_class': '',
'attributes': {},
'description': '',
'input': '**kwargs',
'is_system': True,
'scope': 'public'
})
def_mock.return_value = action_def
run_mock.return_value = ml_actions.Result(data='Hello')
def test_run_action_with_kwargs_input(self):
class FakeAction(ml_actions.Action):
def __init__(self, **kwargs):
super(FakeAction, self).__init__()
class_ret = mock.MagicMock()
class_mock.return_value = class_ret
self.kwargs = kwargs
self.engine.start_action('fake_action', {'input': 'Hello'})
def run(self, context):
return ml_actions.Result(data=self.kwargs)
self.assertEqual(1, def_mock.call_count)
def_mock.assert_called_with('fake_action', namespace='')
self.register_action_class('fake_action', FakeAction)
self.assertEqual(0, validate_mock.call_count)
class_ret.assert_called_once_with(input='Hello')
run_mock.assert_called_once_with(
{'input': 'Hello'},
None,
save=False,
timeout=None
action_ex = self.engine.start_action(
'fake_action',
{'param1': 'Hello'}
)
self.assertDictEqual({'param1': 'Hello'}, action_ex.input)

View File

@ -24,19 +24,16 @@ from mistral.workflow import data_flow
from mistral.workflow import states
def _run_at_target(action_ex_id, action_class_str, attributes,
action_params, safe_rerun, execution_context, target=None,
async_=True, timeout=None):
def _run_at_target(action, action_ex_id, safe_rerun, exec_ctx,
target=None, async_=True, timeout=None):
# We'll just call executor directly for testing purposes.
executor = d_exe.DefaultExecutor()
executor.run_action(
action,
action_ex_id,
action_class_str,
attributes,
action_params,
safe_rerun,
execution_context,
exec_ctx,
redelivered=True
)
@ -45,6 +42,10 @@ MOCK_RUN_AT_TARGET = mock.MagicMock(side_effect=_run_at_target)
class TestSafeRerun(base.EngineTestCase):
def setUp(self):
super(TestSafeRerun, self).setUp()
self.override_config('type', 'remote', 'executor')
@mock.patch.object(r_exe.RemoteExecutor, 'run_action', MOCK_RUN_AT_TARGET)
def test_safe_rerun_true(self):
@ -74,6 +75,7 @@ class TestSafeRerun(base.EngineTestCase):
# to true.
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_success(wf_ex.id)
@ -120,6 +122,7 @@ class TestSafeRerun(base.EngineTestCase):
# to true.
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_success(wf_ex.id)

View File

@ -23,7 +23,6 @@ from mistral.db.v2 import api as db_api
from mistral import exceptions as exc
from mistral.services import workbooks as wb_service
from mistral.services import workflows as wf_service
from mistral.tests.unit import base as test_base
from mistral.tests.unit.engine import base
from mistral.workflow import data_flow
from mistral.workflow import states
@ -553,7 +552,7 @@ class WithItemsEngineTest(base.EngineTestCase):
one_two_three: <% task(task1).result %>
"""
# Register random sleep action in the DB.
test_base.register_action_class('sleep_echo', RandomSleepEchoAction)
self.register_action_class('sleep_echo', RandomSleepEchoAction)
wb_service.create_workbook_v2(wb_text)

View File

@ -19,7 +19,6 @@ from oslo_log import log as logging
from mistral.actions import std_actions
from mistral.db.v2 import api as db_api
from mistral.executors import base as exe
from mistral.executors import remote_executor as r_exe
from mistral.services import workbooks as wb_svc
from mistral.tests.unit.executors import base
@ -40,17 +39,10 @@ cfg.CONF.set_default('auth_enable', False, group='pecan')
mock.MagicMock(return_value=None)
)
class LocalExecutorTest(base.ExecutorTestCase):
def setUp(self):
super(LocalExecutorTest, self).setUp()
@classmethod
def setUpClass(cls):
super(LocalExecutorTest, cls).setUpClass()
cfg.CONF.set_default('type', 'local', group='executor')
@classmethod
def tearDownClass(cls):
exe.cleanup()
cfg.CONF.set_default('type', 'remote', group='executor')
super(LocalExecutorTest, cls).tearDownClass()
self.override_config('type', 'local', 'executor')
@mock.patch.object(
std_actions.EchoAction,

View File

@ -17,12 +17,14 @@ from unittest import mock
from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models
from mistral.services import adhoc_actions
from mistral.tests.unit.api import base
from mistral.tests.unit.mstrlfixtures import policy_fixtures
MOCK_DELETE = mock.MagicMock(return_value=None)
ACTION_DEFINITION = """
ADHOC_ACTION_YAML = """
---
version: '2.0'
@ -33,15 +35,17 @@ my_action:
base-input:
output: "{$.str1}{$.str2}"
"""
ACTION_DB = models.ActionDefinition(
ADHOC_ACTION_DEF = models.ActionDefinition(
id='123e4567-e89b-12d3-a456-426655440000',
name='my_action',
is_system=False,
description='My super cool action.',
tags=['test', 'v2'],
definition=ACTION_DEFINITION
definition=ADHOC_ACTION_YAML
)
MOCK_ACTION = mock.MagicMock(return_value=ACTION_DB)
MOCK_ACTION = mock.MagicMock(return_value=ADHOC_ACTION_DEF)
class TestActionPolicy(base.APITest):
@ -57,17 +61,19 @@ class TestActionPolicy(base.APITest):
"""
def setUp(self):
self.policy = self.useFixture(policy_fixtures.PolicyFixture())
super(TestActionPolicy, self).setUp()
self.policy = self.useFixture(policy_fixtures.PolicyFixture())
@mock.patch.object(db_api, "create_action_definition")
def test_action_create_not_allowed(self, mock_obj):
self.policy.change_policy_definition(
{"actions:create": "role:FAKE"}
)
resp = self.app.post(
'/v2/actions',
ACTION_DEFINITION,
ADHOC_ACTION_YAML,
headers={'Content-Type': 'text/plain'},
expect_errors=True
)
@ -79,9 +85,10 @@ class TestActionPolicy(base.APITest):
self.policy.change_policy_definition(
{"actions:create": "role:FAKE or rule:admin_or_owner"}
)
resp = self.app.post(
'/v2/actions',
ACTION_DEFINITION,
ADHOC_ACTION_YAML,
headers={'Content-Type': 'text/plain'},
expect_errors=True
)
@ -94,9 +101,10 @@ class TestActionPolicy(base.APITest):
"actions:create": "role:FAKE or rule:admin_or_owner",
"actions:publicize": "role:FAKE"
})
resp = self.app.post(
'/v2/actions?scope=public',
ACTION_DEFINITION,
ADHOC_ACTION_YAML,
headers={'Content-Type': 'text/plain'},
expect_errors=True
)
@ -109,9 +117,10 @@ class TestActionPolicy(base.APITest):
"actions:create": "role:FAKE or rule:admin_or_owner",
"actions:publicize": "role:FAKE or rule:admin_or_owner"
})
resp = self.app.post(
'/v2/actions?scope=public',
ACTION_DEFINITION,
ADHOC_ACTION_YAML,
headers={'Content-Type': 'text/plain'},
expect_errors=True
)
@ -124,10 +133,8 @@ class TestActionPolicy(base.APITest):
self.policy.change_policy_definition(
{"actions:delete": "role:FAKE"}
)
resp = self.app.delete(
'/v2/actions/123',
expect_errors=True
)
resp = self.app.delete('/v2/actions/123', expect_errors=True)
self.assertEqual(403, resp.status_int)
@ -137,34 +144,29 @@ class TestActionPolicy(base.APITest):
self.policy.change_policy_definition(
{"actions:delete": "role:FAKE or rule:admin_or_owner"}
)
resp = self.app.delete(
'/v2/actions/123',
expect_errors=True
)
resp = self.app.delete('/v2/actions/123', expect_errors=True)
self.assertEqual(204, resp.status_int)
@mock.patch.object(db_api, "get_action_definition", MOCK_ACTION)
def test_action_get_not_allowed(self):
self.policy.change_policy_definition(
{"actions:get": "role:FAKE"}
)
resp = self.app.get(
'/v2/actions/123',
expect_errors=True
)
resp = self.app.get('/v2/actions/123', expect_errors=True)
self.assertEqual(403, resp.status_int)
@mock.patch.object(db_api, "get_action_definition", MOCK_ACTION)
def test_action_get_allowed(self):
# Create an adhoc action for the purpose of the test.
adhoc_actions.create_actions(ADHOC_ACTION_YAML)
self.policy.change_policy_definition(
{"actions:get": "role:FAKE or rule:admin_or_owner"}
)
resp = self.app.get(
'/v2/actions/123',
expect_errors=True
)
resp = self.app.get('/v2/actions/my_action')
self.assertEqual(200, resp.status_int)
@ -172,10 +174,8 @@ class TestActionPolicy(base.APITest):
self.policy.change_policy_definition(
{"actions:list": "role:FAKE"}
)
resp = self.app.get(
'/v2/actions',
expect_errors=True
)
resp = self.app.get('/v2/actions', expect_errors=True)
self.assertEqual(403, resp.status_int)
@ -183,10 +183,8 @@ class TestActionPolicy(base.APITest):
self.policy.change_policy_definition(
{"actions:list": "role:FAKE or rule:admin_or_owner"}
)
resp = self.app.get(
'/v2/actions',
expect_errors=True
)
resp = self.app.get('/v2/actions', expect_errors=True)
self.assertEqual(200, resp.status_int)
@ -195,9 +193,10 @@ class TestActionPolicy(base.APITest):
self.policy.change_policy_definition(
{"actions:update": "role:FAKE"}
)
resp = self.app.put(
'/v2/actions',
ACTION_DEFINITION,
ADHOC_ACTION_YAML,
headers={'Content-Type': 'text/plain'},
expect_errors=True
)
@ -209,9 +208,10 @@ class TestActionPolicy(base.APITest):
self.policy.change_policy_definition(
{"actions:update": "role:FAKE or rule:admin_or_owner"}
)
resp = self.app.put(
'/v2/actions',
ACTION_DEFINITION,
ADHOC_ACTION_YAML,
headers={'Content-Type': 'text/plain'},
expect_errors=True
)
@ -224,9 +224,10 @@ class TestActionPolicy(base.APITest):
"actions:update": "role:FAKE or rule:admin_or_owner",
"actions:publicize": "role:FAKE"
})
resp = self.app.put(
'/v2/actions?scope=public',
ACTION_DEFINITION,
ADHOC_ACTION_YAML,
headers={'Content-Type': 'text/plain'},
expect_errors=True
)
@ -239,9 +240,10 @@ class TestActionPolicy(base.APITest):
"actions:update": "role:FAKE or rule:admin_or_owner",
"actions:publicize": "role:FAKE or rule:admin_or_owner"
})
resp = self.app.put(
'/v2/actions?scope=public',
ACTION_DEFINITION,
ADHOC_ACTION_YAML,
headers={'Content-Type': 'text/plain'},
expect_errors=True
)

View File

@ -1,50 +0,0 @@
# Copyright 2014 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.db.v2 import api as db_api
from mistral.tests.unit import base
class ActionManagerTest(base.DbTestCase):
def test_action_input(self):
std_http = db_api.get_action_definition("std.http")
std_email = db_api.get_action_definition("std.email")
http_action_input = (
'url, method="GET", params=null, body=null, '
'json=null, headers=null, cookies=null, auth=null, '
'timeout=null, allow_redirects=null, '
'proxies=null, verify=null'
)
self.assertEqual(http_action_input, std_http.input)
std_email_input = (
"from_addr, to_addrs, smtp_server, reply_to=null, cc_addrs=null, "
"bcc_addrs=null, smtp_password=null, subject=null, body=null, "
"html_body=null"
)
self.assertEqual(std_email_input, std_email.input)
def test_action_description(self):
std_http = db_api.get_action_definition("std.http")
std_echo = db_api.get_action_definition("std.echo")
self.assertIn("HTTP action", std_http.description)
self.assertIn("param body: (optional) Dictionary, bytes",
std_http.description)
self.assertIn("This action just returns a configured value",
std_echo.description)

View File

@ -18,7 +18,7 @@ from oslo_config import cfg
from mistral.db.v2 import api as db_api
from mistral.exceptions import DBEntityNotFoundError
from mistral.lang import parser as spec_parser
from mistral.services import actions as action_service
from mistral.services import adhoc_actions as adhoc_action_service
from mistral.tests.unit import base
from mistral_lib import utils
@ -57,15 +57,15 @@ action1:
NAMESPACE = 'test_namespace'
class ActionServiceTest(base.DbTestCase):
class AdhocActionServiceTest(base.DbTestCase):
def setUp(self):
super(ActionServiceTest, self).setUp()
super(AdhocActionServiceTest, self).setUp()
self.addCleanup(db_api.delete_action_definitions, name='action1')
self.addCleanup(db_api.delete_action_definitions, name='action2')
def test_create_actions(self):
db_actions = action_service.create_actions(ACTION_LIST)
db_actions = adhoc_action_service.create_actions(ACTION_LIST)
self.assertEqual(2, len(db_actions))
@ -88,16 +88,24 @@ class ActionServiceTest(base.DbTestCase):
self.assertDictEqual({'output': 'Hey'}, action2_spec.get_base_input())
def test_create_actions_in_namespace(self):
db_actions = action_service.create_actions(ACTION_LIST,
namespace=NAMESPACE)
db_actions = adhoc_action_service.create_actions(
ACTION_LIST,
namespace=NAMESPACE
)
self.assertEqual(2, len(db_actions))
action1_db = self._assert_single_item(db_actions, name='action1')
self.assertEqual(NAMESPACE, action1_db.namespace)
self._assert_single_item(
db_actions,
name='action1',
namespace=NAMESPACE
)
action2_db = self._assert_single_item(db_actions, name='action2')
self.assertEqual(NAMESPACE, action2_db.namespace)
self._assert_single_item(
db_actions,
name='action2',
namespace=NAMESPACE
)
self.assertRaises(
DBEntityNotFoundError,
@ -107,8 +115,10 @@ class ActionServiceTest(base.DbTestCase):
)
def test_update_actions(self):
db_actions = action_service.create_actions(ACTION_LIST,
namespace=NAMESPACE)
db_actions = adhoc_action_service.create_actions(
ACTION_LIST,
namespace=NAMESPACE
)
self.assertEqual(2, len(db_actions))
@ -120,8 +130,10 @@ class ActionServiceTest(base.DbTestCase):
self.assertDictEqual({'output': 'Hi'}, action1_spec.get_base_input())
self.assertDictEqual({}, action1_spec.get_input())
db_actions = action_service.update_actions(UPDATED_ACTION_LIST,
namespace=NAMESPACE)
db_actions = adhoc_action_service.update_actions(
UPDATED_ACTION_LIST,
namespace=NAMESPACE
)
# Action 1.
action1_db = self._assert_single_item(db_actions, name='action1')
@ -139,7 +151,7 @@ class ActionServiceTest(base.DbTestCase):
self.assertRaises(
DBEntityNotFoundError,
action_service.update_actions,
adhoc_action_service.update_actions,
UPDATED_ACTION_LIST,
namespace=''
)
@ -147,7 +159,7 @@ class ActionServiceTest(base.DbTestCase):
def test_delete_action(self):
# Create action.
action_service.create_actions(ACTION_LIST, namespace=NAMESPACE)
adhoc_action_service.create_actions(ACTION_LIST, namespace=NAMESPACE)
action = db_api.get_action_definition('action1', namespace=NAMESPACE)
self.assertEqual(NAMESPACE, action.get('namespace'))

View File

@ -13,6 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from dateutil import parser as dateparser
EQUALS = 'eq'
NOT_EQUAL = 'neq'
LESS_THAN = 'lt'
@ -23,9 +27,17 @@ IN = 'in'
NOT_IN = 'nin'
HAS = 'has'
ALL = (GREATER_THAN_EQUALS, GREATER_THAN,
LESS_THAN_EQUALS, HAS, NOT_EQUAL,
LESS_THAN, IN, EQUALS, NOT_IN)
ALL = (
EQUALS,
NOT_EQUAL,
GREATER_THAN,
GREATER_THAN_EQUALS,
LESS_THAN,
LESS_THAN_EQUALS,
HAS,
IN,
NOT_IN
)
def create_filters_from_request_params(none_values=None, **params):
@ -95,4 +107,62 @@ def has_filters(value):
for filter_type in ALL:
if value.startswith(filter_type + ':'):
return True
return False
def match_filter(obj, attr_name, attr_filter):
# If the attribute doesn't exist we assume that any filter is
# not applicable and we ignore it.
if not hasattr(obj, attr_name):
return True
attr_val = getattr(obj, attr_name)
for op, val in attr_filter.items():
# If the attribute is a date and the given filter value is a string
# we try to convert the filter value into a data as well.
if isinstance(attr_val, datetime.datetime) and isinstance(val, str):
val = dateparser.isoparse(val)
if op not in ALL:
raise ValueError(
'Unknown filter operation encountered [operation=%s]' % op
)
if op == EQUALS and attr_val != val:
return False
if op == NOT_EQUAL and attr_val == val:
return False
if op == LESS_THAN and attr_val >= val:
return False
if op == LESS_THAN_EQUALS and attr_val > val:
return False
if op == GREATER_THAN and attr_val <= val:
return False
if op == GREATER_THAN_EQUALS and attr_val < val:
return False
if op == IN and attr_val not in val:
return False
if op == NOT_IN and attr_val in val:
return False
if op == HAS and val not in attr_val:
return False
return True
def match_filters(obj, filters):
for attr_name, attr_filter in filters.items():
if not match_filter(obj, attr_name, attr_filter):
return False
return True

View File

@ -47,7 +47,7 @@ class ContextView(dict):
def __init__(self, *dicts):
super(ContextView, self).__init__()
self.dicts = dicts or []
self.dicts = [d for d in dicts if d is not None]
def __getitem__(self, key):
for d in self.dicts:

View File

@ -11,7 +11,7 @@ Jinja2>=2.10 # BSD License (3 clause)
jsonschema>=3.2.0 # MIT
keystonemiddleware>=4.18.0 # Apache-2.0
kombu!=4.0.2,>=4.6.1 # BSD
mistral-lib>=1.4.0 # Apache-2.0
mistral-lib>=2.3.0 # Apache-2.0
networkx>=2.3 # BSD
oslo.concurrency>=3.26.0 # Apache-2.0
oslo.config>=5.2.0 # Apache-2.0

View File

@ -49,6 +49,10 @@ oslo.policy.policies =
oslo.policy.enforcer =
mistral = mistral.api.access_control:get_enforcer
mistral.action.providers =
legacy = mistral.actions.legacy:LegacyActionProvider
adhoc = mistral.actions.adhoc:AdHocActionProvider
mistral.actions =
std.async_noop = mistral.actions.std_actions:AsyncNoOpAction
std.noop = mistral.actions.std_actions:NoOpAction