f18661b586
The database constraints which were present were enforcing the global uniqueness of package FQNs and the names of classes defined in them. This behavior was not correct, as the uniqueness should be enforced per tenant, so the same package may be uploaded into two isolated tenants without affecting each other. This behavior lead to a very serious security issue: any tenant could upload a package, leave it private and thus block all other tenants of the cloud from uploading the package with the same name or even other packages which contain at least one class in common with it. This could be used to intentionally block all the operations of Murano on any public environments. This fix modifies the package name constraint to be unique only in combination with owner_id, i.e. makes packages unique per tenant. Also it removes the class name uniquness check from database (as there is no cross-DB way to check it in a proper way) and adds a check method in db.api module instead. As the packages may be made public, this introduces a potential collision: if the user owns some package, and there is a public package with the same fully-qualified-name (or defining same class(es)) then the class loader of the engine will have to choise between these packages and/or classes defined in them. To resolve this collision this commit adds a logic to fetch all the patching packages and then pick the best match. Packages owned by the current tenant are the most preferred, then the engine will pick public packages, and non-owned non-public packages are the least preferred (there may be no such packages now, they may appear when we add other ways of package sharing). Closes-bug: #1440094 Change-Id: I5c9b49642dfb6e955cf0c98b42f418da3b82060a
248 lines
8.5 KiB
Python
Executable File
248 lines
8.5 KiB
Python
Executable File
# Copyright (c) 2014 Mirantis Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import traceback
|
|
import uuid
|
|
|
|
import eventlet.debug
|
|
from oslo import messaging
|
|
from oslo.messaging import target
|
|
from oslo.serialization import jsonutils
|
|
|
|
from murano.common import auth_utils
|
|
from murano.common import config
|
|
from murano.common.helpers import token_sanitizer
|
|
from murano.common import plugin_loader
|
|
from murano.common import rpc
|
|
from murano.dsl import dsl_exception
|
|
from murano.dsl import executor
|
|
from murano.dsl import serializer
|
|
from murano.engine import client_manager
|
|
from murano.engine import environment
|
|
from murano.engine import package_class_loader
|
|
from murano.engine import package_loader
|
|
from murano.engine.system import status_reporter
|
|
import murano.engine.system.system_objects as system_objects
|
|
from murano.common.i18n import _LI, _LE
|
|
from murano.openstack.common import log as logging
|
|
from murano.policy import model_policy_enforcer as enforcer
|
|
|
|
|
|
RPC_SERVICE = None
|
|
PLUGIN_LOADER = None
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
eventlet.debug.hub_exceptions(False)
|
|
|
|
|
|
class TaskProcessingEndpoint(object):
|
|
@staticmethod
|
|
def handle_task(context, task):
|
|
s_task = token_sanitizer.TokenSanitizer().sanitize(task)
|
|
LOG.info(_LI('Starting processing task: {task_desc}').format(
|
|
task_desc=jsonutils.dumps(s_task)))
|
|
|
|
result = {'model': task['model']}
|
|
try:
|
|
task_executor = TaskExecutor(task)
|
|
result = task_executor.execute()
|
|
except Exception as e:
|
|
LOG.exception(_LE('Error during task execution for tenant %s'),
|
|
task['tenant_id'])
|
|
result['action'] = TaskExecutor.exception_result(e)
|
|
msg_env = Environment(task['id'])
|
|
reporter = status_reporter.StatusReporter()
|
|
reporter.initialize(msg_env)
|
|
reporter.report_error(msg_env, str(e))
|
|
finally:
|
|
rpc.api().process_result(result, task['id'])
|
|
|
|
|
|
def _prepare_rpc_service(server_id):
|
|
endpoints = [TaskProcessingEndpoint()]
|
|
|
|
transport = messaging.get_transport(config.CONF)
|
|
s_target = target.Target('murano', 'tasks', server=server_id)
|
|
return messaging.get_rpc_server(transport, s_target, endpoints, 'eventlet')
|
|
|
|
|
|
def get_rpc_service():
|
|
global RPC_SERVICE
|
|
|
|
if RPC_SERVICE is None:
|
|
RPC_SERVICE = _prepare_rpc_service(str(uuid.uuid4()))
|
|
return RPC_SERVICE
|
|
|
|
|
|
def get_plugin_loader():
|
|
global PLUGIN_LOADER
|
|
|
|
if PLUGIN_LOADER is None:
|
|
PLUGIN_LOADER = plugin_loader.PluginLoader()
|
|
return PLUGIN_LOADER
|
|
|
|
|
|
class Environment(object):
|
|
def __init__(self, object_id):
|
|
self.object_id = object_id
|
|
|
|
|
|
class TaskExecutor(object):
|
|
@property
|
|
def action(self):
|
|
return self._action
|
|
|
|
@property
|
|
def environment(self):
|
|
return self._environment
|
|
|
|
@property
|
|
def model(self):
|
|
return self._model
|
|
|
|
def __init__(self, task):
|
|
self._action = task.get('action')
|
|
self._model = task['model']
|
|
self._environment = environment.Environment()
|
|
self._environment.token = task['token']
|
|
self._environment.tenant_id = task['tenant_id']
|
|
self._environment.system_attributes = self._model.get('SystemData', {})
|
|
self._environment.clients = client_manager.ClientManager()
|
|
|
|
self._model_policy_enforcer = enforcer.ModelPolicyEnforcer(
|
|
self._environment)
|
|
|
|
def execute(self):
|
|
self._create_trust()
|
|
|
|
try:
|
|
# !!! please do not delete 2 commented lines of code below.
|
|
# Uncomment to make engine load packages from
|
|
# local folder rather than from API !!!
|
|
|
|
# pkg_loader = package_loader.DirectoryPackageLoader('./meta')
|
|
# return self._execute(pkg_loader)
|
|
|
|
murano_client_factory = lambda: \
|
|
self._environment.clients.get_murano_client(self._environment)
|
|
with package_loader.ApiPackageLoader(
|
|
murano_client_factory,
|
|
self._environment.tenant_id) as pkg_loader:
|
|
return self._execute(pkg_loader)
|
|
finally:
|
|
if self._model['Objects'] is None:
|
|
self._delete_trust()
|
|
|
|
def _execute(self, pkg_loader):
|
|
class_loader = package_class_loader.PackageClassLoader(pkg_loader)
|
|
system_objects.register(class_loader, pkg_loader)
|
|
get_plugin_loader().register_in_loader(class_loader)
|
|
|
|
exc = executor.MuranoDslExecutor(class_loader, self.environment)
|
|
obj = exc.load(self.model)
|
|
|
|
self._validate_model(obj, self.action, class_loader)
|
|
action_result = None
|
|
exception = None
|
|
exception_traceback = None
|
|
try:
|
|
LOG.info(_LI('Invoking pre-execution hooks'))
|
|
self.environment.start()
|
|
# Skip execution of action in case no action is provided.
|
|
# Model will be just loaded, cleaned-up and unloaded.
|
|
# Most of the time this is used for deletion of environments.
|
|
if self.action:
|
|
action_result = self._invoke(exc)
|
|
except Exception as e:
|
|
exception = e
|
|
if isinstance(e, dsl_exception.MuranoPlException):
|
|
LOG.error('\n' + e.format(prefix=' '))
|
|
else:
|
|
exception_traceback = traceback.format_exc()
|
|
LOG.exception(
|
|
_LE("Exception %(exc)s occured"
|
|
" during invocation of %(method)s"),
|
|
{'exc': e, 'method': self.action['method']})
|
|
reporter = status_reporter.StatusReporter()
|
|
reporter.initialize(obj)
|
|
reporter.report_error(obj, str(e))
|
|
finally:
|
|
LOG.info(_LI('Invoking post-execution hooks'))
|
|
self.environment.finish()
|
|
|
|
model = serializer.serialize_model(obj, exc)
|
|
model['SystemData'] = self._environment.system_attributes
|
|
result = {
|
|
'model': model,
|
|
'action': {
|
|
'result': None,
|
|
'isException': False
|
|
}
|
|
}
|
|
if exception is not None:
|
|
result['action'] = TaskExecutor.exception_result(
|
|
exception, exception_traceback)
|
|
else:
|
|
result['action']['result'] = serializer.serialize_object(
|
|
action_result)
|
|
|
|
return result
|
|
|
|
@staticmethod
|
|
def exception_result(exception, exception_traceback=None):
|
|
record = {
|
|
'isException': True,
|
|
'result': {
|
|
'message': str(exception),
|
|
}
|
|
}
|
|
if isinstance(exception, dsl_exception.MuranoPlException):
|
|
record['result']['details'] = exception.format()
|
|
else:
|
|
record['result']['details'] = exception_traceback
|
|
return record
|
|
|
|
def _validate_model(self, obj, action, class_loader):
|
|
if config.CONF.engine.enable_model_policy_enforcer:
|
|
if obj is not None:
|
|
if action is not None and action['method'] == 'deploy':
|
|
self._model_policy_enforcer.validate(obj.to_dictionary(),
|
|
class_loader)
|
|
|
|
def _invoke(self, mpl_executor):
|
|
obj = mpl_executor.object_store.get(self.action['object_id'])
|
|
method_name, args = self.action['method'], self.action['args']
|
|
|
|
if obj is not None:
|
|
return obj.type.invoke(method_name, mpl_executor, obj, args)
|
|
|
|
def _create_trust(self):
|
|
if not config.CONF.engine.use_trusts:
|
|
return
|
|
trust_id = self._environment.system_attributes.get('TrustId')
|
|
if not trust_id:
|
|
trust_id = auth_utils.create_trust(self._environment.token,
|
|
self._environment.tenant_id)
|
|
self._environment.system_attributes['TrustId'] = trust_id
|
|
self._environment.trust_id = trust_id
|
|
|
|
def _delete_trust(self):
|
|
trust_id = self._environment.trust_id
|
|
if trust_id:
|
|
auth_utils.delete_trust(self._environment.trust_id)
|
|
self._environment.system_attributes['TrustId'] = None
|
|
self._environment.trust_id = None
|