Adds API to obtain action result

Also adds File type to core library for common convention type for files

Partially implements: blueprint actions-return-result

Change-Id: I5cbfb9ed6f4ae56e931815841f9c042f25a1d0ca
This commit is contained in:
Stan Lagun 2015-02-16 22:09:54 +03:00
parent 4c5a723331
commit 73f8368024
15 changed files with 183 additions and 35 deletions

View File

@ -0,0 +1,16 @@
Namespaces:
=: io.murano
Name: File
Properties:
base64Content:
Contract: $.string().notNull()
Default: ''
mimeType:
Contract: $.string().notNull()
Default: 'application/octet-stream'
filename:
Contract: $.string()

View File

@ -20,6 +20,7 @@ Classes:
io.murano.Exception: Exception.yaml
io.murano.StackTrace: StackTrace.yaml
io.murano.SharedIp: SharedIp.yaml
io.murano.File: File.yaml
io.murano.system.SecurityGroupManager: SecurityGroupManager.yaml

View File

@ -30,12 +30,7 @@ LOG = logging.getLogger(__name__)
class Controller(object):
def execute(self, request, environment_id, action_id, body):
policy.check("execute_action", request.context, {})
LOG.debug('Action:Execute <ActionId: {0}>'.format(action_id))
unit = db_session.get_session()
def _validate_environment(self, unit, request, environment_id):
environment = unit.query(models.Environment).get(environment_id)
if environment is None:
@ -48,6 +43,14 @@ class Controller(object):
'this tenant resources.'))
raise exc.HTTPUnauthorized
def execute(self, request, environment_id, action_id, body):
policy.check("execute_action", request.context, {})
LOG.debug('Action:Execute <ActionId: {0}>'.format(action_id))
unit = db_session.get_session()
self._validate_environment(unit, request, environment_id)
# no new session can be opened if environment has deploying status
env_status = envs.EnvironmentServices.get_status(environment_id)
if env_status in (states.EnvironmentStatus.DEPLOYING,
@ -65,8 +68,18 @@ class Controller(object):
'is invalid').format(session.id))
raise exc.HTTPForbidden()
actions.ActionServices.execute(action_id, session, unit,
request.context.auth_token, body or {})
task_id = actions.ActionServices.execute(
action_id, session, unit, request.context.auth_token, body or {})
return {'task_id': task_id}
def get_result(self, request, environment_id, task_id):
policy.check("execute_action", request.context, {})
LOG.debug('Action:GetResult <TaskId: {0}>'.format(task_id))
unit = db_session.get_session()
self._validate_environment(unit, request, environment_id)
return actions.ActionServices.get_result(environment_id, task_id, unit)
def create_resource():

View File

@ -147,6 +147,10 @@ class API(wsgi.Router):
controller=actions_resource,
action='execute',
conditions={'method': ['POST']})
mapper.connect('/environments/{environment_id}/actions/{task_id}',
controller=actions_resource,
action='get_result',
conditions={'method': ['GET']})
catalog_resource = catalog.create_resource()
mapper.connect('/catalog/packages/categories',

View File

@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import traceback
import uuid
import eventlet.debug
@ -25,7 +27,7 @@ from murano.common.helpers import token_sanitizer
from murano.common import rpc
from murano.dsl import dsl_exception
from murano.dsl import executor
from murano.dsl import results_serializer
from murano.dsl import serializer
from murano.engine import auth_utils
from murano.engine import client_manager
from murano.engine import environment
@ -51,13 +53,14 @@ class TaskProcessingEndpoint(object):
LOG.info(_LI('Starting processing task: {task_desc}').format(
task_desc=jsonutils.dumps(s_task)))
result = task['model']
result = {'model': task['model']}
try:
task_executor = TaskExecutor(task)
result = task_executor.execute()
except Exception as e:
LOG.exception(_LE('Error during task execution for tenant %s'),
task['tenant_id'])
result['action'] = TaskExecutor.exception_result(e)
msg_env = Environment(task['id'])
reporter = status_reporter.StatusReporter()
reporter.initialize(msg_env)
@ -116,6 +119,13 @@ class TaskExecutor(object):
self._create_trust()
try:
# !!! please do not delete 2 commented lines of code below.
# Uncomment to make engine load packages from
# local folder rather than from API !!!
# pkg_loader = package_loader.DirectoryPackageLoader('./meta')
# return self._execute(pkg_loader)
murano_client_factory = lambda: \
self._environment.clients.get_murano_client(self._environment)
with package_loader.ApiPackageLoader(
@ -133,17 +143,21 @@ class TaskExecutor(object):
obj = exc.load(self.model)
self._validate_model(obj, self.action, class_loader)
action_result = None
exception = None
exception_traceback = None
try:
# Skip execution of action in case no action is provided.
# Model will be just loaded, cleaned-up and unloaded.
# Most of the time this is used for deletion of environments.
if self.action:
self._invoke(exc)
action_result = self._invoke(exc)
except Exception as e:
exception = e
if isinstance(e, dsl_exception.MuranoPlException):
LOG.error('\n' + e.format(prefix=' '))
else:
exception_traceback = traceback.format_exc()
LOG.exception(
_LE("Exception %(exc)s occured"
" during invocation of %(method)s"),
@ -152,10 +166,38 @@ class TaskExecutor(object):
reporter.initialize(obj)
reporter.report_error(obj, str(e))
result = results_serializer.serialize(obj, exc)
result['SystemData'] = self._environment.system_attributes
model = serializer.serialize_model(obj, exc)
model['SystemData'] = self._environment.system_attributes
result = {
'model': model,
'action': {
'result': None,
'isException': False
}
}
if exception is not None:
result['action'] = TaskExecutor.exception_result(
exception, exception_traceback)
else:
result['action']['result'] = serializer.serialize_object(
action_result)
return result
@staticmethod
def exception_result(exception, exception_traceback):
record = {
'isException': True,
'result': {
'message': str(exception),
}
}
if isinstance(exception, dsl_exception.MuranoPlException):
record['result']['details'] = exception.format()
else:
record['result']['details'] = exception_traceback
return record
def _validate_model(self, obj, action, class_loader):
if config.CONF.engine.enable_model_policy_enforcer:
if obj is not None:
@ -168,7 +210,7 @@ class TaskExecutor(object):
method_name, args = self.action['method'], self.action['args']
if obj is not None:
obj.type.invoke(method_name, mpl_executor, obj, args)
return obj.type.invoke(method_name, mpl_executor, obj, args)
def _create_trust(self):
if not config.CONF.engine.use_trusts:

View File

@ -44,6 +44,9 @@ class ResultEndpoint(object):
LOG.debug('Got result from orchestration '
'engine:\n{0}'.format(secure_result))
model = result['model']
action_result = result.get('action', {})
unit = session.get_session()
environment = unit.query(models.Environment).get(environment_id)
@ -52,11 +55,11 @@ class ResultEndpoint(object):
'specified environment not found in database'))
return
if result['Objects'] is None and result.get('ObjectsCopy', {}) is None:
if model['Objects'] is None and model.get('ObjectsCopy', {}) is None:
environments.EnvironmentServices.remove(environment_id)
return
environment.description = result
environment.description = model
if environment.description['Objects'] is not None:
environment.description['Objects']['services'] = \
environment.description['Objects'].pop('applications', [])
@ -72,6 +75,7 @@ class ResultEndpoint(object):
# close deployment
deployment = get_last_deployment(unit, environment.id)
deployment.finished = timeutils.utcnow()
deployment.result = action_result
num_errors = unit.query(models.Status)\
.filter_by(level='error', task_id=deployment.id).count()
@ -112,7 +116,7 @@ class ResultEndpoint(object):
_('Failed') if num_errors + num_warnings > 0 else _('Successful'),
', '.join(map(
lambda a: a['?']['type'],
result['Objects']['services']
model['Objects']['services']
))
)
LOG.info(message)

View File

@ -0,0 +1,41 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""empty message
Revision ID: 006
Revises: None
Create Date: 2015-02-15 12:14:12
"""
# revision identifiers, used by Alembic.
revision = '006'
down_revision = '005'
from alembic import op
import sqlalchemy as sa
MYSQL_ENGINE = 'InnoDB'
MYSQL_CHARSET = 'utf8'
def upgrade():
op.add_column('task', sa.Column('result', sa.types.Text()))
# end Alembic commands #
def downgrade():
op.drop_column('task', 'result')
# end Alembic commands #

View File

@ -133,6 +133,7 @@ class Task(Base, TimestampMixin):
statuses = sa_orm.relationship("Status", backref='task',
cascade='save-update, merge, delete')
result = sa.Column(st.JsonBlob(), nullable=True, default={})
def to_dict(self):
dictionary = super(Task, self).to_dict()

View File

@ -38,3 +38,4 @@ def update_task(action, session, task, unit):
with unit.begin():
unit.add(session)
unit.add(task_info)
return task_info.id

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import types
import murano.dsl.helpers as helpers
@ -24,7 +25,20 @@ class ObjRef(object):
self.ref_obj = obj
def _serialize_tree(root_object, designer_attributes):
def serialize_object(obj):
if isinstance(obj, (collections.Sequence, collections.Set)):
return [serialize_object(t) for t in obj]
elif isinstance(obj, collections.Mapping):
result = {}
for key, value in obj.iteritems():
result[key] = serialize_object(value)
return result
elif isinstance(obj, murano_object.MuranoObject):
return _serialize_object(obj, None)[0]
return obj
def _serialize_object(root_object, designer_attributes=None):
serialized_objects = set()
tree = _pass1_serialize(
root_object, None, serialized_objects, designer_attributes)
@ -32,15 +46,15 @@ def _serialize_tree(root_object, designer_attributes):
return tree, serialized_objects
def serialize(root_object, executor):
def serialize_model(root_object, executor):
if root_object is None:
tree = None
tree_copy = None
attributes = []
else:
tree, serialized_objects = _serialize_tree(
tree, serialized_objects = _serialize_object(
root_object, executor.object_store.designer_attributes)
tree_copy, _ = _serialize_tree(root_object, None)
tree_copy, _ = _serialize_object(root_object, None)
attributes = executor.attribute_store.serialize(serialized_objects)
return {

View File

@ -62,8 +62,9 @@ class ActionServices(object):
task = ActionServices.create_action_task(
action_name, target_obj, args,
environment, session, token)
actions_db.update_task(action_name, session, task, unit)
task_id = actions_db.update_task(action_name, session, task, unit)
rpc.engine().handle_task(task)
return task_id
@staticmethod
def execute(action_id, session, unit, token, args={}):
@ -74,8 +75,9 @@ class ActionServices(object):
if not action[1].get('enabled', True):
raise ValueError('Cannot execute disabled action')
ActionServices.submit_task(action[1]['name'], action[0],
args, environment, session, token, unit)
return ActionServices.submit_task(
action[1]['name'], action[0], args, environment,
session, token, unit)
@staticmethod
def find_action(model, action_id):
@ -104,3 +106,9 @@ class ActionServices(object):
return result
else:
return None
@staticmethod
def get_result(environment_id, task_id, unit):
task = unit.query(models.Task).filter_by(
id=task_id, environment_id=environment_id).first()
return task.result

View File

@ -94,5 +94,4 @@ class TestActionsApi(tb.ControllerTest, tb.MuranoApiTestCase):
self.mock_engine_rpc.handle_task.assert_called_once_with(rpc_task)
# Should this be expected behavior?
self.assertEqual(None, result)
self.assertIn('task_id', result)

View File

@ -135,6 +135,10 @@ class MuranoMigrationsCheckers(object):
self.assertColumnExists(engine, 'environment-template', 'tenant_id')
self.assertColumnExists(engine, 'environment-template', 'name')
def _check_006(self, engine, data):
self.assertEqual('006', migration.version(engine))
self.assertColumnExists(engine, 'task', 'result')
class TestMigrationsMySQL(MuranoMigrationsCheckers,
base.BaseWalkMigrationTestCase,

View File

@ -19,7 +19,7 @@ import types
from murano.dsl import dsl_exception
from murano.dsl import executor
from murano.dsl import murano_object
from murano.dsl import results_serializer
from murano.dsl import serializer
from murano.engine import environment
from murano.tests.unit.dsl.foundation import object_model
@ -95,7 +95,7 @@ class Runner(object):
@property
def serialized_model(self):
return results_serializer.serialize(self._root, self.executor)
return serializer.serialize_model(self._root, self.executor)
@property
def preserve_exception(self):

View File

@ -15,7 +15,7 @@
import mock
from murano.dsl import murano_method
from murano.dsl import results_serializer
from murano.dsl import serializer
from murano.services import actions
from murano.tests.unit import base
@ -35,7 +35,7 @@ class TestActionsSerializer(base.MuranoTestCase):
'action3': {'name': 'name3', 'enabled': True},
}
result = results_serializer._merge_actions(old, new)
result = serializer._merge_actions(old, new)
self.assertEqual(2, len(result))
self.assertNotIn('action1', result)
@ -50,7 +50,7 @@ class TestActionsSerializer(base.MuranoTestCase):
'action2': {'name': 'name3', 'enabled': True},
}
result = results_serializer._merge_actions(old, new)
result = serializer._merge_actions(old, new)
self.assertFalse(result['action1']['enabled'])
@ -76,7 +76,7 @@ class TestActionsSerializer(base.MuranoTestCase):
def test_object_actions_serialization(self):
obj = self._get_mocked_obj()
obj_actions = results_serializer._serialize_available_action(obj)
obj_actions = serializer._serialize_available_action(obj)
expected_result = {'name': 'method1', 'enabled': True}
self.assertIn('id1_method1', obj_actions)
@ -84,13 +84,13 @@ class TestActionsSerializer(base.MuranoTestCase):
def test_that_only_actions_are_serialized(self):
obj = self._get_mocked_obj()
obj_actions = results_serializer._serialize_available_action(obj)
obj_actions = serializer._serialize_available_action(obj)
self.assertNotIn('id1_method2', obj_actions)
def test_parent_actions_are_serialized(self):
obj = self._get_mocked_obj()
obj_actions = results_serializer._serialize_available_action(obj)
obj_actions = serializer._serialize_available_action(obj)
expected_result = {'name': 'method3', 'enabled': True}
self.assertIn('id1_method3', obj_actions)