tacker/tacker/vnfm/infra_drivers/kubernetes/kubernetes_driver.py

2258 lines
103 KiB
Python

# All Rights Reserved.
#
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import re
import time
import urllib.request as urllib2
import yaml
from kubernetes import client
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
from toscaparser import tosca_template
from tacker._i18n import _
from tacker.common.container import kubernetes_utils
from tacker.common import exceptions
from tacker.common import log
from tacker.common import utils
from tacker.extensions import vnfm
from tacker import objects
from tacker.objects import vnf_package as vnf_package_obj
from tacker.objects import vnf_package_vnfd as vnfd_obj
from tacker.objects import vnf_resources as vnf_resource_obj
from tacker.vnflcm import utils as vnflcm_utils
from tacker.vnfm.infra_drivers import abstract_driver
from tacker.vnfm.infra_drivers.kubernetes.k8s import translate_outputs
from tacker.vnfm.infra_drivers.kubernetes import translate_template
from tacker.vnfm.infra_drivers import scale_driver
from urllib.parse import urlparse
CNF_TARGET_FILES_KEY = 'lcm-kubernetes-def-files'
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
VNFC_POD_NOT_FOUND = "POD_NOT_FOUND"
OPTS = [
cfg.IntOpt('stack_retries',
default=100,
help=_("Number of attempts to retry for stack"
" creation/deletion")),
cfg.IntOpt('stack_retry_wait',
default=5,
help=_("Wait time (in seconds) between consecutive stack"
" create/delete retries")),
]
CONF.register_opts(OPTS, group='kubernetes_vim')
def config_opts():
return [('kubernetes_vim', OPTS)]
SCALING_POLICY = 'tosca.policies.tacker.Scaling'
COMMA_CHARACTER = ','
def get_scaling_policy_name(action, policy_name):
return '%s_scale_%s' % (policy_name, action)
class Kubernetes(abstract_driver.VnfAbstractDriver,
scale_driver.VnfScaleAbstractDriver):
"""Kubernetes infra driver for hosting containerized vnfs"""
def __init__(self):
super(Kubernetes, self).__init__()
self.STACK_RETRIES = cfg.CONF.kubernetes_vim.stack_retries
self.STACK_RETRY_WAIT = cfg.CONF.kubernetes_vim.stack_retry_wait
self.kubernetes = kubernetes_utils.KubernetesHTTPAPI()
self.CHECK_DICT_KEY = [
"Pod",
"Service",
"PersistentVolumeClaim",
"Namespace",
"Node",
"PersistentVolume",
"APIService",
"DaemonSet",
"Deployment",
"ReplicaSet",
"StatefulSet",
"Job",
"VolumeAttachment"
]
def get_type(self):
return 'kubernetes'
def get_name(self):
return 'kubernetes'
def get_description(self):
return 'Kubernetes infra driver'
@log.log
def create(self, plugin, context, vnf, auth_attr):
"""Create function
Create ConfigMap, Deployment, Service and Horizontal Pod Autoscaler
objects. Return a string that contains all deployment namespace and
names for tracking resources.
"""
LOG.debug('vnf %s', vnf)
# initialize Kubernetes APIs
auth_cred, file_descriptor = self._get_auth_creds(auth_attr)
try:
core_v1_api_client = self.kubernetes.get_core_v1_api_client(
auth=auth_cred)
app_v1_api_client = self.kubernetes.get_app_v1_api_client(
auth=auth_cred)
scaling_api_client = self.kubernetes.get_scaling_api_client(
auth=auth_cred)
tosca_to_kubernetes = translate_template.TOSCAToKubernetes(
vnf=vnf,
core_v1_api_client=core_v1_api_client,
app_v1_api_client=app_v1_api_client,
scaling_api_client=scaling_api_client)
deployment_names = tosca_to_kubernetes.deploy_kubernetes_objects()
except Exception as e:
LOG.error('Creating VNF got an error due to %s', e)
raise
finally:
self.clean_authenticate_vim(auth_cred, file_descriptor)
return deployment_names
def create_wait(self, plugin, context, vnf_dict, vnf_id, auth_attr):
"""Create wait function
Create wait function will marked VNF is ACTIVE when all status state
from Pod objects is RUNNING.
"""
# initialize Kubernetes APIs
if '{' not in vnf_id and '}' not in vnf_id:
auth_cred, file_descriptor = self._get_auth_creds(auth_attr)
try:
core_v1_api_client = \
self.kubernetes.get_core_v1_api_client(auth=auth_cred)
deployment_info = vnf_id.split(COMMA_CHARACTER)
mgmt_ips = dict()
pods_information = self._get_pods_information(
core_v1_api_client=core_v1_api_client,
deployment_info=deployment_info)
status = self._get_pod_status(pods_information)
stack_retries = self.STACK_RETRIES
error_reason = None
while status == 'Pending' and stack_retries > 0:
time.sleep(self.STACK_RETRY_WAIT)
pods_information = \
self._get_pods_information(
core_v1_api_client=core_v1_api_client,
deployment_info=deployment_info)
status = self._get_pod_status(pods_information)
LOG.debug('status: %s', status)
stack_retries = stack_retries - 1
LOG.debug('VNF initializing status: %(service_name)s '
'%(status)s',
{'service_name': str(deployment_info),
'status': status})
if stack_retries == 0 and status != 'Running':
error_reason = _(
"Resource creation is not completed within"
" {wait} seconds as creation of stack {stack}"
" is not completed").format(
wait=(
self.STACK_RETRIES *
self.STACK_RETRY_WAIT),
stack=vnf_id)
LOG.warning("VNF Creation failed: %(reason)s",
{'reason': error_reason})
raise vnfm.VNFCreateWaitFailed(reason=error_reason)
elif stack_retries != 0 and status != 'Running':
raise vnfm.VNFCreateWaitFailed(reason=error_reason)
for i in range(0, len(deployment_info), 2):
namespace = deployment_info[i]
deployment_name = deployment_info[i + 1]
service_info = core_v1_api_client.read_namespaced_service(
name=deployment_name,
namespace=namespace)
if service_info.metadata.labels.get(
"management_connection"):
vdu_name = service_info.metadata.labels.\
get("vdu_name").split("-")[1]
mgmt_ip = service_info.spec.cluster_ip
mgmt_ips.update({vdu_name: mgmt_ip})
vnf_dict['mgmt_ip_address'] = jsonutils.dump_as_bytes(
mgmt_ips)
except Exception as e:
LOG.error('Creating wait VNF got an error due to %s', e)
raise
finally:
self.clean_authenticate_vim(auth_cred, file_descriptor)
def create_wait_k8s(self, k8s_objs, k8s_client_dict, vnf_instance):
try:
time.sleep(self.STACK_RETRY_WAIT)
keep_going = True
stack_retries = self.STACK_RETRIES
while keep_going and stack_retries > 0:
for k8s_obj in k8s_objs:
kind = k8s_obj.get('object').kind
namespace = k8s_obj.get('namespace')
if hasattr(k8s_obj.get('object').metadata, 'name'):
name = k8s_obj.get('object').metadata.name
else:
name = ''
api_version = k8s_obj.get('object').api_version
if k8s_obj.get('status') == 'Creating':
if kind in self.CHECK_DICT_KEY:
check_method = self.\
_select_check_status_by_kind(kind)
check_method(k8s_client_dict, k8s_obj,
namespace, name, api_version)
else:
k8s_obj['status'] = 'Create_complete'
keep_going = False
for k8s_obj in k8s_objs:
if k8s_obj.get('status') != 'Create_complete':
keep_going = True
else:
if k8s_obj.get('object', '').metadata:
LOG.debug(
'Resource namespace: {namespace},'
'name:{name},kind: {kind} '
'is create complete'.format(
namespace=k8s_obj.get('namespace'),
name=k8s_obj.get('object').metadata.name,
kind=k8s_obj.get('object').kind)
)
else:
LOG.debug(
'Resource namespace: {namespace},'
'name:{name},kind: {kind} '
'is create complete'.format(
namespace=k8s_obj.get('namespace'),
name='',
kind=k8s_obj.get('object').kind)
)
if keep_going:
time.sleep(self.STACK_RETRY_WAIT)
stack_retries -= 1
if stack_retries == 0 and keep_going:
LOG.error('It is time out, When instantiate cnf,'
'waiting for resource creation.')
for k8s_obj in k8s_objs:
if k8s_obj.get('status') == 'Creating':
k8s_obj['status'] = 'Wait_failed'
err_reason = _("The resources are creating time out."
"namespace: {namespace}, name:{name}, "
"kind: {kind}).Reason: {message}").\
format(namespace=k8s_obj.get('namespace'),
name=k8s_obj.get('object').metadata.name,
kind=k8s_obj.get('object').kind,
message=k8s_obj['message'])
LOG.error(err_reason)
error_reason = _(
"Resource creation is not completed within"
" {wait} seconds as creation of stack {stack}"
" is not completed").format(
wait=(self.STACK_RETRIES * self.STACK_RETRY_WAIT),
stack=vnf_instance.id
)
raise vnfm.CNFCreateWaitFailed(reason=error_reason)
return k8s_objs
except Exception as e:
LOG.error('Creating wait CNF got an error due to %s', e)
raise e
def _select_check_status_by_kind(self, kind):
check_dict = {
"Pod": self._check_status_pod,
"Service": self._check_status_service,
"PersistentVolumeClaim":
self._check_status_persistent_volume_claim,
"Namespace": self._check_status_namespace,
"Node": self._check_status_node,
"PersistentVolume": self._check_status_persistent_volume,
"APIService": self._check_status_api_service,
"DaemonSet": self._check_status_daemon_set,
"Deployment": self._check_status_deployment,
"ReplicaSet": self._check_status_replica_set,
"StatefulSet": self._check_status_stateful_set,
"Job": self._check_status_job,
"VolumeAttachment": self._check_status_volume_attachment
}
return check_dict[kind]
def _check_is_ip(self, ip_str):
if re.match(r'^\d{,3}.\d{,3}.\d{,3}.\d{,3}$', ip_str):
num_list = [int(x) for x in ip_str.split('.')]
for i in num_list:
if i > 255 or i < 0:
return False
return True
else:
return False
def _check_status_stateful_set(self, k8s_client_dict, k8s_obj,
namespace, name, api_version):
stateful_set = k8s_client_dict[api_version]. \
read_namespaced_stateful_set(namespace=namespace, name=name)
if stateful_set.status.replicas != \
stateful_set.status.ready_replicas:
k8s_obj['status'] = 'Creating'
k8s_obj['message'] = "Pod in StatefulSet is still creating. " \
"The pod is ready {value1}/{value2}".format(
value1=stateful_set.status.ready_replicas,
value2=stateful_set.status.replicas
)
else:
for i in range(0, stateful_set.spec.replicas):
volume_claim_templates = stateful_set.spec.\
volume_claim_templates
for volume_claim_template in volume_claim_templates:
pvc_name = "-".join(
[volume_claim_template.metadata.name, name, str(i)])
persistent_volume_claim = k8s_client_dict['v1']. \
read_namespaced_persistent_volume_claim(
namespace=namespace, name=pvc_name)
if persistent_volume_claim.status.phase != 'Bound':
k8s_obj['status'] = 'Creating'
k8s_obj['message'] = "PersistentVolumeClaim in " \
"StatefulSet is still " \
"creating." \
"The status is " \
"{status}".format(
status=persistent_volume_claim.status.phase)
else:
k8s_obj['status'] = 'Create_complete'
k8s_obj['message'] = 'StatefulSet is created'
def _check_status_pod(self, k8s_client_dict, k8s_obj,
namespace, name, api_version):
pod = k8s_client_dict[api_version].read_namespaced_pod(
namespace=namespace, name=name)
if pod.status.phase != 'Running':
k8s_obj['status'] = 'Creating'
k8s_obj['message'] = "Pod is still creating. The status is " \
"{status}".format(status=pod.
status.phase)
else:
k8s_obj['status'] = 'Create_complete'
k8s_obj['message'] = "Pod is created"
def _check_status_service(self, k8s_client_dict, k8s_obj,
namespace, name, api_version):
service = k8s_client_dict[api_version].read_namespaced_service(
namespace=namespace, name=name)
status_flag = False
if service.spec.cluster_ip in ['', None] or \
self._check_is_ip(service.spec.cluster_ip):
try:
endpoint = k8s_client_dict['v1'].\
read_namespaced_endpoints(namespace=namespace, name=name)
if endpoint:
status_flag = True
except Exception as e:
msg = _('read endpoinds failed.kind:{kind}.reason:{e}'.format(
kind=service.kind, e=e))
LOG.error(msg)
raise exceptions.ReadEndpoindsFalse(error=msg)
if status_flag:
k8s_obj['status'] = 'Create_complete'
k8s_obj['message'] = "Service is created"
else:
k8s_obj['status'] = 'Creating'
k8s_obj['message'] = "Service is still creating." \
"The status is False"
def _check_status_persistent_volume_claim(self, k8s_client_dict, k8s_obj,
namespace, name, api_version):
claim = k8s_client_dict[api_version].\
read_namespaced_persistent_volume_claim(
namespace=namespace, name=name)
if claim.status.phase != 'Bound':
k8s_obj['status'] = 'Creating'
k8s_obj['message'] = "PersistentVolumeClaim is still creating."\
"The status is {status}".\
format(status=claim.status.phase)
else:
k8s_obj['status'] = 'Create_complete'
k8s_obj['message'] = "PersistentVolumeClaim is created"
def _check_status_namespace(self, k8s_client_dict, k8s_obj,
namespace, name, api_version):
name_space = k8s_client_dict[api_version].read_namespace(name=name)
if name_space.status.phase != 'Active':
k8s_obj['status'] = 'Creating'
k8s_obj['message'] = "Namespace is still creating." \
"The status is {status}". \
format(status=name_space.status.phase)
else:
k8s_obj['status'] = 'Create_complete'
k8s_obj['message'] = "Namespace is created"
def _check_status_node(self, k8s_client_dict, k8s_obj,
namespace, name, api_version):
node = k8s_client_dict[api_version].read_node(name=name)
status_flag = False
for condition in node.status.conditions:
if condition.type == 'Ready':
if condition.status == 'True':
status_flag = True
break
else:
continue
if status_flag:
k8s_obj['status'] = 'Create_complete'
k8s_obj['message'] = "Node is created"
else:
k8s_obj['status'] = 'Creating'
k8s_obj['message'] = "Node is still creating." \
"The status is False"
def _check_status_persistent_volume(self, k8s_client_dict, k8s_obj,
namespace, name, api_version):
volume = k8s_client_dict[api_version].\
read_persistent_volume(name=name)
if volume.status.phase != 'Available' and \
volume.status.phase != 'Bound':
k8s_obj['status'] = 'Creating'
k8s_obj['message'] = "PersistentVolume is still creating." \
"The status is {status}". \
format(status=volume.status.phase)
else:
k8s_obj['status'] = 'Create_complete'
k8s_obj['message'] = "PersistentVolume is created"
def _check_status_api_service(self, k8s_client_dict, k8s_obj,
namespace, name, api_version):
api_service = k8s_client_dict[api_version].read_api_service(name=name)
status_flag = False
for condition in api_service.status.conditions:
if condition.type == 'Available':
if condition.status == 'True':
status_flag = True
break
else:
continue
if status_flag:
k8s_obj['status'] = 'Create_complete'
k8s_obj['message'] = "APIService is created"
else:
k8s_obj['status'] = 'Creating'
k8s_obj['message'] = "APIService is still creating." \
"The status is False"
def _check_status_daemon_set(self, k8s_client_dict, k8s_obj,
namespace, name, api_version):
daemon_set = k8s_client_dict[api_version].\
read_namespaced_daemon_set(namespace=namespace, name=name)
if daemon_set.status.desired_number_scheduled != \
daemon_set.status.number_ready:
k8s_obj['status'] = 'Creating'
k8s_obj['message'] = "DaemonSet is still creating. " \
"The DaemonSet is ready {value1}/{value2}".\
format(value1=daemon_set.status.number_ready,
value2=daemon_set.status.desired_number_scheduled)
else:
k8s_obj['status'] = 'Create_complete'
k8s_obj['message'] = 'DaemonSet is created'
def _check_status_deployment(self, k8s_client_dict, k8s_obj,
namespace, name, api_version):
deployment = k8s_client_dict[api_version].\
read_namespaced_deployment(namespace=namespace, name=name)
if deployment.status.replicas != deployment.status.ready_replicas:
k8s_obj['status'] = 'Creating'
k8s_obj['message'] = "Deployment is still creating. " \
"The Deployment is ready {value1}/{value2}".\
format(value1=deployment.status.ready_replicas,
value2=deployment.status.replicas
)
else:
k8s_obj['status'] = 'Create_complete'
k8s_obj['message'] = 'Deployment is created'
def _check_status_replica_set(self, k8s_client_dict, k8s_obj,
namespace, name, api_version):
replica_set = k8s_client_dict[api_version].\
read_namespaced_replica_set(namespace=namespace, name=name)
if replica_set.status.replicas != replica_set.status.ready_replicas:
k8s_obj['status'] = 'Creating'
k8s_obj['message'] = "ReplicaSet is still creating. " \
"The ReplicaSet is ready {value1}/{value2}".\
format(value1=replica_set.status.ready_replicas,
value2=replica_set.status.replicas
)
else:
k8s_obj['status'] = 'Create_complete'
k8s_obj['message'] = 'ReplicaSet is created'
def _check_status_job(self, k8s_client_dict, k8s_obj,
namespace, name, api_version):
job = k8s_client_dict[api_version].\
read_namespaced_job(namespace=namespace, name=name)
if job.spec.completions != job.status.succeeded:
k8s_obj['status'] = 'Creating'
k8s_obj['message'] = "Job is still creating." \
"The status is {status}". \
format(status=job.spec.completions)
else:
k8s_obj['status'] = 'Create_complete'
k8s_obj['message'] = 'Job is created'
def _check_status_volume_attachment(self, k8s_client_dict, k8s_obj,
namespace, name, api_version):
volume = k8s_client_dict[api_version].\
read_volume_attachment(name=name)
if not volume.status.attached:
k8s_obj['status'] = 'Creating'
k8s_obj['message'] = "VolumeAttachment is still creating." \
"The status is {status}". \
format(status=volume.status.attached)
else:
k8s_obj['status'] = 'Create_complete'
k8s_obj['message'] = 'VolumeAttachment is created'
def _get_pods_information(self, core_v1_api_client, deployment_info):
"""Get pod information"""
pods_information = list()
for i in range(0, len(deployment_info), 2):
namespace = deployment_info[i]
deployment_name = deployment_info[i + 1]
respone = \
core_v1_api_client.list_namespaced_pod(namespace=namespace)
for item in respone.items:
if deployment_name in item.metadata.name:
pods_information.append(item)
return pods_information
def _get_pod_status(self, pods_information):
pending_flag = False
unknown_flag = False
for pod_info in pods_information:
status = pod_info.status.phase
if status == 'Pending':
pending_flag = True
elif status == 'Unknown':
unknown_flag = True
if unknown_flag:
status = 'Unknown'
elif pending_flag:
status = 'Pending'
else:
status = 'Running'
return status
@log.log
def update(self, plugin, context, vnf_id, vnf_dict, vnf, auth_attr):
"""Update containerized VNF through ConfigMap data
In Kubernetes VIM, updating VNF will be updated by updating
ConfigMap data
"""
# initialize Kubernetes APIs
auth_cred, file_descriptor = self._get_auth_creds(auth_attr)
try:
core_v1_api_client = \
self.kubernetes.get_core_v1_api_client(auth=auth_cred)
# update config attribute
config_yaml = vnf_dict.get('attributes', {}).get('config', '')
update_yaml = vnf['vnf'].get('attributes', {}).get('config', '')
LOG.debug('yaml orig %(orig)s update %(update)s',
{'orig': config_yaml, 'update': update_yaml})
# If config_yaml is None, yaml.safe_load() will raise Attribute
# Error. So set config_yaml to {}, if it is None.
if not config_yaml:
config_dict = {}
else:
config_dict = yaml.safe_load(config_yaml) or {}
update_dict = yaml.safe_load(update_yaml)
if not update_dict:
return
LOG.debug('dict orig %(orig)s update %(update)s',
{'orig': config_dict, 'update': update_dict})
utils.deep_update(config_dict, update_dict)
LOG.debug('dict new %(new)s update %(update)s',
{'new': config_dict, 'update': update_dict})
new_yaml = yaml.safe_dump(config_dict)
vnf_dict.setdefault('attributes', {})['config'] = new_yaml
deployment_info = vnf_id.split(",")
for i in range(0, len(deployment_info), 2):
namespace = deployment_info[i]
deployment_name = deployment_info[i + 1]
configmap_resp = core_v1_api_client.read_namespaced_config_map(
namespace=namespace,
name=deployment_name)
configmap_data = configmap_resp.data
new_configmap = {key: update_dict.get(key, configmap_data[key])
for key in configmap_data}
configmap_resp.data = new_configmap
core_v1_api_client.\
patch_namespaced_config_map(namespace=namespace,
name=deployment_name,
body=configmap_resp)
except Exception as e:
LOG.error('Updating VNF got an error due to %s', e)
raise
finally:
self.clean_authenticate_vim(auth_cred, file_descriptor)
@log.log
def update_wait(self, plugin, context, vnf_id, auth_attr,
region_name=None):
"""Update wait function"""
# TODO(phuoc): do nothing, will update it if we need actions
pass
def _delete_legacy(self, vnf_id, auth_cred):
"""Delete function"""
# initialize Kubernetes APIs
try:
core_v1_api_client = self.kubernetes.get_core_v1_api_client(
auth=auth_cred)
app_v1_api_client = self.kubernetes.get_app_v1_api_client(
auth=auth_cred)
scaling_api_client = self.kubernetes.get_scaling_api_client(
auth=auth_cred)
deployment_names = vnf_id.split(COMMA_CHARACTER)
for i in range(0, len(deployment_names), 2):
namespace = deployment_names[i]
deployment_name = deployment_names[i + 1]
# delete ConfigMap if it exists
try:
body = {}
core_v1_api_client.delete_namespaced_config_map(
namespace=namespace,
name=deployment_name,
body=body)
LOG.debug('Successfully deleted ConfigMap %s',
deployment_name)
except Exception as e:
LOG.debug(e)
pass
# delete Service if it exists
try:
core_v1_api_client.delete_namespaced_service(
namespace=namespace,
name=deployment_name)
LOG.debug('Successfully deleted Service %s',
deployment_name)
except Exception as e:
LOG.debug(e)
pass
# delete Horizon Pod Auto-scaling if it exists
try:
body = client.V1DeleteOptions()
scaling_api_client.\
delete_namespaced_horizontal_pod_autoscaler(
namespace=namespace,
name=deployment_name,
body=body)
LOG.debug('Successfully deleted Horizon Pod Auto-Scaling '
'%s', deployment_name)
except Exception as e:
LOG.debug(e)
pass
# delete Deployment if it exists
try:
body = client.V1DeleteOptions(
propagation_policy='Foreground',
grace_period_seconds=5)
app_v1_api_client.delete_namespaced_deployment(
namespace=namespace,
name=deployment_name,
body=body)
LOG.debug('Successfully deleted Deployment %s',
deployment_name)
except Exception as e:
LOG.debug(e)
pass
except Exception:
raise
def _select_delete_api(self, k8s_client_dict, namespace, name,
kind, api_version, body):
"""select kubernetes delete api and call"""
def convert(name):
name_with_underscores = re.sub(
'(.)([A-Z][a-z]+)', r'\1_\2', name)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2',
name_with_underscores).lower()
snake_case_kind = convert(kind)
kubernetes = translate_outputs.Transformer(
None, None, None, None)
try:
if 'namespaced' in kubernetes.method_value.get(kind):
delete_api = eval('k8s_client_dict[api_version].'
'delete_namespaced_%s' % snake_case_kind)
response = delete_api(name=name, namespace=namespace,
body=body)
else:
delete_api = eval('k8s_client_dict[api_version].'
'delete_%s' % snake_case_kind)
response = delete_api(name=name, body=body)
except Exception:
raise
return response
def _get_pvc_list_for_delete(self, k8s_client_dict, sfs_name, namespace):
pvc_list_for_delete = list()
try:
resp_read_sfs = k8s_client_dict['apps/v1'].\
read_namespaced_stateful_set(sfs_name, namespace)
sfs_spec = resp_read_sfs.spec
volume_claim_templates = list()
volume_claim_templates = sfs_spec.volume_claim_templates
try:
resp_list_pvc = k8s_client_dict['v1'].\
list_namespaced_persistent_volume_claim(namespace)
pvc_list = resp_list_pvc.items
for volume_claim_template in volume_claim_templates:
pvc_template_metadata = volume_claim_template.metadata
match_pattern = '-'.join(
[pvc_template_metadata.name, sfs_name, ""])
for pvc in pvc_list:
pvc_metadata = pvc.metadata
pvc_name = pvc_metadata.name
match_result = re.match(
match_pattern + '[0-9]+$', pvc_name)
if match_result is not None:
pvc_list_for_delete.append(pvc_name)
except Exception as e:
LOG.debug(e)
pass
except Exception as e:
LOG.debug(e)
pass
return pvc_list_for_delete
@log.log
def _delete_k8s_obj(self, kind, k8s_client_dict, vnf_resource, body):
namespace = vnf_resource.resource_name.\
split(COMMA_CHARACTER)[0]
name = vnf_resource.resource_name.\
split(COMMA_CHARACTER)[1]
api_version = vnf_resource.resource_type.\
split(COMMA_CHARACTER)[0]
pvc_list_for_delete = list()
# if kind is StatefulSet, create name list for deleting
# PersistentVolumeClaim created when StatefulSet is generated.
if kind == 'StatefulSet':
pvc_list_for_delete = \
self._get_pvc_list_for_delete(
k8s_client_dict=k8s_client_dict,
sfs_name=name,
namespace=namespace)
# delete target k8s obj
try:
self._select_delete_api(
k8s_client_dict=k8s_client_dict,
namespace=namespace,
name=name,
kind=kind,
api_version=api_version,
body=body)
LOG.debug('Successfully deleted resource: '
'kind=%(kind)s, name=%(name)s',
{"kind": kind, "name": name})
except Exception as e:
LOG.debug(e)
pass
if (kind == 'StatefulSet' and
len(pvc_list_for_delete) > 0):
for delete_pvc_name in pvc_list_for_delete:
try:
k8s_client_dict['v1'].\
delete_namespaced_persistent_volume_claim(
name=delete_pvc_name,
namespace=namespace,
body=body)
except Exception as e:
LOG.debug(e)
pass
@log.log
def delete(self, plugin, context, vnf_id, auth_attr, region_name=None,
vnf_instance=None, terminate_vnf_req=None):
"""Delete function"""
auth_cred, file_descriptor = self._get_auth_creds(auth_attr)
try:
if not vnf_instance:
# execute legacy delete method
self._delete_legacy(vnf_id, auth_cred)
else:
# initialize Kubernetes APIs
k8s_client_dict = self.kubernetes.\
get_k8s_client_dict(auth=auth_cred)
# get V1DeleteOptions for deleting an API object
body = {}
vnf_resources = objects.VnfResourceList.get_by_vnf_instance_id(
context, vnf_instance.id)
if terminate_vnf_req:
if terminate_vnf_req.termination_type == 'GRACEFUL':
grace_period_seconds = terminate_vnf_req.\
graceful_termination_timeout
elif terminate_vnf_req.termination_type == 'FORCEFUL':
grace_period_seconds = 0
body = client.V1DeleteOptions(
propagation_policy='Foreground',
grace_period_seconds=grace_period_seconds)
else:
body = client.V1DeleteOptions(
propagation_policy='Foreground')
# follow the order below to resolve dependency when deleting
ordered_kind = [
# 1.
'Deployment', 'Job', 'DaemonSet', 'StatefulSet',
# 2.
'Pod',
# 3.
'PersistentVolumeClaim', 'ConfigMap', 'Secret',
'PriorityClass',
# 4.
'PersistentVolume',
# 5.
'StorageClass',
# 6. Except for 1 to 5 above, delete before `Namespace`
'Service', 'LimitRange', 'PodTemplate', 'Node',
'ResourceQuota', 'ServiceAccount', 'APIService',
'ReplicaSet', 'ControllerRevision',
'HorizontalPodAutoscaler', 'Lease', 'NetworkPolicy',
'ClusterRole', 'ClusterRoleBinding', 'Role', 'RoleBinding',
'VolumeAttachment',
# 7. Delete `Namespace` finally
'Namespace'
]
for kind in ordered_kind:
for vnf_resource in vnf_resources:
obj_kind = vnf_resource.resource_type.\
split(COMMA_CHARACTER)[1]
if obj_kind == kind:
self._delete_k8s_obj(
kind=obj_kind,
k8s_client_dict=k8s_client_dict,
vnf_resource=vnf_resource,
body=body)
except Exception as e:
LOG.error('Deleting VNF got an error due to %s', e)
raise
finally:
self.clean_authenticate_vim(auth_cred, file_descriptor)
def _delete_wait_legacy(self, vnf_id, auth_cred):
"""Delete wait function for legacy
This function is used to checking a containerized VNF is deleted
completely or not. We do it by get information of Kubernetes objects.
When Tacker can not get any information about service, the VNF will be
marked as deleted.
"""
try:
core_v1_api_client = self.kubernetes.get_core_v1_api_client(
auth=auth_cred)
app_v1_api_client = self.kubernetes.get_app_v1_api_client(
auth=auth_cred)
scaling_api_client = self.kubernetes.get_scaling_api_client(
auth=auth_cred)
deployment_names = vnf_id.split(COMMA_CHARACTER)
keep_going = True
stack_retries = self.STACK_RETRIES
while keep_going and stack_retries > 0:
count = 0
for i in range(0, len(deployment_names), 2):
namespace = deployment_names[i]
deployment_name = deployment_names[i + 1]
try:
core_v1_api_client.read_namespaced_config_map(
namespace=namespace,
name=deployment_name)
count = count + 1
except Exception:
pass
try:
core_v1_api_client.read_namespaced_service(
namespace=namespace,
name=deployment_name)
count = count + 1
except Exception:
pass
try:
scaling_api_client.\
read_namespaced_horizontal_pod_autoscaler(
namespace=namespace,
name=deployment_name)
count = count + 1
except Exception:
pass
try:
app_v1_api_client.read_namespaced_deployment(
namespace=namespace,
name=deployment_name)
count = count + 1
except Exception:
pass
stack_retries = stack_retries - 1
# If one of objects is still alive, keeps on waiting
if count > 0:
keep_going = True
time.sleep(self.STACK_RETRY_WAIT)
else:
keep_going = False
except Exception as e:
LOG.error('Deleting wait VNF got an error due to %s', e)
raise
def _select_k8s_obj_read_api(self, k8s_client_dict, namespace, name,
kind, api_version):
"""select kubernetes read api and call"""
def convert(name):
name_with_underscores = re.sub(
'(.)([A-Z][a-z]+)', r'\1_\2', name)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2',
name_with_underscores).lower()
snake_case_kind = convert(kind)
try:
if namespace:
read_api = eval('k8s_client_dict[api_version].'
'read_namespaced_%s' % snake_case_kind)
response = read_api(name=name, namespace=namespace)
else:
read_api = eval('k8s_client_dict[api_version].'
'read_%s' % snake_case_kind)
response = read_api(name=name)
except Exception:
raise
return response
@log.log
def delete_wait(self, plugin, context, vnf_id, auth_attr,
region_name=None, vnf_instance=None):
"""Delete wait function
This function is used to checking a containerized VNF is deleted
completely or not. We do it by get information of Kubernetes objects.
When Tacker can not get any information about service, the VNF will be
marked as deleted.
"""
# initialize Kubernetes APIs
auth_cred, file_descriptor = self._get_auth_creds(auth_attr)
try:
if not vnf_instance:
# execute legacy delete_wait method
self._delete_wait_legacy(vnf_id, auth_cred)
else:
vnf_resources = objects.VnfResourceList.\
get_by_vnf_instance_id(context, vnf_instance.id)
k8s_client_dict = self.kubernetes.\
get_k8s_client_dict(auth=auth_cred)
keep_going = True
stack_retries = self.STACK_RETRIES
while keep_going and stack_retries > 0:
count = 0
for vnf_resource in vnf_resources:
namespace = vnf_resource.resource_name.\
split(COMMA_CHARACTER)[0]
name = vnf_resource.resource_name.\
split(COMMA_CHARACTER)[1]
api_version = vnf_resource.resource_type.\
split(COMMA_CHARACTER)[0]
kind = vnf_resource.resource_type.\
split(COMMA_CHARACTER)[1]
try:
self._select_k8s_obj_read_api(
k8s_client_dict=k8s_client_dict,
namespace=namespace,
name=name,
kind=kind,
api_version=api_version)
count = count + 1
except Exception:
pass
stack_retries = stack_retries - 1
# If one of objects is still alive, keeps on waiting
if count > 0:
keep_going = True
time.sleep(self.STACK_RETRY_WAIT)
else:
keep_going = False
except Exception as e:
LOG.error('Deleting wait VNF got an error due to %s', e)
raise
finally:
self.clean_authenticate_vim(auth_cred, file_descriptor)
def _scale_legacy(self, policy, auth_cred):
LOG.debug("VNF are scaled by updating instance of deployment")
app_v1_api_client = self.kubernetes.get_app_v1_api_client(
auth=auth_cred)
scaling_api_client = self.kubernetes.get_scaling_api_client(
auth=auth_cred)
deployment_names = policy['instance_id'].split(COMMA_CHARACTER)
policy_name = policy['name']
policy_action = policy['action']
for i in range(0, len(deployment_names), 2):
namespace = deployment_names[i]
deployment_name = deployment_names[i + 1]
deployment_info = app_v1_api_client.\
read_namespaced_deployment(namespace=namespace,
name=deployment_name)
scaling_info = scaling_api_client.\
read_namespaced_horizontal_pod_autoscaler(
namespace=namespace,
name=deployment_name)
replicas = deployment_info.status.replicas
scale_replicas = replicas
vnf_scaling_name = deployment_info.metadata.labels.\
get("scaling_name")
if vnf_scaling_name == policy_name:
if policy_action == 'out':
scale_replicas = replicas + 1
elif policy_action == 'in':
scale_replicas = replicas - 1
min_replicas = scaling_info.spec.min_replicas
max_replicas = scaling_info.spec.max_replicas
if (scale_replicas < min_replicas) or \
(scale_replicas > max_replicas):
LOG.debug("Scaling replicas is out of range. The number of"
" replicas keeps %(number)s replicas",
{'number': replicas})
scale_replicas = replicas
deployment_info.spec.replicas = scale_replicas
app_v1_api_client.patch_namespaced_deployment_scale(
namespace=namespace,
name=deployment_name,
body=deployment_info)
def _call_read_scale_api(self, app_v1_api_client, namespace, name, kind):
"""select kubernetes read scale api and call"""
def convert(name):
name_with_underscores = re.sub(
'(.)([A-Z][a-z]+)', r'\1_\2', name)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2',
name_with_underscores).lower()
snake_case_kind = convert(kind)
try:
read_scale_api = eval('app_v1_api_client.'
'read_namespaced_%s_scale' % snake_case_kind)
response = read_scale_api(name=name, namespace=namespace)
except Exception as e:
error_reason = _("Failed the request to read a scale information."
" namespace: {namespace}, name: {name},"
" kind: {kind}, Reason: {exception}").format(
namespace=namespace, name=name, kind=kind, exception=e)
raise vnfm.CNFScaleFailed(reason=error_reason)
return response
def _call_patch_scale_api(self, app_v1_api_client, namespace, name,
kind, body):
"""select kubernetes patch scale api and call"""
def convert(name):
name_with_underscores = re.sub(
'(.)([A-Z][a-z]+)', r'\1_\2', name)
return re.sub('([a-z0-9])([A-Z])', r'\1_\2',
name_with_underscores).lower()
snake_case_kind = convert(kind)
try:
patch_scale_api = eval('app_v1_api_client.'
'patch_namespaced_%s_scale' % snake_case_kind)
response = patch_scale_api(name=name, namespace=namespace,
body=body)
except Exception as e:
error_reason = _("Failed the request to update a scale information"
". namespace: {namespace}, name: {name},"
" kind: {kind}, Reason: {exception}").format(
namespace=namespace, name=name, kind=kind, exception=e)
raise vnfm.CNFScaleFailed(reason=error_reason)
return response
@log.log
def scale(self, context, plugin, auth_attr, policy, region_name):
"""Scale function
Scaling VNF is implemented by updating replicas through Kubernetes API.
The min_replicas and max_replicas is limited by the number of replicas
of policy scaling when user define VNF descriptor.
"""
# initialize Kubernetes APIs
auth_cred, file_descriptor = self._get_auth_creds(auth_attr)
try:
if not policy.get('vnf_instance_id'):
# execute legacy scale method
self._scale_legacy(policy, auth_cred)
else:
vnf_resources = objects.VnfResourceList.get_by_vnf_instance_id(
context, policy['vnf_instance_id'])
app_v1_api_client = self.kubernetes.get_app_v1_api_client(
auth=auth_cred)
aspect_id = policy['name']
vdu_defs = policy['vdu_defs']
is_found = False
error_reason = None
for vnf_resource in vnf_resources:
# The resource that matches the following is the resource
# to be scaled:
# The `name` of the resource stored in vnf_resource (the
# name defined in `metadata.name` of Kubernetes object
# file) matches the value of `properties.name` of VDU
# defined in VNFD.
name = vnf_resource.resource_name.\
split(COMMA_CHARACTER)[1]
for vdu_id, vdu_def in vdu_defs.items():
vdu_properties = vdu_def.get('properties')
if name == vdu_properties.get('name'):
namespace = vnf_resource.resource_name.\
split(COMMA_CHARACTER)[0]
kind = vnf_resource.resource_type.\
split(COMMA_CHARACTER)[1]
is_found = True
break
if is_found:
break
else:
error_reason = _(
"Target VnfResource for aspectId"
" {aspect_id} is not found in DB").format(
aspect_id=aspect_id)
raise vnfm.CNFScaleFailed(reason=error_reason)
target_kinds = ["Deployment", "ReplicaSet", "StatefulSet"]
if kind not in target_kinds:
error_reason = _(
"Target kind {kind} is out of scale target").\
format(kind=kind)
raise vnfm.CNFScaleFailed(reason=error_reason)
scale_info = self._call_read_scale_api(
app_v1_api_client=app_v1_api_client,
namespace=namespace,
name=name,
kind=kind)
current_replicas = scale_info.status.replicas
vdu_profile = vdu_properties.get('vdu_profile')
if policy['action'] == 'out':
scale_replicas = current_replicas + policy['delta_num']
elif policy['action'] == 'in':
scale_replicas = current_replicas - policy['delta_num']
max_replicas = vdu_profile.get('max_number_of_instances')
min_replicas = vdu_profile.get('min_number_of_instances')
if (scale_replicas < min_replicas) or \
(scale_replicas > max_replicas):
error_reason = _(
"The number of target replicas after"
" scaling [{after_replicas}] is out of range").\
format(
after_replicas=scale_replicas)
raise vnfm.CNFScaleFailed(reason=error_reason)
scale_info.spec.replicas = scale_replicas
self._call_patch_scale_api(
app_v1_api_client=app_v1_api_client,
namespace=namespace,
name=name,
kind=kind,
body=scale_info)
except Exception as e:
LOG.error('Scaling VNF got an error due to %s', e)
raise
finally:
self.clean_authenticate_vim(auth_cred, file_descriptor)
def _scale_wait_legacy(self, policy, auth_cred):
core_v1_api_client = self.kubernetes.get_core_v1_api_client(
auth=auth_cred)
deployment_info = policy['instance_id'].split(",")
pods_information = self._get_pods_information(
core_v1_api_client=core_v1_api_client,
deployment_info=deployment_info)
status = self._get_pod_status(pods_information)
stack_retries = self.STACK_RETRIES
error_reason = None
while status == 'Pending' and stack_retries > 0:
time.sleep(self.STACK_RETRY_WAIT)
pods_information = self._get_pods_information(
core_v1_api_client=core_v1_api_client,
deployment_info=deployment_info)
status = self._get_pod_status(pods_information)
# LOG.debug('status: %s', status)
stack_retries = stack_retries - 1
LOG.debug('VNF initializing status: %(service_name)s %(status)s',
{'service_name': str(deployment_info), 'status': status})
if stack_retries == 0 and status != 'Running':
error_reason = _("Resource creation is not completed within"
" {wait} seconds as creation of stack {stack}"
" is not completed").format(
wait=(self.STACK_RETRIES *
self.STACK_RETRY_WAIT),
stack=policy['instance_id'])
LOG.error("VNF Creation failed: %(reason)s",
{'reason': error_reason})
raise vnfm.VNFCreateWaitFailed(reason=error_reason)
elif stack_retries != 0 and status != 'Running':
raise vnfm.VNFCreateWaitFailed(reason=error_reason)
def _is_match_pod_naming_rule(self, rsc_kind, rsc_name, pod_name):
match_result = None
if rsc_kind == 'Pod':
# Expected example: name
if rsc_name == pod_name:
match_result = True
elif rsc_kind == 'Deployment':
# Expected example: name-012789abef-019az
# NOTE(horie): The naming rule of Pod in deployment is
# "(deployment name)-(pod template hash)-(5 charactors)".
# The "pod template hash" string is generated from 32 bit hash.
# This may be from 1 to 10 caracters but not sure the lower limit
# from the source code of Kubernetes.
match_result = re.match(
rsc_name + '-([0-9a-f]{1,10})-([0-9a-z]{5})+$',
pod_name)
elif rsc_kind == 'ReplicaSet' or rsc_kind == 'DaemonSet':
# Expected example: name-019az
match_result = re.match(
rsc_name + '-([0-9a-z]{5})+$',
pod_name)
elif rsc_kind == 'StatefulSet':
# Expected example: name-0
match_result = re.match(
rsc_name + '-[0-9]+$',
pod_name)
if match_result:
return True
else:
return False
def scale_wait(self, context, plugin, auth_attr, policy, region_name,
last_event_id):
"""Scale wait function
Scale wait function will marked VNF is ACTIVE when all status state
from Pod objects is RUNNING.
"""
# initialize Kubernetes APIs
auth_cred, file_descriptor = self._get_auth_creds(auth_attr)
try:
if not policy.get('vnf_instance_id'):
# execute legacy scale_wait method
self._scale_wait_legacy(policy, auth_cred)
else:
vnf_resources = objects.VnfResourceList.get_by_vnf_instance_id(
context, policy['vnf_instance_id'])
core_v1_api_client = self.kubernetes.get_core_v1_api_client(
auth=auth_cred)
app_v1_api_client = self.kubernetes.get_app_v1_api_client(
auth=auth_cred)
aspect_id = policy['name']
vdu_defs = policy['vdu_defs']
is_found = False
error_reason = None
for vnf_resource in vnf_resources:
name = vnf_resource.resource_name.\
split(COMMA_CHARACTER)[1]
for vdu_id, vdu_def in vdu_defs.items():
vdu_properties = vdu_def.get('properties')
if name == vdu_properties.get('name'):
namespace = vnf_resource.resource_name.\
split(COMMA_CHARACTER)[0]
kind = vnf_resource.resource_type.\
split(COMMA_CHARACTER)[1]
is_found = True
break
if is_found:
break
else:
error_reason = _(
"Target VnfResource for aspectId {aspect_id}"
" is not found in DB").format(
aspect_id=aspect_id)
raise vnfm.CNFScaleWaitFailed(reason=error_reason)
scale_info = self._call_read_scale_api(
app_v1_api_client=app_v1_api_client,
namespace=namespace,
name=name,
kind=kind)
status = 'Pending'
stack_retries = self.STACK_RETRIES
error_reason = None
while status == 'Pending' and stack_retries > 0:
pods_information = list()
respone = core_v1_api_client.list_namespaced_pod(
namespace=namespace)
for pod in respone.items:
match_result = self._is_match_pod_naming_rule(
kind, name, pod.metadata.name)
if match_result:
pods_information.append(pod)
status = self._get_pod_status(pods_information)
if status == 'Running' and \
scale_info.spec.replicas != len(pods_information):
status = 'Pending'
if status == 'Pending':
stack_retries = stack_retries - 1
time.sleep(self.STACK_RETRY_WAIT)
elif status == 'Unknown':
error_reason = _(
"CNF Scale failed caused by the Pod status"
" is Unknown")
raise vnfm.CNFScaleWaitFailed(reason=error_reason)
if stack_retries == 0 and status != 'Running':
error_reason = _(
"CNF Scale failed to complete within"
" {wait} seconds while waiting for the aspect_id"
" {aspect_id} to be scaled").format(
wait=(self.STACK_RETRIES *
self.STACK_RETRY_WAIT),
aspect_id=aspect_id)
LOG.error("CNF Scale failed: %(reason)s",
{'reason': error_reason})
raise vnfm.CNFScaleWaitFailed(reason=error_reason)
except Exception as e:
LOG.error('Scaling wait CNF got an error due to %s', e)
raise
finally:
self.clean_authenticate_vim(auth_cred, file_descriptor)
@log.log
def get_resource_info(self, plugin, context, vnf_info, auth_attr,
region_name=None):
# TODO(phuoc): will update it for other components
pass
def _get_auth_creds(self, auth_cred):
file_descriptor = self._create_ssl_ca_file(auth_cred)
if ('username' not in auth_cred) and ('password' not in auth_cred):
auth_cred['username'] = 'None'
auth_cred['password'] = None
return auth_cred, file_descriptor
def _create_ssl_ca_file(self, auth_attr):
ca_cert = utils.none_from_string(auth_attr.get('ssl_ca_cert'))
if ca_cert is not None:
file_descriptor, file_path = \
self.kubernetes.create_ca_cert_tmp_file(ca_cert)
auth_attr['ca_cert_file'] = file_path
return file_descriptor
else:
return None
def clean_authenticate_vim(self, vim_auth, file_descriptor):
# remove ca_cert_file from vim_obj if it exists
# close and delete temp ca_cert_file
if file_descriptor is not None:
file_path = vim_auth.pop('ca_cert_file')
self.kubernetes.close_tmp_file(file_descriptor, file_path)
def heal_vdu(self, plugin, context, vnf_dict, heal_request_data):
pass
def _get_target_k8s_files(self, instantiate_vnf_req):
if instantiate_vnf_req.additional_params and\
CNF_TARGET_FILES_KEY in\
instantiate_vnf_req.additional_params.keys():
target_k8s_files = instantiate_vnf_req.\
additional_params['lcm-kubernetes-def-files']
else:
target_k8s_files = list()
return target_k8s_files
def pre_instantiation_vnf(self, context, vnf_instance,
vim_connection_info, vnf_software_images,
instantiate_vnf_req, vnf_package_path):
vnf_resources = dict()
target_k8s_files = self._get_target_k8s_files(instantiate_vnf_req)
if not target_k8s_files:
# if artifact_files is not provided in request,
# we consider k8s info in provided by TOSCA-based VNFD
# and we will push the request to existed code
return vnf_resources
else:
vnfd = vnfd_obj.VnfPackageVnfd.get_by_id(
context, vnf_instance.vnfd_id)
package_uuid = vnfd.package_uuid
vnf_package = vnf_package_obj.VnfPackage.get_by_id(
context, package_uuid, expected_attrs=['vnf_artifacts'])
if vnf_package.vnf_artifacts:
vnf_artifacts = vnf_package.vnf_artifacts
length = len(vnf_artifacts)
for target_k8s_file in target_k8s_files:
for index, vnf_artifact in enumerate(vnf_artifacts):
if vnf_artifact.artifact_path == target_k8s_file:
break
if length > 1 and index < length - 1:
continue
LOG.debug('CNF Artifact {path} is not found.'.format(
path=target_k8s_file))
setattr(vnf_instance, 'vim_connection_info', [])
setattr(vnf_instance, 'task_state', None)
vnf_instance.save()
raise vnfm.CnfDefinitionNotFound(
path=target_k8s_file)
else:
LOG.debug('VNF Artifact {path} is not found.'.format(
path=vnf_package.vnf_artifacts))
setattr(vnf_instance, 'vim_connection_info', [])
setattr(vnf_instance, 'task_state', None)
vnf_instance.save()
raise exceptions.VnfArtifactNotFound(id=vnf_package.id)
for target_k8s_index, target_k8s_file \
in enumerate(target_k8s_files):
if ((urlparse(target_k8s_file).scheme == 'file') or
(bool(urlparse(target_k8s_file).scheme) and
bool(urlparse(target_k8s_file).netloc))):
file_content = urllib2.urlopen(target_k8s_file).read()
else:
target_k8s_file_path = os.path.join(
vnf_package_path, target_k8s_file)
with open(target_k8s_file_path, 'r') as f:
file_content = f.read()
file_content_dict_list = yaml.safe_load_all(file_content)
vnf_resources_temp = []
for file_content_dict in file_content_dict_list:
vnf_resource = vnf_resource_obj.VnfResource(
context=context)
vnf_resource.vnf_instance_id = vnf_instance.id
vnf_resource.resource_name = ','.join([
file_content_dict.get('metadata', {}).get(
'namespace', ''),
file_content_dict.get('metadata', {}).get(
'name', '')])
vnf_resource.resource_type = ','.join([
file_content_dict.get('apiVersion', ''),
file_content_dict.get('kind', '')])
vnf_resource.resource_identifier = ''
vnf_resource.resource_status = ''
vnf_resources_temp.append(vnf_resource)
vnf_resources[target_k8s_index] = vnf_resources_temp
return vnf_resources
def delete_vnf_instance_resource(self, context, vnf_instance,
vim_connection_info, vnf_resource):
pass
def instantiate_vnf(self, context, vnf_instance, vnfd_dict,
vim_connection_info, instantiate_vnf_req,
grant_response, vnf_package_path,
plugin=None):
target_k8s_files = self._get_target_k8s_files(instantiate_vnf_req)
auth_attr = vim_connection_info.access_info
if not target_k8s_files:
# The case is based on TOSCA for CNF operation.
# It is out of the scope of this patch.
instance_id = self.create(
None, context, vnf_instance, auth_attr)
return instance_id
else:
auth_cred, file_descriptor = self._get_auth_creds(auth_attr)
k8s_client_dict = self.kubernetes.get_k8s_client_dict(auth_cred)
transformer = translate_outputs.Transformer(
None, None, None, k8s_client_dict)
deployment_dict_list = list()
k8s_objs = transformer.\
get_k8s_objs_from_yaml(target_k8s_files, vnf_package_path)
k8s_objs = transformer.deploy_k8s(k8s_objs)
k8s_objs = self.create_wait_k8s(
k8s_objs, k8s_client_dict, vnf_instance)
for k8s_obj in k8s_objs:
deployment_dict = dict()
deployment_dict['namespace'] = k8s_obj.get('namespace')
if k8s_obj.get('object').metadata:
deployment_dict['name'] = k8s_obj.get('object').\
metadata.name
else:
deployment_dict['name'] = ''
deployment_dict['apiVersion'] = k8s_obj.get(
'object').api_version
deployment_dict['kind'] = k8s_obj.get('object').kind
deployment_dict['status'] = k8s_obj.get('status')
deployment_dict_list.append(deployment_dict)
deployment_str_list = [str(x) for x in deployment_dict_list]
# all the deployment object will store into resource_info_str.
# and the instance_id is created from all deployment_dict.
resource_info_str = ';'.join(deployment_str_list)
self.clean_authenticate_vim(auth_cred, file_descriptor)
vnfd_dict['instance_id'] = resource_info_str
return resource_info_str
def post_vnf_instantiation(self, context, vnf_instance,
vim_connection_info, instantiate_vnf_req):
"""Initially store VnfcResourceInfo after instantiation
After instantiation, this function gets pods information from
Kubernetes VIM and store information such as pod name and resource kind
and metadata, and vdu id.
"""
auth_attr = vim_connection_info.access_info
auth_cred, file_descriptor = self._get_auth_creds(auth_attr)
try:
# get Kubernetes object files
target_k8s_files = self._get_target_k8s_files(instantiate_vnf_req)
vnf_package_path = vnflcm_utils._get_vnf_package_path(
context, vnf_instance.vnfd_id)
# initialize Transformer
transformer = translate_outputs.Transformer(
None, None, None, None)
# get Kubernetes object
k8s_objs = transformer.get_k8s_objs_from_yaml(
target_k8s_files, vnf_package_path)
# get TOSCA node templates
vnfd_dict = vnflcm_utils._get_vnfd_dict(
context, vnf_instance.vnfd_id,
vnf_instance.instantiated_vnf_info.flavour_id)
tosca = tosca_template.ToscaTemplate(
parsed_params={}, a_file=False, yaml_dict_tpl=vnfd_dict)
tosca_node_tpls = tosca.topology_template.nodetemplates
# get vdu_ids dict {vdu_name(as pod_name): vdu_id}
vdu_ids = {}
for node_tpl in tosca_node_tpls:
for node_name, node_value in node_tpl.templates.items():
if node_value.get('type') == "tosca.nodes.nfv.Vdu.Compute":
vdu_id = node_name
vdu_name = node_value.get('properties').get('name')
vdu_ids[vdu_name] = vdu_id
# initialize Kubernetes APIs
core_v1_api_client = self.kubernetes.get_core_v1_api_client(
auth=auth_cred)
target_kinds = ["Pod", "Deployment", "DaemonSet", "StatefulSet",
"ReplicaSet"]
pod_list_dict = {}
vnfc_resource_list = []
for k8s_obj in k8s_objs:
rsc_kind = k8s_obj.get('object').kind
if rsc_kind not in target_kinds:
# Skip if rsc_kind is not target kind
continue
rsc_name = k8s_obj.get('object').metadata.name
namespace = k8s_obj.get('object').metadata.namespace
if not namespace:
namespace = "default"
# get V1PodList by namespace
if namespace in pod_list_dict.keys():
pod_list = pod_list_dict.get(namespace)
else:
pod_list = core_v1_api_client.list_namespaced_pod(
namespace=namespace)
pod_list_dict[namespace] = pod_list
# get initially store VnfcResourceInfo after instantiation
for pod in pod_list.items:
pod_name = pod.metadata.name
match_result = self._is_match_pod_naming_rule(
rsc_kind, rsc_name, pod_name)
if match_result:
# get metadata
metadata = {}
metadata[rsc_kind] = jsonutils.dumps(
k8s_obj.get('object').metadata.to_dict())
if rsc_kind != 'Pod':
metadata['Pod'] = jsonutils.dumps(
k8s_obj.get('object').spec.template.metadata.
to_dict())
# generate VnfcResourceInfo
vnfc_resource = objects.VnfcResourceInfo()
vnfc_resource.id = uuidutils.generate_uuid()
vnfc_resource.vdu_id = vdu_ids.get(rsc_name)
resource = objects.ResourceHandle()
resource.resource_id = pod_name
resource.vim_level_resource_type = rsc_kind
vnfc_resource.compute_resource = resource
vnfc_resource.metadata = metadata
vnfc_resource_list.append(vnfc_resource)
if vnfc_resource_list:
inst_vnf_info = vnf_instance.instantiated_vnf_info
inst_vnf_info.vnfc_resource_info = vnfc_resource_list
except Exception as e:
LOG.error('Update vnfc resource info got an error due to %s', e)
raise
finally:
self.clean_authenticate_vim(auth_cred, file_descriptor)
def _get_vnfc_rscs_with_vnfc_id(self, inst_vnf_info, heal_vnf_request):
if not heal_vnf_request.vnfc_instance_id:
# include all vnfc resources
return [resource for resource in inst_vnf_info.vnfc_resource_info]
vnfc_resources = []
for vnfc_resource in inst_vnf_info.vnfc_resource_info:
if vnfc_resource.id in heal_vnf_request.vnfc_instance_id:
vnfc_resources.append(vnfc_resource)
return vnfc_resources
def _get_added_pod_names(self, core_v1_api_client, inst_vnf_info, vdu_id,
vnfc_resource, pod_list_dict):
compute_resource = vnfc_resource.compute_resource
rsc_kind = compute_resource.vim_level_resource_type
rsc_metadata = jsonutils.loads(
vnfc_resource.metadata.get(rsc_kind))
namespace = rsc_metadata.get('namespace')
if not namespace:
namespace = "default"
rsc_name = rsc_metadata.get('name')
# Get pod list from kubernetes
if namespace in pod_list_dict.keys():
pod_list = pod_list_dict.get(namespace)
else:
pod_list = core_v1_api_client.list_namespaced_pod(
namespace=namespace)
pod_list_dict[namespace] = pod_list
# Sort by newest creation_timestamp
sorted_pod_list = sorted(pod_list.items, key=lambda x:
x.metadata.creation_timestamp, reverse=True)
# Get the associated pod name that runs with the actual kubernetes
actual_pod_names = list()
for pod in sorted_pod_list:
match_result = self._is_match_pod_naming_rule(
rsc_kind, rsc_name, pod.metadata.name)
if match_result:
actual_pod_names.append(pod.metadata.name)
# Get the associated pod name stored in vnfcResourceInfo
stored_pod_names = []
for vnfc_rsc_info in inst_vnf_info.vnfc_resource_info:
if vnfc_rsc_info.vdu_id == vnfc_resource.vdu_id:
stored_pod_names.append(
vnfc_rsc_info.compute_resource.resource_id)
# Get the added pod name that does not exist in vnfcResourceInfo
added_pod_names = [
actl_pn for actl_pn in actual_pod_names
if actl_pn not in stored_pod_names
]
return actual_pod_names, added_pod_names
def heal_vnf(self, context, vnf_instance, vim_connection_info,
heal_vnf_request):
"""Heal function
This function heals vnfc instances (mapped as Pod),
and update vnfcResourceInfo which are not the target of healing
before healing operation.
"""
# initialize Kubernetes APIs
auth_attr = vim_connection_info.access_info
auth_cred, file_descriptor = self._get_auth_creds(auth_attr)
inst_vnf_info = vnf_instance.instantiated_vnf_info
try:
core_v1_api_client = self.kubernetes.get_core_v1_api_client(
auth=auth_cred)
# get vnfc_resource_info list for healing
vnfc_resources = self._get_vnfc_rscs_with_vnfc_id(
inst_vnf_info=inst_vnf_info,
heal_vnf_request=heal_vnf_request
)
# Updates resource_id in vnfc_resource_info which are not the
# target of healing before heal operation because they may have
# been re-created by kubelet of Kubernetes automatically and their
# resource_id (as Pod name) have been already changed
updated_vdu_ids = []
pod_list_dict = {}
for vnfc_resource in vnfc_resources:
vdu_id = vnfc_resource.vdu_id
if vdu_id in updated_vdu_ids:
# For updated vdu_id, go to the next Loop
continue
actual_pod_names, added_pod_names = self._get_added_pod_names(
core_v1_api_client, inst_vnf_info, vdu_id, vnfc_resource,
pod_list_dict)
if added_pod_names:
heal_target_ids = heal_vnf_request.vnfc_instance_id
for vnfc_rsc in inst_vnf_info.vnfc_resource_info:
stored_pod_name = vnfc_rsc.compute_resource.resource_id
# Updated vnfcResourceInfo of the same vdu_id other
# than heal target
if (vnfc_rsc.id not in heal_target_ids) and\
(vdu_id == vnfc_rsc.vdu_id) and\
(stored_pod_name not in actual_pod_names):
pod_name = added_pod_names.pop()
vnfc_rsc.compute_resource.resource_id = pod_name
LOG.warning("Update resource_id before healing,"
" vnfc_resource_info.id:%(vnfc_id)s,"
" pod_name:%(pod_name)s",
{'vnfc_id': vnfc_rsc.id,
'pod_name': pod_name})
if not added_pod_names:
break
updated_vdu_ids.append(vdu_id)
for vnfc_resource in vnfc_resources:
body = client.V1DeleteOptions(propagation_policy='Foreground')
compute_resource = vnfc_resource.compute_resource
rsc_kind = compute_resource.vim_level_resource_type
pod_name = compute_resource.resource_id
rsc_metadata = jsonutils.loads(
vnfc_resource.metadata.get(rsc_kind))
namespace = rsc_metadata.get('namespace')
if not namespace:
namespace = "default"
if rsc_kind == 'Pod':
rsc_name = rsc_metadata.get('name')
# Get pod information for re-creation before deletion
pod_info = core_v1_api_client.read_namespaced_pod(
namespace=namespace,
name=rsc_name
)
# Delete Pod
core_v1_api_client.delete_namespaced_pod(
namespace=namespace,
name=pod_name,
body=body
)
# Check and wait that the Pod is deleted
stack_retries = self.STACK_RETRIES
for cnt in range(self.STACK_RETRIES):
try:
core_v1_api_client.read_namespaced_pod(
namespace=namespace,
name=pod_name
)
except Exception as e:
if e.status == 404:
break
else:
error_reason = _("Failed the request to read a"
" Pod information. namespace: {namespace},"
" pod_name: {name}, kind: {kind}, Reason: "
"{exception}").format(
namespace=namespace, name=pod_name,
kind=rsc_kind, exception=e)
raise vnfm.CNFHealFailed(reason=error_reason)
stack_retries = stack_retries - 1
time.sleep(self.STACK_RETRY_WAIT)
# Number of retries exceeded retry count
if stack_retries == 0:
error_reason = _("Resource healing is not completed"
"within {wait} seconds").format(wait=(
self.STACK_RETRIES * self.STACK_RETRY_WAIT))
LOG.error("CNF Healing failed: %(reason)s",
{'reason': error_reason})
raise vnfm.CNFHealFailed(reason=error_reason)
# Recreate pod using retained pod_info
transformer = translate_outputs.Transformer(
None, None, None, None)
metadata = transformer.get_object_meta(rsc_metadata)
body = client.V1Pod(metadata=metadata, spec=pod_info.spec)
core_v1_api_client.create_namespaced_pod(
namespace=namespace,
body=body
)
elif (rsc_kind in ['Deployment', 'DaemonSet', 'StatefulSet',
'ReplicaSet']):
try:
# Delete Pod (Pod is automatically re-created)
core_v1_api_client.delete_namespaced_pod(
namespace=namespace,
name=pod_name,
body=body
)
except Exception as e:
if e.status == 404:
# If when the pod to be deleted does not exist,
# change resource_id to "POD_NOT_FOUND"
compute_resource = vnfc_resource.compute_resource
compute_resource.resource_id = VNFC_POD_NOT_FOUND
LOG.warning("Target pod to delete is not found,"
" vnfc_resource_info.id:%(vnfc_id)s,"
" pod_name:%(pod_name)s",
{'vnfc_id': vnfc_resource.id,
'pod_name': pod_name})
else:
error_reason = _("Failed the request to delete a "
"Pod. namespace: {namespace}, pod_name: {name}"
", kind: {kind}, Reason: {exception}").format(
namespace=namespace, name=pod_name,
kind=rsc_kind, exception=e)
raise vnfm.CNFHealFailed(reason=error_reason)
else:
error_reason = _(
"{vnfc_instance_id} is a kind of Kubertnetes"
" resource that is not covered").format(
vnfc_instance_id=vnfc_resource.id)
LOG.error("CNF Heal failed: %(reason)s",
{'reason': error_reason})
raise vnfm.CNFHealFailed(reason=error_reason)
except Exception as e:
LOG.error('Healing CNF got an error due to %s', e)
raise
finally:
self.clean_authenticate_vim(auth_cred, file_descriptor)
def heal_vnf_wait(self, context, vnf_instance,
vim_connection_info, heal_vnf_request):
"""heal wait function
Wait until all status from Pod objects is RUNNING.
"""
# initialize Kubernetes APIs
auth_attr = vim_connection_info.access_info
auth_cred, file_descriptor = self._get_auth_creds(auth_attr)
try:
core_v1_api_client = self.kubernetes.get_core_v1_api_client(
auth=auth_cred)
app_v1_api_client = self.kubernetes.get_app_v1_api_client(
auth=auth_cred)
vnfc_resources = self._get_vnfc_rscs_with_vnfc_id(
inst_vnf_info=vnf_instance.instantiated_vnf_info,
heal_vnf_request=heal_vnf_request)
# Exclude entries where pods were not found when heal
vnfc_resources = [rsc for rsc in vnfc_resources
if rsc.compute_resource.
resource_id != VNFC_POD_NOT_FOUND]
if not vnfc_resources:
# If heal is not running, wait is no need
return
# Get kubernetes resource information from target vnfcResourceInfo
k8s_resources = list()
for vnfc_resource in vnfc_resources:
info = {}
compute_resource = vnfc_resource.compute_resource
info['kind'] = compute_resource.vim_level_resource_type
rsc_metadata = jsonutils.loads(
vnfc_resource.metadata.get(info['kind']))
info['name'] = rsc_metadata.get('name')
info['namespace'] = rsc_metadata.get('namespace')
k8s_resources.append(info)
# exclude duplicate entries
k8s_resources = list(map(jsonutils.loads,
set(map(jsonutils.dumps, k8s_resources))))
# get replicas of scalable resources for checking number of pod
scalable_kinds = ["Deployment", "ReplicaSet", "StatefulSet"]
for k8s_resource in k8s_resources:
if k8s_resource.get('kind') in scalable_kinds:
scale_info = self._call_read_scale_api(
app_v1_api_client=app_v1_api_client,
namespace=k8s_resource.get('namespace'),
name=k8s_resource.get('name'),
kind=k8s_resource.get('kind'))
k8s_resource['replicas'] = scale_info.spec.replicas
stack_retries = self.STACK_RETRIES
status = 'Pending'
while status == 'Pending' and stack_retries > 0:
pods_information = []
pod_list_dict = {}
is_unmatch_pods_num = False
# Get related pod information and check status
for k8s_resource in k8s_resources:
namespace = k8s_resource.get('namespace')
if namespace in pod_list_dict.keys():
pod_list = pod_list_dict.get(namespace)
else:
pod_list = core_v1_api_client.list_namespaced_pod(
namespace=k8s_resource.get('namespace'))
pod_list_dict[namespace] = pod_list
tmp_pods_info = list()
for pod in pod_list.items:
match_result = self._is_match_pod_naming_rule(
k8s_resource.get('kind'),
k8s_resource.get('name'),
pod.metadata.name)
if match_result:
tmp_pods_info.append(pod)
# NOTE(ueha): The status of pod being deleted is retrieved
# as "Running", which cause incorrect information to be
# stored in vnfcResouceInfo. Therefore, for the scalable
# kinds, by comparing the actual number of pods with the
# replicas, it can wait until the pod deletion is complete
# and store correct information to vnfcResourceInfo.
if k8s_resource.get('kind') in scalable_kinds and \
k8s_resource.get('replicas') != len(tmp_pods_info):
LOG.warning("Unmatch number of pod. (kind: %(kind)s,"
" name: %(name)s, replicas: %(replicas)s,"
" actual_pod_num: %(actual_pod_num)s)", {
'kind': k8s_resource.get('kind'),
'name': k8s_resource.get('name'),
'replicas': str(k8s_resource.get('replicas')),
'actual_pod_num': str(len(tmp_pods_info))})
is_unmatch_pods_num = True
pods_information.extend(tmp_pods_info)
status = self._get_pod_status(pods_information)
if status == 'Unknown':
error_reason = _("Pod status is found Unknown")
LOG.warning("CNF Healing failed: %(reason)s",
{'reason': error_reason})
raise vnfm.CNFHealWaitFailed(reason=error_reason)
elif status == 'Pending' or is_unmatch_pods_num:
time.sleep(self.STACK_RETRY_WAIT)
stack_retries = stack_retries - 1
status = 'Pending'
if stack_retries == 0 and status != 'Running':
error_reason = _("Resource healing is not completed within"
" {wait} seconds").format(
wait=(self.STACK_RETRIES *
self.STACK_RETRY_WAIT))
LOG.error("CNF Healing failed: %(reason)s",
{'reason': error_reason})
raise vnfm.CNFHealWaitFailed(reason=error_reason)
except Exception as e:
LOG.error('Healing wait CNF got an error due to %s', e)
raise
finally:
self.clean_authenticate_vim(auth_cred, file_descriptor)
def post_heal_vnf(self, context, vnf_instance, vim_connection_info,
heal_vnf_request):
"""Update VnfcResourceInfo after healing"""
# initialize Kubernetes APIs
auth_attr = vim_connection_info.access_info
auth_cred, file_descriptor = self._get_auth_creds(auth_attr)
inst_vnf_info = vnf_instance.instantiated_vnf_info
try:
core_v1_api_client = self.kubernetes.get_core_v1_api_client(
auth=auth_cred)
vnfc_resources = self._get_vnfc_rscs_with_vnfc_id(
inst_vnf_info=inst_vnf_info,
heal_vnf_request=heal_vnf_request
)
# initialize
updated_vdu_ids = []
pod_list_dict = {}
for vnfc_resource in vnfc_resources:
vdu_id = vnfc_resource.vdu_id
if vdu_id in updated_vdu_ids:
# For updated vdu_id, go to the next Loop
continue
compute_resource = vnfc_resource.compute_resource
rsc_kind = compute_resource.vim_level_resource_type
pod_name = compute_resource.resource_id
if rsc_kind == 'Pod' or rsc_kind == 'StatefulSet':
# No update required as the pod name does not change
continue
# Update vnfcResourceInfo when other rsc_kind
# (Deployment, DaemonSet, ReplicaSet)
actual_pod_names, added_pod_names = self._get_added_pod_names(
core_v1_api_client, inst_vnf_info, vdu_id, vnfc_resource,
pod_list_dict)
updated_vnfc_ids = []
# Update entries that pod was not found when heal_vnf method
if added_pod_names:
for vnfc_rsc in vnfc_resources:
rsc_id = vnfc_rsc.compute_resource.resource_id
if vdu_id == vnfc_rsc.vdu_id and \
rsc_id == VNFC_POD_NOT_FOUND:
pod_name = added_pod_names.pop()
vnfc_rsc.compute_resource.resource_id = pod_name
LOG.warning("Update resource_id of the"
" entry where the pod was not found,"
" vnfc_resource_info.id:%(vnfc_id)s,"
" new podname:%(pod_name)s",
{'vnfc_id': vnfc_rsc.id,
'pod_name': pod_name})
updated_vnfc_ids.append(vnfc_rsc.id)
if not added_pod_names:
break
# Update entries that was healed successful
if added_pod_names:
for vnfc_rsc_id in heal_vnf_request.vnfc_instance_id:
if vnfc_rsc_id in updated_vnfc_ids:
# If the entry has already been updated,
# go to the next loop
continue
for vnfc_rsc in vnfc_resources:
if vdu_id == vnfc_rsc.vdu_id and \
vnfc_rsc_id == vnfc_rsc.id:
pod_name = added_pod_names.pop()
compute_resource = vnfc_rsc.compute_resource
compute_resource.resource_id = pod_name
if not added_pod_names:
break
updated_vdu_ids.append(vdu_id)
except Exception as e:
LOG.error('Post healing CNF got an error due to %s', e)
raise
finally:
self.clean_authenticate_vim(auth_cred, file_descriptor)
def change_ext_conn_vnf(self, context, vnf_instance, vnf_dict,
vim_connection_info, change_ext_conn_req):
raise NotImplementedError()
def change_ext_conn_vnf_wait(self, context, vnf_instance,
vim_connection_info):
raise NotImplementedError()
def post_change_ext_conn_vnf(self, context, vnf_instance,
vim_connection_info):
raise NotImplementedError()
def get_scale_ids(self,
plugin,
context,
vnf_dict,
auth_attr,
region_name):
return_id_list = []
return return_id_list
def get_scale_in_ids(self,
plugin,
context,
vnf_dict,
is_reverse,
auth_attr,
region_name,
number_of_steps):
return_id_list = []
return_name_list = []
return_grp_id = None
return_res_num = None
return return_id_list, return_name_list, return_grp_id, return_res_num
def scale_resource_update(self, context, vnf_instance,
scale_vnf_request,
vim_connection_info):
"""Update VnfcResourceInfo after scaling"""
auth_attr = vim_connection_info.access_info
auth_cred, file_descriptor = self._get_auth_creds(auth_attr)
inst_vnf_info = vnf_instance.instantiated_vnf_info
try:
# initialize Kubernetes APIs
core_v1_api_client = self.kubernetes.get_core_v1_api_client(
auth=auth_cred)
vnf_resources = objects.VnfResourceList.get_by_vnf_instance_id(
context, vnf_instance.id)
# get scale target informations
vnfd_dict = vnflcm_utils._get_vnfd_dict(context,
vnf_instance.vnfd_id,
inst_vnf_info.flavour_id)
tosca = tosca_template.ToscaTemplate(parsed_params={},
a_file=False,
yaml_dict_tpl=vnfd_dict)
extract_policy_infos = vnflcm_utils.get_extract_policy_infos(tosca)
vdu_defs = vnflcm_utils.get_target_vdu_def_dict(
extract_policy_infos=extract_policy_infos,
aspect_id=scale_vnf_request.aspect_id,
tosca=tosca)
is_found = False
for vnf_resource in vnf_resources:
# For CNF operations, Kubernetes resource information is
# stored in vnfc_resource as follows:
# - resource_name : "namespace,name"
# - resource_type : "api_version,kind"
rsc_name = vnf_resource.resource_name.split(',')[1]
for vdu_id, vdu_def in vdu_defs.items():
vdu_properties = vdu_def.get('properties')
if rsc_name == vdu_properties.get('name'):
is_found = True
namespace = vnf_resource.resource_name.split(',')[0]
rsc_kind = vnf_resource.resource_type.split(',')[1]
target_vdu_id = vdu_id
break
if is_found:
break
# extract stored Pod names by vdu_id
stored_pod_list = []
metadata = None
for vnfc_resource in inst_vnf_info.vnfc_resource_info:
if vnfc_resource.vdu_id == target_vdu_id:
stored_pod_list.append(
vnfc_resource.compute_resource.resource_id)
if not metadata:
# get metadata for new VnfcResourceInfo entry
metadata = vnfc_resource.metadata
# get actual Pod name list
pod_list = core_v1_api_client.list_namespaced_pod(
namespace=namespace)
actual_pod_list = []
for pod in pod_list.items:
match_result = self._is_match_pod_naming_rule(
rsc_kind, rsc_name, pod.metadata.name)
if match_result:
actual_pod_list.append(pod.metadata.name)
# Remove the reduced pods from VnfcResourceInfo
del_index = []
for index, vnfc in enumerate(inst_vnf_info.vnfc_resource_info):
if vnfc.compute_resource.resource_id not in actual_pod_list \
and vnfc.vdu_id == target_vdu_id:
del_index.append(index)
for ind in reversed(del_index):
inst_vnf_info.vnfc_resource_info.pop(ind)
# Add the increased pods to VnfcResourceInfo
for actual_pod_name in actual_pod_list:
if actual_pod_name not in stored_pod_list:
add_vnfc_resource = objects.VnfcResourceInfo()
add_vnfc_resource.id = uuidutils.generate_uuid()
add_vnfc_resource.vdu_id = target_vdu_id
resource = objects.ResourceHandle()
resource.resource_id = actual_pod_name
resource.vim_level_resource_type = rsc_kind
add_vnfc_resource.compute_resource = resource
add_vnfc_resource.metadata = metadata
inst_vnf_info.vnfc_resource_info.append(
add_vnfc_resource)
except Exception as e:
LOG.error('Update vnfc resource info got an error due to %s', e)
raise
finally:
self.clean_authenticate_vim(auth_cred, file_descriptor)
def scale_in_reverse(self,
context,
plugin,
auth_attr,
vnf_info,
scale_vnf_request,
region_name,
scale_name_list,
grp_id):
# NOTE(ueha): The `is_reverse` option is not supported in kubernetes
# VIM, and returns an error response to the user if `is_reverse` is
# true. However, since this method is called in the sequence of
# rollback operation, implementation is required.
vnf_instance_id = vnf_info['vnf_lcm_op_occ'].vnf_instance_id
aspect_id = scale_vnf_request.aspect_id
vnf_instance = objects.VnfInstance.get_by_id(context, vnf_instance_id)
vnfd_dict = vnflcm_utils._get_vnfd_dict(context,
vnf_instance.vnfd_id,
vnf_instance.instantiated_vnf_info.flavour_id)
tosca = tosca_template.ToscaTemplate(parsed_params={}, a_file=False,
yaml_dict_tpl=vnfd_dict)
extract_policy_infos = vnflcm_utils.get_extract_policy_infos(tosca)
policy = dict()
policy['name'] = aspect_id
policy['action'] = 'in'
policy['vnf_instance_id'] = vnf_instance_id
policy['vdu_defs'] = vnflcm_utils.get_target_vdu_def_dict(
extract_policy_infos=extract_policy_infos,
aspect_id=scale_vnf_request.aspect_id,
tosca=tosca)
policy['delta_num'] = vnflcm_utils.get_scale_delta_num(
extract_policy_infos=extract_policy_infos,
aspect_id=scale_vnf_request.aspect_id)
self.scale(context, plugin, auth_attr, policy, region_name)
def scale_out_initial(self,
context,
plugin,
auth_attr,
vnf_info,
scale_vnf_request,
region_name):
pass
def scale_update_wait(self,
context,
plugin,
auth_attr,
vnf_info,
region_name):
lcm_op_occ = vnf_info.get('vnf_lcm_op_occ')
vnf_instance_id = lcm_op_occ.get('vnf_instance_id')
operation_params = jsonutils.loads(lcm_op_occ.get('operation_params'))
scale_vnf_request = objects.ScaleVnfRequest.obj_from_primitive(
operation_params, context=context)
aspect_id = scale_vnf_request.aspect_id
vnf_instance = objects.VnfInstance.get_by_id(context, vnf_instance_id)
vnfd_dict = vnflcm_utils._get_vnfd_dict(context,
vnf_instance.vnfd_id,
vnf_instance.instantiated_vnf_info.flavour_id)
tosca = tosca_template.ToscaTemplate(parsed_params={}, a_file=False,
yaml_dict_tpl=vnfd_dict)
extract_policy_infos = vnflcm_utils.get_extract_policy_infos(tosca)
policy = dict()
policy['name'] = aspect_id
policy['vnf_instance_id'] = lcm_op_occ.get('vnf_instance_id')
policy['vdu_defs'] = vnflcm_utils.get_target_vdu_def_dict(
extract_policy_infos=extract_policy_infos,
aspect_id=scale_vnf_request.aspect_id,
tosca=tosca)
self.scale_wait(context, plugin, auth_attr, policy,
region_name, None)
def get_cinder_list(self,
vnf_info):
pass
def get_grant_resource(self,
vnf_instance,
vnf_info,
scale_vnf_request,
placement_obj_list,
vim_connection_info,
del_list):
pass
def get_rollback_ids(self,
plugin,
context,
vnf_dict,
aspect_id,
auth_attr,
region_name):
return_id_list = []
return_name_list = []
return_grp_id = None
return return_id_list, return_name_list, return_grp_id