Implementation containerized VNF in Kubernetes type

This spec proposes implementing containerized VNF. We choose
to use Kubernetes resources (ConfigMap, Deployment, Horizontal
Pod Autoscaler, Service, etc) to define a containerized VNF.

1. Add "configurable_properties" type in VDU node.
2. Support translate TOSCA to Kubernetes templates.
3. Add kubernetes infra driver to support CRUD containerized
VNF.
Implements: blueprint containerised-vnfs

Change-Id: I706f1f325ca8c2b33debd7e6a13e81535245a5ad
This commit is contained in:
Cong Phuoc Hoang 2017-12-20 01:51:09 +09:00
parent afce4f054c
commit 18ac452c38
20 changed files with 1746 additions and 50 deletions

View File

@ -6,10 +6,12 @@ namespace = tacker.wsgi
namespace = tacker.service
namespace = tacker.nfvo.nfvo_plugin
namespace = tacker.nfvo.drivers.vim.openstack_driver
namespace = tacker.nfvo.drivers.vim.kubernetes_driver
namespace = tacker.keymgr
namespace = tacker.vnfm.monitor
namespace = tacker.vnfm.plugin
namespace = tacker.vnfm.infra_drivers.openstack.openstack
namespace = tacker.vnfm.infra_drivers.kubernetes.kubernetes
namespace = tacker.vnfm.mgmt_drivers.openwrt.openwrt
namespace = tacker.vnfm.monitor_drivers.http_ping.http_ping
namespace = tacker.vnfm.monitor_drivers.ping.ping

View File

@ -0,0 +1,37 @@
tosca_definitions_version: tosca_simple_profile_for_nfv_1_0_0
description: A sample containerized VNF with two containers per VDU
metadata:
template_name: sample-tosca-vnfd
topology_template:
node_templates:
VDU1:
type: tosca.nodes.nfv.VDU.Tacker
properties:
namespace: default
mapping_ports:
- "80:80"
- "88:88"
service_type: NodePort
vnfcs:
front_end:
num_cpus: 0.5
mem_size: 512 MB
image: nginx
ports:
- "80"
rss_reader:
num_cpus: 0.5
mem_size: 512 MB
image: nickchase/rss-php-nginx:v1
ports:
- "88"
policies:
- SP1:
type: tosca.policies.tacker.Scaling
targets: [VDU1]
properties:
min_instances: 1
max_instances: 3
target_cpu_utilization_percentage: 40

View File

@ -0,0 +1,36 @@
tosca_definitions_version: tosca_simple_profile_for_nfv_1_0_0
description: A sample containerized VNF with one container per VDU
metadata:
template_name: sample-tosca-vnfd
topology_template:
node_templates:
VDU1:
type: tosca.nodes.nfv.VDU.Tacker
properties:
namespace: default
mapping_ports:
- "80:80"
labels:
- "app: webserver"
service_type: ClusterIP
vnfcs:
web_server:
num_cpus: 0.2
mem_size: 100 MB
image: k8s.gcr.io/hpa-example
ports:
- "80"
config: |
param0: key1
param1: key2
policies:
- SP1:
type: tosca.policies.tacker.Scaling
targets: [VDU1]
properties:
min_instances: 1
max_instances: 10
target_cpu_utilization_percentage: 50

View File

@ -53,6 +53,7 @@ tacker.openstack.common.cache.backends =
tacker.tacker.vnfm.drivers =
noop = tacker.vnfm.infra_drivers.noop:DeviceNoop
openstack = tacker.vnfm.infra_drivers.openstack.openstack:OpenStack
kubernetes = tacker.vnfm.infra_drivers.kubernetes.kubernetes_driver:Kubernetes
tacker.tacker.mgmt.drivers =
noop = tacker.vnfm.mgmt_drivers.noop:DeviceMgmtNoop
openwrt = tacker.vnfm.mgmt_drivers.openwrt.openwrt:DeviceMgmtOpenWRT
@ -79,6 +80,7 @@ oslo.config.opts =
tacker.vnfm.monitor = tacker.vnfm.monitor:config_opts
tacker.vnfm.plugin = tacker.vnfm.plugin:config_opts
tacker.vnfm.infra_drivers.openstack.openstack= tacker.vnfm.infra_drivers.openstack.openstack:config_opts
tacker.vnfm.infra_drivers.kubernetes.kubernetes_driver = tacker.vnfm.infra_drivers.kubernetes.kubernetes_driver:config_opts
tacker.vnfm.mgmt_drivers.openwrt.openwrt = tacker.vnfm.mgmt_drivers.openwrt.openwrt:config_opts
tacker.vnfm.monitor_drivers.http_ping.http_ping = tacker.vnfm.monitor_drivers.http_ping.http_ping:config_opts
tacker.vnfm.monitor_drivers.ping.ping = tacker.vnfm.monitor_drivers.ping.ping:config_opts

View File

@ -30,8 +30,8 @@ CONF = cfg.CONF
class KubernetesHTTPAPI(object):
def get_k8sClient(self, auth_plugin):
config = client.ConfigurationObject()
def get_k8s_client(self, auth_plugin):
config = client.Configuration()
config.host = auth_plugin['auth_url']
if ('username' in auth_plugin) and ('password' in auth_plugin)\
and (auth_plugin['password'] is not None):
@ -48,21 +48,25 @@ class KubernetesHTTPAPI(object):
config.verify_ssl = True
else:
config.verify_ssl = False
k8s_client = api_client.ApiClient(config=config)
k8s_client = api_client.ApiClient(configuration=config)
return k8s_client
def initialize_ExtensionApiClient(self, auth):
k8s_client = self.get_k8sClient(auth_plugin=auth)
def get_extension_api_client(self, auth):
k8s_client = self.get_k8s_client(auth_plugin=auth)
return client.ExtensionsV1beta1Api(api_client=k8s_client)
def initialize_CoreApiV1Client(self, auth):
k8s_client = self.get_k8sClient(auth_plugin=auth)
def get_core_v1_api_client(self, auth):
k8s_client = self.get_k8s_client(auth_plugin=auth)
return client.CoreV1Api(api_client=k8s_client)
def initialize_CoreApiClient(self, auth):
k8s_client = self.get_k8sClient(auth_plugin=auth)
def get_core_api_client(self, auth):
k8s_client = self.get_k8s_client(auth_plugin=auth)
return client.CoreApi(api_client=k8s_client)
def get_scaling_api_client(self, auth):
k8s_client = self.get_k8s_client(auth_plugin=auth)
return client.AutoscalingV1Api(api_client=k8s_client)
@staticmethod
def create_ca_cert_tmp_file(ca_cert):
file_descriptor, file_path = tempfile.mkstemp()

View File

@ -20,7 +20,7 @@ from oslo_utils import importutils
def monkey_patch():
eventlet.monkey_patch()
eventlet.monkey_patch(all=False, socket=True)
if os.name != 'nt':
p_c_e = importutils.import_module('pyroute2.config.eventlet')
p_c_e.eventlet_config()

View File

@ -156,6 +156,18 @@ class InvalidParamsForSM(exceptions.InvalidInput):
message = _("Please provide parameters for substitution mappings")
class InvalidKubernetesScalingPolicyNumber(exceptions.InvalidInput):
message = _("Please provide only one Scaling policy")
class InvalidKubernetesNetworkNumber(exceptions.InvalidInput):
message = _("Please provide one network for all vdus")
class InvalidKubernetesInputParameter(exceptions.InvalidInput):
message = _("Found unsupported keys for %(found_keys)s ")
def _validate_service_type_list(data, valid_values=None):
if not isinstance(data, list):
msg = _("invalid data format for service list: '%s'") % data

View File

@ -86,11 +86,11 @@ class Kubernetes_Driver(abstract_vim_driver.VimAbstractDriver):
def _validate_vim(self, auth, file_descriptor):
# If Tacker can get k8s_info, Kubernetes authentication is valid
# if not, it is invalid
auth_dict = dict(auth)
try:
auth_dict = dict(auth)
k8s_coreClient = \
self.kubernetes.initialize_CoreApiClient(auth_dict)
k8s_info = k8s_coreClient.get_api_versions()
core_api_client = \
self.kubernetes.get_core_api_client(auth_dict)
k8s_info = core_api_client.get_api_versions()
LOG.info(k8s_info)
except Exception as e:
LOG.info('VIM Kubernetes authentication is wrong.')
@ -98,18 +98,8 @@ class Kubernetes_Driver(abstract_vim_driver.VimAbstractDriver):
self.clean_authenticate_vim(auth_dict, file_descriptor)
raise nfvo.VimUnauthorizedException(message=str(e))
def _initialize_k8s_extensionClient(self, auth):
k8s_extensionClient =\
self.kubernetes.initialize_ExtensionApiClient(auth)
return k8s_extensionClient
def _initialize_k8s_coreV1Client(self, auth):
k8s_coreV1Client =\
self.kubernetes.initialize_CoreApiV1Client(auth)
return k8s_coreV1Client
def _find_regions(self, k8s_coreV1Client):
list_namespaces = k8s_coreV1Client.list_namespace()
def _find_regions(self, core_v1_api_client):
list_namespaces = core_v1_api_client.list_namespace()
namespaces = [namespace.metadata.name
for namespace in list_namespaces.items]
return namespaces
@ -122,9 +112,9 @@ class Kubernetes_Driver(abstract_vim_driver.VimAbstractDriver):
# in Kubernetes environment, user can deploy resource
# on specific namespace
auth_cred, file_descriptor = self._get_auth_creds(vim_obj)
k8s_coreV1Client = \
self.kubernetes.initialize_CoreApiV1Client(auth_cred)
namespace_list = self._find_regions(k8s_coreV1Client)
core_v1_api_client = \
self.kubernetes.get_core_v1_api_client(auth_cred)
namespace_list = self._find_regions(core_v1_api_client)
self.clean_authenticate_vim(auth_cred, file_descriptor)
vim_obj['placement_attr'] = {'regions': namespace_list}
return vim_obj

View File

@ -97,15 +97,15 @@ class TestKubernetes_Driver(base.TestCase):
self._test_register_vim(self.vim_obj, mock_k8s_client,
mock_k8s_coreV1Client)
mock_k8s_coreV1Client.list_namespace.assert_called_once_with()
self.kubernetes_api.\
initialize_CoreApiClient.assert_called_once_with(auth_obj)
self.kubernetes_api. \
get_core_api_client.assert_called_once_with(auth_obj)
def _test_register_vim(self, vim_obj, mock_k8s_client,
mock_k8s_coreV1Client):
self.kubernetes_api.\
initialize_CoreApiClient.return_value = mock_k8s_client
self.kubernetes_api.\
initialize_CoreApiV1Client.return_value = mock_k8s_coreV1Client
self.kubernetes_api. \
get_core_api_client.return_value = mock_k8s_client
self.kubernetes_api. \
get_core_v1_api_client.return_value = mock_k8s_coreV1Client
fernet_attrs = {'encrypt.return_value': 'encrypted_password'}
mock_fernet_obj = mock.Mock(**fernet_attrs)
mock_fernet_key = 'test_fernet_key'

View File

@ -169,6 +169,36 @@ data_types:
description: The mac address allowed to be paired with specific virtual IP.
tosca.datatypes.nfv.VnfcConfigurableProperties:
properties:
num_cpus:
type: float
required: false
mem_size:
type: string
required: false
image:
type: string
required: false
command:
type: list
entry_schema:
type: string
required: false
args:
type: list
entry_schema:
type: string
required: false
ports:
type: list
entry_schema:
type: string
required: false
config:
type: string
required: false
policy_types:
tosca.policies.tacker.Placement:
derived_from: tosca.policies.Root
@ -279,6 +309,10 @@ policy_types:
type: string
required: true
description: List of Scaling nodes.
target_cpu_utilization_percentage:
type: integer
required: false
description: The target average CPU utilization over all the pods which is used in Kubernetes environment
min_instances:
type: integer
required: true

View File

@ -248,6 +248,28 @@ node_types:
type: string
required: false
vnfcs:
type: map
required: false
entry_schema:
type: tosca.datatypes.nfv.VnfcConfigurableProperties
namespace:
type: string
required: false
mapping_ports:
type: list
required: false
entry_schema:
type: string
labels:
type: list
required: false
entry_schema:
type: string
tosca.nodes.nfv.CP.Tacker:
derived_from: tosca.nodes.nfv.CP
properties:

View File

@ -0,0 +1,241 @@
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class ToscaKubeObject(object):
"""ToscaKubeObject holds the basic struct of a VDU.
That is used for translating TOSCA to Kubernetes templates.
"""
def __init__(self, name=None, namespace=None, mapping_ports=None,
containers=None, network_name=None,
mgmt_connection_point=False, scaling_object=None,
service_type=None, labels=None):
self._name = name
self._namespace = namespace
self._mapping_ports = mapping_ports
self._containers = containers
self._network_name = network_name
self._mgmt_connection_point = mgmt_connection_point
self._scaling_object = scaling_object
self._service_type = service_type
self._labels = labels
@property
def name(self):
return self._name
@name.setter
def name(self, name):
self._name = name
@property
def namespace(self):
return self._namespace
@namespace.setter
def namespace(self, namespace):
self._namespace = namespace
@property
def mapping_ports(self):
return self._mapping_ports
@mapping_ports.setter
def mapping_ports(self, mapping_ports):
self._mapping_ports = mapping_ports
@property
def containers(self):
return self._containers
@containers.setter
def containers(self, containers):
self._containers = containers
@property
def network_name(self):
return self._network_name
@network_name.setter
def network_name(self, network_name):
self._network_name = network_name
@property
def mgmt_connection_point(self):
return self._mgmt_connection_point
@mgmt_connection_point.setter
def mgmt_connection_point(self, mgmt_connection_point):
self._mgmt_connection_point = mgmt_connection_point
@property
def scaling_object(self):
return self._scaling_object
@scaling_object.setter
def scaling_object(self, scaling_object):
self._scaling_object = scaling_object
@property
def service_type(self):
return self._service_type
@service_type.setter
def service_type(self, service_type):
self._service_type = service_type
@property
def labels(self):
return self._labels
@labels.setter
def labels(self, labels):
self._labels = labels
class Container(object):
"""Container holds the basic structs of a container"""
def __init__(self, name=None, num_cpus=None, mem_size=None, image=None,
command=None, args=None, ports=None, config=None):
self._name = name
self._num_cpus = num_cpus
self._mem_size = mem_size
self._image = image
self._command = command
self._args = args
self._ports = ports
self._config = config
@property
def name(self):
return self._name
@name.setter
def name(self, name):
self._name = name
@property
def num_cpus(self):
return self._num_cpus
@num_cpus.setter
def num_cpus(self, num_cpus):
self._num_cpus = num_cpus
@property
def mem_size(self):
return self._mem_size
@mem_size.setter
def mem_size(self, mem_size):
self._mem_size = mem_size
@property
def image(self):
return self._image
@image.setter
def image(self, image):
self._image = image
@property
def command(self):
return self._command
@command.setter
def command(self, command):
self._command = command
@property
def args(self):
return self._args
@args.setter
def args(self, args):
self._args = args
@property
def ports(self):
return self._ports
@ports.setter
def ports(self, ports):
self._ports = ports
@property
def config(self):
return self._config
@config.setter
def config(self, config):
self._config = config
class ScalingObject(object):
"""ScalingObject holds the basic struct of a horizontal pod auto-scaling"""
def __init__(self, scaling_name=None, min_replicas=None, max_replicas=None,
scale_target_name=None,
target_cpu_utilization_percentage=None):
self._scaling_name = scaling_name
self._min_replicas = min_replicas
self._max_replicas = max_replicas
self._scale_target_name = scale_target_name
self._target_cpu_utilization_percentage = \
target_cpu_utilization_percentage
@property
def scaling_name(self):
return self._scaling_name
@scaling_name.setter
def scaling_name(self, scaling_name):
self._scaling_name = scaling_name
@property
def min_replicas(self):
return self._min_replicas
@min_replicas.setter
def min_replicas(self, min_replicas):
self._min_replicas = min_replicas
@property
def max_replicas(self):
return self._max_replicas
@max_replicas.setter
def max_replicas(self, max_replicas):
self._max_replicas = max_replicas
@property
def scale_target_name(self):
return self._scale_target_name
@scale_target_name.setter
def scale_target_name(self, scale_target_name):
self._scale_target_name = scale_target_name
@property
def target_cpu_utilization_percentage(self):
return self._target_cpu_utilization_percentage
@target_cpu_utilization_percentage.setter
def target_cpu_utilization_percentage(self,
target_cpu_utilization_percentage):
self._target_cpu_utilization_percentage = \
target_cpu_utilization_percentage

View File

@ -0,0 +1,277 @@
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import uuidutils
from tacker.common import log
from tacker.extensions import vnfm
from tacker.tosca import utils as toscautils
from tacker.vnfm.infra_drivers.kubernetes.k8s import tosca_kube_object
from toscaparser.functions import GetInput
from toscaparser import tosca_template
import toscaparser.utils.yamlparser
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
YAML_LOADER = toscaparser.utils.yamlparser.load_yaml
SCALING = 'tosca.policies.Scaling'
TACKER_CP = 'tosca.nodes.nfv.CP.Tacker'
TACKER_VL = 'tosca.nodes.nfv.VL'
COLON_CHARACTER = ':'
WHITE_SPACE_CHARACTER = ' '
NON_WHITE_SPACE_CHARACTER = ''
TOSCA_LINKS_TO = 'tosca.relationships.network.LinksTo'
TOSCA_BINDS_TO = 'tosca.relationships.network.BindsTo'
ALLOWED_KUBERNETES_OBJECT_PROPS = ('namespace', 'mapping_ports', 'labels',
'vnfcs', 'service_type',
'mgmt_driver', 'config_drive')
ALLOWED_CONTAINER_OBJECT_PROPS = ('num_cpus', 'mem_size', 'image', 'config',
'command', 'args', 'ports')
ALLOWED_SCALING_OBJECT_PROPS = ('min_instances', 'max_instances',
'target_cpu_utilization_percentage')
SCALAR_UNIT_DICT = {'B': 1, 'kB': 1000, 'KiB': 1024, 'MB': 1000000,
'MiB': 1048576, 'GB': 1000000000,
'GiB': 1073741824, 'TB': 1000000000000,
'TiB': 1099511627776}
class Parser(object):
"""Convert TOSCA template to Tosca Kube object"""
def __init__(self, vnfd_dict):
self.vnfd_dict = vnfd_dict
def loader(self):
"""Load TOSCA template and start parsing"""
try:
parserd_params = None
toscautils.updateimports(self.vnfd_dict)
tosca = tosca_template.\
ToscaTemplate(parsed_params=parserd_params,
a_file=False,
yaml_dict_tpl=self.vnfd_dict)
except Exception as e:
LOG.debug("tosca-parser error: %s", str(e))
raise vnfm.ToscaParserFailed(error_msg_details=str(e))
# Initiate a list tosca_kube_object which are defined from VDU
tosca_kube_objects = []
vdus = toscautils.findvdus(tosca)
for node_template in vdus:
vdu_name = node_template.name
tosca_kube_obj = self.tosca_to_kube_mapping(node_template)
# Find network name in which VDU is attached
tosca_kube_obj.network_name = self.find_networks(tosca, vdu_name)
# If connection_point is True, Tacker will manage its service ip
tosca_kube_obj.mgmt_connection_point = \
self.check_mgmt_cp(tosca, vdu_name)
# Find scaling policy that is used for this VDU, different to
# VM-based VNF, there are no alarm policies.
tosca_kube_obj.scaling_object = \
self.get_scaling_policy(tosca, vdu_name)
tosca_kube_objects.append(tosca_kube_obj)
return tosca_kube_objects
@log.log
def tosca_to_kube_mapping(self, node_template):
"""Map TOSCA template to ToscaKubeObject properties"""
tosca_props = self.get_properties(node_template)
self.check_unsupported_key(tosca_props,
ALLOWED_KUBERNETES_OBJECT_PROPS)
tosca_kube_obj = tosca_kube_object.ToscaKubeObject()
# tosca_kube_obj name is used for tracking Kubernetes resources
service_name = 'svc-' + node_template.name + '-' + \
uuidutils.generate_uuid()
tosca_kube_obj.name = service_name[:15]
tosca_kube_obj.namespace = tosca_props.get('namespace')
tosca_kube_obj.mapping_ports = tosca_props.get('mapping_ports')
tosca_kube_obj.labels = tosca_props.get('labels')
# Find config properties of VNFComponents in each VDU node
vnfc_config_props = tosca_props.get('vnfcs')
container_objects = self.vnfc_configurable_to_container_mapping(
vnfc_config_props)
tosca_kube_obj.containers = container_objects
# In labels, we define which type of Service VNF will be deployed
service_labels = dict()
if tosca_kube_obj.labels:
for label in tosca_kube_obj.labels:
label = label.replace(
WHITE_SPACE_CHARACTER,
NON_WHITE_SPACE_CHARACTER)
labels = label.split(COLON_CHARACTER)
service_labels.update({labels[0]: labels[1]})
tosca_kube_obj.labels = service_labels
tosca_kube_obj.service_type = tosca_props.get('service_type')
return tosca_kube_obj
@log.log
def vnfc_configurable_to_container_mapping(self, vnfc_config_properties):
"""Map VnfcConfigurableProperties to list of containers"""
containers = list()
for container_name, container_props in vnfc_config_properties.items():
container = tosca_kube_object.Container()
container.name = container_name
self.check_unsupported_key(container_props,
ALLOWED_CONTAINER_OBJECT_PROPS)
container.num_cpus = container_props.get('num_cpus')
memory_size = container_props.get('mem_size')
container.mem_size = self.process_memory(memory_size)
container.image = container_props.get('image')
container.config = container_props.get('config')
container.command = container_props.get('command')
container.args = container_props.get('args')
container.ports = container_props.get('ports')
containers.append(container)
return containers
@log.log
def process_memory(self, mem_value):
"""Translate memory size with unit to a number of byte memory"""
# Memory size has the pattern e.g. 512 MB, 1024 MB or 1 GB
parser_memory = mem_value.split(WHITE_SPACE_CHARACTER)
memory_value = parser_memory[0]
memory_unit = parser_memory[1]
memory_real_value = 0
# Translate memory's byte size based on SCALAR_UNIT_DICT
if memory_unit in SCALAR_UNIT_DICT.keys():
memory_real_value = \
int(memory_value) * SCALAR_UNIT_DICT[memory_unit]
return memory_real_value
@log.log
def get_scaling_policy(self, tosca, vdu_name):
"""Find scaling policy which is used for VDU"""
if len(tosca.policies) == 0:
scaling_obj = None
else:
count = 0
scaling_obj = tosca_kube_object.ScalingObject()
for policy in tosca.policies:
if policy.type_definition.is_derived_from(SCALING) \
and vdu_name in policy.targets:
count = count + 1
policy_props = policy.properties
self.check_unsupported_key(policy_props,
ALLOWED_SCALING_OBJECT_PROPS)
scaling_obj.scaling_name = policy.name
scaling_obj.target_cpu_utilization_percentage = \
policy_props.get(
'target_cpu_utilization_percentage')
scaling_obj.min_replicas = \
policy_props.get('min_instances')
scaling_obj.max_replicas = \
policy_props.get('max_instances')
if count > 1:
# Because in Kubernetes environment, we can attach only one
# scaling policy to Deployment. If user provides more than one
# policy this error will happen when count > 1
LOG.debug("Tacker only support one scaling policy per VDU")
raise vnfm.InvalidKubernetesScalingPolicyNumber
return scaling_obj
@log.log
def find_networks(self, tosca, vdu_name):
"""Find networks which VDU is attached based on vdu_name."""
networks = []
network_names = []
for node_template in tosca.nodetemplates:
if node_template.type_definition.is_derived_from(TACKER_CP):
match = False
links_to = None
binds_to = None
for rel, node in node_template.relationships.items():
if not links_to and rel.is_derived_from(TOSCA_LINKS_TO):
links_to = node
elif not binds_to and rel.is_derived_from(TOSCA_BINDS_TO):
binds_to = node
if binds_to.name == vdu_name:
match = True
if match:
networks.append(links_to.name)
for node_template in tosca.nodetemplates:
if node_template.type_definition.is_derived_from(TACKER_VL):
tosca_props = self.get_properties(node_template)
if node_template.name in networks:
for key, value in tosca_props.items():
if key == 'network_name':
network_names.append(value)
if len(network_names) > 1:
# Currently, Kubernetes doesn't support multiple networks.
# If user provides more than one network, the error will raise.
# TODO(anyone): support Multus or multiple networking
LOG.debug("Kubernetes feature only support one network")
raise vnfm.InvalidKubernetesNetworkNumber
if network_names:
return network_names[0]
else:
return None
@log.log
def check_mgmt_cp(self, tosca, vdu_name):
"""Check if management for connection point is enabled"""
mgmt_connection_point = False
for nt in tosca.nodetemplates:
if nt.type_definition.is_derived_from(TACKER_CP):
mgmt = nt.get_property_value('management') or None
if mgmt:
vdu = None
for rel, node in nt.relationships.items():
if rel.is_derived_from(TOSCA_BINDS_TO):
vdu = node.name
break
if vdu == vdu_name:
mgmt_connection_point = True
LOG.debug('mgmt_connection_point: %s', mgmt_connection_point)
return mgmt_connection_point
@log.log
def get_properties(self, node_template):
"""Return a list of property node template objects."""
tosca_props = {}
for prop in node_template.get_properties_objects():
if isinstance(prop.value, GetInput):
tosca_props[prop.name] = {'get_param': prop.value.input_name}
else:
tosca_props[prop.name] = prop.value
return tosca_props
def check_unsupported_key(self, input_values, support_key):
"""collect all unsupported keys"""
found_keys = []
for key in input_values:
if key not in support_key:
found_keys.append(key)
if len(found_keys) > 0:
raise vnfm.InvalidKubernetesInputParameter(found_keys=found_keys)

View File

@ -0,0 +1,351 @@
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kubernetes import client
from oslo_config import cfg
from oslo_log import log as logging
import toscaparser.utils.yamlparser
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
YAML_LOADER = toscaparser.utils.yamlparser.load_yaml
NEWLINE_CHARACTER = "\n"
COLON_CHARACTER = ':'
WHITE_SPACE_CHARACTER = ' '
NON_WHITE_SPACE_CHARACTER = ''
HYPHEN_CHARACTER = '-'
DASH_CHARACTER = '_'
class Transformer(object):
"""Transform TOSCA template to Kubernetes resources"""
def __init__(self, core_v1_api_client, extension_api_client,
scaling_api_client):
self.core_v1_api_client = core_v1_api_client
self.extension_api_client = extension_api_client
self.scaling_api_client = scaling_api_client
def transform(self, tosca_kube_objects):
"""transform function translates from tosca_kube_object to
kubernetes_object (ConfigMap, Deployment, Service, HPA)
"""
# kubernetes_objects store all kubernetes objects that are transformed
# from TOSCA VNF template
kubernetes_objects = dict()
for tosca_kube_obj in tosca_kube_objects:
namespace = tosca_kube_obj.namespace
kubernetes_objects['namespace'] = namespace
kubernetes_objects['objects'] = list()
kube_obj_name = tosca_kube_obj.name
new_kube_obj_name = self.pre_process_name(kube_obj_name)
# translate environments to ConfigMap objects
for container in tosca_kube_obj.containers:
config_map_object = \
self.init_configmap(container_props=container,
kube_obj_name=new_kube_obj_name)
if config_map_object:
kubernetes_objects['objects'].append(config_map_object)
# translate Deployment object
deployment_object = \
self.init_deployment(tosca_kube_obj=tosca_kube_obj,
kube_obj_name=new_kube_obj_name)
kubernetes_objects['objects'].append(deployment_object)
# translate to Horizontal Pod Autoscaler object
hpa_object = self.init_hpa(tosca_kube_obj=tosca_kube_obj,
kube_obj_name=new_kube_obj_name)
if hpa_object:
kubernetes_objects['objects'].append(hpa_object)
# translate to Service object
service_object = self.init_service(tosca_kube_obj=tosca_kube_obj,
kube_obj_name=new_kube_obj_name)
kubernetes_objects['objects'].append(service_object)
return kubernetes_objects
def deploy(self, kubernetes_objects):
"""Deploy Kubernetes objects on Kubernetes VIM and return
a list name of services
"""
deployment_names = list()
namespace = kubernetes_objects.get('namespace')
k8s_objects = kubernetes_objects.get('objects')
for k8s_object in k8s_objects:
object_type = k8s_object.kind
if object_type == 'ConfigMap':
self.core_v1_api_client.create_namespaced_config_map(
namespace=namespace,
body=k8s_object)
LOG.debug('Successfully created ConfigMap %s',
k8s_object.metadata.name)
elif object_type == 'Deployment':
self.extension_api_client.create_namespaced_deployment(
namespace=namespace,
body=k8s_object)
LOG.debug('Successfully created Deployment %s',
k8s_object.metadata.name)
elif object_type == 'HorizontalPodAutoscaler':
self.scaling_api_client.\
create_namespaced_horizontal_pod_autoscaler(
namespace=namespace,
body=k8s_object)
LOG.debug('Successfully created Horizontal Pod Autoscaler %s',
k8s_object.metadata.name)
elif object_type == 'Service':
self.core_v1_api_client.create_namespaced_service(
namespace=namespace,
body=k8s_object)
LOG.debug('Successfully created Service %s',
k8s_object.metadata.name)
deployment_names.append(namespace)
deployment_names.append(k8s_object.metadata.name)
# return a string that contains all deployment namespace and names
# for tracking resources pattern:
# namespace1,deployment1,namespace2,deployment2,namespace3,deployment3
return ",".join(deployment_names)
# config_labels configures label
def config_labels(self, deployment_name=None, scaling_name=None):
label = dict()
if deployment_name:
label.update({"selector": deployment_name})
if scaling_name:
label.update({"scaling_name": scaling_name})
return label
# Init resource requirement for container
def init_resource_requirements(self, container):
limits = dict()
requests = dict()
if container.num_cpus:
limits.update({'cpu': container.num_cpus})
requests.update({'cpu': container.num_cpus})
if container.mem_size:
limits.update({'memory': container.mem_size})
requests.update({'memory': container.mem_size})
return client.V1ResourceRequirements(limits=limits,
requests=requests)
def init_envs(self, container_props, name):
config = container_props.config
config_dict = self.pre_process_config(config)
configmap_name = name
list_envs = []
for key in config_dict:
config_map_ref = client.V1ConfigMapKeySelector(
key=key,
name=configmap_name)
env_var = client.V1EnvVarSource(
config_map_key_ref=config_map_ref)
env_object = client.V1EnvVar(
name=key,
value_from=env_var)
list_envs.append(env_object)
return list_envs
# Init container object
def init_containers(self, container_props, limit_resource, name):
list_env_var = self.init_envs(container_props, name)
container_name = self.pre_process_name(container_props.name)
list_container_port = list()
if container_props.ports:
for container_port in container_props.ports:
port = int(container_port)
cport = client.V1ContainerPort(container_port=port)
list_container_port.append(cport)
container = client.V1Container(
name=container_name,
image=container_props.image,
ports=list_container_port,
resources=limit_resource,
command=container_props.command,
args=container_props.args,
env=list_env_var,
image_pull_policy="IfNotPresent")
return container
# init_deployment initializes Kubernetes Pod object
def init_deployment(self, tosca_kube_obj, kube_obj_name):
"""Instantiate the deployment object"""
deployment_name = kube_obj_name
# Create a list of container, which made a Pod
containers = list()
for container_prop in tosca_kube_obj.containers:
limit_resource = self.init_resource_requirements(container_prop)
container = self.init_containers(
container_props=container_prop,
limit_resource=limit_resource,
name=deployment_name)
containers.append(container)
# Make a label with pattern {"selector": "deployment_name"}
if tosca_kube_obj.scaling_object:
scaling_name = tosca_kube_obj.scaling_object.scaling_name
update_label = self.config_labels(deployment_name=deployment_name,
scaling_name=scaling_name)
else:
update_label = self.config_labels(deployment_name=deployment_name)
if tosca_kube_obj.labels:
if 'selector' in update_label:
del update_label['selector']
labels = dict(tosca_kube_obj.labels.items() + update_label.items())
else:
labels = update_label
# Create and configure a spec section
pod_template = client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels=labels),
spec=client.V1PodSpec(containers=containers))
# Create the specification of deployment
deployment_spec = client.ExtensionsV1beta1DeploymentSpec(
template=pod_template)
metadata = client.V1ObjectMeta(name=deployment_name)
# Instantiate the deployment object
deployment = client.ExtensionsV1beta1Deployment(
api_version="extensions/v1beta1",
kind="Deployment",
metadata=metadata,
spec=deployment_spec)
return deployment
# init_hpa initializes Kubernetes Horizon Pod Auto-scaling object
def init_hpa(self, tosca_kube_obj, kube_obj_name):
scaling_props = tosca_kube_obj.scaling_object
hpa = None
if scaling_props:
min_replicas = scaling_props.min_replicas
max_replicas = scaling_props.max_replicas
cpu_util = scaling_props.target_cpu_utilization_percentage
deployment_name = kube_obj_name
# Create target Deployment object
target = client.V1CrossVersionObjectReference(
api_version="extensions/v1beta1",
kind="Deployment",
name=deployment_name)
# Create the specification of horizon pod auto-scaling
hpa_spec = client.V1HorizontalPodAutoscalerSpec(
min_replicas=min_replicas,
max_replicas=max_replicas,
target_cpu_utilization_percentage=cpu_util,
scale_target_ref=target)
metadata = client.V1ObjectMeta(name=deployment_name)
# Create Horizon Pod Auto-Scaling
hpa = client.V1HorizontalPodAutoscaler(
api_version="autoscaling/v1",
kind="HorizontalPodAutoscaler",
spec=hpa_spec,
metadata=metadata)
return hpa
# init_service initializes Kubernetes service object
def init_service(self, tosca_kube_obj, kube_obj_name):
list_service_port = list()
service_label = tosca_kube_obj.labels
for port in tosca_kube_obj.mapping_ports:
if COLON_CHARACTER in port:
ports = port.split(COLON_CHARACTER)
published_port = int(ports[0])
target_port = int(ports[1])
else:
target_port = published_port = int(port)
service_port = client.V1ServicePort(
name=str(published_port),
port=published_port,
target_port=target_port)
list_service_port.append(service_port)
deployment_name = kube_obj_name
selector_by_name = self.config_labels(deployment_name)
if tosca_kube_obj.labels:
selectors = tosca_kube_obj.labels.copy()
else:
selectors = selector_by_name
if tosca_kube_obj.mgmt_connection_point:
service_label['management_connection'] = 'True'
if tosca_kube_obj.network_name:
service_label['network_name'] = tosca_kube_obj.network_name
service_label['vdu_name'] = tosca_kube_obj.name
metadata = client.V1ObjectMeta(name=deployment_name,
labels=service_label)
if tosca_kube_obj.service_type:
service_type = tosca_kube_obj.service_type
else:
service_type = None
service_spec = client.V1ServiceSpec(
selector=selectors,
ports=list_service_port,
type=service_type)
service = client.V1Service(
api_version="v1",
kind="Service",
spec=service_spec,
metadata=metadata)
return service
# init_config_map initializes Kubernetes ConfigMap object
def init_configmap(self, container_props, kube_obj_name):
config_map = None
if container_props.config:
configmap_name = kube_obj_name
metadata = client.V1ObjectMeta(name=configmap_name)
config_dict = self.pre_process_config(container_props.config)
config_map = client.V1ConfigMap(
api_version="v1",
kind="ConfigMap",
data=config_dict,
metadata=metadata)
return config_map
def pre_process_name(self, name):
# replace '_' by '-' to meet Kubernetes' requirement
new_name = name.replace(DASH_CHARACTER, HYPHEN_CHARACTER).lower()
return new_name
def pre_process_config(self, config):
# Split by separating lines
config_dict = {}
if config:
configs = config.split(NEWLINE_CHARACTER)
for config_item in configs:
# Ignore if config_item is null
if config_item:
# Strip all types of white-space characters
config_item = config_item.replace(
WHITE_SPACE_CHARACTER,
NON_WHITE_SPACE_CHARACTER)
config_prop = config_item.split(COLON_CHARACTER)
config_dict[config_prop[0]] = config_prop[1]
# config_dict has the pattern such as
# {'param1': 'key1', 'param0': 'key0'}
return config_dict

View File

@ -0,0 +1,547 @@
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
import yaml
from kubernetes import client
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
from tacker.common.container import kubernetes_utils
from tacker.common import log
from tacker.common import utils
from tacker.extensions import vnfm
from tacker.vnfm.infra_drivers import abstract_driver
from tacker.vnfm.infra_drivers.kubernetes import translate_template
from tacker.vnfm.infra_drivers import scale_driver
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
OPTS = [
cfg.IntOpt('stack_retries',
default=100,
help=_("Number of attempts to retry for stack"
" creation/deletion")),
cfg.IntOpt('stack_retry_wait',
default=5,
help=_("Wait time (in seconds) between consecutive stack"
" create/delete retries")),
]
CONF.register_opts(OPTS, group='kubernetes_vim')
def config_opts():
return [('kubernetes_vim', OPTS)]
SCALING_POLICY = 'tosca.policies.tacker.Scaling'
COMMA_CHARACTER = ','
def get_scaling_policy_name(action, policy_name):
return '%s_scale_%s' % (policy_name, action)
class Kubernetes(abstract_driver.DeviceAbstractDriver,
scale_driver.VnfScaleAbstractDriver):
"""Kubernetes infra driver for hosting containerized vnfs"""
def __init__(self):
super(Kubernetes, self).__init__()
self.STACK_RETRIES = cfg.CONF.kubernetes_vim.stack_retries
self.STACK_RETRY_WAIT = cfg.CONF.kubernetes_vim.stack_retry_wait
self.kubernetes = kubernetes_utils.KubernetesHTTPAPI()
def get_type(self):
return 'kubernetes'
def get_name(self):
return 'kubernetes'
def get_description(self):
return 'Kubernetes infra driver'
@log.log
def create(self, plugin, context, vnf, auth_attr):
"""Create function
Create ConfigMap, Deployment, Service and Horizontal Pod Autoscaler
objects. Return a string that contains all deployment namespace and
names for tracking resources.
"""
LOG.debug('vnf %s', vnf)
# initialize Kubernetes APIs
auth_cred, file_descriptor = self._get_auth_creds(auth_attr)
try:
core_v1_api_client = self.kubernetes.get_core_v1_api_client(
auth=auth_cred)
extension_api_client = self.kubernetes.get_extension_api_client(
auth=auth_cred)
scaling_api_client = self.kubernetes.get_scaling_api_client(
auth=auth_cred)
tosca_to_kubernetes = translate_template.TOSCAToKubernetes(
vnf=vnf,
core_v1_api_client=core_v1_api_client,
extension_api_client=extension_api_client,
scaling_api_client=scaling_api_client)
deployment_names = tosca_to_kubernetes.deploy_kubernetes_objects()
except Exception as e:
LOG.error('Creating VNF got an error due to %s', e)
raise
finally:
self.clean_authenticate_vim(auth_cred, file_descriptor)
return deployment_names
def create_wait(self, plugin, context, vnf_dict, vnf_id, auth_attr):
"""Create wait function
Create wait function will marked VNF is ACTIVE when all status state
from Pod objects is RUNNING.
"""
# initialize Kubernetes APIs
auth_cred, file_descriptor = self._get_auth_creds(auth_attr)
try:
core_v1_api_client = \
self.kubernetes.get_core_v1_api_client(auth=auth_cred)
deployment_info = vnf_id.split(COMMA_CHARACTER)
mgmt_ips = dict()
pods_information = self._get_pods_information(
core_v1_api_client=core_v1_api_client,
deployment_info=deployment_info)
status = self._get_pod_status(pods_information)
stack_retries = self.STACK_RETRIES
error_reason = None
while status == 'Pending' and stack_retries > 0:
time.sleep(self.STACK_RETRY_WAIT)
pods_information = \
self._get_pods_information(
core_v1_api_client=core_v1_api_client,
deployment_info=deployment_info)
status = self._get_pod_status(pods_information)
LOG.debug('status: %s', status)
stack_retries = stack_retries - 1
LOG.debug('VNF initializing status: %(service_name)s %(status)s',
{'service_name': str(deployment_info), 'status': status})
if stack_retries == 0 and status != 'Running':
error_reason = _("Resource creation is not completed within"
" {wait} seconds as creation of stack {stack}"
" is not completed").format(
wait=(self.STACK_RETRIES *
self.STACK_RETRY_WAIT),
stack=vnf_id)
LOG.warning("VNF Creation failed: %(reason)s",
{'reason': error_reason})
raise vnfm.VNFCreateWaitFailed(reason=error_reason)
elif stack_retries != 0 and status != 'Running':
raise vnfm.VNFCreateWaitFailed(reason=error_reason)
for i in range(0, len(deployment_info), 2):
namespace = deployment_info[i]
deployment_name = deployment_info[i + 1]
service_info = core_v1_api_client.read_namespaced_service(
name=deployment_name,
namespace=namespace)
if service_info.metadata.labels.get("management_connection"):
vdu_name = service_info.metadata.labels.\
get("vdu_name").split("-")[1]
mgmt_ip = service_info.spec.cluster_ip
mgmt_ips.update({vdu_name: mgmt_ip})
vnf_dict['mgmt_url'] = jsonutils.dumps(mgmt_ips)
except Exception as e:
LOG.error('Creating wait VNF got an error due to %s', e)
raise
finally:
self.clean_authenticate_vim(auth_cred, file_descriptor)
def _get_pods_information(self, core_v1_api_client, deployment_info):
"""Get pod information"""
pods_information = list()
for i in range(0, len(deployment_info), 2):
namespace = deployment_info[i]
deployment_name = deployment_info[i + 1]
respone = \
core_v1_api_client.list_namespaced_pod(namespace=namespace)
for item in respone.items:
if deployment_name in item.metadata.name:
pods_information.append(item)
return pods_information
def _get_pod_status(self, pods_information):
pending_flag = False
unknown_flag = False
for pod_info in pods_information:
status = pod_info.status.phase
if status == 'Pending':
pending_flag = True
elif status == 'Unknown':
unknown_flag = True
if unknown_flag:
status = 'Unknown'
elif pending_flag:
status = 'Pending'
else:
status = 'Running'
return status
@log.log
def update(self, plugin, context, vnf_id, vnf_dict, vnf, auth_attr):
"""Update containerized VNF through ConfigMap data
In Kubernetes VIM, updating VNF will be updated by updating
ConfigMap data
"""
# initialize Kubernetes APIs
auth_cred, file_descriptor = self._get_auth_creds(auth_attr)
try:
core_v1_api_client = \
self.kubernetes.get_core_v1_api_client(auth=auth_cred)
# update config attribute
config_yaml = vnf_dict.get('attributes', {}).get('config', '')
update_yaml = vnf['vnf'].get('attributes', {}).get('config', '')
LOG.debug('yaml orig %(orig)s update %(update)s',
{'orig': config_yaml, 'update': update_yaml})
# If config_yaml is None, yaml.safe_load() will raise Attribute
# Error. So set config_yaml to {}, if it is None.
if not config_yaml:
config_dict = {}
else:
config_dict = yaml.safe_load(config_yaml) or {}
update_dict = yaml.safe_load(update_yaml)
if not update_dict:
return
LOG.debug('dict orig %(orig)s update %(update)s',
{'orig': config_dict, 'update': update_dict})
utils.deep_update(config_dict, update_dict)
LOG.debug('dict new %(new)s update %(update)s',
{'new': config_dict, 'update': update_dict})
new_yaml = yaml.safe_dump(config_dict)
vnf_dict.setdefault('attributes', {})['config'] = new_yaml
deployment_info = vnf_id.split(",")
for i in range(0, len(deployment_info), 2):
namespace = deployment_info[i]
deployment_name = deployment_info[i + 1]
configmap_resp = core_v1_api_client.read_namespaced_config_map(
namespace=namespace,
name=deployment_name)
configmap_data = configmap_resp.data
new_configmap = {key: update_dict.get(key, configmap_data[key])
for key in configmap_data}
configmap_resp.data = new_configmap
core_v1_api_client.\
patch_namespaced_config_map(namespace=namespace,
name=deployment_name,
body=configmap_resp)
except Exception as e:
LOG.error('Updating VNF got an error due to %s', e)
raise
finally:
self.clean_authenticate_vim(auth_cred, file_descriptor)
@log.log
def update_wait(self, plugin, context, vnf_id, auth_attr,
region_name=None):
"""Update wait function"""
# TODO(phuoc): do nothing, will update it if we need actions
pass
def delete(self, plugin, context, vnf_id, auth_attr, region_name=None):
"""Delete function"""
# initialize Kubernetes APIs
auth_cred, file_descriptor = self._get_auth_creds(auth_attr)
try:
core_v1_api_client = self.kubernetes.get_core_v1_api_client(
auth=auth_cred)
extension_api_client = self.kubernetes.get_extension_api_client(
auth=auth_cred)
scaling_api_client = self.kubernetes.get_scaling_api_client(
auth=auth_cred)
deployment_names = vnf_id.split(COMMA_CHARACTER)
for i in range(0, len(deployment_names), 2):
namespace = deployment_names[i]
deployment_name = deployment_names[i + 1]
# delete ConfigMap if it exists
try:
body = {}
core_v1_api_client.delete_namespaced_config_map(
namespace=namespace,
name=deployment_name,
body=body)
LOG.debug('Successfully deleted ConfigMap %s',
deployment_name)
except Exception as e:
LOG.debug(e)
pass
# delete Service if it exists
try:
core_v1_api_client.delete_namespaced_service(
namespace=namespace,
name=deployment_name)
LOG.debug('Successfully deleted Service %s',
deployment_name)
except Exception as e:
LOG.debug(e)
pass
# delete Horizon Pod Auto-scaling if it exists
try:
body = client.V1DeleteOptions()
scaling_api_client.\
delete_namespaced_horizontal_pod_autoscaler(
namespace=namespace,
name=deployment_name,
body=body)
LOG.debug('Successfully deleted Horizon Pod Auto-Scaling'
'%s', deployment_name)
except Exception as e:
LOG.debug(e)
pass
# delete Deployment if it exists
try:
body = client.V1DeleteOptions(
propagation_policy='Foreground',
grace_period_seconds=5)
extension_api_client.delete_namespaced_deployment(
namespace=namespace,
name=deployment_name,
body=body)
LOG.debug('Successfully deleted Deployment %s',
deployment_name)
except Exception as e:
LOG.debug(e)
pass
except Exception as e:
LOG.error('Deleting VNF got an error due to %s', e)
raise
finally:
self.clean_authenticate_vim(auth_cred, file_descriptor)
def delete_wait(self, plugin, context, vnf_id, auth_attr,
region_name=None):
"""Delete wait function
This function is used to checking a containerized VNF is deleted
completely or not. We do it by get information of Kubernetes objects.
When Tacker can not get any information about service, the VNF will be
marked as deleted.
"""
# initialize Kubernetes APIs
auth_cred, file_descriptor = self._get_auth_creds(auth_attr)
try:
core_v1_api_client = self.kubernetes.get_core_v1_api_client(
auth=auth_cred)
extension_api_client = self.kubernetes.get_extension_api_client(
auth=auth_cred)
scaling_api_client = self.kubernetes.get_scaling_api_client(
auth=auth_cred)
deployment_names = vnf_id.split(COMMA_CHARACTER)
keep_going = True
stack_retries = self.STACK_RETRIES
while keep_going and stack_retries > 0:
count = 0
for i in range(0, len(deployment_names), 2):
namespace = deployment_names[i]
deployment_name = deployment_names[i + 1]
try:
core_v1_api_client.read_namespaced_config_map(
namespace=namespace,
name=deployment_name)
count = count + 1
except Exception:
pass
try:
core_v1_api_client.read_namespaced_service(
namespace=namespace,
name=deployment_name)
count = count + 1
except Exception:
pass
try:
scaling_api_client.\
read_namespaced_horizontal_pod_autoscaler(
namespace=namespace,
name=deployment_name)
count = count + 1
except Exception:
pass
try:
extension_api_client.read_namespaced_deployment(
namespace=namespace,
name=deployment_name)
count = count + 1
except Exception:
pass
stack_retries = stack_retries - 1
# If one of objects is still alive, keeps on waiting
if count > 0:
keep_going = True
else:
keep_going = False
except Exception as e:
LOG.error('Deleting wait VNF got an error due to %s', e)
raise
finally:
self.clean_authenticate_vim(auth_cred, file_descriptor)
@log.log
def scale(self, context, plugin, auth_attr, policy, region_name):
"""Scale function
Scaling VNF is implemented by updating replicas through Kubernetes API.
The min_replicas and max_replicas is limited by the number of replicas
of policy scaling when user define VNF descriptor.
"""
LOG.debug("VNF are scaled by updating instance of deployment")
# initialize Kubernetes APIs
auth_cred, file_descriptor = self._get_auth_creds(auth_attr)
try:
extension_api_client = self.kubernetes.get_extension_api_client(
auth=auth_cred)
scaling_api_client = self.kubernetes.get_scaling_api_client(
auth=auth_cred)
deployment_names = policy['instance_id'].split(COMMA_CHARACTER)
policy_name = policy['name']
policy_action = policy['action']
for i in range(0, len(deployment_names), 2):
namespace = deployment_names[i]
deployment_name = deployment_names[i + 1]
deployment_info = extension_api_client.\
read_namespaced_deployment(namespace=namespace,
name=deployment_name)
scaling_info = scaling_api_client.\
read_namespaced_horizontal_pod_autoscaler(
namespace=namespace,
name=deployment_name)
replicas = deployment_info.status.replicas
scale_replicas = replicas
vnf_scaling_name = deployment_info.metadata.labels.\
get("scaling_name")
if vnf_scaling_name == policy_name:
if policy_action == 'out':
scale_replicas = replicas + 1
elif policy_action == 'in':
scale_replicas = replicas - 1
min_replicas = scaling_info.spec.min_replicas
max_replicas = scaling_info.spec.max_replicas
if (scale_replicas < min_replicas) or \
(scale_replicas > max_replicas):
LOG.debug("Scaling replicas is out of range. The number of"
" replicas keeps %(number)s replicas",
{'number': replicas})
scale_replicas = replicas
deployment_info.spec.replicas = scale_replicas
extension_api_client.patch_namespaced_deployment_scale(
namespace=namespace,
name=deployment_name,
body=deployment_info)
except Exception as e:
LOG.error('Scaling VNF got an error due to %s', e)
raise
finally:
self.clean_authenticate_vim(auth_cred, file_descriptor)
def scale_wait(self, context, plugin, auth_attr, policy, region_name,
last_event_id):
"""Scale wait function
Scale wait function will marked VNF is ACTIVE when all status state
from Pod objects is RUNNING.
"""
# initialize Kubernetes APIs
auth_cred, file_descriptor = self._get_auth_creds(auth_attr)
try:
core_v1_api_client = self.kubernetes.get_core_v1_api_client(
auth=auth_cred)
deployment_info = policy['instance_id'].split(",")
pods_information = self._get_pods_information(
core_v1_api_client=core_v1_api_client,
deployment_info=deployment_info)
status = self._get_pod_status(pods_information)
stack_retries = self.STACK_RETRIES
error_reason = None
while status == 'Pending' and stack_retries > 0:
time.sleep(self.STACK_RETRY_WAIT)
pods_information = self._get_pods_information(
core_v1_api_client=core_v1_api_client,
deployment_info=deployment_info)
status = self._get_pod_status(pods_information)
# LOG.debug('status: %s', status)
stack_retries = stack_retries - 1
LOG.debug('VNF initializing status: %(service_name)s %(status)s',
{'service_name': str(deployment_info), 'status': status})
if stack_retries == 0 and status != 'Running':
error_reason = _("Resource creation is not completed within"
" {wait} seconds as creation of stack {stack}"
" is not completed").format(
wait=(self.STACK_RETRIES *
self.STACK_RETRY_WAIT),
stack=policy['instance_id'])
LOG.error("VNF Creation failed: %(reason)s",
{'reason': error_reason})
raise vnfm.VNFCreateWaitFailed(reason=error_reason)
elif stack_retries != 0 and status != 'Running':
raise vnfm.VNFCreateWaitFailed(reason=error_reason)
except Exception as e:
LOG.error('Scaling wait VNF got an error due to %s', e)
raise
finally:
self.clean_authenticate_vim(auth_cred, file_descriptor)
@log.log
def get_resource_info(self, plugin, context, vnf_info, auth_attr,
region_name=None):
# TODO(phuoc): will update it for other components
pass
def _get_auth_creds(self, auth_cred):
file_descriptor = self._create_ssl_ca_file(auth_cred)
if ('username' not in auth_cred) and ('password' not in auth_cred):
auth_cred['username'] = 'None'
auth_cred['password'] = None
return auth_cred, file_descriptor
def _create_ssl_ca_file(self, auth_attr):
ca_cert = auth_attr['ssl_ca_cert']
if ca_cert is not None:
file_descriptor, file_path = \
self.kubernetes.create_ca_cert_tmp_file(ca_cert)
auth_attr['ca_cert_file'] = file_path
return file_descriptor
else:
return None
def clean_authenticate_vim(self, vim_auth, file_descriptor):
# remove ca_cert_file from vim_obj if it exists
# close and delete temp ca_cert_file
if file_descriptor is not None:
file_path = vim_auth.pop('ca_cert_file')
self.kubernetes.close_tmp_file(file_descriptor, file_path)

View File

@ -0,0 +1,118 @@
# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import yaml
from oslo_config import cfg
from oslo_log import log as logging
from toscaparser.utils import yamlparser
from tacker.common import log
from tacker.extensions import common_services as cs
from tacker.extensions import vnfm
from tacker.vnfm.infra_drivers.kubernetes.k8s import translate_inputs
from tacker.vnfm.infra_drivers.kubernetes.k8s import translate_outputs
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class TOSCAToKubernetes(object):
def __init__(self, vnf, core_v1_api_client,
extension_api_client, scaling_api_client):
self.vnf = vnf
self.core_v1_api_client = core_v1_api_client
self.extension_api_client = extension_api_client
self.scaling_api_client = scaling_api_client
self.attributes = {}
self.vnfd_yaml = None
def generate_tosca_kube_objects(self):
"""Load TOSCA template and return tosca_kube_objects"""
vnfd_dict = self.process_input()
parser = translate_inputs.Parser(vnfd_dict)
return parser.loader()
def deploy_kubernetes_objects(self):
"""Translate tosca_kube_objects to Kubernetes objects and deploy them.
Return a string that contains all deployment namespace and names
"""
tosca_kube_objects = self.generate_tosca_kube_objects()
transformer = translate_outputs.Transformer(
core_v1_api_client=self.core_v1_api_client,
extension_api_client=self.extension_api_client,
scaling_api_client=self.scaling_api_client)
kubernetes_objects = transformer.transform(tosca_kube_objects)
deployment_names = transformer.deploy(
kubernetes_objects=kubernetes_objects)
# return namespaces and service names for tracking resources
return deployment_names
def process_input(self):
"""Process input of vnfd template"""
self.attributes = self.vnf['vnfd']['attributes'].copy()
self.vnfd_yaml = self.attributes.pop('vnfd', None)
if self.vnfd_yaml is None:
LOG.info("VNFD is not provided, so no vnf is created !!")
return
LOG.debug('vnfd_yaml %s', self.vnfd_yaml)
vnfd_dict = yamlparser.simple_ordered_parse(self.vnfd_yaml)
LOG.debug('vnfd_dict %s', vnfd_dict)
# Read parameter and process inputs
if 'get_input' in str(vnfd_dict):
self._process_parameterized_input(self.vnf['attributes'],
vnfd_dict)
return vnfd_dict
@log.log
def _update_params(self, original, paramvalues):
for key, value in (original).items():
if not isinstance(value, dict) or 'get_input' not in str(value):
pass
elif isinstance(value, dict):
if 'get_input' in value:
if value['get_input'] in paramvalues:
original[key] = paramvalues[value['get_input']]
else:
LOG.debug('Key missing Value: %s', key)
raise cs.InputValuesMissing(key=key)
else:
self._update_params(value, paramvalues)
@log.log
def _process_parameterized_input(self, attrs, vnfd_dict):
param_vattrs_yaml = attrs.pop('param_values', None)
if param_vattrs_yaml:
try:
param_vattrs_dict = yaml.safe_load(param_vattrs_yaml)
LOG.debug('param_vattrs_yaml', param_vattrs_dict)
for node in vnfd_dict['topology_template']['node_templates'].\
values():
if 'get_input' in str(node):
self._update_params(node, param_vattrs_dict)
except Exception as e:
LOG.debug("Not Well Formed: %s", str(e))
raise vnfm.ParamYAMLNotWellFormed(
error_msg_details=str(e))
else:
self._update_params(vnfd_dict, param_vattrs_dict)
else:
raise cs.ParamYAMLInputMissing()

View File

@ -115,7 +115,7 @@ class VNFMPlugin(vnfm_db.VNFMPluginDb, VNFMMgmtMixin):
"""
OPTS_INFRA_DRIVER = [
cfg.ListOpt(
'infra_driver', default=['noop', 'openstack'],
'infra_driver', default=['noop', 'openstack', 'kubernetes'],
help=_('Hosting vnf drivers tacker plugin will use')),
]
cfg.CONF.register_opts(OPTS_INFRA_DRIVER, 'tacker')
@ -332,8 +332,10 @@ class VNFMPlugin(vnfm_db.VNFMPluginDb, VNFMMgmtMixin):
context, vnf) if not vnf.get('id') else vnf
vnf_id = vnf_dict['id']
LOG.debug('vnf_dict %s', vnf_dict)
self.mgmt_create_pre(context, vnf_dict)
self.add_alarm_url_to_vnf(context, vnf_dict)
if driver_name == 'openstack':
self.mgmt_create_pre(context, vnf_dict)
self.add_alarm_url_to_vnf(context, vnf_dict)
try:
instance_id = self._vnf_manager.invoke(
driver_name, 'create', plugin=self,
@ -366,6 +368,14 @@ class VNFMPlugin(vnfm_db.VNFMPluginDb, VNFMMgmtMixin):
'service_types': [{'service_type': 'vnfd'}]}}
vnf_info['vnfd_id'] = self.create_vnfd(context, vnfd).get('id')
infra_driver, vim_auth = self._get_infra_driver(context, vnf_info)
if infra_driver not in self._vnf_manager:
LOG.debug('unknown vim driver '
'%(infra_driver)s in %(drivers)s',
{'infra_driver': infra_driver,
'drivers': cfg.CONF.tacker.infra_driver})
raise vnfm.InvalidInfraDriver(vim_name=infra_driver)
vnf_attributes = vnf_info['attributes']
if vnf_attributes.get('param_values'):
param = vnf_attributes['param_values']
@ -385,13 +395,6 @@ class VNFMPlugin(vnfm_db.VNFMPluginDb, VNFMMgmtMixin):
vnf_attributes['config'] = yaml.safe_dump(config)
else:
self._report_deprecated_yaml_str()
infra_driver, vim_auth = self._get_infra_driver(context, vnf_info)
if infra_driver not in self._vnf_manager:
LOG.debug('unknown vim driver '
'%(infra_driver)s in %(drivers)s',
{'infra_driver': infra_driver,
'drivers': cfg.CONF.tacker.infra_driver})
raise vnfm.InvalidInfraDriver(vim_name=infra_driver)
vnf_dict = self._create_vnf(context, vnf_info, vim_auth, infra_driver)

View File

@ -70,9 +70,29 @@ class VimClient(object):
def _build_vim_auth(self, context, vim_info):
LOG.debug('VIM id is %s', vim_info['id'])
vim_auth = vim_info['auth_cred']
vim_auth['password'] = self._decode_vim_auth(context,
vim_info['id'],
vim_auth)
# decode password
if ('password' in vim_auth) and (vim_auth['password'] is not None):
vim_auth['password'] = self._decode_vim_auth(context,
vim_info['id'],
vim_auth,
vim_auth['password'])
# decode bearer_token
if 'bearer_token' in vim_auth:
vim_auth['bearer_token'] = self.\
_decode_vim_auth(context,
vim_info['id'],
vim_auth,
vim_auth['bearer_token'])
# decode ssl_ca_cert
if ('ssl_ca_cert' in vim_auth) and \
(vim_auth['ssl_ca_cert'] is not None):
vim_auth['ssl_ca_cert'] = self.\
_decode_vim_auth(context,
vim_info['id'],
vim_auth,
vim_auth['ssl_ca_cert'])
vim_auth['auth_url'] = vim_info['auth_url']
# These attributes are needless for authentication
@ -83,13 +103,13 @@ class VimClient(object):
vim_auth.pop(attr, None)
return vim_auth
def _decode_vim_auth(self, context, vim_id, auth):
def _decode_vim_auth(self, context, vim_id, auth, secret_value):
"""Decode Vim credentials
Decrypt VIM cred, get fernet Key from local_file_system or
barbican.
"""
cred = auth['password'].encode('utf-8')
cred = secret_value.encode('utf-8')
if auth.get('key_type') == 'barbican_key':
keystone_conf = CONF.keystone_authtoken
secret_uuid = auth['secret_uuid']