diff --git a/etc/config-generator.conf b/etc/config-generator.conf index ae9784372..b0cadfe7a 100644 --- a/etc/config-generator.conf +++ b/etc/config-generator.conf @@ -6,10 +6,12 @@ namespace = tacker.wsgi namespace = tacker.service namespace = tacker.nfvo.nfvo_plugin namespace = tacker.nfvo.drivers.vim.openstack_driver +namespace = tacker.nfvo.drivers.vim.kubernetes_driver namespace = tacker.keymgr namespace = tacker.vnfm.monitor namespace = tacker.vnfm.plugin namespace = tacker.vnfm.infra_drivers.openstack.openstack +namespace = tacker.vnfm.infra_drivers.kubernetes.kubernetes namespace = tacker.vnfm.mgmt_drivers.openwrt.openwrt namespace = tacker.vnfm.monitor_drivers.http_ping.http_ping namespace = tacker.vnfm.monitor_drivers.ping.ping diff --git a/samples/tosca-templates/vnfd/tosca-vnfd-containerized-two-containers.yaml b/samples/tosca-templates/vnfd/tosca-vnfd-containerized-two-containers.yaml new file mode 100644 index 000000000..61a7a6aac --- /dev/null +++ b/samples/tosca-templates/vnfd/tosca-vnfd-containerized-two-containers.yaml @@ -0,0 +1,37 @@ +tosca_definitions_version: tosca_simple_profile_for_nfv_1_0_0 +description: A sample containerized VNF with two containers per VDU + +metadata: + template_name: sample-tosca-vnfd + +topology_template: + node_templates: + VDU1: + type: tosca.nodes.nfv.VDU.Tacker + properties: + namespace: default + mapping_ports: + - "80:80" + - "88:88" + service_type: NodePort + vnfcs: + front_end: + num_cpus: 0.5 + mem_size: 512 MB + image: nginx + ports: + - "80" + rss_reader: + num_cpus: 0.5 + mem_size: 512 MB + image: nickchase/rss-php-nginx:v1 + ports: + - "88" + policies: + - SP1: + type: tosca.policies.tacker.Scaling + targets: [VDU1] + properties: + min_instances: 1 + max_instances: 3 + target_cpu_utilization_percentage: 40 \ No newline at end of file diff --git a/samples/tosca-templates/vnfd/tosca-vnfd-containerized.yaml b/samples/tosca-templates/vnfd/tosca-vnfd-containerized.yaml new file mode 100644 index 000000000..cc137768d --- /dev/null +++ b/samples/tosca-templates/vnfd/tosca-vnfd-containerized.yaml @@ -0,0 +1,36 @@ +tosca_definitions_version: tosca_simple_profile_for_nfv_1_0_0 +description: A sample containerized VNF with one container per VDU + +metadata: + template_name: sample-tosca-vnfd + +topology_template: + node_templates: + VDU1: + type: tosca.nodes.nfv.VDU.Tacker + properties: + namespace: default + mapping_ports: + - "80:80" + labels: + - "app: webserver" + service_type: ClusterIP + vnfcs: + web_server: + num_cpus: 0.2 + mem_size: 100 MB + image: k8s.gcr.io/hpa-example + ports: + - "80" + config: | + param0: key1 + param1: key2 + + policies: + - SP1: + type: tosca.policies.tacker.Scaling + targets: [VDU1] + properties: + min_instances: 1 + max_instances: 10 + target_cpu_utilization_percentage: 50 diff --git a/setup.cfg b/setup.cfg index 2788d5a47..fc2239d53 100644 --- a/setup.cfg +++ b/setup.cfg @@ -53,6 +53,7 @@ tacker.openstack.common.cache.backends = tacker.tacker.vnfm.drivers = noop = tacker.vnfm.infra_drivers.noop:DeviceNoop openstack = tacker.vnfm.infra_drivers.openstack.openstack:OpenStack + kubernetes = tacker.vnfm.infra_drivers.kubernetes.kubernetes_driver:Kubernetes tacker.tacker.mgmt.drivers = noop = tacker.vnfm.mgmt_drivers.noop:DeviceMgmtNoop openwrt = tacker.vnfm.mgmt_drivers.openwrt.openwrt:DeviceMgmtOpenWRT @@ -79,6 +80,7 @@ oslo.config.opts = tacker.vnfm.monitor = tacker.vnfm.monitor:config_opts tacker.vnfm.plugin = tacker.vnfm.plugin:config_opts tacker.vnfm.infra_drivers.openstack.openstack= tacker.vnfm.infra_drivers.openstack.openstack:config_opts + tacker.vnfm.infra_drivers.kubernetes.kubernetes_driver = tacker.vnfm.infra_drivers.kubernetes.kubernetes_driver:config_opts tacker.vnfm.mgmt_drivers.openwrt.openwrt = tacker.vnfm.mgmt_drivers.openwrt.openwrt:config_opts tacker.vnfm.monitor_drivers.http_ping.http_ping = tacker.vnfm.monitor_drivers.http_ping.http_ping:config_opts tacker.vnfm.monitor_drivers.ping.ping = tacker.vnfm.monitor_drivers.ping.ping:config_opts diff --git a/tacker/common/container/kubernetes_utils.py b/tacker/common/container/kubernetes_utils.py index 90be5bbc8..98131f143 100644 --- a/tacker/common/container/kubernetes_utils.py +++ b/tacker/common/container/kubernetes_utils.py @@ -30,8 +30,8 @@ CONF = cfg.CONF class KubernetesHTTPAPI(object): - def get_k8sClient(self, auth_plugin): - config = client.ConfigurationObject() + def get_k8s_client(self, auth_plugin): + config = client.Configuration() config.host = auth_plugin['auth_url'] if ('username' in auth_plugin) and ('password' in auth_plugin)\ and (auth_plugin['password'] is not None): @@ -48,21 +48,25 @@ class KubernetesHTTPAPI(object): config.verify_ssl = True else: config.verify_ssl = False - k8s_client = api_client.ApiClient(config=config) + k8s_client = api_client.ApiClient(configuration=config) return k8s_client - def initialize_ExtensionApiClient(self, auth): - k8s_client = self.get_k8sClient(auth_plugin=auth) + def get_extension_api_client(self, auth): + k8s_client = self.get_k8s_client(auth_plugin=auth) return client.ExtensionsV1beta1Api(api_client=k8s_client) - def initialize_CoreApiV1Client(self, auth): - k8s_client = self.get_k8sClient(auth_plugin=auth) + def get_core_v1_api_client(self, auth): + k8s_client = self.get_k8s_client(auth_plugin=auth) return client.CoreV1Api(api_client=k8s_client) - def initialize_CoreApiClient(self, auth): - k8s_client = self.get_k8sClient(auth_plugin=auth) + def get_core_api_client(self, auth): + k8s_client = self.get_k8s_client(auth_plugin=auth) return client.CoreApi(api_client=k8s_client) + def get_scaling_api_client(self, auth): + k8s_client = self.get_k8s_client(auth_plugin=auth) + return client.AutoscalingV1Api(api_client=k8s_client) + @staticmethod def create_ca_cert_tmp_file(ca_cert): file_descriptor, file_path = tempfile.mkstemp() diff --git a/tacker/common/eventlet_utils.py b/tacker/common/eventlet_utils.py index c1c517934..ff9a50907 100644 --- a/tacker/common/eventlet_utils.py +++ b/tacker/common/eventlet_utils.py @@ -20,7 +20,7 @@ from oslo_utils import importutils def monkey_patch(): - eventlet.monkey_patch() + eventlet.monkey_patch(all=False, socket=True) if os.name != 'nt': p_c_e = importutils.import_module('pyroute2.config.eventlet') p_c_e.eventlet_config() diff --git a/tacker/extensions/vnfm.py b/tacker/extensions/vnfm.py index 749a56879..b13ed9ccf 100644 --- a/tacker/extensions/vnfm.py +++ b/tacker/extensions/vnfm.py @@ -156,6 +156,18 @@ class InvalidParamsForSM(exceptions.InvalidInput): message = _("Please provide parameters for substitution mappings") +class InvalidKubernetesScalingPolicyNumber(exceptions.InvalidInput): + message = _("Please provide only one Scaling policy") + + +class InvalidKubernetesNetworkNumber(exceptions.InvalidInput): + message = _("Please provide one network for all vdus") + + +class InvalidKubernetesInputParameter(exceptions.InvalidInput): + message = _("Found unsupported keys for %(found_keys)s ") + + def _validate_service_type_list(data, valid_values=None): if not isinstance(data, list): msg = _("invalid data format for service list: '%s'") % data diff --git a/tacker/nfvo/drivers/vim/kubernetes_driver.py b/tacker/nfvo/drivers/vim/kubernetes_driver.py index 984ddecad..54a96c899 100644 --- a/tacker/nfvo/drivers/vim/kubernetes_driver.py +++ b/tacker/nfvo/drivers/vim/kubernetes_driver.py @@ -86,11 +86,11 @@ class Kubernetes_Driver(abstract_vim_driver.VimAbstractDriver): def _validate_vim(self, auth, file_descriptor): # If Tacker can get k8s_info, Kubernetes authentication is valid # if not, it is invalid + auth_dict = dict(auth) try: - auth_dict = dict(auth) - k8s_coreClient = \ - self.kubernetes.initialize_CoreApiClient(auth_dict) - k8s_info = k8s_coreClient.get_api_versions() + core_api_client = \ + self.kubernetes.get_core_api_client(auth_dict) + k8s_info = core_api_client.get_api_versions() LOG.info(k8s_info) except Exception as e: LOG.info('VIM Kubernetes authentication is wrong.') @@ -98,18 +98,8 @@ class Kubernetes_Driver(abstract_vim_driver.VimAbstractDriver): self.clean_authenticate_vim(auth_dict, file_descriptor) raise nfvo.VimUnauthorizedException(message=str(e)) - def _initialize_k8s_extensionClient(self, auth): - k8s_extensionClient =\ - self.kubernetes.initialize_ExtensionApiClient(auth) - return k8s_extensionClient - - def _initialize_k8s_coreV1Client(self, auth): - k8s_coreV1Client =\ - self.kubernetes.initialize_CoreApiV1Client(auth) - return k8s_coreV1Client - - def _find_regions(self, k8s_coreV1Client): - list_namespaces = k8s_coreV1Client.list_namespace() + def _find_regions(self, core_v1_api_client): + list_namespaces = core_v1_api_client.list_namespace() namespaces = [namespace.metadata.name for namespace in list_namespaces.items] return namespaces @@ -122,9 +112,9 @@ class Kubernetes_Driver(abstract_vim_driver.VimAbstractDriver): # in Kubernetes environment, user can deploy resource # on specific namespace auth_cred, file_descriptor = self._get_auth_creds(vim_obj) - k8s_coreV1Client = \ - self.kubernetes.initialize_CoreApiV1Client(auth_cred) - namespace_list = self._find_regions(k8s_coreV1Client) + core_v1_api_client = \ + self.kubernetes.get_core_v1_api_client(auth_cred) + namespace_list = self._find_regions(core_v1_api_client) self.clean_authenticate_vim(auth_cred, file_descriptor) vim_obj['placement_attr'] = {'regions': namespace_list} return vim_obj diff --git a/tacker/tests/unit/nfvo/drivers/vim/test_kubernetes_driver.py b/tacker/tests/unit/nfvo/drivers/vim/test_kubernetes_driver.py index 4a7fcbbba..25b125c09 100644 --- a/tacker/tests/unit/nfvo/drivers/vim/test_kubernetes_driver.py +++ b/tacker/tests/unit/nfvo/drivers/vim/test_kubernetes_driver.py @@ -97,15 +97,15 @@ class TestKubernetes_Driver(base.TestCase): self._test_register_vim(self.vim_obj, mock_k8s_client, mock_k8s_coreV1Client) mock_k8s_coreV1Client.list_namespace.assert_called_once_with() - self.kubernetes_api.\ - initialize_CoreApiClient.assert_called_once_with(auth_obj) + self.kubernetes_api. \ + get_core_api_client.assert_called_once_with(auth_obj) def _test_register_vim(self, vim_obj, mock_k8s_client, mock_k8s_coreV1Client): - self.kubernetes_api.\ - initialize_CoreApiClient.return_value = mock_k8s_client - self.kubernetes_api.\ - initialize_CoreApiV1Client.return_value = mock_k8s_coreV1Client + self.kubernetes_api. \ + get_core_api_client.return_value = mock_k8s_client + self.kubernetes_api. \ + get_core_v1_api_client.return_value = mock_k8s_coreV1Client fernet_attrs = {'encrypt.return_value': 'encrypted_password'} mock_fernet_obj = mock.Mock(**fernet_attrs) mock_fernet_key = 'test_fernet_key' diff --git a/tacker/tosca/lib/tacker_defs.yaml b/tacker/tosca/lib/tacker_defs.yaml index c4c30f075..b7136be07 100644 --- a/tacker/tosca/lib/tacker_defs.yaml +++ b/tacker/tosca/lib/tacker_defs.yaml @@ -169,6 +169,36 @@ data_types: description: The mac address allowed to be paired with specific virtual IP. + tosca.datatypes.nfv.VnfcConfigurableProperties: + properties: + num_cpus: + type: float + required: false + mem_size: + type: string + required: false + image: + type: string + required: false + command: + type: list + entry_schema: + type: string + required: false + args: + type: list + entry_schema: + type: string + required: false + ports: + type: list + entry_schema: + type: string + required: false + config: + type: string + required: false + policy_types: tosca.policies.tacker.Placement: derived_from: tosca.policies.Root @@ -279,6 +309,10 @@ policy_types: type: string required: true description: List of Scaling nodes. + target_cpu_utilization_percentage: + type: integer + required: false + description: The target average CPU utilization over all the pods which is used in Kubernetes environment min_instances: type: integer required: true diff --git a/tacker/tosca/lib/tacker_nfv_defs.yaml b/tacker/tosca/lib/tacker_nfv_defs.yaml index b4bbd6ab4..8df6ab85e 100644 --- a/tacker/tosca/lib/tacker_nfv_defs.yaml +++ b/tacker/tosca/lib/tacker_nfv_defs.yaml @@ -248,6 +248,28 @@ node_types: type: string required: false + vnfcs: + type: map + required: false + entry_schema: + type: tosca.datatypes.nfv.VnfcConfigurableProperties + + namespace: + type: string + required: false + + mapping_ports: + type: list + required: false + entry_schema: + type: string + + labels: + type: list + required: false + entry_schema: + type: string + tosca.nodes.nfv.CP.Tacker: derived_from: tosca.nodes.nfv.CP properties: diff --git a/tacker/vnfm/infra_drivers/kubernetes/__init__.py b/tacker/vnfm/infra_drivers/kubernetes/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tacker/vnfm/infra_drivers/kubernetes/k8s/__init__.py b/tacker/vnfm/infra_drivers/kubernetes/k8s/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tacker/vnfm/infra_drivers/kubernetes/k8s/tosca_kube_object.py b/tacker/vnfm/infra_drivers/kubernetes/k8s/tosca_kube_object.py new file mode 100644 index 000000000..ef6aaf9e6 --- /dev/null +++ b/tacker/vnfm/infra_drivers/kubernetes/k8s/tosca_kube_object.py @@ -0,0 +1,241 @@ +# All Rights Reserved. +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +class ToscaKubeObject(object): + + """ToscaKubeObject holds the basic struct of a VDU. + + That is used for translating TOSCA to Kubernetes templates. + """ + + def __init__(self, name=None, namespace=None, mapping_ports=None, + containers=None, network_name=None, + mgmt_connection_point=False, scaling_object=None, + service_type=None, labels=None): + self._name = name + self._namespace = namespace + self._mapping_ports = mapping_ports + self._containers = containers + self._network_name = network_name + self._mgmt_connection_point = mgmt_connection_point + self._scaling_object = scaling_object + self._service_type = service_type + self._labels = labels + + @property + def name(self): + return self._name + + @name.setter + def name(self, name): + self._name = name + + @property + def namespace(self): + return self._namespace + + @namespace.setter + def namespace(self, namespace): + self._namespace = namespace + + @property + def mapping_ports(self): + return self._mapping_ports + + @mapping_ports.setter + def mapping_ports(self, mapping_ports): + self._mapping_ports = mapping_ports + + @property + def containers(self): + return self._containers + + @containers.setter + def containers(self, containers): + self._containers = containers + + @property + def network_name(self): + return self._network_name + + @network_name.setter + def network_name(self, network_name): + self._network_name = network_name + + @property + def mgmt_connection_point(self): + return self._mgmt_connection_point + + @mgmt_connection_point.setter + def mgmt_connection_point(self, mgmt_connection_point): + self._mgmt_connection_point = mgmt_connection_point + + @property + def scaling_object(self): + return self._scaling_object + + @scaling_object.setter + def scaling_object(self, scaling_object): + self._scaling_object = scaling_object + + @property + def service_type(self): + return self._service_type + + @service_type.setter + def service_type(self, service_type): + self._service_type = service_type + + @property + def labels(self): + return self._labels + + @labels.setter + def labels(self, labels): + self._labels = labels + + +class Container(object): + """Container holds the basic structs of a container""" + def __init__(self, name=None, num_cpus=None, mem_size=None, image=None, + command=None, args=None, ports=None, config=None): + self._name = name + self._num_cpus = num_cpus + self._mem_size = mem_size + self._image = image + self._command = command + self._args = args + self._ports = ports + self._config = config + + @property + def name(self): + return self._name + + @name.setter + def name(self, name): + self._name = name + + @property + def num_cpus(self): + return self._num_cpus + + @num_cpus.setter + def num_cpus(self, num_cpus): + self._num_cpus = num_cpus + + @property + def mem_size(self): + return self._mem_size + + @mem_size.setter + def mem_size(self, mem_size): + self._mem_size = mem_size + + @property + def image(self): + return self._image + + @image.setter + def image(self, image): + self._image = image + + @property + def command(self): + return self._command + + @command.setter + def command(self, command): + self._command = command + + @property + def args(self): + return self._args + + @args.setter + def args(self, args): + self._args = args + + @property + def ports(self): + return self._ports + + @ports.setter + def ports(self, ports): + self._ports = ports + + @property + def config(self): + return self._config + + @config.setter + def config(self, config): + self._config = config + + +class ScalingObject(object): + """ScalingObject holds the basic struct of a horizontal pod auto-scaling""" + def __init__(self, scaling_name=None, min_replicas=None, max_replicas=None, + scale_target_name=None, + target_cpu_utilization_percentage=None): + self._scaling_name = scaling_name + self._min_replicas = min_replicas + self._max_replicas = max_replicas + self._scale_target_name = scale_target_name + self._target_cpu_utilization_percentage = \ + target_cpu_utilization_percentage + + @property + def scaling_name(self): + return self._scaling_name + + @scaling_name.setter + def scaling_name(self, scaling_name): + self._scaling_name = scaling_name + + @property + def min_replicas(self): + return self._min_replicas + + @min_replicas.setter + def min_replicas(self, min_replicas): + self._min_replicas = min_replicas + + @property + def max_replicas(self): + return self._max_replicas + + @max_replicas.setter + def max_replicas(self, max_replicas): + self._max_replicas = max_replicas + + @property + def scale_target_name(self): + return self._scale_target_name + + @scale_target_name.setter + def scale_target_name(self, scale_target_name): + self._scale_target_name = scale_target_name + + @property + def target_cpu_utilization_percentage(self): + return self._target_cpu_utilization_percentage + + @target_cpu_utilization_percentage.setter + def target_cpu_utilization_percentage(self, + target_cpu_utilization_percentage): + self._target_cpu_utilization_percentage = \ + target_cpu_utilization_percentage diff --git a/tacker/vnfm/infra_drivers/kubernetes/k8s/translate_inputs.py b/tacker/vnfm/infra_drivers/kubernetes/k8s/translate_inputs.py new file mode 100644 index 000000000..6730cf40d --- /dev/null +++ b/tacker/vnfm/infra_drivers/kubernetes/k8s/translate_inputs.py @@ -0,0 +1,277 @@ +# All Rights Reserved. +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import cfg +from oslo_log import log as logging +from oslo_utils import uuidutils + +from tacker.common import log +from tacker.extensions import vnfm +from tacker.tosca import utils as toscautils +from tacker.vnfm.infra_drivers.kubernetes.k8s import tosca_kube_object + +from toscaparser.functions import GetInput +from toscaparser import tosca_template +import toscaparser.utils.yamlparser + + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + +YAML_LOADER = toscaparser.utils.yamlparser.load_yaml +SCALING = 'tosca.policies.Scaling' +TACKER_CP = 'tosca.nodes.nfv.CP.Tacker' +TACKER_VL = 'tosca.nodes.nfv.VL' +COLON_CHARACTER = ':' +WHITE_SPACE_CHARACTER = ' ' +NON_WHITE_SPACE_CHARACTER = '' +TOSCA_LINKS_TO = 'tosca.relationships.network.LinksTo' +TOSCA_BINDS_TO = 'tosca.relationships.network.BindsTo' + +ALLOWED_KUBERNETES_OBJECT_PROPS = ('namespace', 'mapping_ports', 'labels', + 'vnfcs', 'service_type', + 'mgmt_driver', 'config_drive') +ALLOWED_CONTAINER_OBJECT_PROPS = ('num_cpus', 'mem_size', 'image', 'config', + 'command', 'args', 'ports') +ALLOWED_SCALING_OBJECT_PROPS = ('min_instances', 'max_instances', + 'target_cpu_utilization_percentage') + +SCALAR_UNIT_DICT = {'B': 1, 'kB': 1000, 'KiB': 1024, 'MB': 1000000, + 'MiB': 1048576, 'GB': 1000000000, + 'GiB': 1073741824, 'TB': 1000000000000, + 'TiB': 1099511627776} + + +class Parser(object): + """Convert TOSCA template to Tosca Kube object""" + + def __init__(self, vnfd_dict): + self.vnfd_dict = vnfd_dict + + def loader(self): + """Load TOSCA template and start parsing""" + + try: + parserd_params = None + toscautils.updateimports(self.vnfd_dict) + tosca = tosca_template.\ + ToscaTemplate(parsed_params=parserd_params, + a_file=False, + yaml_dict_tpl=self.vnfd_dict) + except Exception as e: + LOG.debug("tosca-parser error: %s", str(e)) + raise vnfm.ToscaParserFailed(error_msg_details=str(e)) + + # Initiate a list tosca_kube_object which are defined from VDU + tosca_kube_objects = [] + vdus = toscautils.findvdus(tosca) + + for node_template in vdus: + vdu_name = node_template.name + tosca_kube_obj = self.tosca_to_kube_mapping(node_template) + + # Find network name in which VDU is attached + tosca_kube_obj.network_name = self.find_networks(tosca, vdu_name) + + # If connection_point is True, Tacker will manage its service ip + tosca_kube_obj.mgmt_connection_point = \ + self.check_mgmt_cp(tosca, vdu_name) + + # Find scaling policy that is used for this VDU, different to + # VM-based VNF, there are no alarm policies. + tosca_kube_obj.scaling_object = \ + self.get_scaling_policy(tosca, vdu_name) + tosca_kube_objects.append(tosca_kube_obj) + return tosca_kube_objects + + @log.log + def tosca_to_kube_mapping(self, node_template): + """Map TOSCA template to ToscaKubeObject properties""" + tosca_props = self.get_properties(node_template) + self.check_unsupported_key(tosca_props, + ALLOWED_KUBERNETES_OBJECT_PROPS) + tosca_kube_obj = tosca_kube_object.ToscaKubeObject() + + # tosca_kube_obj name is used for tracking Kubernetes resources + service_name = 'svc-' + node_template.name + '-' + \ + uuidutils.generate_uuid() + tosca_kube_obj.name = service_name[:15] + tosca_kube_obj.namespace = tosca_props.get('namespace') + tosca_kube_obj.mapping_ports = tosca_props.get('mapping_ports') + tosca_kube_obj.labels = tosca_props.get('labels') + + # Find config properties of VNFComponents in each VDU node + vnfc_config_props = tosca_props.get('vnfcs') + container_objects = self.vnfc_configurable_to_container_mapping( + vnfc_config_props) + tosca_kube_obj.containers = container_objects + + # In labels, we define which type of Service VNF will be deployed + service_labels = dict() + if tosca_kube_obj.labels: + for label in tosca_kube_obj.labels: + label = label.replace( + WHITE_SPACE_CHARACTER, + NON_WHITE_SPACE_CHARACTER) + labels = label.split(COLON_CHARACTER) + service_labels.update({labels[0]: labels[1]}) + tosca_kube_obj.labels = service_labels + tosca_kube_obj.service_type = tosca_props.get('service_type') + return tosca_kube_obj + + @log.log + def vnfc_configurable_to_container_mapping(self, vnfc_config_properties): + """Map VnfcConfigurableProperties to list of containers""" + containers = list() + for container_name, container_props in vnfc_config_properties.items(): + container = tosca_kube_object.Container() + container.name = container_name + self.check_unsupported_key(container_props, + ALLOWED_CONTAINER_OBJECT_PROPS) + container.num_cpus = container_props.get('num_cpus') + memory_size = container_props.get('mem_size') + container.mem_size = self.process_memory(memory_size) + container.image = container_props.get('image') + container.config = container_props.get('config') + container.command = container_props.get('command') + container.args = container_props.get('args') + container.ports = container_props.get('ports') + containers.append(container) + return containers + + @log.log + def process_memory(self, mem_value): + """Translate memory size with unit to a number of byte memory""" + # Memory size has the pattern e.g. 512 MB, 1024 MB or 1 GB + parser_memory = mem_value.split(WHITE_SPACE_CHARACTER) + memory_value = parser_memory[0] + memory_unit = parser_memory[1] + memory_real_value = 0 + + # Translate memory's byte size based on SCALAR_UNIT_DICT + if memory_unit in SCALAR_UNIT_DICT.keys(): + memory_real_value = \ + int(memory_value) * SCALAR_UNIT_DICT[memory_unit] + return memory_real_value + + @log.log + def get_scaling_policy(self, tosca, vdu_name): + """Find scaling policy which is used for VDU""" + if len(tosca.policies) == 0: + scaling_obj = None + else: + count = 0 + scaling_obj = tosca_kube_object.ScalingObject() + for policy in tosca.policies: + if policy.type_definition.is_derived_from(SCALING) \ + and vdu_name in policy.targets: + count = count + 1 + policy_props = policy.properties + self.check_unsupported_key(policy_props, + ALLOWED_SCALING_OBJECT_PROPS) + scaling_obj.scaling_name = policy.name + scaling_obj.target_cpu_utilization_percentage = \ + policy_props.get( + 'target_cpu_utilization_percentage') + scaling_obj.min_replicas = \ + policy_props.get('min_instances') + scaling_obj.max_replicas = \ + policy_props.get('max_instances') + + if count > 1: + # Because in Kubernetes environment, we can attach only one + # scaling policy to Deployment. If user provides more than one + # policy this error will happen when count > 1 + LOG.debug("Tacker only support one scaling policy per VDU") + raise vnfm.InvalidKubernetesScalingPolicyNumber + + return scaling_obj + + @log.log + def find_networks(self, tosca, vdu_name): + """Find networks which VDU is attached based on vdu_name.""" + networks = [] + network_names = [] + for node_template in tosca.nodetemplates: + if node_template.type_definition.is_derived_from(TACKER_CP): + match = False + links_to = None + binds_to = None + for rel, node in node_template.relationships.items(): + if not links_to and rel.is_derived_from(TOSCA_LINKS_TO): + links_to = node + elif not binds_to and rel.is_derived_from(TOSCA_BINDS_TO): + binds_to = node + if binds_to.name == vdu_name: + match = True + if match: + networks.append(links_to.name) + + for node_template in tosca.nodetemplates: + if node_template.type_definition.is_derived_from(TACKER_VL): + tosca_props = self.get_properties(node_template) + if node_template.name in networks: + for key, value in tosca_props.items(): + if key == 'network_name': + network_names.append(value) + + if len(network_names) > 1: + # Currently, Kubernetes doesn't support multiple networks. + # If user provides more than one network, the error will raise. + # TODO(anyone): support Multus or multiple networking + LOG.debug("Kubernetes feature only support one network") + raise vnfm.InvalidKubernetesNetworkNumber + if network_names: + return network_names[0] + else: + return None + + @log.log + def check_mgmt_cp(self, tosca, vdu_name): + """Check if management for connection point is enabled""" + mgmt_connection_point = False + for nt in tosca.nodetemplates: + if nt.type_definition.is_derived_from(TACKER_CP): + mgmt = nt.get_property_value('management') or None + if mgmt: + vdu = None + for rel, node in nt.relationships.items(): + if rel.is_derived_from(TOSCA_BINDS_TO): + vdu = node.name + break + if vdu == vdu_name: + mgmt_connection_point = True + LOG.debug('mgmt_connection_point: %s', mgmt_connection_point) + return mgmt_connection_point + + @log.log + def get_properties(self, node_template): + """Return a list of property node template objects.""" + tosca_props = {} + for prop in node_template.get_properties_objects(): + if isinstance(prop.value, GetInput): + tosca_props[prop.name] = {'get_param': prop.value.input_name} + else: + tosca_props[prop.name] = prop.value + return tosca_props + + def check_unsupported_key(self, input_values, support_key): + """collect all unsupported keys""" + found_keys = [] + for key in input_values: + if key not in support_key: + found_keys.append(key) + if len(found_keys) > 0: + raise vnfm.InvalidKubernetesInputParameter(found_keys=found_keys) diff --git a/tacker/vnfm/infra_drivers/kubernetes/k8s/translate_outputs.py b/tacker/vnfm/infra_drivers/kubernetes/k8s/translate_outputs.py new file mode 100644 index 000000000..df6199223 --- /dev/null +++ b/tacker/vnfm/infra_drivers/kubernetes/k8s/translate_outputs.py @@ -0,0 +1,351 @@ +# All Rights Reserved. +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from kubernetes import client +from oslo_config import cfg +from oslo_log import log as logging +import toscaparser.utils.yamlparser + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + +YAML_LOADER = toscaparser.utils.yamlparser.load_yaml +NEWLINE_CHARACTER = "\n" +COLON_CHARACTER = ':' +WHITE_SPACE_CHARACTER = ' ' +NON_WHITE_SPACE_CHARACTER = '' +HYPHEN_CHARACTER = '-' +DASH_CHARACTER = '_' + + +class Transformer(object): + """Transform TOSCA template to Kubernetes resources""" + + def __init__(self, core_v1_api_client, extension_api_client, + scaling_api_client): + self.core_v1_api_client = core_v1_api_client + self.extension_api_client = extension_api_client + self.scaling_api_client = scaling_api_client + + def transform(self, tosca_kube_objects): + """transform function translates from tosca_kube_object to + + kubernetes_object (ConfigMap, Deployment, Service, HPA) + """ + + # kubernetes_objects store all kubernetes objects that are transformed + # from TOSCA VNF template + kubernetes_objects = dict() + for tosca_kube_obj in tosca_kube_objects: + namespace = tosca_kube_obj.namespace + kubernetes_objects['namespace'] = namespace + kubernetes_objects['objects'] = list() + kube_obj_name = tosca_kube_obj.name + new_kube_obj_name = self.pre_process_name(kube_obj_name) + + # translate environments to ConfigMap objects + for container in tosca_kube_obj.containers: + config_map_object = \ + self.init_configmap(container_props=container, + kube_obj_name=new_kube_obj_name) + if config_map_object: + kubernetes_objects['objects'].append(config_map_object) + + # translate Deployment object + deployment_object = \ + self.init_deployment(tosca_kube_obj=tosca_kube_obj, + kube_obj_name=new_kube_obj_name) + kubernetes_objects['objects'].append(deployment_object) + + # translate to Horizontal Pod Autoscaler object + hpa_object = self.init_hpa(tosca_kube_obj=tosca_kube_obj, + kube_obj_name=new_kube_obj_name) + if hpa_object: + kubernetes_objects['objects'].append(hpa_object) + + # translate to Service object + service_object = self.init_service(tosca_kube_obj=tosca_kube_obj, + kube_obj_name=new_kube_obj_name) + kubernetes_objects['objects'].append(service_object) + + return kubernetes_objects + + def deploy(self, kubernetes_objects): + """Deploy Kubernetes objects on Kubernetes VIM and return + + a list name of services + """ + + deployment_names = list() + namespace = kubernetes_objects.get('namespace') + k8s_objects = kubernetes_objects.get('objects') + + for k8s_object in k8s_objects: + object_type = k8s_object.kind + + if object_type == 'ConfigMap': + self.core_v1_api_client.create_namespaced_config_map( + namespace=namespace, + body=k8s_object) + LOG.debug('Successfully created ConfigMap %s', + k8s_object.metadata.name) + elif object_type == 'Deployment': + self.extension_api_client.create_namespaced_deployment( + namespace=namespace, + body=k8s_object) + LOG.debug('Successfully created Deployment %s', + k8s_object.metadata.name) + elif object_type == 'HorizontalPodAutoscaler': + self.scaling_api_client.\ + create_namespaced_horizontal_pod_autoscaler( + namespace=namespace, + body=k8s_object) + LOG.debug('Successfully created Horizontal Pod Autoscaler %s', + k8s_object.metadata.name) + elif object_type == 'Service': + self.core_v1_api_client.create_namespaced_service( + namespace=namespace, + body=k8s_object) + LOG.debug('Successfully created Service %s', + k8s_object.metadata.name) + deployment_names.append(namespace) + deployment_names.append(k8s_object.metadata.name) + + # return a string that contains all deployment namespace and names + # for tracking resources pattern: + # namespace1,deployment1,namespace2,deployment2,namespace3,deployment3 + return ",".join(deployment_names) + + # config_labels configures label + def config_labels(self, deployment_name=None, scaling_name=None): + label = dict() + if deployment_name: + label.update({"selector": deployment_name}) + if scaling_name: + label.update({"scaling_name": scaling_name}) + return label + + # Init resource requirement for container + def init_resource_requirements(self, container): + limits = dict() + requests = dict() + if container.num_cpus: + limits.update({'cpu': container.num_cpus}) + requests.update({'cpu': container.num_cpus}) + if container.mem_size: + limits.update({'memory': container.mem_size}) + requests.update({'memory': container.mem_size}) + return client.V1ResourceRequirements(limits=limits, + requests=requests) + + def init_envs(self, container_props, name): + config = container_props.config + config_dict = self.pre_process_config(config) + configmap_name = name + + list_envs = [] + for key in config_dict: + config_map_ref = client.V1ConfigMapKeySelector( + key=key, + name=configmap_name) + env_var = client.V1EnvVarSource( + config_map_key_ref=config_map_ref) + env_object = client.V1EnvVar( + name=key, + value_from=env_var) + list_envs.append(env_object) + return list_envs + + # Init container object + def init_containers(self, container_props, limit_resource, name): + list_env_var = self.init_envs(container_props, name) + container_name = self.pre_process_name(container_props.name) + list_container_port = list() + if container_props.ports: + for container_port in container_props.ports: + port = int(container_port) + cport = client.V1ContainerPort(container_port=port) + list_container_port.append(cport) + container = client.V1Container( + name=container_name, + image=container_props.image, + ports=list_container_port, + resources=limit_resource, + command=container_props.command, + args=container_props.args, + env=list_env_var, + image_pull_policy="IfNotPresent") + return container + + # init_deployment initializes Kubernetes Pod object + def init_deployment(self, tosca_kube_obj, kube_obj_name): + """Instantiate the deployment object""" + + deployment_name = kube_obj_name + # Create a list of container, which made a Pod + containers = list() + for container_prop in tosca_kube_obj.containers: + limit_resource = self.init_resource_requirements(container_prop) + container = self.init_containers( + container_props=container_prop, + limit_resource=limit_resource, + name=deployment_name) + containers.append(container) + + # Make a label with pattern {"selector": "deployment_name"} + if tosca_kube_obj.scaling_object: + scaling_name = tosca_kube_obj.scaling_object.scaling_name + update_label = self.config_labels(deployment_name=deployment_name, + scaling_name=scaling_name) + else: + update_label = self.config_labels(deployment_name=deployment_name) + if tosca_kube_obj.labels: + if 'selector' in update_label: + del update_label['selector'] + labels = dict(tosca_kube_obj.labels.items() + update_label.items()) + else: + labels = update_label + + # Create and configure a spec section + pod_template = client.V1PodTemplateSpec( + metadata=client.V1ObjectMeta(labels=labels), + spec=client.V1PodSpec(containers=containers)) + + # Create the specification of deployment + deployment_spec = client.ExtensionsV1beta1DeploymentSpec( + template=pod_template) + metadata = client.V1ObjectMeta(name=deployment_name) + + # Instantiate the deployment object + deployment = client.ExtensionsV1beta1Deployment( + api_version="extensions/v1beta1", + kind="Deployment", + metadata=metadata, + spec=deployment_spec) + return deployment + + # init_hpa initializes Kubernetes Horizon Pod Auto-scaling object + def init_hpa(self, tosca_kube_obj, kube_obj_name): + scaling_props = tosca_kube_obj.scaling_object + hpa = None + if scaling_props: + min_replicas = scaling_props.min_replicas + max_replicas = scaling_props.max_replicas + cpu_util = scaling_props.target_cpu_utilization_percentage + deployment_name = kube_obj_name + + # Create target Deployment object + target = client.V1CrossVersionObjectReference( + api_version="extensions/v1beta1", + kind="Deployment", + name=deployment_name) + # Create the specification of horizon pod auto-scaling + hpa_spec = client.V1HorizontalPodAutoscalerSpec( + min_replicas=min_replicas, + max_replicas=max_replicas, + target_cpu_utilization_percentage=cpu_util, + scale_target_ref=target) + metadata = client.V1ObjectMeta(name=deployment_name) + # Create Horizon Pod Auto-Scaling + hpa = client.V1HorizontalPodAutoscaler( + api_version="autoscaling/v1", + kind="HorizontalPodAutoscaler", + spec=hpa_spec, + metadata=metadata) + return hpa + + # init_service initializes Kubernetes service object + def init_service(self, tosca_kube_obj, kube_obj_name): + list_service_port = list() + service_label = tosca_kube_obj.labels + for port in tosca_kube_obj.mapping_ports: + if COLON_CHARACTER in port: + ports = port.split(COLON_CHARACTER) + published_port = int(ports[0]) + target_port = int(ports[1]) + else: + target_port = published_port = int(port) + service_port = client.V1ServicePort( + name=str(published_port), + port=published_port, + target_port=target_port) + list_service_port.append(service_port) + + deployment_name = kube_obj_name + selector_by_name = self.config_labels(deployment_name) + if tosca_kube_obj.labels: + selectors = tosca_kube_obj.labels.copy() + else: + selectors = selector_by_name + if tosca_kube_obj.mgmt_connection_point: + service_label['management_connection'] = 'True' + if tosca_kube_obj.network_name: + service_label['network_name'] = tosca_kube_obj.network_name + service_label['vdu_name'] = tosca_kube_obj.name + + metadata = client.V1ObjectMeta(name=deployment_name, + labels=service_label) + if tosca_kube_obj.service_type: + service_type = tosca_kube_obj.service_type + else: + service_type = None + service_spec = client.V1ServiceSpec( + selector=selectors, + ports=list_service_port, + type=service_type) + + service = client.V1Service( + api_version="v1", + kind="Service", + spec=service_spec, + metadata=metadata) + return service + + # init_config_map initializes Kubernetes ConfigMap object + def init_configmap(self, container_props, kube_obj_name): + config_map = None + if container_props.config: + configmap_name = kube_obj_name + metadata = client.V1ObjectMeta(name=configmap_name) + config_dict = self.pre_process_config(container_props.config) + config_map = client.V1ConfigMap( + api_version="v1", + kind="ConfigMap", + data=config_dict, + metadata=metadata) + return config_map + + def pre_process_name(self, name): + # replace '_' by '-' to meet Kubernetes' requirement + new_name = name.replace(DASH_CHARACTER, HYPHEN_CHARACTER).lower() + return new_name + + def pre_process_config(self, config): + # Split by separating lines + config_dict = {} + if config: + configs = config.split(NEWLINE_CHARACTER) + for config_item in configs: + # Ignore if config_item is null + if config_item: + # Strip all types of white-space characters + config_item = config_item.replace( + WHITE_SPACE_CHARACTER, + NON_WHITE_SPACE_CHARACTER) + config_prop = config_item.split(COLON_CHARACTER) + config_dict[config_prop[0]] = config_prop[1] + # config_dict has the pattern such as + # {'param1': 'key1', 'param0': 'key0'} + return config_dict diff --git a/tacker/vnfm/infra_drivers/kubernetes/kubernetes_driver.py b/tacker/vnfm/infra_drivers/kubernetes/kubernetes_driver.py new file mode 100644 index 000000000..b166ba524 --- /dev/null +++ b/tacker/vnfm/infra_drivers/kubernetes/kubernetes_driver.py @@ -0,0 +1,547 @@ +# All Rights Reserved. +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time +import yaml + +from kubernetes import client +from oslo_config import cfg +from oslo_log import log as logging +from oslo_serialization import jsonutils + +from tacker.common.container import kubernetes_utils +from tacker.common import log +from tacker.common import utils +from tacker.extensions import vnfm +from tacker.vnfm.infra_drivers import abstract_driver +from tacker.vnfm.infra_drivers.kubernetes import translate_template +from tacker.vnfm.infra_drivers import scale_driver + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + +OPTS = [ + cfg.IntOpt('stack_retries', + default=100, + help=_("Number of attempts to retry for stack" + " creation/deletion")), + cfg.IntOpt('stack_retry_wait', + default=5, + help=_("Wait time (in seconds) between consecutive stack" + " create/delete retries")), +] + +CONF.register_opts(OPTS, group='kubernetes_vim') + + +def config_opts(): + return [('kubernetes_vim', OPTS)] + + +SCALING_POLICY = 'tosca.policies.tacker.Scaling' +COMMA_CHARACTER = ',' + + +def get_scaling_policy_name(action, policy_name): + return '%s_scale_%s' % (policy_name, action) + + +class Kubernetes(abstract_driver.DeviceAbstractDriver, + scale_driver.VnfScaleAbstractDriver): + """Kubernetes infra driver for hosting containerized vnfs""" + + def __init__(self): + super(Kubernetes, self).__init__() + self.STACK_RETRIES = cfg.CONF.kubernetes_vim.stack_retries + self.STACK_RETRY_WAIT = cfg.CONF.kubernetes_vim.stack_retry_wait + self.kubernetes = kubernetes_utils.KubernetesHTTPAPI() + + def get_type(self): + return 'kubernetes' + + def get_name(self): + return 'kubernetes' + + def get_description(self): + return 'Kubernetes infra driver' + + @log.log + def create(self, plugin, context, vnf, auth_attr): + """Create function + + Create ConfigMap, Deployment, Service and Horizontal Pod Autoscaler + objects. Return a string that contains all deployment namespace and + names for tracking resources. + """ + LOG.debug('vnf %s', vnf) + # initialize Kubernetes APIs + auth_cred, file_descriptor = self._get_auth_creds(auth_attr) + try: + core_v1_api_client = self.kubernetes.get_core_v1_api_client( + auth=auth_cred) + extension_api_client = self.kubernetes.get_extension_api_client( + auth=auth_cred) + scaling_api_client = self.kubernetes.get_scaling_api_client( + auth=auth_cred) + tosca_to_kubernetes = translate_template.TOSCAToKubernetes( + vnf=vnf, + core_v1_api_client=core_v1_api_client, + extension_api_client=extension_api_client, + scaling_api_client=scaling_api_client) + deployment_names = tosca_to_kubernetes.deploy_kubernetes_objects() + except Exception as e: + LOG.error('Creating VNF got an error due to %s', e) + raise + finally: + self.clean_authenticate_vim(auth_cred, file_descriptor) + return deployment_names + + def create_wait(self, plugin, context, vnf_dict, vnf_id, auth_attr): + """Create wait function + + Create wait function will marked VNF is ACTIVE when all status state + from Pod objects is RUNNING. + """ + # initialize Kubernetes APIs + auth_cred, file_descriptor = self._get_auth_creds(auth_attr) + try: + core_v1_api_client = \ + self.kubernetes.get_core_v1_api_client(auth=auth_cred) + deployment_info = vnf_id.split(COMMA_CHARACTER) + mgmt_ips = dict() + pods_information = self._get_pods_information( + core_v1_api_client=core_v1_api_client, + deployment_info=deployment_info) + status = self._get_pod_status(pods_information) + stack_retries = self.STACK_RETRIES + error_reason = None + while status == 'Pending' and stack_retries > 0: + time.sleep(self.STACK_RETRY_WAIT) + pods_information = \ + self._get_pods_information( + core_v1_api_client=core_v1_api_client, + deployment_info=deployment_info) + status = self._get_pod_status(pods_information) + LOG.debug('status: %s', status) + stack_retries = stack_retries - 1 + + LOG.debug('VNF initializing status: %(service_name)s %(status)s', + {'service_name': str(deployment_info), 'status': status}) + if stack_retries == 0 and status != 'Running': + error_reason = _("Resource creation is not completed within" + " {wait} seconds as creation of stack {stack}" + " is not completed").format( + wait=(self.STACK_RETRIES * + self.STACK_RETRY_WAIT), + stack=vnf_id) + LOG.warning("VNF Creation failed: %(reason)s", + {'reason': error_reason}) + raise vnfm.VNFCreateWaitFailed(reason=error_reason) + elif stack_retries != 0 and status != 'Running': + raise vnfm.VNFCreateWaitFailed(reason=error_reason) + + for i in range(0, len(deployment_info), 2): + namespace = deployment_info[i] + deployment_name = deployment_info[i + 1] + service_info = core_v1_api_client.read_namespaced_service( + name=deployment_name, + namespace=namespace) + if service_info.metadata.labels.get("management_connection"): + vdu_name = service_info.metadata.labels.\ + get("vdu_name").split("-")[1] + mgmt_ip = service_info.spec.cluster_ip + mgmt_ips.update({vdu_name: mgmt_ip}) + vnf_dict['mgmt_url'] = jsonutils.dumps(mgmt_ips) + except Exception as e: + LOG.error('Creating wait VNF got an error due to %s', e) + raise + finally: + self.clean_authenticate_vim(auth_cred, file_descriptor) + + def _get_pods_information(self, core_v1_api_client, deployment_info): + """Get pod information""" + pods_information = list() + for i in range(0, len(deployment_info), 2): + namespace = deployment_info[i] + deployment_name = deployment_info[i + 1] + respone = \ + core_v1_api_client.list_namespaced_pod(namespace=namespace) + for item in respone.items: + if deployment_name in item.metadata.name: + pods_information.append(item) + return pods_information + + def _get_pod_status(self, pods_information): + pending_flag = False + unknown_flag = False + for pod_info in pods_information: + status = pod_info.status.phase + if status == 'Pending': + pending_flag = True + elif status == 'Unknown': + unknown_flag = True + if unknown_flag: + status = 'Unknown' + elif pending_flag: + status = 'Pending' + else: + status = 'Running' + return status + + @log.log + def update(self, plugin, context, vnf_id, vnf_dict, vnf, auth_attr): + """Update containerized VNF through ConfigMap data + + In Kubernetes VIM, updating VNF will be updated by updating + ConfigMap data + """ + # initialize Kubernetes APIs + auth_cred, file_descriptor = self._get_auth_creds(auth_attr) + try: + core_v1_api_client = \ + self.kubernetes.get_core_v1_api_client(auth=auth_cred) + + # update config attribute + config_yaml = vnf_dict.get('attributes', {}).get('config', '') + update_yaml = vnf['vnf'].get('attributes', {}).get('config', '') + LOG.debug('yaml orig %(orig)s update %(update)s', + {'orig': config_yaml, 'update': update_yaml}) + # If config_yaml is None, yaml.safe_load() will raise Attribute + # Error. So set config_yaml to {}, if it is None. + if not config_yaml: + config_dict = {} + else: + config_dict = yaml.safe_load(config_yaml) or {} + update_dict = yaml.safe_load(update_yaml) + if not update_dict: + return + LOG.debug('dict orig %(orig)s update %(update)s', + {'orig': config_dict, 'update': update_dict}) + utils.deep_update(config_dict, update_dict) + LOG.debug('dict new %(new)s update %(update)s', + {'new': config_dict, 'update': update_dict}) + new_yaml = yaml.safe_dump(config_dict) + vnf_dict.setdefault('attributes', {})['config'] = new_yaml + + deployment_info = vnf_id.split(",") + for i in range(0, len(deployment_info), 2): + namespace = deployment_info[i] + deployment_name = deployment_info[i + 1] + configmap_resp = core_v1_api_client.read_namespaced_config_map( + namespace=namespace, + name=deployment_name) + configmap_data = configmap_resp.data + new_configmap = {key: update_dict.get(key, configmap_data[key]) + for key in configmap_data} + configmap_resp.data = new_configmap + core_v1_api_client.\ + patch_namespaced_config_map(namespace=namespace, + name=deployment_name, + body=configmap_resp) + except Exception as e: + LOG.error('Updating VNF got an error due to %s', e) + raise + finally: + self.clean_authenticate_vim(auth_cred, file_descriptor) + + @log.log + def update_wait(self, plugin, context, vnf_id, auth_attr, + region_name=None): + """Update wait function""" + # TODO(phuoc): do nothing, will update it if we need actions + pass + + def delete(self, plugin, context, vnf_id, auth_attr, region_name=None): + """Delete function""" + # initialize Kubernetes APIs + auth_cred, file_descriptor = self._get_auth_creds(auth_attr) + try: + core_v1_api_client = self.kubernetes.get_core_v1_api_client( + auth=auth_cred) + extension_api_client = self.kubernetes.get_extension_api_client( + auth=auth_cred) + scaling_api_client = self.kubernetes.get_scaling_api_client( + auth=auth_cred) + deployment_names = vnf_id.split(COMMA_CHARACTER) + + for i in range(0, len(deployment_names), 2): + namespace = deployment_names[i] + deployment_name = deployment_names[i + 1] + # delete ConfigMap if it exists + try: + body = {} + core_v1_api_client.delete_namespaced_config_map( + namespace=namespace, + name=deployment_name, + body=body) + LOG.debug('Successfully deleted ConfigMap %s', + deployment_name) + except Exception as e: + LOG.debug(e) + pass + # delete Service if it exists + try: + core_v1_api_client.delete_namespaced_service( + namespace=namespace, + name=deployment_name) + LOG.debug('Successfully deleted Service %s', + deployment_name) + except Exception as e: + LOG.debug(e) + pass + # delete Horizon Pod Auto-scaling if it exists + try: + body = client.V1DeleteOptions() + scaling_api_client.\ + delete_namespaced_horizontal_pod_autoscaler( + namespace=namespace, + name=deployment_name, + body=body) + LOG.debug('Successfully deleted Horizon Pod Auto-Scaling' + '%s', deployment_name) + except Exception as e: + LOG.debug(e) + pass + # delete Deployment if it exists + try: + body = client.V1DeleteOptions( + propagation_policy='Foreground', + grace_period_seconds=5) + extension_api_client.delete_namespaced_deployment( + namespace=namespace, + name=deployment_name, + body=body) + LOG.debug('Successfully deleted Deployment %s', + deployment_name) + except Exception as e: + LOG.debug(e) + pass + except Exception as e: + LOG.error('Deleting VNF got an error due to %s', e) + raise + finally: + self.clean_authenticate_vim(auth_cred, file_descriptor) + + def delete_wait(self, plugin, context, vnf_id, auth_attr, + region_name=None): + """Delete wait function + + This function is used to checking a containerized VNF is deleted + completely or not. We do it by get information of Kubernetes objects. + When Tacker can not get any information about service, the VNF will be + marked as deleted. + """ + # initialize Kubernetes APIs + auth_cred, file_descriptor = self._get_auth_creds(auth_attr) + try: + core_v1_api_client = self.kubernetes.get_core_v1_api_client( + auth=auth_cred) + extension_api_client = self.kubernetes.get_extension_api_client( + auth=auth_cred) + scaling_api_client = self.kubernetes.get_scaling_api_client( + auth=auth_cred) + + deployment_names = vnf_id.split(COMMA_CHARACTER) + keep_going = True + stack_retries = self.STACK_RETRIES + while keep_going and stack_retries > 0: + count = 0 + for i in range(0, len(deployment_names), 2): + namespace = deployment_names[i] + deployment_name = deployment_names[i + 1] + try: + core_v1_api_client.read_namespaced_config_map( + namespace=namespace, + name=deployment_name) + count = count + 1 + except Exception: + pass + try: + core_v1_api_client.read_namespaced_service( + namespace=namespace, + name=deployment_name) + count = count + 1 + except Exception: + pass + try: + scaling_api_client.\ + read_namespaced_horizontal_pod_autoscaler( + namespace=namespace, + name=deployment_name) + count = count + 1 + except Exception: + pass + try: + extension_api_client.read_namespaced_deployment( + namespace=namespace, + name=deployment_name) + count = count + 1 + except Exception: + pass + stack_retries = stack_retries - 1 + # If one of objects is still alive, keeps on waiting + if count > 0: + keep_going = True + else: + keep_going = False + except Exception as e: + LOG.error('Deleting wait VNF got an error due to %s', e) + raise + finally: + self.clean_authenticate_vim(auth_cred, file_descriptor) + + @log.log + def scale(self, context, plugin, auth_attr, policy, region_name): + """Scale function + + Scaling VNF is implemented by updating replicas through Kubernetes API. + The min_replicas and max_replicas is limited by the number of replicas + of policy scaling when user define VNF descriptor. + """ + LOG.debug("VNF are scaled by updating instance of deployment") + # initialize Kubernetes APIs + auth_cred, file_descriptor = self._get_auth_creds(auth_attr) + try: + extension_api_client = self.kubernetes.get_extension_api_client( + auth=auth_cred) + scaling_api_client = self.kubernetes.get_scaling_api_client( + auth=auth_cred) + deployment_names = policy['instance_id'].split(COMMA_CHARACTER) + policy_name = policy['name'] + policy_action = policy['action'] + + for i in range(0, len(deployment_names), 2): + namespace = deployment_names[i] + deployment_name = deployment_names[i + 1] + deployment_info = extension_api_client.\ + read_namespaced_deployment(namespace=namespace, + name=deployment_name) + scaling_info = scaling_api_client.\ + read_namespaced_horizontal_pod_autoscaler( + namespace=namespace, + name=deployment_name) + + replicas = deployment_info.status.replicas + scale_replicas = replicas + vnf_scaling_name = deployment_info.metadata.labels.\ + get("scaling_name") + if vnf_scaling_name == policy_name: + if policy_action == 'out': + scale_replicas = replicas + 1 + elif policy_action == 'in': + scale_replicas = replicas - 1 + + min_replicas = scaling_info.spec.min_replicas + max_replicas = scaling_info.spec.max_replicas + if (scale_replicas < min_replicas) or \ + (scale_replicas > max_replicas): + LOG.debug("Scaling replicas is out of range. The number of" + " replicas keeps %(number)s replicas", + {'number': replicas}) + scale_replicas = replicas + deployment_info.spec.replicas = scale_replicas + extension_api_client.patch_namespaced_deployment_scale( + namespace=namespace, + name=deployment_name, + body=deployment_info) + except Exception as e: + LOG.error('Scaling VNF got an error due to %s', e) + raise + finally: + self.clean_authenticate_vim(auth_cred, file_descriptor) + + def scale_wait(self, context, plugin, auth_attr, policy, region_name, + last_event_id): + """Scale wait function + + Scale wait function will marked VNF is ACTIVE when all status state + from Pod objects is RUNNING. + """ + # initialize Kubernetes APIs + auth_cred, file_descriptor = self._get_auth_creds(auth_attr) + try: + core_v1_api_client = self.kubernetes.get_core_v1_api_client( + auth=auth_cred) + deployment_info = policy['instance_id'].split(",") + + pods_information = self._get_pods_information( + core_v1_api_client=core_v1_api_client, + deployment_info=deployment_info) + status = self._get_pod_status(pods_information) + + stack_retries = self.STACK_RETRIES + error_reason = None + while status == 'Pending' and stack_retries > 0: + time.sleep(self.STACK_RETRY_WAIT) + + pods_information = self._get_pods_information( + core_v1_api_client=core_v1_api_client, + deployment_info=deployment_info) + status = self._get_pod_status(pods_information) + + # LOG.debug('status: %s', status) + stack_retries = stack_retries - 1 + + LOG.debug('VNF initializing status: %(service_name)s %(status)s', + {'service_name': str(deployment_info), 'status': status}) + + if stack_retries == 0 and status != 'Running': + error_reason = _("Resource creation is not completed within" + " {wait} seconds as creation of stack {stack}" + " is not completed").format( + wait=(self.STACK_RETRIES * + self.STACK_RETRY_WAIT), + stack=policy['instance_id']) + LOG.error("VNF Creation failed: %(reason)s", + {'reason': error_reason}) + raise vnfm.VNFCreateWaitFailed(reason=error_reason) + + elif stack_retries != 0 and status != 'Running': + raise vnfm.VNFCreateWaitFailed(reason=error_reason) + except Exception as e: + LOG.error('Scaling wait VNF got an error due to %s', e) + raise + finally: + self.clean_authenticate_vim(auth_cred, file_descriptor) + + @log.log + def get_resource_info(self, plugin, context, vnf_info, auth_attr, + region_name=None): + # TODO(phuoc): will update it for other components + pass + + def _get_auth_creds(self, auth_cred): + file_descriptor = self._create_ssl_ca_file(auth_cred) + if ('username' not in auth_cred) and ('password' not in auth_cred): + auth_cred['username'] = 'None' + auth_cred['password'] = None + return auth_cred, file_descriptor + + def _create_ssl_ca_file(self, auth_attr): + ca_cert = auth_attr['ssl_ca_cert'] + if ca_cert is not None: + file_descriptor, file_path = \ + self.kubernetes.create_ca_cert_tmp_file(ca_cert) + auth_attr['ca_cert_file'] = file_path + return file_descriptor + else: + return None + + def clean_authenticate_vim(self, vim_auth, file_descriptor): + # remove ca_cert_file from vim_obj if it exists + # close and delete temp ca_cert_file + if file_descriptor is not None: + file_path = vim_auth.pop('ca_cert_file') + self.kubernetes.close_tmp_file(file_descriptor, file_path) diff --git a/tacker/vnfm/infra_drivers/kubernetes/translate_template.py b/tacker/vnfm/infra_drivers/kubernetes/translate_template.py new file mode 100644 index 000000000..7b6cd9400 --- /dev/null +++ b/tacker/vnfm/infra_drivers/kubernetes/translate_template.py @@ -0,0 +1,118 @@ +# All Rights Reserved. +# +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import yaml + +from oslo_config import cfg +from oslo_log import log as logging +from toscaparser.utils import yamlparser + +from tacker.common import log +from tacker.extensions import common_services as cs +from tacker.extensions import vnfm +from tacker.vnfm.infra_drivers.kubernetes.k8s import translate_inputs +from tacker.vnfm.infra_drivers.kubernetes.k8s import translate_outputs + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +class TOSCAToKubernetes(object): + + def __init__(self, vnf, core_v1_api_client, + extension_api_client, scaling_api_client): + self.vnf = vnf + self.core_v1_api_client = core_v1_api_client + self.extension_api_client = extension_api_client + self.scaling_api_client = scaling_api_client + self.attributes = {} + self.vnfd_yaml = None + + def generate_tosca_kube_objects(self): + """Load TOSCA template and return tosca_kube_objects""" + + vnfd_dict = self.process_input() + parser = translate_inputs.Parser(vnfd_dict) + return parser.loader() + + def deploy_kubernetes_objects(self): + """Translate tosca_kube_objects to Kubernetes objects and deploy them. + + Return a string that contains all deployment namespace and names + """ + + tosca_kube_objects = self.generate_tosca_kube_objects() + transformer = translate_outputs.Transformer( + core_v1_api_client=self.core_v1_api_client, + extension_api_client=self.extension_api_client, + scaling_api_client=self.scaling_api_client) + kubernetes_objects = transformer.transform(tosca_kube_objects) + deployment_names = transformer.deploy( + kubernetes_objects=kubernetes_objects) + # return namespaces and service names for tracking resources + return deployment_names + + def process_input(self): + """Process input of vnfd template""" + + self.attributes = self.vnf['vnfd']['attributes'].copy() + self.vnfd_yaml = self.attributes.pop('vnfd', None) + if self.vnfd_yaml is None: + LOG.info("VNFD is not provided, so no vnf is created !!") + return + LOG.debug('vnfd_yaml %s', self.vnfd_yaml) + vnfd_dict = yamlparser.simple_ordered_parse(self.vnfd_yaml) + LOG.debug('vnfd_dict %s', vnfd_dict) + + # Read parameter and process inputs + if 'get_input' in str(vnfd_dict): + self._process_parameterized_input(self.vnf['attributes'], + vnfd_dict) + return vnfd_dict + + @log.log + def _update_params(self, original, paramvalues): + for key, value in (original).items(): + if not isinstance(value, dict) or 'get_input' not in str(value): + pass + elif isinstance(value, dict): + if 'get_input' in value: + if value['get_input'] in paramvalues: + original[key] = paramvalues[value['get_input']] + else: + LOG.debug('Key missing Value: %s', key) + raise cs.InputValuesMissing(key=key) + else: + self._update_params(value, paramvalues) + + @log.log + def _process_parameterized_input(self, attrs, vnfd_dict): + param_vattrs_yaml = attrs.pop('param_values', None) + if param_vattrs_yaml: + try: + param_vattrs_dict = yaml.safe_load(param_vattrs_yaml) + LOG.debug('param_vattrs_yaml', param_vattrs_dict) + for node in vnfd_dict['topology_template']['node_templates'].\ + values(): + if 'get_input' in str(node): + self._update_params(node, param_vattrs_dict) + except Exception as e: + LOG.debug("Not Well Formed: %s", str(e)) + raise vnfm.ParamYAMLNotWellFormed( + error_msg_details=str(e)) + else: + self._update_params(vnfd_dict, param_vattrs_dict) + else: + raise cs.ParamYAMLInputMissing() diff --git a/tacker/vnfm/plugin.py b/tacker/vnfm/plugin.py index 94b5a2449..0ca66b1d0 100644 --- a/tacker/vnfm/plugin.py +++ b/tacker/vnfm/plugin.py @@ -115,7 +115,7 @@ class VNFMPlugin(vnfm_db.VNFMPluginDb, VNFMMgmtMixin): """ OPTS_INFRA_DRIVER = [ cfg.ListOpt( - 'infra_driver', default=['noop', 'openstack'], + 'infra_driver', default=['noop', 'openstack', 'kubernetes'], help=_('Hosting vnf drivers tacker plugin will use')), ] cfg.CONF.register_opts(OPTS_INFRA_DRIVER, 'tacker') @@ -332,8 +332,10 @@ class VNFMPlugin(vnfm_db.VNFMPluginDb, VNFMMgmtMixin): context, vnf) if not vnf.get('id') else vnf vnf_id = vnf_dict['id'] LOG.debug('vnf_dict %s', vnf_dict) - self.mgmt_create_pre(context, vnf_dict) - self.add_alarm_url_to_vnf(context, vnf_dict) + if driver_name == 'openstack': + self.mgmt_create_pre(context, vnf_dict) + self.add_alarm_url_to_vnf(context, vnf_dict) + try: instance_id = self._vnf_manager.invoke( driver_name, 'create', plugin=self, @@ -366,6 +368,14 @@ class VNFMPlugin(vnfm_db.VNFMPluginDb, VNFMMgmtMixin): 'service_types': [{'service_type': 'vnfd'}]}} vnf_info['vnfd_id'] = self.create_vnfd(context, vnfd).get('id') + infra_driver, vim_auth = self._get_infra_driver(context, vnf_info) + if infra_driver not in self._vnf_manager: + LOG.debug('unknown vim driver ' + '%(infra_driver)s in %(drivers)s', + {'infra_driver': infra_driver, + 'drivers': cfg.CONF.tacker.infra_driver}) + raise vnfm.InvalidInfraDriver(vim_name=infra_driver) + vnf_attributes = vnf_info['attributes'] if vnf_attributes.get('param_values'): param = vnf_attributes['param_values'] @@ -385,13 +395,6 @@ class VNFMPlugin(vnfm_db.VNFMPluginDb, VNFMMgmtMixin): vnf_attributes['config'] = yaml.safe_dump(config) else: self._report_deprecated_yaml_str() - infra_driver, vim_auth = self._get_infra_driver(context, vnf_info) - if infra_driver not in self._vnf_manager: - LOG.debug('unknown vim driver ' - '%(infra_driver)s in %(drivers)s', - {'infra_driver': infra_driver, - 'drivers': cfg.CONF.tacker.infra_driver}) - raise vnfm.InvalidInfraDriver(vim_name=infra_driver) vnf_dict = self._create_vnf(context, vnf_info, vim_auth, infra_driver) diff --git a/tacker/vnfm/vim_client.py b/tacker/vnfm/vim_client.py index 1b93d16b4..eaa03d7c7 100644 --- a/tacker/vnfm/vim_client.py +++ b/tacker/vnfm/vim_client.py @@ -70,9 +70,29 @@ class VimClient(object): def _build_vim_auth(self, context, vim_info): LOG.debug('VIM id is %s', vim_info['id']) vim_auth = vim_info['auth_cred'] - vim_auth['password'] = self._decode_vim_auth(context, - vim_info['id'], - vim_auth) + + # decode password + if ('password' in vim_auth) and (vim_auth['password'] is not None): + vim_auth['password'] = self._decode_vim_auth(context, + vim_info['id'], + vim_auth, + vim_auth['password']) + # decode bearer_token + if 'bearer_token' in vim_auth: + vim_auth['bearer_token'] = self.\ + _decode_vim_auth(context, + vim_info['id'], + vim_auth, + vim_auth['bearer_token']) + # decode ssl_ca_cert + if ('ssl_ca_cert' in vim_auth) and \ + (vim_auth['ssl_ca_cert'] is not None): + vim_auth['ssl_ca_cert'] = self.\ + _decode_vim_auth(context, + vim_info['id'], + vim_auth, + vim_auth['ssl_ca_cert']) + vim_auth['auth_url'] = vim_info['auth_url'] # These attributes are needless for authentication @@ -83,13 +103,13 @@ class VimClient(object): vim_auth.pop(attr, None) return vim_auth - def _decode_vim_auth(self, context, vim_id, auth): + def _decode_vim_auth(self, context, vim_id, auth, secret_value): """Decode Vim credentials Decrypt VIM cred, get fernet Key from local_file_system or barbican. """ - cred = auth['password'].encode('utf-8') + cred = secret_value.encode('utf-8') if auth.get('key_type') == 'barbican_key': keystone_conf = CONF.keystone_authtoken secret_uuid = auth['secret_uuid']