Add support for actions in engine

Methods in MuranoPL now can be marked with Usage attribute
to specify whether particular method available for remote
call or not. By default usage is Runtime (not available for
remote call).

Workflow:
  migrateVm:
    Usage: Action
    Arguments:
      - killExisting:
          Contract: $.bool()
          Default: True

Change-Id: If3da3c6bf67aa79d522d82abbf3b5378f72e87ae
Partially-Implements: blueprint application-actions
This commit is contained in:
Serg Melikyan 2014-06-02 14:06:15 +04:00
parent 80efe502c6
commit 8c91052b47
5 changed files with 138 additions and 32 deletions

View File

@ -45,6 +45,7 @@ Workflow:
deploy: deploy:
Usage: Action
Body: Body:
- $.agentListener.start() - $.agentListener.start()
- If: len($.applications) = 0 - If: len($.applications) = 0

View File

@ -23,6 +23,7 @@ from murano.common import config
from murano.common.helpers import token_sanitizer from murano.common.helpers import token_sanitizer
from murano.common import rpc from murano.common import rpc
from murano.dsl import executor from murano.dsl import executor
from murano.dsl import murano_method
from murano.dsl import results_serializer from murano.dsl import results_serializer
from murano.engine import environment from murano.engine import environment
from murano.engine import package_class_loader from murano.engine import package_class_loader
@ -44,40 +45,20 @@ class TaskProcessingEndpoint(object):
LOG.info(_('Starting processing task: {task_desc}').format( LOG.info(_('Starting processing task: {task_desc}').format(
task_desc=anyjson.dumps(s_task))) task_desc=anyjson.dumps(s_task)))
env = environment.Environment()
env.token = task['token']
env.tenant_id = task['tenant_id']
LOG.debug('Processing new task: {0}'.format(task))
try: try:
with package_loader.ApiPackageLoader(env.token, env.tenant_id) as \ task_executor = TaskExecutor(task)
pkg_loader: result = task_executor.execute()
class_loader = package_class_loader.PackageClassLoader( rpc.api().process_result(result)
pkg_loader)
system_objects.register(class_loader, pkg_loader)
exc = executor.MuranoDslExecutor(class_loader, env)
obj = exc.load(task['model'])
try:
if obj is not None:
obj.type.invoke('deploy', exc, obj, {})
except Exception as e:
reporter = status_reporter.StatusReporter()
reporter.initialize(obj)
reporter.report_error(obj, '{0}'.format(e))
finally:
s_res = results_serializer.serialize(obj, exc)
rpc.api().process_result(s_res)
except Exception as e: except Exception as e:
# TODO(gokrokve) report error here # TODO(gokrokve) report error here
# TODO(slagun) code below needs complete rewrite and redesign # TODO(slagun) code below needs complete rewrite and redesign
LOG.exception("Error during task execution for tenant %s", LOG.exception("Error during task execution for tenant %s",
env.tenant_id) task['tenant_id'])
if task['model']['Objects']: if task['model']['Objects']:
msg_env = Environment(task['model']['Objects']['?']['id']) msg_env = Environment(task['model']['Objects']['?']['id'])
reporter = status_reporter.StatusReporter() reporter = status_reporter.StatusReporter()
reporter.initialize(msg_env) reporter.initialize(msg_env)
reporter.report_error(msg_env, '{0}'.format(e)) reporter.report_error(msg_env, str(e))
rpc.api().process_result(task['model']) rpc.api().process_result(task['model'])
@ -100,3 +81,67 @@ def get_rpc_service():
class Environment: class Environment:
def __init__(self, object_id): def __init__(self, object_id):
self.object_id = object_id self.object_id = object_id
class TaskExecutor(object):
@property
def action(self):
return self._action
@property
def environment(self):
return self._environment
@property
def model(self):
return self._model
def __init__(self, task):
self._action = task.get('action')
self._model = task['model']
self._environment = environment.Environment()
self._environment.token = task['token']
self._environment.tenant_id = task['tenant_id']
def execute(self):
token, tenant_id = self.environment.token, self.environment.tenant_id
with package_loader.ApiPackageLoader(token, tenant_id) as pkg_loader:
class_loader = package_class_loader.PackageClassLoader(pkg_loader)
system_objects.register(class_loader, pkg_loader)
exc = executor.MuranoDslExecutor(class_loader, self.environment)
obj = exc.load(self.model)
try:
# Skip execution of action in case of no action is provided.
# Model will be just loaded, cleaned-up and unloaded.
# Most of the time this is used for deletion of environments.
if self.action:
self._invoke(exc)
except Exception as e:
reporter = status_reporter.StatusReporter()
reporter.initialize(obj)
reporter.report_error(obj, str(e))
return results_serializer.serialize(obj, exc)
def _invoke(self, mpl_executor):
obj = mpl_executor.object_store.get(self.action['object_id'])
method_name, args = self.action['method'], self.action['args']
if obj is not None:
if self._is_action(obj, method_name) is False:
raise Exception('%s is not an action' % (method_name,))
obj.type.invoke(method_name, mpl_executor, obj, args)
@staticmethod
def _is_action(obj, method_name):
implementations = obj.type.find_method(method_name)
if len(implementations) < 1:
raise Exception('Action %s is not found' % (method_name,))
if len(implementations) > 1:
raise Exception('Action %s name is ambiguous' % (method_name,))
declaring_class, _ = implementations[0]
method = declaring_class.get_method(method_name)
return method.usage == murano_method.MethodUsages.Action

View File

@ -123,18 +123,23 @@ class SessionServices(object):
environment = unit.query(models.Environment).get( environment = unit.query(models.Environment).get(
session.environment_id) session.environment_id)
data = { task = {
'action': {
'object_id': environment.id,
'method': 'deploy',
'args': {}
},
'model': session.description, 'model': session.description,
'token': token, 'token': token,
'tenant_id': environment.tenant_id 'tenant_id': environment.tenant_id
} }
data['model']['Objects']['?']['id'] = environment.id task['model']['Objects']['?']['id'] = environment.id
data['model']['Objects']['applications'] = \ task['model']['Objects']['applications'] = \
data['model']['Objects'].get('services', []) task['model']['Objects'].get('services', [])
if 'services' in data['model']['Objects']: if 'services' in task['model']['Objects']:
del data['model']['Objects']['services'] del task['model']['Objects']['services']
session.state = SessionState.deploying session.state = SessionState.deploying
deployment = models.Deployment() deployment = models.Deployment()
@ -150,4 +155,4 @@ class SessionServices(object):
unit.add(session) unit.add(session)
unit.add(deployment) unit.add(deployment)
rpc.engine().handle_task(data) rpc.engine().handle_task(task)

View File

@ -25,6 +25,19 @@ except ImportError: # python2.6
from ordereddict import OrderedDict # noqa from ordereddict import OrderedDict # noqa
class MethodUsages(object):
Action = 'Action'
Runtime = 'Runtime'
All = set([Action, Runtime])
def methodusage(usage):
def wrapper(method):
method._murano_method_usage = usage
return method
return wrapper
class MuranoMethod(object): class MuranoMethod(object):
def __init__(self, namespace_resolver, def __init__(self, namespace_resolver,
murano_class, name, payload): murano_class, name, payload):
@ -34,9 +47,12 @@ class MuranoMethod(object):
if callable(payload): if callable(payload):
self._body = payload self._body = payload
self._arguments_scheme = self._generate_arguments_scheme(payload) self._arguments_scheme = self._generate_arguments_scheme(payload)
self._usage = getattr(payload, '_murano_method_usage',
MethodUsages.Runtime)
else: else:
payload = payload or {} payload = payload or {}
self._body = self._prepare_body(payload.get('Body') or []) self._body = self._prepare_body(payload.get('Body') or [])
self._usage = payload.get('Usage') or MethodUsages.Runtime
arguments_scheme = payload.get('Arguments') or [] arguments_scheme = payload.get('Arguments') or []
if isinstance(arguments_scheme, types.DictionaryType): if isinstance(arguments_scheme, types.DictionaryType):
arguments_scheme = [{key: value} for key, value in arguments_scheme = [{key: value} for key, value in
@ -64,6 +80,10 @@ class MuranoMethod(object):
def arguments_scheme(self): def arguments_scheme(self):
return self._arguments_scheme return self._arguments_scheme
@property
def usage(self):
return self._usage
@property @property
def body(self): def body(self):
return self._body return self._body

View File

@ -14,6 +14,8 @@
import types import types
import murano.dsl.helpers as helpers
import murano.dsl.murano_method as murano_method
import murano.dsl.murano_object as murano_object import murano.dsl.murano_object as murano_object
@ -56,6 +58,35 @@ def _cmp_objects(obj1, obj2):
return obj1.object_id == obj2.object_id return obj1.object_id == obj2.object_id
def _serialize_available_action(obj):
def _serialize(obj_type):
actions = {}
for name, method in obj_type.methods.iteritems():
if method.usage == murano_method.MethodUsages.Action:
actions[name] = {
'id': '%s_%s' % (obj.object_id, name),
'name': name,
'enabled': True,
}
for parent in obj_type.parents:
parent_actions = _serialize(parent)
actions = helpers.merge_dicts(parent_actions, actions)
return actions
return _serialize(obj.type).values()
def _merge_actions(list1, list2):
dict1 = dict([(action['id'], action) for action in list1])
dict2 = dict([(action['id'], action) for action in list2])
result = helpers.merge_dicts(dict1, dict2)
for action_id in dict1:
if action_id not in dict2:
del result[action_id]
return result.values()
def _pass1_serialize(value, parent, serialized_objects, def _pass1_serialize(value, parent, serialized_objects,
designer_attributes_getter): designer_attributes_getter):
if isinstance(value, (types.StringTypes, types.IntType, types.FloatType, if isinstance(value, (types.StringTypes, types.IntType, types.FloatType,
@ -69,6 +100,10 @@ def _pass1_serialize(value, parent, serialized_objects,
result = value.to_dictionary() result = value.to_dictionary()
if designer_attributes_getter is not None: if designer_attributes_getter is not None:
result['?'].update(designer_attributes_getter(value.object_id)) result['?'].update(designer_attributes_getter(value.object_id))
#deserialize and merge list of actions
actions = _serialize_available_action(value)
result['?']['_actions'] = _merge_actions(
result['?'].get('_actions', []), actions)
serialized_objects.add(value.object_id) serialized_objects.add(value.object_id)
return _pass1_serialize( return _pass1_serialize(
result, value, serialized_objects, designer_attributes_getter) result, value, serialized_objects, designer_attributes_getter)