Add main entities related to action providers

* This patch adds the base infrastructure for the new concept
  that we call Action Providers. It will allow to refactor
  Mistral in a way that all action management will be encapsulated
  with action providers. For example, Mistral Engine needs to
  schedule actions while processing a workflow. Now it has to
  access DB itself to find an action definition (meta info about
  what this action is, like python class, input params etc.),
  then prepare input parameters, validate parameters and send a
  message to an executor to run it. In many other places, Mistral
  also directly access DB to fetch an action definition. It's
  not flexible because we assume that definitions of Mistral
  actions must always be stored in DB. The concept of an Action
  Provider allows moving away from that limitation. A particular
  implementation of the ActionProvider interface can store action
  definitions in any suitable way, even fetching them using
  various transport protocols like AMQP, HTTP, TCP, or anything
  else. An action provider may also generate action definitions
  dynamically, like creating wrappers around a set of operating
  system commands. But for the rest of the system it won't matter
  what happens inside action providers. All details will be
  hidden behind a unified interface.
* Added ActionDescriptor interface and its convenience base
  abstract implementation. ActionDescriptor is an entity
  representing an action before it's instantiated. It carries
  the most important meta information about an action like name,
  description, input parameters etc. It is also responsible for
  validating action input parameters and instantiating a real
  action.
* Added PythonActionDescriptor which represents a regular action
  written as a Python class.
* Added CompositeActionProvider that delegates calls find() and
  find_all() to a collection of other action providers.
* Minor style changes.

Partially implements: bp/mistral-action-providers

Change-Id: Ic9108c9293731b3576081c75f2786e1156ba0ccd
This commit is contained in:
Renat Akhmerov 2020-07-20 12:19:01 +07:00
parent 301c17e7ea
commit f818f1f02c
10 changed files with 625 additions and 13 deletions

View File

@ -12,7 +12,19 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mistral_lib.actions.base import Action
from mistral_lib.actions.base import ActionDescriptor
from mistral_lib.actions.base import ActionProvider
from mistral_lib.actions.providers.composite import CompositeActionProvider
from mistral_lib.actions.providers.python import PythonActionDescriptor
from mistral_lib.actions.types import Result
__all__ = ['Action', 'Result']
__all__ = [
'Action',
'Result',
'ActionDescriptor',
'ActionProvider',
'PythonActionDescriptor',
'CompositeActionProvider'
]

View File

@ -15,8 +15,13 @@
import abc
from oslo_utils import importutils
class Action(object):
from mistral_lib import serialization
from mistral_lib.utils import inspect_utils as i_utils
class Action(serialization.MistralSerializable):
"""Action.
Action is a means in Mistral to perform some useful work associated with
@ -54,7 +59,7 @@ class Action(object):
Result can be of two types:
1) Any serializable value meaningful from a user perspective (such
as string, number or dict).
2) Instance of {mistral.workflow.utils.Result} which has field "data"
2) Instance of {mistral_lib.types.Result} which has field "data"
for success result and field "error" for keeping so called "error
result" like HTTP error code and similar. Using the second type
allows to communicate a result even in case of error and hence to have
@ -76,3 +81,252 @@ class Action(object):
doesn't override this method then the action is synchronous.
"""
return True
@classmethod
def get_serialization_key(cls):
# By default we use the same serializer key for every action
# assuming that most action class can use the same serializer.
return "%s.%s" % (Action.__module__, Action.__name__)
class ActionSerializer(serialization.DictBasedSerializer):
def serialize_to_dict(self, entity):
cls = type(entity)
return {
'cls': '%s.%s' % (cls.__module__, cls.__name__),
'cls_attrs': i_utils.get_public_fields(cls),
'data': vars(entity),
}
def deserialize_from_dict(self, entity_dict):
cls_str = entity_dict['cls']
# Rebuild action class and restore attributes.
cls = importutils.import_class(cls_str)
cls_attrs = entity_dict['cls_attrs']
if cls_attrs:
# If we have serialized class attributes it means that we need
# to create a dynamic class.
cls = type(cls.__name__, (cls,), cls_attrs)
# NOTE(rakhmerov): We use this hacky was of instantiating
# the action here because we can't use normal __init__(),
# we don't know the parameters. And even if we find out
# what they are the real internal state of the object is
# what was stored as vars() method that just returns all
# fields. So we have to bypass __init__() and set attributes
# one by one. Of course, this is a serious limitation since
# action creators will need to keep in mind to avoid having
# some complex initialisation logic in __init__() that
# does something not reflecting in an instance state.
# However, this all applies to the case when the action
# has to be sent to a remote executor.
action = cls.__new__(cls)
for k, v in entity_dict['data'].items():
setattr(action, k, v)
return action
# NOTE: Every action implementation can register its own serializer
# if needed, but this serializer should work for vast of majority of
# actions.
serialization.register_serializer(Action, ActionSerializer())
class ActionDescriptor(abc.ABC):
"""Provides required information about a certain type of actions.
Action descriptor is not an action itself. It rather carries all
important meta information about a particular action before the
action is instantiated. In some sense it is similar to a class but
the difference is that one type of action descriptor may support
many different actions. This abstraction is responsible for
validation of input parameters and instantiation of an action.
"""
@property
@abc.abstractmethod
def name(self):
"""The name of the action."""
pass
@property
@abc.abstractmethod
def description(self):
"""The description of the action."""
pass
@property
@abc.abstractmethod
def params_spec(self):
"""Comma-separated string with input parameter names.
Each parameter name can be either just a name or a string
"param=val" where "param" is the name of the parameter
and "val" its default value. All names and values in the
string must be JSON-compatible.
"""
pass
@property
@abc.abstractmethod
def namespace(self):
"""The namespace of the action.
NOTE: Not all actions have to support namespaces.
"""
pass
@property
@abc.abstractmethod
def project_id(self):
"""The ID of the project (tenant) this action belongs to.
If it's not specified then the action can be used within
all projects (tenants).
NOTE: Not all actions have to support projects(tenants).
"""
pass
@property
@abc.abstractmethod
def scope(self):
"""The scope of the action within a project (tenant).
It makes sense only if the "project_id" property is not None.
It should be assigned with the "public" value if the action
is available in all projects and "private" if it's accessible
only by users of the specified project.
NOTE: Not all actions have to support projects(tenants).
"""
pass
@property
@abc.abstractmethod
def action_class_name(self):
"""String representation of the Python class of the action.
Can be None in case if the action is dynamically generated with
some kind of wrapper.
"""
pass
@property
@abc.abstractmethod
def action_class_attributes(self):
"""The attributes of the action Python class, if relevant.
If the action has a static Python class associated with it
and this method returns not an empty dictionary then the
action can be instantiated from a new dynamic class
based on the property "action_class_string" and this property
that essentially carries public class field values.
"""
pass
@abc.abstractmethod
def instantiate(self, input_dict, wf_ctx):
"""Instantiate the required action with the given parameters.
:param input_dict: Action parameters as a dictionary where keys
are parameter names and values are parameter values.
:param wf_ctx: Workflow context relevant for the point when
action is about to start.
:return: An instance of mistral_lib.actions.Action.
"""
pass
@abc.abstractmethod
def check_parameters(self, params):
"""Validate action parameters.
The method does a preliminary check of the given actual action
parameters and raises an exception if they don't match the
action signature. However, a successful invocation of this
method does not guarantee a further successful run of the
instantiated action.
:param params: Action parameters as a dictionary where keys
are parameter names and values are parameter values.
:return: None or raises an exception if the given parameters
are not valid.
"""
pass
@abc.abstractmethod
def post_process_result(self, result):
"""Converts the given action result.
A certain action implementation may need to do an additional
conversion of the action result by its descriptor. This approach
allows to implement wrapper actions running asynchronously
because in such cases, the initial action result depends on a 3rd
party that's responsible for delivering it to Mistral. But when
it comes to Mistral we still have a chance to apply needed
transformations defined by this method.
:param result: Action result.
An instance of mistral_lib.types.Result.
:return: Converted action result.
"""
pass
class ActionProvider(abc.ABC):
"""Serves as a source of actions for the system.
A concrete implementation of this interface can use its own
way of delivering actions to the system. It can store actions
in a database, get them over HTTP, AMQP or any other transport.
Or it can simply provide a static collection of actions and keep
them in memory throughout the cycle of the application.
"""
def __init__(self, name):
self._name = name
@property
def name(self):
"""The name of the action provider.
Different action providers can use it differently.
Some may completely ignore it, others may use it, for example,
for searching actions in a certain way.
"""
return self._name
@abc.abstractmethod
def find(self, action_name, namespace=None):
"""Finds action descriptor by name.
:param action_name: Action name.
:param namespace: Action namespace. None is used for the default
namespace.
:return: An instance of ActionDescriptor or None, if not found.
"""
pass
@abc.abstractmethod
def find_all(self, namespace=None, limit=None, sort_fields=None,
sort_dirs=None, filters=None):
"""Finds all action descriptors for this provider.
:param namespace: Optional. Action namespace.
:param limit: Positive integer or None. Maximum number of action
descriptors to return in a single result.
:param sort_fields: Optional. A list of action descriptor fields
that define sorting of the result set.
:param sort_dirs: Optional. A list of sorting orders ("asc" or "desc")
in addition to the "sort_fields" argument.
:param filters: Optional. A dictionary describing AND-joined filters.
:return: List of ActionDescriptor instances.
"""
pass

View File

@ -0,0 +1,118 @@
# Copyright 2020 Nokia Software.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import copy
from mistral_lib import actions
from mistral_lib import exceptions as exc
from mistral_lib import utils
def _compare_parameters(expected_params, actual_params):
"""Compares the expected parameters with the actual parameters.
:param expected_params: Expected dict of parameters.
:param actual_params: Actual dict of parameters.
:return: Tuple {missing parameter names, unexpected parameter names}
"""
missing_params = []
unexpected_params = copy.deepcopy(list((actual_params or {}).keys()))
for p_name, p_value in expected_params.items():
if p_value is utils.NotDefined and p_name not in unexpected_params:
missing_params.append(str(p_name))
if p_name in unexpected_params:
unexpected_params.remove(p_name)
return missing_params, unexpected_params
class ActionDescriptorBase(actions.ActionDescriptor, abc.ABC):
def __init__(self, name, desc, params_spec, namespace=None,
project_id=None, scope=None):
self._name = name
self._desc = desc
self._params_spec = params_spec
self._namespace = namespace
self._project_id = project_id
self._scope = scope
@property
def name(self):
return self._name
@property
def description(self):
return self._desc
@property
def params_spec(self):
return self._params_spec
@property
def namespace(self):
return self._namespace
@property
def project_id(self):
return self._project_id
@property
def scope(self):
return self._scope
@property
def action_class_name(self):
return None
@property
def action_class_attributes(self):
return None
def check_parameters(self, params):
# Don't validate action input if action initialization
# method contains ** argument.
if '**' in self.params_spec:
return
expected_params = utils.get_dict_from_string(self.params_spec)
actual_params = params or {}
missing, unexpected = _compare_parameters(
expected_params,
actual_params
)
if missing or unexpected:
msg = 'Invalid input [name=%s, class=%s'
msg_props = [self.name, self.action_class_name]
if missing:
msg += ', missing=%s'
msg_props.append(missing)
if unexpected:
msg += ', unexpected=%s'
msg_props.append(unexpected)
msg += ']'
raise exc.ActionException(msg % tuple(msg_props))
def post_process_result(self, result):
return result

View File

@ -0,0 +1,53 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mistral_lib.actions import base
class CompositeActionProvider(base.ActionProvider):
def __init__(self, name, delegates):
super().__init__(name)
self._delegates = delegates
def find(self, action_name, namespace=None):
for d in self._delegates:
action_desc = d.find(action_name, namespace)
if action_desc is not None:
return action_desc
return None
def find_all(self, namespace=None, limit=None, sort_fields=None,
sort_dirs=None, **filters):
# TODO(rakhmerov): Implement the algorithm that takes ordering/sorting
# parameters into account correctly. For now, they are just passed to
# delegates.
res = []
for d in self._delegates:
action_descriptors = d.find_all(
namespace=namespace,
limit=limit,
sort_fields=sort_fields,
sort_dirs=sort_dirs,
**filters
)
if action_descriptors is not None:
res.extend(action_descriptors)
return res
def add_action_provider(self, action_provider):
self._delegates.append(action_provider)

View File

@ -0,0 +1,66 @@
# Copyright 2020 Nokia Software.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral_lib.actions.providers import base
from mistral_lib.utils import inspect_utils as i_utils
class PythonActionDescriptor(base.ActionDescriptorBase):
def __init__(self, name, action_cls, action_cls_attrs=None, namespace=None,
project_id=None, scope=None):
super(PythonActionDescriptor, self).__init__(
name,
i_utils.get_docstring(action_cls),
i_utils.get_arg_list_as_str(action_cls.__init__),
namespace,
project_id,
scope
)
self._action_cls = action_cls
self._action_cls_attrs = action_cls_attrs
def __repr__(self):
return 'Python action [name=%s, cls=%s]' % (
self.name,
self._action_cls
)
def instantiate(self, params, wf_ctx):
if not self._action_cls_attrs:
# No need to create new dynamic type.
return self._action_cls(**params)
dynamic_cls = type(
self._action_cls.__name__,
(self._action_cls,),
self._action_cls_attrs
)
return dynamic_cls(**params)
@property
def action_class(self):
return self._action_cls
@property
def action_class_name(self):
return "{}.{}".format(
self._action_cls.__module__,
self._action_cls.__name__
)
@property
def action_class_attributes(self):
return self._action_cls_attrs

View File

@ -19,7 +19,7 @@ from mistral_lib import utils
class Result(serialization.MistralSerializable):
"""Explicit data structure containing a result of task execution."""
"""Action result."""
def __init__(self, data=None, error=None, cancel=False):
self.data = data
@ -60,8 +60,7 @@ class Result(serialization.MistralSerializable):
return not self.__eq__(other)
def to_dict(self):
return ({'result': self.data}
if self.is_success() else {'result': self.error})
return {'result': self.data if self.is_success() else self.error}
class ResultSerializer(serialization.DictBasedSerializer):

View File

@ -75,3 +75,7 @@ class MistralException(Exception):
"""
message = "An unknown exception occurred"
class ActionException(MistralException):
message = "Failed to process an action"

View File

@ -0,0 +1,105 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mistral_lib import actions
from mistral_lib.actions.providers import python
from mistral_lib.tests import base as tests_base
class HelloAction(actions.Action):
"""I help with testing."""
def __init__(self, f_name, l_name):
super(HelloAction, self).__init__()
self._f_name = f_name
self._l_name = l_name
def run(self, context):
return 'Hello %s %s!' % (self._f_name, self._l_name)
class TestActionProvider(actions.ActionProvider):
def __init__(self, name):
super(TestActionProvider, self).__init__(name)
self.action_descs = {}
def add_action_descriptor(self, action_desc):
self.action_descs[action_desc.name] = action_desc
def find(self, action_name, namespace=None):
return self.action_descs.get(action_name)
def find_all(self, namespace=None, limit=None, sort_fields=None,
sort_dirs=None, **filters):
return self.action_descs.values()
class TestActionProviders(tests_base.TestCase):
def test_python_action_descriptor(self):
action_desc = python.PythonActionDescriptor('test_action', HelloAction)
# Check descriptor attributes.
self.assertEqual('test_action', action_desc.name)
self.assertEqual(HelloAction, action_desc.action_class)
self.assertEqual(
HelloAction.__module__ + '.' + HelloAction.__name__,
action_desc.action_class_name
)
self.assertIsNone(action_desc.action_class_attributes)
self.assertEqual('I help with testing.', action_desc.description)
self.assertEqual('f_name, l_name', action_desc.params_spec)
# Instantiate the action and check how it works.
action = action_desc.instantiate(
{'f_name': 'Jhon', 'l_name': 'Doe'},
{}
)
res = action.run(None)
self.assertEqual('Hello Jhon Doe!', res)
def test_composite_action_provider(self):
# Check empty provider.
composite_provider = actions.CompositeActionProvider('test', [])
self.assertEqual(0, len(composite_provider.find_all()))
# Add two test providers.
provider1 = TestActionProvider('provider1')
action_desc1 = python.PythonActionDescriptor('action1', HelloAction)
action_desc2 = python.PythonActionDescriptor('action2', HelloAction)
action_desc3 = python.PythonActionDescriptor('action3', HelloAction)
action_desc4 = python.PythonActionDescriptor('action4', HelloAction)
provider1.add_action_descriptor(action_desc1)
provider1.add_action_descriptor(action_desc2)
provider2 = TestActionProvider('provider2')
provider2.add_action_descriptor(action_desc3)
provider2.add_action_descriptor(action_desc4)
composite_provider = actions.CompositeActionProvider(
'test',
[provider1, provider2]
)
self.assertEqual(4, len(composite_provider.find_all()))
self.assertEqual(action_desc1, composite_provider.find('action1'))
self.assertEqual(action_desc2, composite_provider.find('action2'))
self.assertEqual(action_desc3, composite_provider.find('action3'))
self.assertEqual(action_desc4, composite_provider.find('action4'))

View File

@ -35,14 +35,12 @@ def _fake_context():
workflow_name='workflow_name',
callback_url='callback_url')
ctx = context.ActionContext(security_ctx, execution_ctx)
return ctx
return context.ActionContext(security_ctx, execution_ctx)
class TestActionsBase(tests_base.TestCase):
def test_empty_context(self):
ctx = context.ActionContext(
context.SecurityContext(),
context.ExecutionContext()
@ -55,7 +53,6 @@ class TestActionsBase(tests_base.TestCase):
self.assertEqual(ctx.execution.workflow_name, None)
def test_deprecated_properties(self):
ctx = _fake_context()
deprecated_properties = [
@ -68,28 +65,32 @@ class TestActionsBase(tests_base.TestCase):
for deprecated in deprecated_properties:
old = getattr(ctx, deprecated)
new = getattr(ctx.security, deprecated)
self.assertEqual(old, new)
class TestActionContextSerializer(tests_base.TestCase):
def test_serialization(self):
ctx = _fake_context()
serialiser = context.ActionContextSerializer()
dict_ctx = serialiser.serialize_to_dict(ctx)
self.assertEqual(dict_ctx['security'], vars(ctx.security))
self.assertEqual(dict_ctx['execution'], vars(ctx.execution))
def test_deserialization(self):
ctx = _fake_context()
serialiser = context.ActionContextSerializer()
dict_ctx = serialiser.serialize_to_dict(ctx)
ctx_2 = serialiser.deserialize_from_dict(dict_ctx)
self.assertEqual(ctx.security.auth_uri, ctx_2.security.auth_uri)
self.assertEqual(
ctx.execution.workflow_name,
ctx_2.execution.workflow_name)
ctx_2.execution.workflow_name
)