Change to use Helm cli during scale with helm

Change scale operation of CNF that is instantiated with Helm chart to
perform using Helm cli by specifying replica count parameters for each
aspect_id at instantiation in advance.

And this patch includes some refactoring of helm_client.

Implements: blueprint helmchart-k8s-vim

Change-Id: I12ca7966d61f4894f50d3e1fc46bc89d69af832d
This commit is contained in:
Ayumu Ueha 2022-01-20 01:52:58 +00:00
parent 5951218ee8
commit 7e36af6c37
8 changed files with 431 additions and 67 deletions

View File

@ -0,0 +1,9 @@
---
upgrade:
- |
Change scale operation of CNF that is instantiated with Helm chart to
perform using Helm cli by specifying replica count parameters for each
aspect_id at instantiation in advance.
This change makes that upgrading the Helm values (e.g. nodeSelector or
tolerations) directly in Kubernetes VIM after a scale operation from Tacker
does not change the number of replicas.

View File

@ -137,6 +137,10 @@ class HelmClientRemoteCommandError(exceptions.TackerException):
message = _('Failed to execute remote command.')
class HelmClientMissingParamsError(exceptions.TackerException):
message = _('The specified value %(value)s was not found.')
class HelmClientOtherError(exceptions.TackerException):
message = _('An error occurred in HelmClient: %(error_message)s.')

View File

@ -126,13 +126,18 @@ class VnfLcmKubernetesHelmTest(vnflcm_base.BaseVnfLcmKubernetesTest):
"helmchartname": "apache",
"exthelmrepo_url": "https://charts.bitnami.com/bitnami"
}
]
],
"helm_replica_values": {
"vdu1_aspect": "replicaCount",
"vdu2_aspect": "replicaCount"
}
}
vnf_instance = self._create_and_instantiate_vnf_instance(
self.vnfd_id, "helmchart", vnf_instance_name,
vnf_instance_description, inst_additional_param)
self._test_scale_cnf(vnf_instance, aspect_id="vdu1_aspect")
self._test_scale_cnf(vnf_instance, aspect_id="vdu2_aspect")
self._test_heal_cnf_with_sol002(vnf_instance)
self._test_heal_cnf_with_sol003(vnf_instance)

View File

@ -1023,7 +1023,8 @@ def fake_pod_list():
)
def get_scale_policy(type, aspect_id='vdu1', delta_num=1, is_legacy=False):
def get_scale_policy(type, aspect_id='vdu1', delta_num=1, is_legacy=False,
vdu_name='fake_name'):
policy = dict()
policy['action'] = type
policy['name'] = aspect_id
@ -1036,7 +1037,7 @@ def get_scale_policy(type, aspect_id='vdu1', delta_num=1, is_legacy=False):
'VDU1': {
'type': 'tosca.nodes.nfv.Vdu.Compute',
'properties': {
'name': 'fake_name',
'name': vdu_name,
'description': 'test description',
'vdu_profile': {
'min_number_of_instances': 1,
@ -1169,10 +1170,12 @@ def fake_inst_vnf_req_for_helmchart(external=True, local=True, namespace=None):
}
)
additional_params['using_helm_install_param'] = using_helm_install_param
additional_params['helm_replica_values'] = {"vdu1_aspect": "replicaCount"}
if namespace:
additional_params['namespace'] = namespace
return objects.InstantiateVnfRequest(additional_params=additional_params)
return objects.InstantiateVnfRequest(
flavour_id="simple", additional_params=additional_params)
def execute_cmd_helm_client(*args, **kwargs):
@ -1198,6 +1201,8 @@ def execute_cmd_helm_client(*args, **kwargs):
' containers:\n',
' - name: nginx\n'
]
elif 'helm get values' in ssh_command:
result = ['{"replicaCount":2}']
else:
result = ""
return result

View File

@ -2210,13 +2210,21 @@ class TestKubernetes(base.TestCase):
@mock.patch.object(client.AppsV1Api, 'patch_namespaced_deployment_scale')
@mock.patch.object(client.AppsV1Api, 'read_namespaced_deployment_scale')
@mock.patch.object(objects.VnfInstance, "get_by_id")
@mock.patch.object(objects.VnfResourceList, "get_by_vnf_instance_id")
def test_scale_in_deployment(self, mock_vnf_resource_list,
mock_vnf_instance_get_by_id,
mock_read_namespaced_deployment_scale,
mock_patch_namespaced_deployment_scale):
policy = fakes.get_scale_policy(type='in')
mock_vnf_resource_list.return_value = \
fakes.get_vnf_resource_list(kind='Deployment')
scale_status = objects.ScaleInfo(
aspect_id='vdu1_aspect', scale_level=1)
mock_vnf_instance_get_by_id.return_value = (
vnflcm_fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED,
scale_status=scale_status))
mock_read_namespaced_deployment_scale.return_value = \
client.V1Scale(spec=client.V1ScaleSpec(replicas=2),
status=client.V1ScaleStatus(replicas=2))
@ -2232,13 +2240,21 @@ class TestKubernetes(base.TestCase):
@mock.patch.object(client.AppsV1Api, 'patch_namespaced_stateful_set_scale')
@mock.patch.object(client.AppsV1Api, 'read_namespaced_stateful_set_scale')
@mock.patch.object(objects.VnfInstance, "get_by_id")
@mock.patch.object(objects.VnfResourceList, "get_by_vnf_instance_id")
def test_scale_in_stateful_set(self, mock_vnf_resource_list,
mock_vnf_instance_get_by_id,
mock_read_namespaced_stateful_set_scale,
mock_patch_namespaced_stateful_set_scale):
policy = fakes.get_scale_policy(type='in')
mock_vnf_resource_list.return_value = \
fakes.get_vnf_resource_list(kind='StatefulSet')
scale_status = objects.ScaleInfo(
aspect_id='vdu1_aspect', scale_level=1)
mock_vnf_instance_get_by_id.return_value = (
vnflcm_fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED,
scale_status=scale_status))
mock_read_namespaced_stateful_set_scale.return_value = \
client.V1Scale(spec=client.V1ScaleSpec(replicas=2),
status=client.V1ScaleStatus(replicas=2))
@ -2254,13 +2270,21 @@ class TestKubernetes(base.TestCase):
@mock.patch.object(client.AppsV1Api, 'patch_namespaced_replica_set_scale')
@mock.patch.object(client.AppsV1Api, 'read_namespaced_replica_set_scale')
@mock.patch.object(objects.VnfInstance, "get_by_id")
@mock.patch.object(objects.VnfResourceList, "get_by_vnf_instance_id")
def test_scale_in_replica_set(self, mock_vnf_resource_list,
mock_vnf_instance_get_by_id,
mock_read_namespaced_replica_set_scale,
mock_patch_namespaced_replica_set_scale):
policy = fakes.get_scale_policy(type='in')
mock_vnf_resource_list.return_value = \
fakes.get_vnf_resource_list(kind='ReplicaSet')
scale_status = objects.ScaleInfo(
aspect_id='vdu1_aspect', scale_level=1)
mock_vnf_instance_get_by_id.return_value = (
vnflcm_fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED,
scale_status=scale_status))
mock_read_namespaced_replica_set_scale.return_value = \
client.V1Scale(spec=client.V1ScaleSpec(replicas=2),
status=client.V1ScaleStatus(replicas=2))
@ -2276,13 +2300,21 @@ class TestKubernetes(base.TestCase):
@mock.patch.object(client.AppsV1Api, 'patch_namespaced_deployment_scale')
@mock.patch.object(client.AppsV1Api, 'read_namespaced_deployment_scale')
@mock.patch.object(objects.VnfInstance, "get_by_id")
@mock.patch.object(objects.VnfResourceList, "get_by_vnf_instance_id")
def test_scale_out(self, mock_vnf_resource_list,
mock_vnf_instance_get_by_id,
mock_read_namespaced_deployment_scale,
mock_patch_namespaced_deployment_scale):
policy = fakes.get_scale_policy(type='out')
mock_vnf_resource_list.return_value = \
fakes.get_vnf_resource_list(kind='Deployment')
scale_status = objects.ScaleInfo(
aspect_id='vdu1_aspect', scale_level=1)
mock_vnf_instance_get_by_id.return_value = (
vnflcm_fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED,
scale_status=scale_status))
mock_read_namespaced_deployment_scale.return_value = \
client.V1Scale(spec=client.V1ScaleSpec(replicas=1),
status=client.V1ScaleStatus(replicas=1))
@ -2296,33 +2328,51 @@ class TestKubernetes(base.TestCase):
mock_read_namespaced_deployment_scale.assert_called_once()
mock_patch_namespaced_deployment_scale.assert_called_once()
@mock.patch.object(objects.VnfInstance, "get_by_id")
@mock.patch.object(objects.VnfResourceList, "get_by_vnf_instance_id")
def test_scale_target_not_found(self, mock_vnf_resource_list):
def test_scale_target_not_found(self, mock_vnf_resource_list,
mock_vnf_instance_get_by_id):
policy = fakes.get_scale_policy(type='in')
mock_vnf_resource_list.return_value = \
fakes.get_vnf_resource_list(kind='Depoyment', name='other_name')
mock_vnf_instance_get_by_id.return_value = (
vnflcm_fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED))
self.assertRaises(vnfm.CNFScaleFailed,
self.kubernetes.scale,
self.context, None,
utils.get_vim_auth_obj(), policy, None)
@mock.patch.object(objects.VnfInstance, "get_by_id")
@mock.patch.object(objects.VnfResourceList, "get_by_vnf_instance_id")
def test_scale_out_of_target_kind(self, mock_vnf_resource_list):
def test_scale_out_of_target_kind(self, mock_vnf_resource_list,
mock_vnf_instance_get_by_id):
policy = fakes.get_scale_policy(type='in')
mock_vnf_resource_list.return_value = \
fakes.get_vnf_resource_list(kind='Pod')
mock_vnf_instance_get_by_id.return_value = (
vnflcm_fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED))
self.assertRaises(vnfm.CNFScaleFailed,
self.kubernetes.scale,
self.context, None,
utils.get_vim_auth_obj(), policy, None)
@mock.patch.object(client.AppsV1Api, 'read_namespaced_deployment_scale')
@mock.patch.object(objects.VnfInstance, "get_by_id")
@mock.patch.object(objects.VnfResourceList, "get_by_vnf_instance_id")
def test_scale_in_less_than_min_replicas(self, mock_vnf_resource_list,
mock_vnf_instance_get_by_id,
mock_read_namespaced_deployment_scale):
policy = fakes.get_scale_policy(type='in')
mock_vnf_resource_list.return_value = \
fakes.get_vnf_resource_list(kind='Deployment')
scale_status = objects.ScaleInfo(
aspect_id='vdu1_aspect', scale_level=1)
mock_vnf_instance_get_by_id.return_value = (
vnflcm_fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED,
scale_status=scale_status))
mock_read_namespaced_deployment_scale.return_value = \
client.V1Scale(spec=client.V1ScaleSpec(replicas=1),
status=client.V1ScaleStatus(replicas=1))
@ -2332,12 +2382,20 @@ class TestKubernetes(base.TestCase):
utils.get_vim_auth_obj(), policy, None)
@mock.patch.object(client.AppsV1Api, 'read_namespaced_deployment_scale')
@mock.patch.object(objects.VnfInstance, "get_by_id")
@mock.patch.object(objects.VnfResourceList, "get_by_vnf_instance_id")
def test_scale_out_over_max_replicas(self, mock_vnf_resource_list,
mock_vnf_instance_get_by_id,
mock_read_namespaced_deployment_scale):
policy = fakes.get_scale_policy(type='out')
mock_vnf_resource_list.return_value = \
fakes.get_vnf_resource_list(kind='Deployment')
scale_status = objects.ScaleInfo(
aspect_id='vdu1_aspect', scale_level=1)
mock_vnf_instance_get_by_id.return_value = (
vnflcm_fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED,
scale_status=scale_status))
mock_read_namespaced_deployment_scale.return_value = \
client.V1Scale(spec=client.V1ScaleSpec(replicas=3),
status=client.V1ScaleStatus(replicas=3))

View File

@ -18,9 +18,9 @@ import paramiko
from ddt import ddt
from kubernetes import client
from oslo_serialization import jsonutils
from tacker.common import exceptions
from tacker import context
from tacker.db.db_sqlalchemy import models
from tacker.extensions import common_services as cs
from tacker.extensions import vnfm
from tacker import objects
from tacker.tests.unit import base
@ -41,17 +41,18 @@ class FakeRemoteCommandExecutor(mock.Mock):
class FakeCommander(mock.Mock):
def config(self, is_success, errmsg=None):
def config(self, is_success, errmsg=None, stdout=''):
self.is_success = is_success
self.errmsg = errmsg
self.stdout = stdout
def execute_command(self, *args, **kwargs):
is_success = self.is_success
fake_result = FakeCmdResult()
stderr = ''
stdout = ''
stdout = self.stdout
return_code = (0) if is_success else (1)
stderr, stdout = ('', '') if is_success else ('err', '')
stderr, stdout = ('', stdout) if is_success else ('err', stdout)
if self.errmsg:
stderr = [self.errmsg]
fake_result.set_std(stderr, stdout, return_code)
@ -145,19 +146,43 @@ class TestKubernetesHelm(base.TestCase):
self.helm_client._execute_command,
ssh_command, timeout, retry)
def test_pre_instantiation_vnf_helm(self):
@mock.patch.object(eventlet, 'monkey_patch')
def test_helmclient_get_value_nested_param(self, mock_monkey_patch):
stdout = ['{"foo":{"bar":1}}']
self.helm_client.commander.config(True, stdout=stdout)
res = self.helm_client.get_value('fake_release_name', '', 'foo.bar')
self.assertEqual(res, 1)
@mock.patch.object(eventlet, 'monkey_patch')
def test_helmclient_get_value_missing_param(self, mock_monkey_patch):
stdout = ['{"foo":1}']
self.helm_client.commander.config(True, stdout=stdout)
self.assertRaises(vnfm.HelmClientMissingParamsError,
self.helm_client.get_value,
'fake_release_name', '', 'foo.bar')
@mock.patch.object(objects.VnfPackageVnfd, "get_by_id")
@mock.patch('tacker.vnflcm.utils._get_vnfd_dict')
def test_pre_instantiation_vnf_helm(self, mock_vnfd_dict,
mock_vnf_package_vnfd_get_by_id):
vnf_instance = fd_utils.get_vnf_instance_object()
vim_connection_info = fakes.fake_vim_connection_info_with_extra()
vnf_software_images = None
vnf_package_path = self.package_path
instantiate_vnf_req = fakes.fake_inst_vnf_req_for_helmchart()
mock_vnfd_dict.return_value = vnflcm_fakes.vnfd_dict_cnf()
mock_vnf_package_vnfd_get_by_id.return_value = (
vnflcm_fakes.return_vnf_package_vnfd())
vnf_resources = self.kubernetes.pre_instantiation_vnf(
self.context, vnf_instance, vim_connection_info,
vnf_software_images,
instantiate_vnf_req, vnf_package_path)
self.assertEqual(vnf_resources, {})
def test_pre_helm_install_with_bool_param(self):
@mock.patch.object(objects.VnfPackageVnfd, "get_by_id")
@mock.patch('tacker.vnflcm.utils._get_vnfd_dict')
def test_pre_helm_install_with_bool_param(self, mock_vnfd_dict,
mock_vnf_package_vnfd_get_by_id):
vnf_instance = fd_utils.get_vnf_instance_object()
vim_connection_info = fakes.fake_vim_connection_info_with_extra()
vnf_software_images = None
@ -168,6 +193,9 @@ class TestKubernetesHelm(base.TestCase):
'using_helm_install_param']
using_helm_inst_params[0]['exthelmchart'] = True
using_helm_inst_params[1]['exthelmchart'] = False
mock_vnfd_dict.return_value = vnflcm_fakes.vnfd_dict_cnf()
mock_vnf_package_vnfd_get_by_id.return_value = (
vnflcm_fakes.return_vnf_package_vnfd())
vnf_resources = self.kubernetes.pre_instantiation_vnf(
self.context, vnf_instance, vim_connection_info,
vnf_software_images,
@ -175,12 +203,14 @@ class TestKubernetesHelm(base.TestCase):
self.assertEqual(vnf_resources, {})
def test_pre_helm_install_invaid_vimconnectioninfo_no_helm_info(self):
vnf_instance = fd_utils.get_vnf_instance_object()
vim_connection_info = fakes.fake_vim_connection_info_with_extra()
del vim_connection_info.extra['helm_info']
vnf_package_path = self.package_path
instantiate_vnf_req = fakes.fake_inst_vnf_req_for_helmchart()
exc = self.assertRaises(vnfm.InvalidVimConnectionInfo,
self.kubernetes._pre_helm_install,
self.context, vnf_instance,
vim_connection_info, instantiate_vnf_req,
vnf_package_path)
msg = ("Invalid vim_connection_info: "
@ -188,12 +218,14 @@ class TestKubernetesHelm(base.TestCase):
self.assertEqual(msg, exc.format_message())
def test_pre_helm_install_invaid_vimconnectioninfo_no_masternode_ip(self):
vnf_instance = fd_utils.get_vnf_instance_object()
vim_connection_info = fakes.fake_vim_connection_info_with_extra(
del_field='masternode_ip')
vnf_package_path = self.package_path
instantiate_vnf_req = fakes.fake_inst_vnf_req_for_helmchart()
exc = self.assertRaises(vnfm.InvalidVimConnectionInfo,
self.kubernetes._pre_helm_install,
self.context, vnf_instance,
vim_connection_info, instantiate_vnf_req,
vnf_package_path)
msg = ("Invalid vim_connection_info: "
@ -201,6 +233,7 @@ class TestKubernetesHelm(base.TestCase):
self.assertEqual(msg, exc.format_message())
def test_pre_helm_install_invalid_helm_param(self):
vnf_instance = fd_utils.get_vnf_instance_object()
vim_connection_info = fakes.fake_vim_connection_info_with_extra()
vnf_package_path = self.package_path
instantiate_vnf_req = fakes.fake_inst_vnf_req_for_helmchart(
@ -208,8 +241,9 @@ class TestKubernetesHelm(base.TestCase):
using_helm_inst_params = instantiate_vnf_req.additional_params[
'using_helm_install_param']
del using_helm_inst_params[0]['exthelmchart']
exc = self.assertRaises(cs.InputValuesMissing,
exc = self.assertRaises(exceptions.InvalidInput,
self.kubernetes._pre_helm_install,
self.context, vnf_instance,
vim_connection_info, instantiate_vnf_req,
vnf_package_path)
msg = ("Parameter input values missing for the key '{param}'".format(
@ -217,12 +251,14 @@ class TestKubernetesHelm(base.TestCase):
self.assertEqual(msg, exc.format_message())
def test_pre_helm_install_empty_helm_param(self):
vnf_instance = fd_utils.get_vnf_instance_object()
vim_connection_info = fakes.fake_vim_connection_info_with_extra()
vnf_package_path = self.package_path
instantiate_vnf_req = fakes.fake_inst_vnf_req_for_helmchart(
external=False, local=False)
exc = self.assertRaises(cs.InputValuesMissing,
exc = self.assertRaises(exceptions.InvalidInput,
self.kubernetes._pre_helm_install,
self.context, vnf_instance,
vim_connection_info, instantiate_vnf_req,
vnf_package_path)
msg = ("Parameter input values missing for the key '{param}'".format(
@ -230,6 +266,7 @@ class TestKubernetesHelm(base.TestCase):
self.assertEqual(msg, exc.format_message())
def test_pre_helm_install_invalid_chartfile_path(self):
vnf_instance = fd_utils.get_vnf_instance_object()
vim_connection_info = fakes.fake_vim_connection_info_with_extra()
vnf_package_path = self.package_path
instantiate_vnf_req = fakes.fake_inst_vnf_req_for_helmchart(
@ -239,6 +276,7 @@ class TestKubernetesHelm(base.TestCase):
using_helm_inst_params[0]['helmchartfile_path'] = 'invalid_path'
exc = self.assertRaises(vnfm.CnfDefinitionNotFound,
self.kubernetes._pre_helm_install,
self.context, vnf_instance,
vim_connection_info, instantiate_vnf_req,
vnf_package_path)
msg = _("CNF definition file with path {path} is not found "
@ -246,6 +284,28 @@ class TestKubernetesHelm(base.TestCase):
path=using_helm_inst_params[0]['helmchartfile_path'])
self.assertEqual(msg, exc.format_message())
@mock.patch.object(objects.VnfPackageVnfd, "get_by_id")
@mock.patch('tacker.vnflcm.utils._get_vnfd_dict')
def test_pre_helm_install_missing_replica_values(
self, mock_vnfd_dict, mock_vnf_package_vnfd_get_by_id):
vnf_instance = fd_utils.get_vnf_instance_object()
vim_connection_info = fakes.fake_vim_connection_info_with_extra()
vnf_package_path = self.package_path
instantiate_vnf_req = fakes.fake_inst_vnf_req_for_helmchart(
external=False)
instantiate_vnf_req.additional_params['helm_replica_values'] = {}
mock_vnfd_dict.return_value = vnflcm_fakes.vnfd_dict_cnf()
mock_vnf_package_vnfd_get_by_id.return_value = (
vnflcm_fakes.return_vnf_package_vnfd())
exc = self.assertRaises(exceptions.InvalidInput,
self.kubernetes._pre_helm_install,
self.context, vnf_instance,
vim_connection_info, instantiate_vnf_req,
vnf_package_path)
aspect_id = 'vdu1_aspect'
msg = f"Replica value for aspectId '{aspect_id}' is missing"
self.assertEqual(msg, exc.format_message())
@mock.patch.object(objects.VnfResource, 'create')
@mock.patch.object(paramiko.Transport, 'close')
@mock.patch.object(paramiko.SFTPClient, 'put')
@ -507,3 +567,109 @@ class TestKubernetesHelm(base.TestCase):
region_name=None,
vnf_instance=vnf_instance)
self.assertEqual(mock_read_namespaced_deployment.call_count, 0)
@mock.patch.object(helm_client.HelmClient, '_execute_command')
@mock.patch.object(vim_client.VimClient, 'get_vim')
@mock.patch.object(objects.VnfInstance, "get_by_id")
def test_scale_in_with_local_helmchart(self, mock_vnf_instance_get_by_id,
mock_get_vim, mock_command):
policy = fakes.get_scale_policy(type='in', aspect_id='vdu1_aspect',
vdu_name='myrelease-ext-mychart-ext')
scale_status = objects.ScaleInfo(
aspect_id='vdu1_aspect', scale_level=1)
mock_get_vim.return_value = fakes.fake_k8s_vim_obj()
vim_connection_info = fakes.fake_vim_connection_info_with_extra()
instantiate_vnf_req = fakes.fake_inst_vnf_req_for_helmchart(
local=False)
vnf_instance = copy.deepcopy(self.vnf_instance)
vnf_instance.vim_connection_info = [vim_connection_info]
vnf_instance.scale_status = [scale_status]
vnf_instance.instantiated_vnf_info.additional_params = \
instantiate_vnf_req.additional_params
mock_vnf_instance_get_by_id.return_value = vnf_instance
mock_command.side_effect = fakes.execute_cmd_helm_client
self.kubernetes.scale(context=self.context, plugin=None,
auth_attr=utils.get_vim_auth_obj(),
policy=policy,
region_name=None)
@mock.patch.object(helm_client.HelmClient, '_execute_command')
@mock.patch.object(vim_client.VimClient, 'get_vim')
@mock.patch.object(objects.VnfInstance, "get_by_id")
def test_scale_out_with_ext_helmchart(self, mock_vnf_instance_get_by_id,
mock_get_vim, mock_command):
policy = fakes.get_scale_policy(type='out', aspect_id='vdu1_aspect',
vdu_name='myrelease-local-localhelm')
scale_status = objects.ScaleInfo(
aspect_id='vdu1_aspect', scale_level=1)
mock_get_vim.return_value = fakes.fake_k8s_vim_obj()
vim_connection_info = fakes.fake_vim_connection_info_with_extra()
instantiate_vnf_req = fakes.fake_inst_vnf_req_for_helmchart(
external=False, namespace='dummy_namespace')
vnf_instance = copy.deepcopy(self.vnf_instance)
vnf_instance.vim_connection_info = [vim_connection_info]
vnf_instance.scale_status = [scale_status]
vnf_instance.instantiated_vnf_info.additional_params = \
instantiate_vnf_req.additional_params
mock_vnf_instance_get_by_id.return_value = vnf_instance
mock_command.side_effect = fakes.execute_cmd_helm_client
self.kubernetes.scale(context=self.context, plugin=None,
auth_attr=utils.get_vim_auth_obj(),
policy=policy,
region_name=None)
@mock.patch.object(helm_client.HelmClient, '_execute_command')
@mock.patch.object(vim_client.VimClient, 'get_vim')
@mock.patch.object(objects.VnfInstance, "get_by_id")
def test_scale_in_less_than_min_replicas(self, mock_vnf_instance_get_by_id,
mock_get_vim, mock_command):
policy = fakes.get_scale_policy(type='in', aspect_id='vdu1_aspect',
vdu_name='myrelease-ext-mychart-ext')
scale_status = objects.ScaleInfo(
aspect_id='vdu1_aspect', scale_level=1)
mock_get_vim.return_value = fakes.fake_k8s_vim_obj()
vim_connection_info = fakes.fake_vim_connection_info_with_extra()
instantiate_vnf_req = fakes.fake_inst_vnf_req_for_helmchart(
local=False)
vnf_instance = copy.deepcopy(self.vnf_instance)
vnf_instance.vim_connection_info = [vim_connection_info]
vnf_instance.scale_status = [scale_status]
vnf_instance.instantiated_vnf_info.additional_params = \
instantiate_vnf_req.additional_params
mock_vnf_instance_get_by_id.return_value = vnf_instance
mock_command.side_effect = [['{"replicaCount":1}'], '']
exc = self.assertRaises(vnfm.CNFScaleFailed,
self.kubernetes.scale,
self.context, None, utils.get_vim_auth_obj(),
policy, None)
msg = ("CNF Scale Failed with reason: The number of target replicas "
"after scaling [0] is out of range")
self.assertEqual(msg, exc.format_message())
@mock.patch.object(helm_client.HelmClient, '_execute_command')
@mock.patch.object(vim_client.VimClient, 'get_vim')
@mock.patch.object(objects.VnfInstance, "get_by_id")
def test_scale_out_over_max_replicas(self, mock_vnf_instance_get_by_id,
mock_get_vim, mock_command):
policy = fakes.get_scale_policy(type='out', aspect_id='vdu1_aspect',
vdu_name='myrelease-local-localhelm')
scale_status = objects.ScaleInfo(
aspect_id='vdu1_aspect', scale_level=1)
mock_get_vim.return_value = fakes.fake_k8s_vim_obj()
vim_connection_info = fakes.fake_vim_connection_info_with_extra()
instantiate_vnf_req = fakes.fake_inst_vnf_req_for_helmchart(
external=False)
vnf_instance = copy.deepcopy(self.vnf_instance)
vnf_instance.vim_connection_info = [vim_connection_info]
vnf_instance.scale_status = [scale_status]
vnf_instance.instantiated_vnf_info.additional_params = \
instantiate_vnf_req.additional_params
mock_vnf_instance_get_by_id.return_value = vnf_instance
mock_command.side_effect = [['{"replicaCount":3}'], '']
exc = self.assertRaises(vnfm.CNFScaleFailed,
self.kubernetes.scale,
self.context, None, utils.get_vim_auth_obj(),
policy, None)
msg = ("CNF Scale Failed with reason: The number of target replicas "
"after scaling [4] is out of range")
self.assertEqual(msg, exc.format_message())

View File

@ -18,6 +18,7 @@ import time
import eventlet
from oslo_log import log as logging
from oslo_serialization import jsonutils
import paramiko
from tacker.common import cmd_executer
@ -26,11 +27,13 @@ from tacker.extensions import vnfm
LOG = logging.getLogger(__name__)
HELM_CMD_TIMEOUT = 30
HELM_INSTALL_TIMEOUT = 120
HELM_UPGRADE_TIMEOUT = 120
VALUE_SPLIT_CHARACTER = '.'
TRANSPORT_RETRIES = 2
TRANSPORT_WAIT = 15
class HelmClient(object):
class HelmClient():
"""Helm client for hosting containerized vnfs"""
def __init__(self, ip, username, password):
@ -52,8 +55,8 @@ class HelmClient(object):
ssh_command, input_data=None)
break
except eventlet.timeout.Timeout:
error_message = ('It is time out, When execute command: {}.'
.format(ssh_command))
error_message = ('It is time out, When execute command: '
f'{ssh_command}.')
LOG.debug(error_message)
retry -= 1
if retry < 0:
@ -71,12 +74,12 @@ class HelmClient(object):
def add_repository(self, repo_name, repo_url):
# execute helm repo add command
ssh_command = "helm repo add {} {}".format(repo_name, repo_url)
ssh_command = f"helm repo add {repo_name} {repo_url}"
self._execute_command(ssh_command)
def remove_repository(self, repo_name):
# execute helm repo remove command
ssh_command = "helm repo remove {}".format(repo_name)
ssh_command = f"helm repo remove {repo_name}"
self._execute_command(ssh_command)
def _transport_helmchart(self, source_path, target_path):
@ -102,10 +105,9 @@ class HelmClient(object):
def put_helmchart(self, source_path, target_dir):
# create helm chart directory and change permission
ssh_command = ("if [ ! -d {target_dir} ]; then "
"`sudo mkdir -p {target_dir}; "
"sudo chown -R {username} {target_dir};`; fi").format(
target_dir=target_dir, username=self.username)
ssh_command = (f"if [ ! -d {target_dir} ]; then "
f"`sudo mkdir -p {target_dir}; "
f"sudo chown -R {self.username} {target_dir};`; fi")
self._execute_command(ssh_command)
# get helm chart name and target path
chartfile_name = source_path[source_path.rfind(os.sep) + 1:]
@ -113,40 +115,75 @@ class HelmClient(object):
# transport helm chart file
self._transport_helmchart(source_path, target_path)
# decompress helm chart file
ssh_command = "tar -zxf {} -C {}".format(target_path, target_dir)
ssh_command = f"tar -zxf {target_path} -C {target_dir}"
self._execute_command(ssh_command)
def delete_helmchart(self, target_path):
# delete helm chart folder
ssh_command = "sudo rm -rf {}".format(target_path)
ssh_command = f"sudo rm -rf {target_path}"
self._execute_command(ssh_command)
def install(self, release_name, chart_name, namespace, parameters):
# execute helm install command
ssh_command = "helm install {} {}".format(release_name, chart_name)
ssh_command = f"helm install {release_name} {chart_name}"
if namespace:
ssh_command += " --namespace {}".format(namespace)
ssh_command = f"{ssh_command} --namespace {namespace}"
if parameters:
for param in parameters:
ssh_command += " --set {}".format(param)
ssh_command = f"{ssh_command} --set {param}"
self._execute_command(ssh_command, timeout=HELM_INSTALL_TIMEOUT)
def uninstall(self, release_name, namespace):
# execute helm uninstall command
ssh_command = "helm uninstall {}".format(release_name)
ssh_command = f"helm uninstall {release_name}"
if namespace:
ssh_command += " --namespace {}".format(namespace)
ssh_command = f"{ssh_command} --namespace {namespace}"
self._execute_command(ssh_command, timeout=HELM_INSTALL_TIMEOUT)
def get_manifest(self, release_name, namespace):
# execute helm get manifest command
ssh_command = "helm get manifest {}".format(release_name)
ssh_command = f"helm get manifest {release_name}"
if namespace:
ssh_command += " --namespace {}".format(namespace)
ssh_command = f"{ssh_command} --namespace {namespace}"
result = self._execute_command(ssh_command)
# convert manifest to text format
mf_content = ''.join(result)
return mf_content
def _get_values(self, release_name, namespace):
# execute helm get values command
ssh_command = f"helm get values {release_name} --all --output json"
if namespace:
ssh_command = f"{ssh_command} --namespace {namespace}"
result = self._execute_command(ssh_command)
# get values (json format) from result and convert to dict
values = jsonutils.loads(result[0])
return values
def get_value(self, release_name, namespace, value):
res = self._get_values(release_name, namespace)
# get specified value (loop for nested value: e.g. "foo.bar")
for val in value.split(VALUE_SPLIT_CHARACTER):
if isinstance(res, dict):
res = res.get(val)
else:
# if not of type dict, target value is not found
res = None
if res is None:
self.close_session()
LOG.error(f"{value} is not found in retrieved values.")
raise vnfm.HelmClientMissingParamsError(value=value)
return res
def upgrade_values(self, release_name, chart_name, namespace, parameters):
# execute helm upgrade command
ssh_command = (f"helm upgrade {release_name} {chart_name}"
" --reuse-values")
if namespace:
ssh_command = f"{ssh_command} --namespace {namespace}"
for param_key, param_val in parameters.items():
ssh_command = f"{ssh_command} --set {param_key}={param_val}"
self._execute_command(ssh_command, timeout=HELM_UPGRADE_TIMEOUT)
def close_session(self):
self.commander.close_session()

View File

@ -31,7 +31,6 @@ from tacker.common.container import kubernetes_utils
from tacker.common import exceptions
from tacker.common import log
from tacker.common import utils
from tacker.extensions import common_services as cs
from tacker.extensions import vnfm
from tacker import objects
from tacker.objects.fields import ErrorPoint as EP
@ -810,10 +809,13 @@ class Kubernetes(abstract_driver.VnfAbstractDriver,
def _get_helm_info(self, vim_connection_info):
# replace single quote to double quote
helm_info = vim_connection_info.extra.get('helm_info')
helm_info_dq = helm_info.replace("'", '"')
helm_info_dict = jsonutils.loads(helm_info_dq)
return helm_info_dict
helm_info = jsonutils.loads(
vim_connection_info.extra.get('helm_info')
.replace("'", '"'))
ips = helm_info.get('masternode_ip', [])
username = helm_info.get('masternode_username', '')
password = helm_info.get('masternode_password', '')
return ips, username, password
def _helm_uninstall(self, context, vnf_instance):
inst_vnf_info = vnf_instance.instantiated_vnf_info
@ -825,13 +827,10 @@ class Kubernetes(abstract_driver.VnfAbstractDriver,
vnf_instance.vim_connection_info)
vim_connection_info = objects.VimConnectionInfo.obj_from_primitive(
vim_info, context)
helm_info = self._get_helm_info(vim_connection_info)
ip_list = helm_info.get('masternode_ip')
username = helm_info.get('masternode_username')
password = helm_info.get('masternode_password')
ips, username, password = self._get_helm_info(vim_connection_info)
k8s_objs = []
# initialize HelmClient
helmclient = helm_client.HelmClient(ip_list[0], username, password)
helmclient = helm_client.HelmClient(ips[0], username, password)
for helm_inst_params in helm_inst_param_list:
release_name = helm_inst_params.get('helmreleasename')
# execute `helm uninstall` command
@ -1011,12 +1010,9 @@ class Kubernetes(abstract_driver.VnfAbstractDriver,
vnf_instance.vim_connection_info)
vim_connection_info = objects.VimConnectionInfo.obj_from_primitive(
vim_info, context)
helm_info = self._get_helm_info(vim_connection_info)
ip_list = helm_info.get('masternode_ip')
username = helm_info.get('masternode_username')
password = helm_info.get('masternode_password')
ips, username, password = self._get_helm_info(vim_connection_info)
del_dir = os.path.join(HELM_CHART_DIR_BASE, vnf_instance.id)
for ip in ip_list:
for ip in ips:
local_helm_del_flag = False
# initialize HelmClient
helmclient = helm_client.HelmClient(ip, username, password)
@ -1191,6 +1187,76 @@ class Kubernetes(abstract_driver.VnfAbstractDriver,
return response
def _helm_scale(self, context, vnf_instance, policy):
aspect_id = policy['name']
vdu_defs = policy['vdu_defs']
inst_additional_params = (vnf_instance.instantiated_vnf_info
.additional_params)
namespace = inst_additional_params.get('namespace', '')
helm_install_params = inst_additional_params.get(
'using_helm_install_param', [])
# Get releasename and chartname from Helm install params in Instantiate
# request parameter by using VDU properties name.
found_flag = False
for vdu_def in vdu_defs.values():
vdu_properties = vdu_def.get('properties')
for helm_install_param in helm_install_params:
if self._is_exthelmchart(helm_install_param):
chart_name = helm_install_param.get('helmchartname')
upgrade_chart_name = "/".join(
[helm_install_param.get('helmrepositoryname'),
chart_name])
else:
chartfile_path = helm_install_param.get(
'helmchartfile_path')
chartfile_name = chartfile_path[
chartfile_path.rfind(os.sep) + 1:]
chart_name = "-".join(chartfile_name.split("-")[:-1])
upgrade_chart_name = ("/var/tacker/helm/"
f"{vnf_instance.id}/{chart_name}")
release_name = helm_install_param.get('helmreleasename')
resource_name = "-".join([release_name, chart_name])
if resource_name == vdu_properties.get('name'):
found_flag = True
break
if found_flag:
break
# Prepare for scale operation
helm_replica_values = inst_additional_params.get('helm_replica_values')
replica_param = helm_replica_values.get(aspect_id)
vim_info = vnflcm_utils._get_vim(context,
vnf_instance.vim_connection_info)
vim_connection_info = objects.VimConnectionInfo.obj_from_primitive(
vim_info, context)
ips, username, password = self._get_helm_info(vim_connection_info)
# initialize HelmClient
helmclient = helm_client.HelmClient(ips[0], username, password)
# execute `helm get values` command to get current replicas
current_replicas = helmclient.get_value(
release_name, namespace, value=replica_param)
vdu_profile = vdu_properties.get('vdu_profile')
if policy['action'] == 'out':
scale_replicas = current_replicas + policy['delta_num']
elif policy['action'] == 'in':
scale_replicas = current_replicas - policy['delta_num']
# check if replica count is in min and man range defined in VNFD
max_replicas = vdu_profile.get('max_number_of_instances')
min_replicas = vdu_profile.get('min_number_of_instances')
if (scale_replicas < min_replicas) or (scale_replicas > max_replicas):
error_reason = ("The number of target replicas after"
f" scaling [{scale_replicas}] is out of range")
LOG.error(error_reason)
raise vnfm.CNFScaleFailed(reason=error_reason)
# execute scale processing (`helm upgrade` command)
upgrade_values = {replica_param: scale_replicas}
helmclient.upgrade_values(release_name, upgrade_chart_name,
namespace, parameters=upgrade_values)
helmclient.close_session()
return
@log.log
def scale(self, context, plugin, auth_attr, policy, region_name):
"""Scale function
@ -1206,6 +1272,13 @@ class Kubernetes(abstract_driver.VnfAbstractDriver,
# execute legacy scale method
self._scale_legacy(policy, auth_cred)
else:
vnf_instance = objects.VnfInstance.get_by_id(
context, policy['vnf_instance_id'])
# check use_helm flag
inst_vnf_info = vnf_instance.instantiated_vnf_info
if self._is_use_helm_flag(inst_vnf_info.additional_params):
self._helm_scale(context, vnf_instance, policy)
return
vnf_resources = objects.VnfResourceList.get_by_vnf_instance_id(
context, policy['vnf_instance_id'])
app_v1_api_client = self.kubernetes.get_app_v1_api_client(
@ -1498,24 +1571,24 @@ class Kubernetes(abstract_driver.VnfAbstractDriver,
return exthelmchart.lower() == 'true'
return bool(exthelmchart)
def _pre_helm_install(self, vim_connection_info,
def _pre_helm_install(self, context, vnf_instance, vim_connection_info,
instantiate_vnf_req, vnf_package_path):
def _check_param_exists(params_dict, check_param):
if check_param not in params_dict.keys():
LOG.error("{check_param} is not found".format(
check_param=check_param))
raise cs.InputValuesMissing(key=check_param)
raise exceptions.InvalidInput(missing_key_err_msg %
{"key": check_param})
missing_key_err_msg = ("Parameter input values missing for"
" the key '%(key)s'")
# check helm info in vim_connection_info
if 'helm_info' not in vim_connection_info.extra.keys():
reason = "helm_info is missing in vim_connection_info.extra."
LOG.error(reason)
raise vnfm.InvalidVimConnectionInfo(reason=reason)
helm_info = self._get_helm_info(vim_connection_info)
ip_list = helm_info.get('masternode_ip', [])
username = helm_info.get('masternode_username', '')
password = helm_info.get('masternode_username', '')
if not (ip_list and username and password):
ips, username, password = self._get_helm_info(vim_connection_info)
if not (ips and username and password):
reason = "content of helm_info is invalid."
LOG.error(reason)
raise vnfm.InvalidVimConnectionInfo(reason=reason)
@ -1527,7 +1600,8 @@ class Kubernetes(abstract_driver.VnfAbstractDriver,
'using_helm_install_param', [])
if not helm_install_param_list:
LOG.error("using_helm_install_param is empty.")
raise cs.InputValuesMissing(key='using_helm_install_param')
raise exceptions.InvalidInput(missing_key_err_msg %
{"key": "using_helm_install_param"})
for helm_install_params in helm_install_param_list:
# common parameter check
_check_param_exists(helm_install_params, 'exthelmchart')
@ -1548,6 +1622,18 @@ class Kubernetes(abstract_driver.VnfAbstractDriver,
path=chartfile_path))
raise vnfm.CnfDefinitionNotFound(path=chartfile_path)
# check parameters for scale operation
vnfd = vnflcm_utils.get_vnfd_dict(context, vnf_instance.vnfd_id,
instantiate_vnf_req.flavour_id)
tosca = tosca_template.ToscaTemplate(parsed_params={}, a_file=False,
yaml_dict_tpl=vnfd)
extract_policy_infos = vnflcm_utils.get_extract_policy_infos(tosca)
helm_replica_values = additional_params.get('helm_replica_values', {})
for aspect_id in extract_policy_infos['aspect_id_dict'].keys():
if aspect_id not in helm_replica_values.keys():
raise exceptions.InvalidInput(
f"Replica value for aspectId '{aspect_id}' is missing")
def _get_target_k8s_files(self, instantiate_vnf_req):
if instantiate_vnf_req.additional_params and\
CNF_TARGET_FILES_KEY in\
@ -1585,8 +1671,8 @@ class Kubernetes(abstract_driver.VnfAbstractDriver,
# check use_helm flag
if self._is_use_helm_flag(instantiate_vnf_req.additional_params):
# parameter check
self._pre_helm_install(
vim_connection_info, instantiate_vnf_req, vnf_package_path)
self._pre_helm_install(context, vnf_instance, vim_connection_info,
instantiate_vnf_req, vnf_package_path)
# NOTE: In case of using helm, vnf_resources is created
# after `helm install` command is executed.
return {}
@ -1657,13 +1743,10 @@ class Kubernetes(abstract_driver.VnfAbstractDriver,
namespace = additional_params.get('namespace', '')
helm_inst_param_list = additional_params.get(
'using_helm_install_param')
helm_info = self._get_helm_info(vim_connection_info)
ip_list = helm_info.get('masternode_ip')
username = helm_info.get('masternode_username')
password = helm_info.get('masternode_password')
ips, username, password = self._get_helm_info(vim_connection_info)
vnf_resources = []
k8s_objs = []
for ip_idx, ip in enumerate(ip_list):
for ip_idx, ip in enumerate(ips):
# initialize HelmClient
helmclient = helm_client.HelmClient(ip, username, password)
for inst_params in helm_inst_param_list:
@ -1770,13 +1853,10 @@ class Kubernetes(abstract_driver.VnfAbstractDriver,
namespace = additional_params.get('namespace', '')
helm_inst_param_list = additional_params.get(
'using_helm_install_param')
helm_info = self._get_helm_info(vim_connection_info)
ip_list = helm_info.get('masternode_ip')
username = helm_info.get('masternode_username')
password = helm_info.get('masternode_password')
ips, username, password = self._get_helm_info(vim_connection_info)
k8s_objs = []
# initialize HelmClient
helmclient = helm_client.HelmClient(ip_list[0], username, password)
helmclient = helm_client.HelmClient(ips[0], username, password)
for helm_inst_params in helm_inst_param_list:
release_name = helm_inst_params.get('helmreleasename')
# get manifest by using `helm get manifest` command