# Copyright (C) 2019 NTT DATA # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import os import shutil from oslo_log import log as logging from oslo_utils import encodeutils from oslo_utils import excutils from toscaparser.tosca_template import ToscaTemplate import zipfile from tacker.common import exceptions import tacker.conf CONF = tacker.conf.CONF LOG = logging.getLogger(__name__) def _check_type(custom_def, node_type, type_list): for node_data_type, node_data_type_value in custom_def.items(): if node_data_type == node_type and node_type in type_list: return True, node_data_type_value for k, v in node_data_type_value.items(): if k == 'derived_from': if v in type_list and node_type == node_data_type: return True, node_data_type_value return False, None def _get_sw_image_artifact(artifacts): if not artifacts: return for artifact_value in artifacts.values(): if 'type' in artifact_value: if artifact_value['type'] == 'tosca.artifacts.nfv.SwImage': return artifact_value def _update_default_vnfd_data(node_value, node_type_value): vnf_properties = node_value['properties'] type_properties = node_type_value['properties'] for property_key, property_value in type_properties.items(): if property_key == 'descriptor_id': # if descriptor_id is parameterized, then get the value from the # default property and set it in the vnf_properties. if vnf_properties and isinstance( vnf_properties.get('descriptor_id'), dict): vnf_properties['descriptor_id'] = property_value.get("default") return vnf_properties def _get_vnf_data(nodetemplates): type_list = ['tosca.nodes.nfv.VNF'] for nt in nodetemplates: for node_name, node_value in nt.templates.items(): type_status, node_type_value = _check_type(nt.custom_def, node_value['type'], type_list) if type_status and node_type_value: return _update_default_vnfd_data(node_value, node_type_value) def _get_instantiation_levels(policies): if policies: for policy in policies: if policy.type_definition.type == \ 'tosca.policies.nfv.InstantiationLevels': return policy.properties def _update_flavour_data_from_vnf(custom_defs, node_tpl, flavour): type_list = ['tosca.nodes.nfv.VNF'] type_status, _ = _check_type(custom_defs, node_tpl['type'], type_list) if type_status and node_tpl['properties']: vnf_properties = node_tpl['properties'] if 'flavour_description' in vnf_properties: flavour.update( {'flavour_description': vnf_properties[ 'flavour_description']}) if 'flavour_id' in vnf_properties and isinstance( vnf_properties['flavour_id'], str): flavour.update({'flavour_id': vnf_properties['flavour_id']}) def _get_software_image(custom_defs, nodetemplate_name, node_tpl): type_list = ['tosca.nodes.nfv.Vdu.Compute', 'tosca.nodes.nfv.Vdu.VirtualBlockStorage'] type_status, _ = _check_type(custom_defs, node_tpl['type'], type_list) if type_status: properties = node_tpl['properties'] sw_image_artifact = _get_sw_image_artifact(node_tpl.get('artifacts')) if sw_image_artifact: properties['sw_image_data'].update( {'software_image_id': nodetemplate_name}) sw_image_data = properties['sw_image_data'] if 'metadata' in sw_image_artifact: sw_image_data.update({'metadata': sw_image_artifact['metadata']}) return sw_image_data def _populate_flavour_data(tosca): flavours = [] if tosca.nested_tosca_templates_with_topology: for tp in tosca.nested_tosca_templates_with_topology: _get_flavour_data(tp, flavours) else: _get_flavour_data(tosca.topology_template, flavours) return flavours def _get_flavour_data(tp, flavours): sw_image_list = [] # Setting up flavour data flavour_id = tp.substitution_mappings.properties.get('flavour_id') if flavour_id: if isinstance(flavour_id, dict): error_msg = "flavour_id should be string and given" \ " {}".format(flavour_id) raise exceptions.InvalidCSAR(error_msg) flavour = {'flavour_id': flavour_id} else: flavour = {} instantiation_levels = _get_instantiation_levels(tp.policies) if instantiation_levels: flavour.update({'instantiation_levels': instantiation_levels}) for template_name, node_tpl in tp.tpl.get('node_templates').items(): # check the flavour property in vnf data _update_flavour_data_from_vnf(tp.custom_defs, node_tpl, flavour) # Update the software image data sw_image = _get_software_image(tp.custom_defs, template_name, node_tpl) if sw_image: sw_image_list.append(sw_image) # Add software images for flavour if sw_image_list: flavour.update({'sw_images': sw_image_list}) if flavour: flavours.append(flavour) def _get_instantiation_levels_from_policy(tpl_policies): """Get defined instantiation levels Getting instantiation levels defined under policy type 'tosca.policies.nfv.InstantiationLevels'. """ levels = [] for policy in tpl_policies: for key, value in policy.items(): if value.get('type') == 'tosca.policies.nfv.InstantiationLevels'\ and value.get('properties', {}).get('levels', {}): levels = value.get('properties').get('levels').keys() default_level = value.get( 'properties').get('default_level') if default_level and default_level not in levels: error_msg = "Level {} not found in defined levels" \ " {}".format(default_level, ",".join(sorted(levels))) raise exceptions.InvalidCSAR(error_msg) return levels def _validate_instantiation_levels(policy, instantiation_levels): expected_policy_type = ['tosca.policies.nfv.VduInstantiationLevels', 'tosca.policies.nfv.' 'VirtualLinkInstantiationLevels'] for policy_name, policy_tpl in policy.items(): if policy_tpl.get('type') not in expected_policy_type: return if not instantiation_levels: msg = ('Policy of type' ' "tosca.policies.nfv.InstantiationLevels is not defined.') raise exceptions.InvalidCSAR(msg) if policy_tpl.get('properties'): levels_in_policy = policy_tpl.get( 'properties').get('levels') if levels_in_policy: invalid_levels = set(levels_in_policy.keys()) - set( instantiation_levels) else: invalid_levels = set() if invalid_levels: error_msg = "Level(s) {} not found in defined levels" \ " {}".format(",".join(sorted(invalid_levels)), ",".join(sorted(instantiation_levels) )) raise exceptions.InvalidCSAR(error_msg) def _validate_sw_image_data_for_artifact(node_tpl, template_name): artifact_type = [] artifacts = node_tpl.get('artifacts') if artifacts: for key, value in artifacts.items(): if value.get('type') == 'tosca.artifacts.nfv.SwImage': artifact_type.append(value.get('type')) if len(artifact_type) > 1: error_msg = ('artifacts of type "tosca.artifacts.nfv.SwImage"' ' is added more than one time for' ' node %(node)s.') % {'node': template_name} raise exceptions.InvalidCSAR(error_msg) if artifact_type and node_tpl.get('properties'): if not node_tpl.get('properties').get('sw_image_data'): error_msg = ('Node property "sw_image_data" is missing for' ' artifact type %(type)s for ' 'node %(node)s.') % { 'type': artifact_type[0], 'node': template_name} raise exceptions.InvalidCSAR(error_msg) def _validate_sw_image_data_for_artifacts(tosca): for tp in tosca.nested_tosca_templates_with_topology: for template_name, node_tpl in tp.tpl.get('node_templates').items(): _validate_sw_image_data_for_artifact(node_tpl, template_name) for template in tosca.nodetemplates: _validate_sw_image_data_for_artifact( template.entity_tpl, template.name) def _get_data_from_csar(tosca, context, id): for tp in tosca.nested_tosca_templates_with_topology: policies = tp.tpl.get("policies") if policies: levels = _get_instantiation_levels_from_policy(policies) for policy_tpl in policies: _validate_instantiation_levels(policy_tpl, levels) _validate_sw_image_data_for_artifacts(tosca) vnf_data = _get_vnf_data(tosca.nodetemplates) if not vnf_data: error_msg = "VNF properties are mandatory" raise exceptions.InvalidCSAR(error_msg) flavours = _populate_flavour_data(tosca) if not flavours: error_msg = "No VNF flavours are available" raise exceptions.InvalidCSAR(error_msg) return vnf_data, flavours def extract_csar_zip_file(file_path, extract_path): try: with zipfile.ZipFile(file_path, 'r') as zf: zf.extractall(extract_path) except (RuntimeError, zipfile.BadZipfile) as exp: with excutils.save_and_reraise_exception(): LOG.error("Error encountered while extracting " "csar zip file %(path)s. Error: %(error)s.", {'path': file_path, 'error': encodeutils.exception_to_unicode(exp)}) exp.reraise = False raise exceptions.InvalidZipFile(path=file_path) def load_csar_data(context, package_uuid, zip_path): extract_zip_path = os.path.join(CONF.vnf_package.vnf_package_csar_path, package_uuid) extract_csar_zip_file(zip_path, extract_zip_path) try: tosca = ToscaTemplate(zip_path, None, True) return _get_data_from_csar(tosca, context, package_uuid) except exceptions.InvalidCSAR as exp: with excutils.save_and_reraise_exception(): LOG.error("Error processing CSAR file %(path)s for vnf package" " %(uuid)s: Error: %(error)s. ", {'path': zip_path, 'uuid': package_uuid, 'error': encodeutils.exception_to_unicode(exp)}) except Exception as exp: with excutils.save_and_reraise_exception(): LOG.error("Tosca parser failed for vnf package %(uuid)s: " "Error: %(error)s. ", {'uuid': package_uuid, 'error': encodeutils.exception_to_unicode(exp)}) exp.reraise = False raise exceptions.InvalidCSAR(encodeutils.exception_to_unicode (exp)) def delete_csar_data(package_uuid): # Remove zip and folder from the vnf_package_csar_path csar_zip_temp_path = os.path.join(CONF.vnf_package.vnf_package_csar_path, package_uuid) csar_path = os.path.join(CONF.vnf_package.vnf_package_csar_path, package_uuid + ".zip") try: shutil.rmtree(csar_zip_temp_path) os.remove(csar_path) except OSError as exc: exc_message = encodeutils.exception_to_unicode(exc) msg = _('Failed to delete csar folder: ' '%(csar_path)s, Error: %(exc)s') LOG.warning(msg, {'csar_path': csar_path, 'exc': exc_message})