Support PVs for k8s cluster with MgmtDriver

Support deploying Kubernetes cluster VNF which has a storage server
with Cinder volume. This feature enables users to deploy CNF which
has PersistentVolume on it. The following changes are added for
MgmtDriver.

1.Create a storage server VM with Cinder volume.
2.Expose Cinder volume as NFS shared directories in the storage
server.
3.Register NFS shared directories as Kubernetes PersistentVolumes.
4.Install NFS client on newly created Master/Worker VMs in all LCM
operations.

Implements: blueprint pv-k8s-cluster
Change-Id: Iaab9b6393e6f2007c637f83a82de0d27cf9db257
This commit is contained in:
Yi Feng 2021-08-26 19:18:40 +09:00
parent a7de7a589b
commit 6929cbc3f1
2 changed files with 543 additions and 21 deletions

View File

@ -0,0 +1,12 @@
---
features:
- |
Support deploying Kubernetes cluster VNF which has a storage server with
Cinder volume. This feature enables users to deploy CNF which has
PersistentVolume on it. The following changes are added for MgmtDriver.
- Create a storage server VM with Cinder volume.
- Expose Cinder volume as NFS shared directories in the storage server.
- Register NFS shared directories as Kubernetes PersistentVolumes.
- Install NFS client on newly created Master/Worker VMs in all LCM
operations.

View File

@ -19,6 +19,7 @@ import json
import os import os
import re import re
import time import time
import yaml
from oslo_log import log as logging from oslo_log import log as logging
from oslo_utils import uuidutils from oslo_utils import uuidutils
@ -34,13 +35,18 @@ from tacker.vnflcm import utils as vnflcm_utils
from tacker.vnfm.infra_drivers.openstack import heat_client as hc from tacker.vnfm.infra_drivers.openstack import heat_client as hc
from tacker.vnfm.mgmt_drivers import vnflcm_abstract_driver from tacker.vnfm.mgmt_drivers import vnflcm_abstract_driver
LOG = logging.getLogger(__name__) CHECK_PV_AVAILABLE_RETRY = 5
K8S_CMD_TIMEOUT = 30 CHECK_PV_DEL_COMPLETE_RETRY = 5
K8S_INSTALL_TIMEOUT = 2700 CONNECT_MASTER_RETRY_TIMES = 4
HELM_CMD_TIMEOUT = 30 HELM_CMD_TIMEOUT = 30
HELM_INSTALL_TIMEOUT = 300 HELM_INSTALL_TIMEOUT = 300
HELM_CHART_DIR = "/var/tacker/helm" HELM_CHART_DIR = "/var/tacker/helm"
HELM_CHART_CMP_PATH = "/tmp/tacker-helm.tgz" HELM_CHART_CMP_PATH = "/tmp/tacker-helm.tgz"
K8S_CMD_TIMEOUT = 30
K8S_INSTALL_TIMEOUT = 2700
LOG = logging.getLogger(__name__)
NFS_CMD_TIMEOUT = 30
NFS_INSTALL_TIMEOUT = 2700
SERVER_WAIT_COMPLETE_TIME = 60 SERVER_WAIT_COMPLETE_TIME = 60
# CLI timeout period when setting private registries connection # CLI timeout period when setting private registries connection
@ -452,6 +458,21 @@ class KubernetesMgmtDriver(vnflcm_abstract_driver.VnflcmMgmtAbstractDriver):
raise paramiko.SSHException() raise paramiko.SSHException()
time.sleep(SERVER_WAIT_COMPLETE_TIME) time.sleep(SERVER_WAIT_COMPLETE_TIME)
def _init_commander(self, user, password, host, retry=4):
while retry > 0:
try:
commander = cmd_executer.RemoteCommandExecutor(
user=user, password=password, host=host,
timeout=K8S_INSTALL_TIMEOUT)
return commander
except Exception as e:
LOG.debug(e)
retry -= 1
if retry == 0:
LOG.error(e)
raise
time.sleep(SERVER_WAIT_COMPLETE_TIME)
def _get_vm_cidr_list(self, master_ip, proxy): def _get_vm_cidr_list(self, master_ip, proxy):
# ha and scale: get vm cidr list # ha and scale: get vm cidr list
vm_cidr_list = [] vm_cidr_list = []
@ -977,6 +998,48 @@ class KubernetesMgmtDriver(vnflcm_abstract_driver.VnflcmMgmtAbstractDriver):
return vim_connection_info return vim_connection_info
def _check_ss_installation_params(self, ss_installation_params):
first_level_keys = ('ssh_cp_name', 'username', 'password')
for key in first_level_keys:
if not ss_installation_params.get(key):
LOG.error(f"The {key} in the ss_installation_params "
"does not exist.")
raise exceptions.MgmtDriverNotFound(
param=key)
cinder_volume_setup_params = \
ss_installation_params.get('cinder_volume_setup_params')
cinder_volume_setup_param_keys = ('volume_resource_id', 'mount_to')
for cinder_volume_setup_param in cinder_volume_setup_params:
for key in cinder_volume_setup_param_keys:
if not cinder_volume_setup_param.get(key):
LOG.error(f"The {key} in the cinder_volume_setup_params "
"does not exist.")
raise exceptions.MgmtDriverNotFound(
param=key)
nfs_server_setup_params = \
ss_installation_params.get('nfs_server_setup_params')
nfs_server_setup_param_keys = ('export_dir', 'export_to')
for nfs_server_setup_param in nfs_server_setup_params:
for key in nfs_server_setup_param_keys:
if not nfs_server_setup_param.get(key):
LOG.error(f"The {key} in the nfs_server_setup_params "
"does not exist.")
raise exceptions.MgmtDriverNotFound(
param=key)
def _check_pv_registration_params(self, pv_registration_params):
pv_registration_param_keys = ('pv_manifest_file_path',
'nfs_server_cp')
for pv_registration_param in pv_registration_params:
for key in pv_registration_param_keys:
if not pv_registration_param.get(key):
LOG.error(f"The {key} in the pv_registration_params "
"does not exist.")
raise exceptions.MgmtDriverNotFound(
param=key)
def instantiate_end(self, context, vnf_instance, def instantiate_end(self, context, vnf_instance,
instantiate_vnf_request, grant, instantiate_vnf_request, grant,
grant_request, **kwargs): grant_request, **kwargs):
@ -995,6 +1058,7 @@ class KubernetesMgmtDriver(vnflcm_abstract_driver.VnflcmMgmtAbstractDriver):
context, vnf_instance) context, vnf_instance)
additional_param = instantiate_vnf_request.additional_params.get( additional_param = instantiate_vnf_request.additional_params.get(
'k8s_cluster_installation_param', {}) 'k8s_cluster_installation_param', {})
self._check_values(additional_param)
script_path = additional_param.get('script_path') script_path = additional_param.get('script_path')
vim_name = additional_param.get('vim_name') vim_name = additional_param.get('vim_name')
master_node = additional_param.get('master_node', {}) master_node = additional_param.get('master_node', {})
@ -1028,6 +1092,15 @@ class KubernetesMgmtDriver(vnflcm_abstract_driver.VnflcmMgmtAbstractDriver):
else: else:
additional_param['master_node']['cluster_cidr'] = '10.96.0.0/12' additional_param['master_node']['cluster_cidr'] = '10.96.0.0/12'
# check storage_server/pv_registration_params
ss_installation_params = additional_param.get('storage_server')
pv_registration_params = \
additional_param.get('pv_registration_params')
if ss_installation_params:
self._check_ss_installation_params(ss_installation_params)
if pv_registration_params:
self._check_pv_registration_params(pv_registration_params)
# get private_registry_connection_info param # get private_registry_connection_info param
pr_connection_info = additional_param.get( pr_connection_info = additional_param.get(
'private_registry_connection_info') 'private_registry_connection_info')
@ -1085,11 +1158,48 @@ class KubernetesMgmtDriver(vnflcm_abstract_driver.VnflcmMgmtAbstractDriver):
helm_inst_script_path, helm_inst_script_path,
pr_connection_info) pr_connection_info)
if ss_installation_params:
self._install_storage_server(context, vnf_instance, proxy,
ss_installation_params)
for vm_dict in master_vm_dict_list + worker_vm_dict_list:
self._install_nfs_client(
vm_dict.get('ssh', {}).get('username'),
vm_dict.get('ssh', {}).get('password'),
vm_dict.get('ssh', {}).get('ipaddr'))
commander, master_vm_dict = \
self._get_connect_master(master_vm_dict_list)
self._register_persistent_volumes(
context, vnf_instance, commander,
master_vm_dict.get('ssh', {}).get('username'),
master_vm_dict.get('ssh', {}).get('password'),
master_vm_dict.get('ssh', {}).get('ipaddr'),
pv_registration_params)
# register vim with kubernetes cluster info # register vim with kubernetes cluster info
self._create_vim(context, vnf_instance, server, self._create_vim(context, vnf_instance, server,
bearer_token, ssl_ca_cert, vim_name, project_name, bearer_token, ssl_ca_cert, vim_name, project_name,
master_vm_dict_list, masternode_ip_list) master_vm_dict_list, masternode_ip_list)
def _get_connect_master(self, master_vm_dict_list):
for vm_dict in master_vm_dict_list:
retry = CONNECT_MASTER_RETRY_TIMES
while retry > 0:
try:
commander = cmd_executer.RemoteCommandExecutor(
user=vm_dict.get('ssh', {}).get('username'),
password=vm_dict.get('ssh', {}).get('password'),
host=vm_dict.get('ssh', {}).get('ipaddr'),
timeout=K8S_CMD_TIMEOUT)
return commander, vm_dict
except (exceptions.NotAuthorized, paramiko.SSHException,
paramiko.ssh_exception.NoValidConnectionsError) as e:
LOG.debug(e)
retry -= 1
time.sleep(SERVER_WAIT_COMPLETE_TIME)
else:
LOG.error('Failed to execute remote command.')
raise exceptions.MgmtDriverRemoteCommandError()
def terminate_start(self, context, vnf_instance, def terminate_start(self, context, vnf_instance,
terminate_vnf_request, grant, terminate_vnf_request, grant,
grant_request, **kwargs): grant_request, **kwargs):
@ -1635,6 +1745,12 @@ class KubernetesMgmtDriver(vnflcm_abstract_driver.VnflcmMgmtAbstractDriver):
commander, worker_nic_ip, commander, worker_nic_ip,
host_compute_dict.get(worker_nic_ip), host_compute_dict.get(worker_nic_ip),
zone_id_dict.get(worker_nic_ip)) zone_id_dict.get(worker_nic_ip))
storage_server_param = vnf_instance.instantiated_vnf_info \
.additional_params.get('k8s_cluster_installation_param')\
.get('storage_server')
if storage_server_param:
self._install_nfs_client(worker_username, worker_password,
worker_ip)
hosts_str = '\\n'.join(add_worker_hosts) hosts_str = '\\n'.join(add_worker_hosts)
# set /etc/hosts on master node and normal worker node # set /etc/hosts on master node and normal worker node
@ -2028,17 +2144,35 @@ class KubernetesMgmtDriver(vnflcm_abstract_driver.VnflcmMgmtAbstractDriver):
def _get_target_physical_resource_ids(self, vnf_instance, def _get_target_physical_resource_ids(self, vnf_instance,
heal_vnf_request): heal_vnf_request):
target_physical_resource_ids = [] target_node_physical_resource_ids = []
target_ss_physical_resource_ids = []
storage_server_param = vnf_instance.instantiated_vnf_info \
.additional_params.get('k8s_cluster_installation_param')\
.get('storage_server', {})
target_ss_cp_name = storage_server_param.get('ssh_cp_name', None)
for vnfc_instance_id in heal_vnf_request.vnfc_instance_id: for vnfc_instance_id in heal_vnf_request.vnfc_instance_id:
instantiated_vnf_info = vnf_instance.instantiated_vnf_info instantiated_vnf_info = vnf_instance.instantiated_vnf_info
vnfc_resource_info = instantiated_vnf_info.vnfc_resource_info vnfc_resource_info = instantiated_vnf_info.vnfc_resource_info
vnfc_resource = self._get_vnfc_resource_id( vnfc_resource = self._get_vnfc_resource_id(
vnfc_resource_info, vnfc_instance_id) vnfc_resource_info, vnfc_instance_id)
if vnfc_resource: if vnfc_resource:
target_physical_resource_ids.append( storage_server_flag = False
vnfc_resource.compute_resource.resource_id) if hasattr(vnfc_resource, 'vnfc_cp_info') and \
target_ss_cp_name:
vnfc_cp_infos = vnfc_resource.vnfc_cp_info
for vnfc_cp_info in vnfc_cp_infos:
if vnfc_cp_info.cpd_id == target_ss_cp_name:
storage_server_flag = True
break
if storage_server_flag:
target_ss_physical_resource_ids.append(
vnfc_resource.compute_resource.resource_id)
else:
target_node_physical_resource_ids.append(
vnfc_resource.compute_resource.resource_id)
return target_physical_resource_ids return target_node_physical_resource_ids, \
target_ss_physical_resource_ids
def _prepare_for_restoring_helm(self, commander, master_ip): def _prepare_for_restoring_helm(self, commander, master_ip):
helm_info = {} helm_info = {}
@ -2121,13 +2255,21 @@ class KubernetesMgmtDriver(vnflcm_abstract_driver.VnflcmMgmtAbstractDriver):
if vim_info.vim_id == k8s_vim_info.id: if vim_info.vim_id == k8s_vim_info.id:
vnf_instance.vim_connection_info.remove(vim_info) vnf_instance.vim_connection_info.remove(vim_info)
else: else:
target_physical_resource_ids = \ target_node_physical_resource_ids, \
target_ss_physical_resource_ids = \
self._get_target_physical_resource_ids( self._get_target_physical_resource_ids(
vnf_instance, heal_vnf_request) vnf_instance, heal_vnf_request)
self._delete_node_to_be_healed( if target_node_physical_resource_ids:
heatclient, stack_id, target_physical_resource_ids, self._delete_node_to_be_healed(
master_username, master_password, worker_resource_name, heatclient, stack_id, target_node_physical_resource_ids,
master_resource_name, master_node, worker_node) master_username, master_password, worker_resource_name,
master_resource_name, master_node, worker_node)
if target_ss_physical_resource_ids:
self._heal_start_storage_server(
context, vnf_instance,
vnf_additional_params, heatclient, stack_id,
target_node_physical_resource_ids, master_username,
master_password, master_resource_name, master_node)
def _fix_master_node( def _fix_master_node(
self, not_fixed_master_infos, hosts_str, self, not_fixed_master_infos, hosts_str,
@ -2330,6 +2472,8 @@ class KubernetesMgmtDriver(vnflcm_abstract_driver.VnflcmMgmtAbstractDriver):
vm_cidr_list.append(str(network_ip)) vm_cidr_list.append(str(network_ip))
master_node = k8s_cluster_installation_param.get('master_node') master_node = k8s_cluster_installation_param.get('master_node')
script_path = k8s_cluster_installation_param.get('script_path') script_path = k8s_cluster_installation_param.get('script_path')
ss_installation_params = \
k8s_cluster_installation_param.get('storage_server')
pod_cidr = master_node.get('pod_cidr', '192.168.0.0/16') pod_cidr = master_node.get('pod_cidr', '192.168.0.0/16')
cluster_cidr = master_node.get("cluster_cidr", '10.96.0.0/12') cluster_cidr = master_node.get("cluster_cidr", '10.96.0.0/12')
if proxy.get("http_proxy") and proxy.get("https_proxy"): if proxy.get("http_proxy") and proxy.get("https_proxy"):
@ -2391,6 +2535,12 @@ class KubernetesMgmtDriver(vnflcm_abstract_driver.VnflcmMgmtAbstractDriver):
script_path, cluster_ip, pod_cidr, cluster_cidr, script_path, cluster_ip, pod_cidr, cluster_cidr,
kubeadm_token, ssl_ca_cert_hash, ha_flag, helm_info, kubeadm_token, ssl_ca_cert_hash, ha_flag, helm_info,
pr_connection_info, http_private_registries) pr_connection_info, http_private_registries)
for fixed_master_info in fixed_master_infos.values():
node_ip = fixed_master_info.get('master_ssh_ip')
if ss_installation_params:
self._install_nfs_client(
master_username, master_password, node_ip)
if flag_worker: if flag_worker:
self._fix_worker_node( self._fix_worker_node(
fixed_worker_infos, fixed_worker_infos,
@ -2399,6 +2549,12 @@ class KubernetesMgmtDriver(vnflcm_abstract_driver.VnflcmMgmtAbstractDriver):
kubeadm_token, ssl_ca_cert_hash, ha_flag, kubeadm_token, ssl_ca_cert_hash, ha_flag,
pr_connection_info, http_private_registries) pr_connection_info, http_private_registries)
for fixed_worker_info in fixed_worker_infos.values():
node_ip = fixed_worker_info.get('worker_ssh_ip')
if ss_installation_params:
self._install_nfs_client(worker_username, worker_password,
node_ip)
if self.SET_NODE_LABEL_FLAG: if self.SET_NODE_LABEL_FLAG:
for fixed_worker_name, fixed_worker in fixed_worker_infos.items(): for fixed_worker_name, fixed_worker in fixed_worker_infos.items():
commander, _ = self._connect_ssh_scale( commander, _ = self._connect_ssh_scale(
@ -2492,17 +2648,24 @@ class KubernetesMgmtDriver(vnflcm_abstract_driver.VnflcmMgmtAbstractDriver):
heatclient = hc.HeatClient(vim_connection_info.access_info) heatclient = hc.HeatClient(vim_connection_info.access_info)
# get all target physical resource id # get all target physical resource id
target_physical_resource_ids = \ target_node_physical_resource_ids, \
target_ss_physical_resource_ids = \
self._get_target_physical_resource_ids( self._get_target_physical_resource_ids(
vnf_instance, heal_vnf_request) vnf_instance, heal_vnf_request)
if target_node_physical_resource_ids:
self._heal_and_join_k8s_node( self._heal_and_join_k8s_node(
heatclient, stack_id, target_physical_resource_ids, heatclient, stack_id, target_node_physical_resource_ids,
vnf_additional_params, master_resource_name, vnf_additional_params, master_resource_name,
master_username, master_password, vnf_package_path, master_username, master_password, vnf_package_path,
worker_resource_name, worker_username, worker_password, worker_resource_name, worker_username, worker_password,
cluster_resource_name, master_node, worker_node, cluster_resource_name, master_node, worker_node,
vnf_instance, grant) vnf_instance, grant)
if target_ss_physical_resource_ids:
self._heal_end_storage_server(context, vnf_instance,
vnf_additional_params, stack_id, master_node,
master_resource_name, heatclient,
target_node_physical_resource_ids,
master_username, master_password)
def change_external_connectivity_start( def change_external_connectivity_start(
self, context, vnf_instance, self, context, vnf_instance,
@ -2515,3 +2678,350 @@ class KubernetesMgmtDriver(vnflcm_abstract_driver.VnflcmMgmtAbstractDriver):
change_ext_conn_request, grant, change_ext_conn_request, grant,
grant_request, **kwargs): grant_request, **kwargs):
pass pass
def _check_envi(self, commander):
ssh_command = 'cat /etc/os-release | grep "PRETTY_NAME=" | ' \
'grep -c "Ubuntu 20.04"; arch | grep -c x86_64'
result = self._execute_command(
commander, ssh_command, NFS_CMD_TIMEOUT, 'common', 3)
results = [int(item.replace('\n', '')) for item in result]
if not (results[0] and results[1]):
raise exceptions.MgmtDriverOtherError(
error_message="Storage server VM setup failed."
"Your OS does not support at present."
"It only supports Ubuntu 20.04 (x86_64)"
)
def _install_storage_server(self, context, vnf_instance, proxy,
ss_installation_params):
ssh_cp_name = ss_installation_params.get('ssh_cp_name')
ssh_username = ss_installation_params.get('username')
ssh_password = ss_installation_params.get('password')
cinder_volume_setup_params = \
ss_installation_params.get('cinder_volume_setup_params')
nfs_server_setup_params = \
ss_installation_params.get('nfs_server_setup_params')
stack_id = vnf_instance.instantiated_vnf_info.instance_id
vim_connection_info = self._get_vim_connection_info(
context, vnf_instance)
heatclient = hc.HeatClient(vim_connection_info.access_info)
resource_info = heatclient.resources.get(
stack_id=stack_id,
resource_name=ssh_cp_name
)
ssh_ip_address = resource_info.attributes \
.get('fixed_ips')[0].get('ip_address')
if not ssh_ip_address:
raise exceptions.MgmtDriverOtherError(
error_message="Failed to get IP address for "
"Storage server VM")
commander = self._init_commander(ssh_username, ssh_password,
ssh_ip_address)
self._check_envi(commander)
for setup_info in cinder_volume_setup_params:
volume_resource_id = setup_info.get('volume_resource_id')
volume_mount_to = setup_info.get('mount_to')
resource_info = heatclient.resources.get(
stack_id=stack_id,
resource_name=volume_resource_id
)
volume_device = resource_info.attributes.get(
'attachments')[0].get('device')
if not volume_device:
raise exceptions.MgmtDriverOtherError(
error_message=f"Failed to get device information for "
f"Cinder volume.volume_resource_id:"
f"{volume_resource_id}")
ssh_command = \
f"sudo mkfs -t ext4 {volume_device} 2>/dev/null && " \
f"sudo mkdir -p {volume_mount_to} && " \
f"sudo mount {volume_device} {volume_mount_to} -t ext4"
result = self._execute_command(
commander, ssh_command, NFS_CMD_TIMEOUT * 5, 'common', 3)
ssh_command = f"df | grep -c {volume_device}"
result = self._execute_command(
commander, ssh_command, NFS_CMD_TIMEOUT, 'common', 3)
results = [int(item.replace('\n', '')) for item in result]
if not results[0]:
raise exceptions.MgmtDriverOtherError(
error_message=f"Failed to setup Cinder volume device on "
f"Storage Server VM"
f"(device:{volume_device}).")
http_proxy = proxy.get('http_proxy')
https_proxy = proxy.get('https_proxy')
if http_proxy and https_proxy:
ssh_command = \
f'echo -e "Acquire::http::Proxy \\"{http_proxy}\\";\n' \
f'Acquire::https::Proxy \\"{https_proxy}\\";" | ' \
f'sudo tee /etc/apt/apt.conf.d/proxy.conf >/dev/null ' \
f'&& ' \
f'sudo apt-get update && ' \
f'export DEBIAN_FRONTEND=noninteractive;' \
f'sudo -E apt-get install -y nfs-kernel-server'
else:
ssh_command = "sudo apt-get update && " \
"export DEBIAN_FRONTEND=noninteractive;" \
"sudo -E apt-get install -y nfs-kernel-server"
result = self._execute_command(
commander, ssh_command, NFS_INSTALL_TIMEOUT, 'common', 3)
for setup_info in nfs_server_setup_params:
export_dir = setup_info.get('export_dir')
export_to = setup_info.get('export_to')
export_str = f"{export_dir} {export_to}" \
f"(rw,sync,no_subtree_check,insecure,all_squash)"
ssh_command = f'sudo mkdir -p {export_dir} && ' \
f'sudo chown nobody.nogroup {export_dir} && ' \
f'echo "{export_str}" | ' \
f'sudo tee -a /etc/exports >/dev/null'
result = self._execute_command(
commander, ssh_command, NFS_CMD_TIMEOUT * 10, 'common', 3)
ssh_command = "sudo exportfs -ra"
result = self._execute_command(
commander, ssh_command, NFS_CMD_TIMEOUT, 'common', 3)
ssh_command = "sudo exportfs"
result = self._execute_command(
commander, ssh_command, NFS_CMD_TIMEOUT, 'common', 3)
for setup_info in nfs_server_setup_params:
export_dir = setup_info.get('export_dir')
if export_dir not in ','.join(result):
raise exceptions.MgmtDriverOtherError(
error_message=f"Failed to setup NFS export on Storage "
f"Server VM(export_dir:{export_dir})")
def _install_nfs_client(self, node_username, node_password, node_ip):
# commander = cmd_executer.RemoteCommandExecutor(
# user=node_username, password=node_password, host=node_ip,
# timeout=K8S_CMD_TIMEOUT)
commander = self._init_commander(node_username, node_password, node_ip)
ssh_command = "sudo apt-get update && " \
"export DEBIAN_FRONTEND=noninteractive;" \
"sudo -E apt-get install -y nfs-common"
result = self._execute_command(
commander, ssh_command, NFS_INSTALL_TIMEOUT, 'common', 3)
ssh_command = "sudo apt list --installed 2>/dev/null | " \
"grep -c nfs-common"
result = self._execute_command(
commander, ssh_command, NFS_CMD_TIMEOUT, 'common', 3)
results = [int(item.replace('\n', '')) for item in result]
if not results[0]:
raise exceptions.MgmtDriverOtherError(
error_message=f"Failed to install NFS client"
f"(node ip:{node_ip})")
def _register_persistent_volumes(self, context, vnf_instance, commander,
master_username, master_password,
master_ip, pv_registration_params):
vnf_package_path = vnflcm_utils._get_vnf_package_path(
context, vnf_instance.vnfd_id)
pv_file_list = []
for pv_info in pv_registration_params:
pv_manifest_file_path = pv_info.get('pv_manifest_file_path')
nfs_server_cp = pv_info.get('nfs_server_cp')
local_file_path = os.path.join(vnf_package_path,
pv_manifest_file_path)
if not os.path.exists(local_file_path):
raise exceptions.MgmtDriverParamInvalid(
param=f"pv_manifest_file_path"
f"(path:{pv_manifest_file_path})")
with open(local_file_path, 'r', encoding='utf-8') as f:
nfs_pv = yaml.safe_load(f)
pv_name = nfs_pv.get('metadata', {}).get('name')
if not pv_name:
raise exceptions.MgmtDriverOtherError(
error_message=f"Failed to get Kubernetes PersistentVolume"
f" name from manifest file"
f"(path:{pv_manifest_file_path})")
remote_file_path = os.path.join('/tmp', pv_manifest_file_path)
remote_dir_path = os.path.dirname(remote_file_path)
ssh_command = f"mkdir -p {remote_dir_path}"
result = self._execute_command(
commander, ssh_command, NFS_CMD_TIMEOUT, 'common', 3)
pv_file_list.append(
(local_file_path, remote_file_path, nfs_server_cp, pv_name))
with paramiko.Transport(master_ip, 22) as connect:
connect.connect(
username=master_username, password=master_password)
sftp_client = paramiko.SFTPClient.from_transport(connect)
for pv_info in pv_file_list:
sftp_client.put(pv_info[0], pv_info[1])
vim_connection_info = self._get_vim_connection_info(
context, vnf_instance)
heatclient = hc.HeatClient(vim_connection_info.access_info)
stack_id = vnf_instance.instantiated_vnf_info.instance_id
nfs_server_cp_dict = {}
for pv_info in pv_file_list:
pv_file_path = pv_info[1]
nfs_server_cp = pv_info[2]
pv_name = pv_info[3]
nfs_server_ip = nfs_server_cp_dict.get(nfs_server_cp)
if not nfs_server_ip:
resource_info = heatclient.resources.get(
stack_id, nfs_server_cp)
nfs_server_ip = resource_info.attributes \
.get('fixed_ips')[0].get('ip_address')
if not nfs_server_ip:
raise exceptions.MgmtDriverOtherError(
error_message=f"Failed to get NFS server IP address"
f"(nfs_server_cp:{nfs_server_cp})")
nfs_server_cp_dict[nfs_server_cp] = nfs_server_ip
ssh_command = \
f'sed -i -E ' \
f'"s/server *: *[0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+/server: ' \
f'{nfs_server_ip}/g" {pv_file_path} && ' \
f'kubectl apply -f {pv_file_path}'
result = self._execute_command(
commander, ssh_command, NFS_CMD_TIMEOUT, 'common', 3)
ssh_command = f"kubectl get pv {pv_name} " \
"-o jsonpath='{.status.phase}'"
for _ in range(CHECK_PV_AVAILABLE_RETRY):
result = self._execute_command(
commander, ssh_command, NFS_CMD_TIMEOUT, 'common', 5)
if 'Available' in ','.join(result):
break
else:
time.sleep(30)
else:
ssh_command = f"kubectl delete pv {pv_name}"
result = self._execute_command(
commander, ssh_command, NFS_CMD_TIMEOUT, 'common', 5)
raise exceptions.MgmtDriverOtherError(
error_message='Failed to register Persistent volume'
'(Status is not "Available" state)')
ssh_command = f"rm -f {pv_file_path}"
result = self._execute_command(
commander, ssh_command, NFS_CMD_TIMEOUT, 'common', 5)
def _heal_start_storage_server(
self, context, vnf_instance, vnf_additional_params, heatclient,
stack_id, target_node_physical_resource_ids, master_username,
master_password, master_resource_name, master_node):
master_ssh_ips = []
if master_node.get('aspect_id'):
master_resource_list = self._get_resources_list(
heatclient, stack_id, master_resource_name)
flag_master, fixed_master_infos, not_fixed_master_infos = \
self._get_master_node_name(
heatclient, master_resource_list,
target_node_physical_resource_ids,
master_node)
for value in not_fixed_master_infos.values():
master_ssh_ips.append(value.get('master_ssh_ip'))
else:
resource_info = heatclient.resources.get(
stack_id, master_node.get('ssh_cp_name'))
master_ssh_ips.append(resource_info.attributes.get(
'fixed_ips')[0].get('ip_address'))
commander, master_ip = self._connect_ssh_scale(
master_ssh_ips, master_username,
master_password)
vnf_package_path = vnflcm_utils._get_vnf_package_path(
context, vnf_instance.vnfd_id)
pv_registration_params = \
vnf_additional_params.get(
'k8s_cluster_installation_param').get(
'pv_registration_params')
pv_name_list = []
for pv_info in pv_registration_params:
pv_manifest_file_path = pv_info.get('pv_manifest_file_path')
pv_file_path = \
os.path.join(vnf_package_path, pv_manifest_file_path)
with open(pv_file_path, 'r', encoding='utf-8') as f:
nfs_pv = yaml.safe_load(f)
pv_name = nfs_pv.get('metadata').get('name')
pv_name_list.append(pv_name)
ssh_command = 'kubectl get pv ' \
'-o jsonpath=\'{range.items[*]}' \
'status:{@.status.phase},' \
'name:{@.metadata.name}{"\\n"}{end}\' | ' \
'grep "status:Bound"'
result = self._execute_command(
commander, ssh_command, NFS_CMD_TIMEOUT, 'common', 3)
for result_line in result:
in_use_pv_name = \
result_line.replace('\n', '').split(',')[1].split(':')[1]
if in_use_pv_name in pv_name_list:
raise exceptions.MgmtDriverOtherError(
error_message=f"heal_start failed({in_use_pv_name} "
f"Persistent volume is in use)")
for pv_name in pv_name_list:
ssh_command = f"kubectl delete pv {pv_name}"
result = self._execute_command(
commander, ssh_command, NFS_CMD_TIMEOUT, 'common', 5)
ssh_command = 'kubectl get pv ' \
'-o jsonpath=\'{range .items[*]}' \
'{@.metadata.name}{"\\n"}{end}\''
for _ in range(CHECK_PV_DEL_COMPLETE_RETRY):
result = self._execute_command(
commander, ssh_command, NFS_CMD_TIMEOUT, 'common', 5)
pv_delete_comp_flag = True
for result_line in result:
if result_line.replace('\n', '') in pv_name_list:
pv_delete_comp_flag = False
break
if pv_delete_comp_flag:
break
else:
time.sleep(30)
else:
raise exceptions.MgmtDriverOtherError(
error_message='heal_start failed'
'(Persistent volume deletion timeout)')
def _heal_end_storage_server(self, context, vnf_instance,
vnf_additional_params, stack_id, master_node,
master_resource_name, heatclient,
target_node_physical_resource_ids,
master_username, master_password):
k8s_cluster_installation_param = vnf_additional_params.get(
'k8s_cluster_installation_param', {})
proxy = k8s_cluster_installation_param.get('proxy', {})
ss_installation_params = vnf_additional_params\
.get('k8s_cluster_installation_param', {}).get('storage_server')
self._install_storage_server(context, vnf_instance, proxy,
ss_installation_params)
master_ssh_ips = []
if master_node.get('aspect_id'):
master_resource_list = self._get_resources_list(
heatclient, stack_id, master_resource_name)
flag_master, fixed_master_infos, not_fixed_master_infos = \
self._get_master_node_name(
heatclient, master_resource_list,
target_node_physical_resource_ids,
master_node)
for value in not_fixed_master_infos.values():
master_ssh_ips.append(value.get('master_ssh_ip'))
else:
resource_info = heatclient.resources.get(
stack_id, master_node.get('ssh_cp_name'))
master_ssh_ips.append(resource_info.attributes.get(
'fixed_ips')[0].get('ip_address'))
commander, master_ip = self._connect_ssh_scale(
master_ssh_ips, master_username,
master_password)
pv_registration_params = vnf_additional_params\
.get('k8s_cluster_installation_param')\
.get('pv_registration_params')
self._register_persistent_volumes(
context, vnf_instance, commander, master_username,
master_password, master_ip, pv_registration_params)