From 8c91052b47b2a0e5358c3e62e285b3ecd89bf213 Mon Sep 17 00:00:00 2001 From: Serg Melikyan Date: Mon, 2 Jun 2014 14:06:15 +0400 Subject: [PATCH] Add support for actions in engine Methods in MuranoPL now can be marked with Usage attribute to specify whether particular method available for remote call or not. By default usage is Runtime (not available for remote call). Workflow: migrateVm: Usage: Action Arguments: - killExisting: Contract: $.bool() Default: True Change-Id: If3da3c6bf67aa79d522d82abbf3b5378f72e87ae Partially-Implements: blueprint application-actions --- meta/io.murano/Classes/Environment.yaml | 1 + murano/common/engine.py | 95 ++++++++++++++++++------- murano/db/services/sessions.py | 19 +++-- murano/dsl/murano_method.py | 20 ++++++ murano/dsl/results_serializer.py | 35 +++++++++ 5 files changed, 138 insertions(+), 32 deletions(-) diff --git a/meta/io.murano/Classes/Environment.yaml b/meta/io.murano/Classes/Environment.yaml index 0c2a199c..70541dc8 100644 --- a/meta/io.murano/Classes/Environment.yaml +++ b/meta/io.murano/Classes/Environment.yaml @@ -45,6 +45,7 @@ Workflow: deploy: + Usage: Action Body: - $.agentListener.start() - If: len($.applications) = 0 diff --git a/murano/common/engine.py b/murano/common/engine.py index cc405a25..e44a3e5d 100644 --- a/murano/common/engine.py +++ b/murano/common/engine.py @@ -23,6 +23,7 @@ from murano.common import config from murano.common.helpers import token_sanitizer from murano.common import rpc from murano.dsl import executor +from murano.dsl import murano_method from murano.dsl import results_serializer from murano.engine import environment from murano.engine import package_class_loader @@ -44,40 +45,20 @@ class TaskProcessingEndpoint(object): LOG.info(_('Starting processing task: {task_desc}').format( task_desc=anyjson.dumps(s_task))) - env = environment.Environment() - env.token = task['token'] - env.tenant_id = task['tenant_id'] - LOG.debug('Processing new task: {0}'.format(task)) try: - with package_loader.ApiPackageLoader(env.token, env.tenant_id) as \ - pkg_loader: - class_loader = package_class_loader.PackageClassLoader( - pkg_loader) - system_objects.register(class_loader, pkg_loader) - - exc = executor.MuranoDslExecutor(class_loader, env) - obj = exc.load(task['model']) - - try: - if obj is not None: - obj.type.invoke('deploy', exc, obj, {}) - except Exception as e: - reporter = status_reporter.StatusReporter() - reporter.initialize(obj) - reporter.report_error(obj, '{0}'.format(e)) - finally: - s_res = results_serializer.serialize(obj, exc) - rpc.api().process_result(s_res) + task_executor = TaskExecutor(task) + result = task_executor.execute() + rpc.api().process_result(result) except Exception as e: # TODO(gokrokve) report error here # TODO(slagun) code below needs complete rewrite and redesign LOG.exception("Error during task execution for tenant %s", - env.tenant_id) + task['tenant_id']) if task['model']['Objects']: msg_env = Environment(task['model']['Objects']['?']['id']) reporter = status_reporter.StatusReporter() reporter.initialize(msg_env) - reporter.report_error(msg_env, '{0}'.format(e)) + reporter.report_error(msg_env, str(e)) rpc.api().process_result(task['model']) @@ -100,3 +81,67 @@ def get_rpc_service(): class Environment: def __init__(self, object_id): self.object_id = object_id + + +class TaskExecutor(object): + @property + def action(self): + return self._action + + @property + def environment(self): + return self._environment + + @property + def model(self): + return self._model + + def __init__(self, task): + self._action = task.get('action') + self._model = task['model'] + self._environment = environment.Environment() + self._environment.token = task['token'] + self._environment.tenant_id = task['tenant_id'] + + def execute(self): + token, tenant_id = self.environment.token, self.environment.tenant_id + with package_loader.ApiPackageLoader(token, tenant_id) as pkg_loader: + class_loader = package_class_loader.PackageClassLoader(pkg_loader) + system_objects.register(class_loader, pkg_loader) + + exc = executor.MuranoDslExecutor(class_loader, self.environment) + obj = exc.load(self.model) + + try: + # Skip execution of action in case of no action is provided. + # Model will be just loaded, cleaned-up and unloaded. + # Most of the time this is used for deletion of environments. + if self.action: + self._invoke(exc) + except Exception as e: + reporter = status_reporter.StatusReporter() + reporter.initialize(obj) + reporter.report_error(obj, str(e)) + + return results_serializer.serialize(obj, exc) + + def _invoke(self, mpl_executor): + obj = mpl_executor.object_store.get(self.action['object_id']) + method_name, args = self.action['method'], self.action['args'] + + if obj is not None: + if self._is_action(obj, method_name) is False: + raise Exception('%s is not an action' % (method_name,)) + + obj.type.invoke(method_name, mpl_executor, obj, args) + + @staticmethod + def _is_action(obj, method_name): + implementations = obj.type.find_method(method_name) + if len(implementations) < 1: + raise Exception('Action %s is not found' % (method_name,)) + if len(implementations) > 1: + raise Exception('Action %s name is ambiguous' % (method_name,)) + declaring_class, _ = implementations[0] + method = declaring_class.get_method(method_name) + return method.usage == murano_method.MethodUsages.Action diff --git a/murano/db/services/sessions.py b/murano/db/services/sessions.py index a232471a..40a0bee7 100644 --- a/murano/db/services/sessions.py +++ b/murano/db/services/sessions.py @@ -123,18 +123,23 @@ class SessionServices(object): environment = unit.query(models.Environment).get( session.environment_id) - data = { + task = { + 'action': { + 'object_id': environment.id, + 'method': 'deploy', + 'args': {} + }, 'model': session.description, 'token': token, 'tenant_id': environment.tenant_id } - data['model']['Objects']['?']['id'] = environment.id - data['model']['Objects']['applications'] = \ - data['model']['Objects'].get('services', []) + task['model']['Objects']['?']['id'] = environment.id + task['model']['Objects']['applications'] = \ + task['model']['Objects'].get('services', []) - if 'services' in data['model']['Objects']: - del data['model']['Objects']['services'] + if 'services' in task['model']['Objects']: + del task['model']['Objects']['services'] session.state = SessionState.deploying deployment = models.Deployment() @@ -150,4 +155,4 @@ class SessionServices(object): unit.add(session) unit.add(deployment) - rpc.engine().handle_task(data) + rpc.engine().handle_task(task) diff --git a/murano/dsl/murano_method.py b/murano/dsl/murano_method.py index fdc9d461..6992492a 100644 --- a/murano/dsl/murano_method.py +++ b/murano/dsl/murano_method.py @@ -25,6 +25,19 @@ except ImportError: # python2.6 from ordereddict import OrderedDict # noqa +class MethodUsages(object): + Action = 'Action' + Runtime = 'Runtime' + All = set([Action, Runtime]) + + +def methodusage(usage): + def wrapper(method): + method._murano_method_usage = usage + return method + return wrapper + + class MuranoMethod(object): def __init__(self, namespace_resolver, murano_class, name, payload): @@ -34,9 +47,12 @@ class MuranoMethod(object): if callable(payload): self._body = payload self._arguments_scheme = self._generate_arguments_scheme(payload) + self._usage = getattr(payload, '_murano_method_usage', + MethodUsages.Runtime) else: payload = payload or {} self._body = self._prepare_body(payload.get('Body') or []) + self._usage = payload.get('Usage') or MethodUsages.Runtime arguments_scheme = payload.get('Arguments') or [] if isinstance(arguments_scheme, types.DictionaryType): arguments_scheme = [{key: value} for key, value in @@ -64,6 +80,10 @@ class MuranoMethod(object): def arguments_scheme(self): return self._arguments_scheme + @property + def usage(self): + return self._usage + @property def body(self): return self._body diff --git a/murano/dsl/results_serializer.py b/murano/dsl/results_serializer.py index 7d7a4806..122252d7 100644 --- a/murano/dsl/results_serializer.py +++ b/murano/dsl/results_serializer.py @@ -14,6 +14,8 @@ import types +import murano.dsl.helpers as helpers +import murano.dsl.murano_method as murano_method import murano.dsl.murano_object as murano_object @@ -56,6 +58,35 @@ def _cmp_objects(obj1, obj2): return obj1.object_id == obj2.object_id +def _serialize_available_action(obj): + def _serialize(obj_type): + actions = {} + for name, method in obj_type.methods.iteritems(): + if method.usage == murano_method.MethodUsages.Action: + actions[name] = { + 'id': '%s_%s' % (obj.object_id, name), + 'name': name, + 'enabled': True, + } + for parent in obj_type.parents: + parent_actions = _serialize(parent) + actions = helpers.merge_dicts(parent_actions, actions) + return actions + return _serialize(obj.type).values() + + +def _merge_actions(list1, list2): + dict1 = dict([(action['id'], action) for action in list1]) + dict2 = dict([(action['id'], action) for action in list2]) + + result = helpers.merge_dicts(dict1, dict2) + for action_id in dict1: + if action_id not in dict2: + del result[action_id] + + return result.values() + + def _pass1_serialize(value, parent, serialized_objects, designer_attributes_getter): if isinstance(value, (types.StringTypes, types.IntType, types.FloatType, @@ -69,6 +100,10 @@ def _pass1_serialize(value, parent, serialized_objects, result = value.to_dictionary() if designer_attributes_getter is not None: result['?'].update(designer_attributes_getter(value.object_id)) + #deserialize and merge list of actions + actions = _serialize_available_action(value) + result['?']['_actions'] = _merge_actions( + result['?'].get('_actions', []), actions) serialized_objects.add(value.object_id) return _pass1_serialize( result, value, serialized_objects, designer_attributes_getter)