diff --git a/releasenotes/notes/database_synchronaization-ed47b552227892cf.yaml b/releasenotes/notes/database_synchronaization-ed47b552227892cf.yaml new file mode 100644 index 000000000..c74c9c4cf --- /dev/null +++ b/releasenotes/notes/database_synchronaization-ed47b552227892cf.yaml @@ -0,0 +1,13 @@ +--- +features: + - | + This patch adds the ability to periodically synchronize resources + in K8s VIM and Tacker DB. + There is no interface on the K8s to notify Tacker when the auto-scale or + auto-heal of a pod is running and the pod information is updated. + This can lead to inconsistencies between the Pod information in Tacker + database and the Pod information running on the actual K8s. + This function periodically checks the pod information in the Tacker + database and the pod information in the K8s, and updates the information + in the Tacker database according to the K8s side if there is + any inconsistency. diff --git a/tacker/conductor/conductor_server.py b/tacker/conductor/conductor_server.py index 23c2b2fc7..ccb3af4d0 100644 --- a/tacker/conductor/conductor_server.py +++ b/tacker/conductor/conductor_server.py @@ -22,6 +22,7 @@ import oslo_messaging import requests import shutil import sys +import threading import time import traceback import yaml @@ -30,6 +31,7 @@ from glance_store import exceptions as store_exceptions from oslo_config import cfg from oslo_log import log as logging from oslo_serialization import jsonutils +from oslo_service import loopingcall from oslo_service import periodic_task from oslo_service import service from oslo_utils import encodeutils @@ -75,6 +77,10 @@ from tacker.vnfm import nfvo_client from tacker.vnfm import plugin CONF = tacker.conf.CONF +# NOTE(fengyi): After the conductor is started, since the start-up process +# is being executed, it should take a while to start the actual DB +# synchronization periodical process. +DB_SYNC_INITIAL_DELAY = 60 # NOTE(tpatil): keystone_authtoken opts registered explicitly as conductor # service doesn't use the keystonemiddleware.authtoken middleware as it's @@ -292,6 +298,14 @@ def grant_error_common(function): return decorated_function +def async_call(func): + def inner(*args, **kwargs): + th = threading.Thread(target=func, args=args, + kwargs=kwargs, daemon=True) + th.start() + return inner + + class Conductor(manager.Manager, v2_hook.ConductorV2Hook): def __init__(self, host, conf=None): if conf: @@ -304,6 +318,15 @@ class Conductor(manager.Manager, v2_hook.ConductorV2Hook): self.vnf_manager = driver_manager.DriverManager( 'tacker.tacker.vnfm.drivers', cfg.CONF.tacker.infra_driver) + self._periodic_call() + + @async_call + def _periodic_call(self): + self.periodic = loopingcall.FixedIntervalLoopingCall( + self._sync_db) + self.periodic.start( + interval=CONF.db_synchronization_interval, + initial_delay=DB_SYNC_INITIAL_DELAY) def start(self): coordination.COORDINATOR.start() @@ -2506,6 +2529,13 @@ class Conductor(manager.Manager, v2_hook.ConductorV2Hook): error_point=vnf_dict['current_error_point'] ) + def _sync_db(self): + """Periodic database update invocation method""" + LOG.debug("Starting _sync_db") + context = t_context.get_admin_context() + self.vnflcm_driver.sync_db(context) + LOG.debug("Ended _sync_db") + def init(args, **kwargs): CONF(args=args, project='tacker', diff --git a/tacker/conf/conductor.py b/tacker/conf/conductor.py index f581a8a3c..2b2364c2e 100644 --- a/tacker/conf/conductor.py +++ b/tacker/conf/conductor.py @@ -23,6 +23,10 @@ interval_opts = [ default=1800, help=_('Seconds between running periodic tasks ' 'to cleanup residues of deleted vnf packages')), + cfg.IntOpt('db_synchronization_interval', + default=300, + help=_('Interval time in sec for DB sync between ' + 'Tacker and Kubernetes VIMs')), ] diff --git a/tacker/db/vnfm/vnfm_db.py b/tacker/db/vnfm/vnfm_db.py index 7878f22fd..4e42f11d3 100644 --- a/tacker/db/vnfm/vnfm_db.py +++ b/tacker/db/vnfm/vnfm_db.py @@ -37,6 +37,7 @@ from tacker.db import db_base from tacker.db.db_sqlalchemy import models from tacker.db import model_base from tacker.db import models_v1 +from tacker.db.nfvo import nfvo_db # noqa: F401 from tacker.db.nfvo import ns_db from tacker.db import types from tacker.extensions import vnfm diff --git a/tacker/objects/fields.py b/tacker/objects/fields.py index a8f0908c1..fdfa33f28 100644 --- a/tacker/objects/fields.py +++ b/tacker/objects/fields.py @@ -139,8 +139,10 @@ class VnfInstanceTaskState(BaseTackerEnum): TERMINATING = 'TERMINATING' SCALING = 'SCALING' ERROR = 'ERROR' + DB_SYNCHRONIZING = 'DB_SYNCHRONIZING' - ALL = (INSTANTIATING, HEALING, TERMINATING, SCALING, ERROR) + ALL = (INSTANTIATING, HEALING, TERMINATING, + SCALING, ERROR, DB_SYNCHRONIZING) class VnfInstanceTaskStateField(BaseEnumField): diff --git a/tacker/sol_refactored/common/exceptions.py b/tacker/sol_refactored/common/exceptions.py index d51084604..c744b5172 100644 --- a/tacker/sol_refactored/common/exceptions.py +++ b/tacker/sol_refactored/common/exceptions.py @@ -347,6 +347,14 @@ class InvalidPagingMarker(SolHttpError400): message = _("Paging marker value %(marker)s is invalid.") +class DbSyncNoDiff(Exception): + pass + + +class DbSyncFailed(Exception): + pass + + class K8sOperationFailed(SolHttpError422): # title and detail are set in the code from kubernetes operation pass diff --git a/tacker/sol_refactored/common/vnfd_utils.py b/tacker/sol_refactored/common/vnfd_utils.py index fbcd9f418..303ebe230 100644 --- a/tacker/sol_refactored/common/vnfd_utils.py +++ b/tacker/sol_refactored/common/vnfd_utils.py @@ -459,3 +459,14 @@ class Vnfd(object): mani_artifact_files.extend(meta_artifacts_files) return mani_artifact_files + + def get_initial_delta(self, flavour_id, vdu_id): + initial_deltas = self.get_policy_values_by_type(flavour_id, + 'tosca.policies.nfv.VduInitialDelta') + for aspect in initial_deltas: + if vdu_id in aspect.get('targets', []): + initial_delta = (aspect.get('properties', {}) + .get('initial_delta', {}) + .get('number_of_instances')) + break + return initial_delta diff --git a/tacker/sol_refactored/conductor/conductor_v2.py b/tacker/sol_refactored/conductor/conductor_v2.py index fa1d178dc..46c00949b 100644 --- a/tacker/sol_refactored/conductor/conductor_v2.py +++ b/tacker/sol_refactored/conductor/conductor_v2.py @@ -13,7 +13,11 @@ # License for the specific language governing permissions and limitations # under the License. +import threading + from oslo_log import log as logging +from oslo_service import loopingcall +from oslo_utils import encodeutils from tacker.common import log from tacker import context as tacker_context @@ -35,6 +39,18 @@ from tacker.sol_refactored.objects.v2 import fields LOG = logging.getLogger(__name__) CONF = config.CONF +# NOTE(fengyi): After the conductor is started, since the start-up process +# is being executed, it should take a while to start the actual DB +# synchronization periodical process. +DB_SYNC_INITIAL_DELAY = 60 + + +def async_call(func): + def inner(*args, **kwargs): + th = threading.Thread(target=func, args=args, + kwargs=kwargs, daemon=True) + th.start() + return inner class ConductorV2(object): @@ -49,6 +65,15 @@ class ConductorV2(object): self.sn_driver = sdrv.ServerNotificationDriver.instance() self._change_lcm_op_state() + self._periodic_call() + + @async_call + def _periodic_call(self): + self.periodic = loopingcall.FixedIntervalLoopingCall( + self._sync_db) + self.periodic.start(interval=CONF.db_synchronization_interval, + initial_delay=DB_SYNC_INITIAL_DELAY) + def _change_lcm_op_state(self): # NOTE: If the conductor down during processing and # the LcmOperationState STARTING/PROCESSING/ROLLING_BACK remain, @@ -321,6 +346,37 @@ class ConductorV2(object): self.nfvo_client.send_lcmocc_notification(context, lcmocc, inst, self.endpoint) + def _sync_db(self): + """Periodic database update invocation method(v2 api)""" + LOG.debug("Starting _sync_db") + context = tacker_context.get_admin_context() + + vnf_instances = objects.VnfInstanceV2.get_by_filter( + context, instantiationState='INSTANTIATED') + for inst in vnf_instances: + try: + vim_info = inst_utils.select_vim_info(inst.vimConnectionInfo) + self.vnflcm_driver.diff_check_inst(inst, vim_info) + self._sync_inst(context, inst, vim_info) + except sol_ex.DbSyncNoDiff: + continue + except sol_ex.DbSyncFailed as e: + LOG.error("%s: %s", e.__class__.__name__, e.args[0]) + except sol_ex.OtherOperationInProgress: + LOG.info("There is an LCM operation in progress, so " + f"skip this DB synchronization. vnf: {inst.id}.") + except Exception as e: + LOG.error(f"Failed to synchronize database vnf: {inst.id} " + f"Error: {encodeutils.exception_to_unicode(e)}") + LOG.debug("Ended _sync_db") + + @coordinate.lock_vnf_instance('{inst.id}') + def _sync_inst(self, context, inst, vim_info): + vnf_inst = inst_utils.get_inst(context, inst.id) + self.vnflcm_driver.sync_db( + context, vnf_inst, vim_info) + vnf_inst.update(context) + def store_alarm_info(self, context, alarm): self.vnffm_driver.store_alarm_info(context, alarm) diff --git a/tacker/sol_refactored/conductor/vnflcm_driver_v2.py b/tacker/sol_refactored/conductor/vnflcm_driver_v2.py index 643b3d5b9..87d36e847 100644 --- a/tacker/sol_refactored/conductor/vnflcm_driver_v2.py +++ b/tacker/sol_refactored/conductor/vnflcm_driver_v2.py @@ -1129,3 +1129,29 @@ class VnfLcmDriverV2(object): else: # should not occur raise sol_ex.SolException(sol_detail='not support vim type') + + def sync_db(self, context, vnf_inst, vim_info): + # Database synchronization works only when + # the vimType is kubernetes or ETSINFV.HELM.V_3 + if vim_info.vimType == 'kubernetes': + driver = kubernetes.Kubernetes() + driver.sync_db(context, vnf_inst, vim_info) + elif vim_info.vimType == 'ETSINFV.HELM.V_3': + driver = helm.Helm() + driver.sync_db(context, vnf_inst, vim_info) + else: + # Only support CNF + raise sol_ex.DbSyncNoDiff( + "There are no differences in Vnfc resources.") + + def diff_check_inst(self, vnf_inst, vim_info): + if vim_info.vimType == 'kubernetes': + driver = kubernetes.Kubernetes() + driver.diff_check_inst(vnf_inst, vim_info) + elif vim_info.vimType == 'ETSINFV.HELM.V_3': + driver = helm.Helm() + driver.diff_check_inst(vnf_inst, vim_info) + else: + # Only support CNF + raise sol_ex.DbSyncNoDiff( + "There are no differences in Vnfc resources.") diff --git a/tacker/sol_refactored/infra_drivers/kubernetes/kubernetes.py b/tacker/sol_refactored/infra_drivers/kubernetes/kubernetes.py index 5b5b6f22d..104c88af2 100644 --- a/tacker/sol_refactored/infra_drivers/kubernetes/kubernetes.py +++ b/tacker/sol_refactored/infra_drivers/kubernetes/kubernetes.py @@ -13,8 +13,6 @@ # License for the specific language governing permissions and limitations # under the License. -import re - from kubernetes import client from oslo_log import log as logging @@ -305,27 +303,4 @@ class Kubernetes(kubernetes_common.KubernetesCommon): ) def _is_match_pod_naming_rule(self, rsc_kind, rsc_name, pod_name): - match_result = None - if rsc_kind == 'Pod': - # Expected example: name - if rsc_name == pod_name: - return True - elif rsc_kind == 'Deployment': - # Expected example: name-012789abef-019az - # NOTE(horie): The naming rule of Pod in deployment is - # "(deployment name)-(pod template hash)-(5 charactors)". - # The "pod template hash" string is generated from 32 bit hash. - # This may be from 1 to 10 caracters but not sure the lower limit - # from the source code of Kubernetes. - match_result = re.match( - rsc_name + '-([0-9a-f]{1,10})-([0-9a-z]{5})+$', pod_name) - elif rsc_kind in ('ReplicaSet', 'DaemonSet'): - # Expected example: name-019az - match_result = re.match(rsc_name + '-([0-9a-z]{5})+$', pod_name) - elif rsc_kind == 'StatefulSet': - # Expected example: name-0 - match_result = re.match(rsc_name + '-[0-9]+$', pod_name) - if match_result: - return True - - return False + return self.is_match_pod_naming(rsc_kind, rsc_name, pod_name) diff --git a/tacker/sol_refactored/infra_drivers/kubernetes/kubernetes_common.py b/tacker/sol_refactored/infra_drivers/kubernetes/kubernetes_common.py index 03ed73286..22e01a4bb 100644 --- a/tacker/sol_refactored/infra_drivers/kubernetes/kubernetes_common.py +++ b/tacker/sol_refactored/infra_drivers/kubernetes/kubernetes_common.py @@ -13,6 +13,10 @@ # License for the specific language governing permissions and limitations # under the License. +import copy +import operator +import re + from oslo_log import log as logging from oslo_service import loopingcall @@ -21,6 +25,7 @@ from tacker.sol_refactored.common import exceptions as sol_ex from tacker.sol_refactored.common import vnf_instance_utils as inst_utils from tacker.sol_refactored.infra_drivers.kubernetes import kubernetes_resource from tacker.sol_refactored.infra_drivers.kubernetes import kubernetes_utils +from tacker.sol_refactored.nfvo import nfvo_client from tacker.sol_refactored import objects @@ -214,3 +219,175 @@ class KubernetesCommon(object): check_reses = set(k8s_reses) self._check_status(_check_updated, check_reses, k8s_api_client, namespace, old_pods_names) + + def diff_check_inst(self, inst, vim_info): + inst_tmp = copy.deepcopy(inst) + self.diff_check_and_update_vnfc(inst_tmp, vim_info) + + def diff_check_and_update_vnfc(self, vnf_instance, vim_info): + with kubernetes_utils.AuthContextManager(vim_info) as acm: + k8s_api_client = acm.init_k8s_api_client() + old_pods_names = { + vnfc.computeResource.resourceId for vnfc in + vnf_instance.instantiatedVnfInfo.vnfcResourceInfo} + self._update_vnfc_info(vnf_instance, k8s_api_client) + new_pods_names = { + vnfc.computeResource.resourceId for vnfc in + vnf_instance.instantiatedVnfInfo.vnfcResourceInfo} + if operator.eq(old_pods_names, new_pods_names): + raise sol_ex.DbSyncNoDiff( + "There are no differences in Vnfc resources.") + + def sync_db(self, context, vnf_instance, vim_info): + self.diff_check_and_update_vnfc(vnf_instance, vim_info) + + vdu_id_list = self._get_vdu_list(vnf_instance) + for vdu_id in vdu_id_list: + resource_type = "" + resource_name = "" + # get pod information + for vnfc in vnf_instance.instantiatedVnfInfo.vnfcResourceInfo: + if vnfc.vduId == vdu_id: + resource_type = (vnfc.computeResource + .vimLevelResourceType) + resource_name = self._get_resource_name( + vnfc.computeResource.resourceId, resource_type) + break + pod_resources_from_k8s = self._get_pod_information( + resource_name, resource_type, vnf_instance, vim_info) + + vnfcs = [vnfc for vnfc in + vnf_instance.instantiatedVnfInfo.vnfcResourceInfo if + vnfc.vduId == vdu_id] + replicas = vnf_instance.instantiatedVnfInfo.metadata[ + 'vdu_reses'][vdu_id]['spec'].get('replicas') + if replicas is not None: + vnf_instance.instantiatedVnfInfo.metadata[ + 'vdu_reses'][vdu_id]['spec']['replicas'] = len(vnfcs) + + self._calc_scale_level( + context, vnf_instance, vdu_id, + len(pod_resources_from_k8s)) + + def _calc_scale_level(self, context, vnf_instance, + vdu_id, current_pod_num): + """calc scale_level and set""" + aspect_id = "" + flavour_id = vnf_instance.instantiatedVnfInfo.flavourId + client = nfvo_client.NfvoClient() + vnfd = client.get_vnfd(context, vnf_instance.vnfdId) + for aspect_delta in vnfd.get_policy_values_by_type(flavour_id, + 'tosca.policies.nfv.VduScalingAspectDeltas'): + if vdu_id in aspect_delta.get('targets', []): + aspect_id = aspect_delta.get('properties', {}).get('aspect') + break + if not aspect_id: + return + + delta = vnfd.get_scale_vdu_and_num(flavour_id, aspect_id).get(vdu_id) + initial_delta = vnfd.get_initial_delta(flavour_id, vdu_id) + + if (current_pod_num - initial_delta) % delta != 0: + raise sol_ex.DbSyncFailed( + "Error computing 'scale_level'. current Pod num: " + f"{current_pod_num} delta: {delta}. vnf: {vnf_instance.id} " + f"vdu: {vdu_id}") + + scale_level = (current_pod_num - initial_delta) // delta + + for inst_vnf in vnf_instance.instantiatedVnfInfo.scaleStatus: + if inst_vnf.aspectId == aspect_id: + inst_vnf.scaleLevel = scale_level + break + + self._check_pod_range( + vnf_instance, vnfd, vdu_id, flavour_id, current_pod_num) + + def _check_pod_range(self, vnf_instance, vnfd, vdu_id, + flavour_id, current_pod_num): + """Check the range of the maximum or minimum number of pods. + + If it finds out of range, output a error log and do not update + database. + """ + vdu_profile = (vnfd.get_vdu_nodes(flavour_id) + .get(vdu_id, {}) + .get("properties", {}) + .get("vdu_profile", {})) + max_number_of_instances = vdu_profile.get("max_number_of_instances") + min_number_of_instances = vdu_profile.get("min_number_of_instances") + + if (current_pod_num > max_number_of_instances + or current_pod_num < min_number_of_instances): + raise sol_ex.DbSyncFailed( + f"Failed to update database vnf {vnf_instance.id} " + f"vdu: {vdu_id}. Pod num is out of range. " + f"pod_num: {current_pod_num}") + + def _get_pod_information(self, resource_name, + resource_type, vnf_instance, vim_connection_info): + """Extract a Pod starting with the specified 'resource_name' name""" + namespace = vnf_instance.metadata.get('namespace') + if not namespace: + namespace = "default" + + with kubernetes_utils.AuthContextManager(vim_connection_info) as acm: + k8s_api_client = acm.init_k8s_api_client() + all_pods = kubernetes_utils.list_namespaced_pods( + k8s_api_client, namespace) + + resource_pods = {} + + for pod in all_pods: + if self.is_match_pod_naming(resource_type, + resource_name, pod.metadata.name): + resource_pods[pod.metadata.name] = pod.metadata.to_dict() + return resource_pods + + def _get_vdu_list(self, vnf_instance): + """get vdu_list""" + vdu_id_list = set() + + for vnfc in vnf_instance.instantiatedVnfInfo.vnfcResourceInfo: + vdu_id_list.add(vnfc.vduId) + return vdu_id_list + + def _get_resource_name(self, resource_id, resource_type): + """get resource name""" + if resource_type == 'Pod': + resource_name = resource_id + else: + name_list = resource_id.split("-") + if resource_type == 'Deployment': + del name_list[-2:] + elif resource_type in ('ReplicaSet', 'DaemonSet', 'StatefulSet'): + del name_list[-1] + resource_name = '-'.join(name_list) + + return resource_name + + def is_match_pod_naming(self, rsc_kind, rsc_name, pod_name): + match_result = None + if rsc_kind == 'Pod': + # Expected example: name + if rsc_name == pod_name: + return True + elif rsc_kind == 'Deployment': + # Expected example: name-012789abef-019az + # NOTE(horie): The naming rule of Pod in deployment is + # "(deployment name)-(pod template hash)-(5 charactors)". + # The "pod template hash" string is generated from 32 bit hash. + # This may be from 1 to 10 caracters but not sure the lower limit + # from the source code of Kubernetes. + match_result = re.match( + rsc_name + '-([0-9a-f]{1,10})-([0-9a-z]{5})+$', pod_name) + elif rsc_kind in ('ReplicaSet', 'DaemonSet'): + # Expected example: name-019az + match_result = re.match(rsc_name + '-([0-9a-z]{5})+$', pod_name) + elif rsc_kind == 'StatefulSet': + # Expected example: name-0 + match_result = re.match(rsc_name + '-[0-9]+$', pod_name) + if match_result: + return True + + return False diff --git a/tacker/tests/unit/conductor/test_conductor_server.py b/tacker/tests/unit/conductor/test_conductor_server.py index 551d0a5f1..4fcbcc7c7 100644 --- a/tacker/tests/unit/conductor/test_conductor_server.py +++ b/tacker/tests/unit/conductor/test_conductor_server.py @@ -3987,3 +3987,7 @@ class TestConductor(SqlTestCase, unit_base.FixturedTestCase): mock_change_vnf_status.assert_not_called() self.assertEqual(mock_change_ext_conn_grant.call_count, 0) mock_update_vnf_attributes.assert_called_once() + + def test_sync_db(self): + self.conductor._sync_db() + self.vnflcm_driver.sync_db.assert_called_once() diff --git a/tacker/tests/unit/sol_refactored/conductor/test_conductor_v2.py b/tacker/tests/unit/sol_refactored/conductor/test_conductor_v2.py index 87886a7da..7bae7e0a1 100644 --- a/tacker/tests/unit/sol_refactored/conductor/test_conductor_v2.py +++ b/tacker/tests/unit/sol_refactored/conductor/test_conductor_v2.py @@ -17,7 +17,10 @@ from datetime import datetime from unittest import mock import ddt +from kubernetes import client +from oslo_log import log as logging from oslo_utils import uuidutils +from tooz.drivers import file from tacker import context from tacker.sol_refactored.common import exceptions as sol_ex @@ -28,6 +31,11 @@ from tacker.sol_refactored.nfvo import nfvo_client from tacker.sol_refactored import objects from tacker.sol_refactored.objects.v2 import fields from tacker.tests.unit.db import base as db_base +from tacker.tests.unit.sol_refactored.infra_drivers.kubernetes import fakes +from tacker.vnfm.infra_drivers.kubernetes import kubernetes_driver + + +CNF_SAMPLE_VNFD_ID = "b1bb0ce7-ebca-4fa7-95ed-4840d70a1177" @ddt.ddt @@ -629,3 +637,97 @@ class TestConductorV2(db_base.SqlTestCase): result = self.conductor.modify_vnfinfo(self.context, lcmocc.id) self.assertEqual(None, result) + + @mock.patch.object(vnflcm_driver_v2.VnfLcmDriverV2, + 'sync_db') + @mock.patch.object(kubernetes_driver.Kubernetes, + '_check_pod_information') + @mock.patch.object(client.CoreV1Api, 'list_namespaced_pod') + @mock.patch.object(objects.base.TackerPersistentObject, "get_by_filter") + def test_sync_db( + self, mock_db_sync, mock_get_by_filters, + mock_list_namespaced_pod, mock_check_pod_information): + vnf_instance_obj = fakes.fake_vnf_instance() + vnf_instance_obj.id = CNF_SAMPLE_VNFD_ID + vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info( + vdu_id='VDU1', rsc_kind='Pod') + vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [ + vnfc_rsc_info_obj1 + ] + vim_connection_object = fakes.fake_vim_connection_info() + vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object + + mock_get_by_filters.return_value = [ + vnf_instance_obj, vnf_instance_obj] + mock_list_namespaced_pod.return_value = client.V1PodList( + items=[fakes.get_fake_pod_info(kind='Pod')]) + mock_check_pod_information.return_value = True + self.conductor._sync_db() + mock_db_sync.assert_called_once() + + @mock.patch.object(kubernetes_driver.Kubernetes, + '_check_pod_information') + @mock.patch.object(client.CoreV1Api, 'list_namespaced_pod') + @mock.patch.object(objects.base.TackerPersistentObject, "get_by_filter") + def test_sync_db_exception( + self, mock_get_by_filters, mock_list_namespaced_pod, + mock_check_pod_information): + vnf_instance_obj = fakes.fake_vnf_instance() + vnf_instance_obj.id = CNF_SAMPLE_VNFD_ID + vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info( + vdu_id='VDU1', rsc_kind='Pod') + vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [ + vnfc_rsc_info_obj1 + ] + vim_connection_object = fakes.fake_vim_connection_info() + vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object + + mock_get_by_filters.return_value = [ + vnf_instance_obj, vnf_instance_obj] + mock_list_namespaced_pod.return_value = client.V1PodList( + items=[fakes.get_fake_pod_info(kind='Pod')]) + mock_check_pod_information.return_value = True + + log_name = "tacker.sol_refactored.conductor.conductor_v2" + with self.assertLogs(logger=log_name, level=logging.DEBUG) as cm: + self.conductor._sync_db() + + msg = (f'ERROR:{log_name}:Failed to synchronize database vnf: ' + f'{vnf_instance_obj.id} Error: ') + self.assertIn(f'{msg}', cm.output[1]) + + @mock.patch.object(file.FileLock, 'acquire') + @mock.patch.object(kubernetes_driver.Kubernetes, + '_check_pod_information') + @mock.patch.object(client.CoreV1Api, 'list_namespaced_pod') + @mock.patch.object(objects.base.TackerPersistentObject, "get_by_filter") + def test_sync_db_sol_ex( + self, mock_get_by_filters, mock_list_namespaced_pod, + mock_check_pod_information, mock_acquire): + vnf_instance_obj = fakes.fake_vnf_instance() + # vnf_instance_obj.id = CNF_SAMPLE_VNFD_ID + vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info( + vdu_id='VDU1', rsc_kind='Pod') + vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [ + vnfc_rsc_info_obj1 + ] + vim_connection_object = fakes.fake_vim_connection_info() + vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object + + vnf_instance_obj1 = fakes.fake_vnf_instance() + vnf_instance_obj1.vimConnectionInfo['vim1'] = vim_connection_object + mock_get_by_filters.return_value = [ + vnf_instance_obj] + mock_list_namespaced_pod.return_value = client.V1PodList( + items=[fakes.get_fake_pod_info(kind='Pod')]) + mock_check_pod_information.return_value = True + mock_acquire.return_value = False + + log_name = "tacker.sol_refactored.conductor.conductor_v2" + with self.assertLogs(logger=log_name, level=logging.DEBUG) as cm: + self.conductor._sync_db() + + msg = (f'INFO:{log_name}:There is an LCM operation in progress, ' + f'so skip this DB synchronization. ' + f'vnf: {vnf_instance_obj.id}.') + self.assertIn(f'{msg}', cm.output) diff --git a/tacker/tests/unit/sol_refactored/conductor/test_vnflcm_driver_v2.py b/tacker/tests/unit/sol_refactored/conductor/test_vnflcm_driver_v2.py index 6a0972244..4097e0fac 100644 --- a/tacker/tests/unit/sol_refactored/conductor/test_vnflcm_driver_v2.py +++ b/tacker/tests/unit/sol_refactored/conductor/test_vnflcm_driver_v2.py @@ -17,6 +17,7 @@ from datetime import datetime import os from unittest import mock +from kubernetes import client from oslo_utils import uuidutils from tacker import context @@ -31,6 +32,7 @@ from tacker.sol_refactored.nfvo import nfvo_client from tacker.sol_refactored import objects from tacker.sol_refactored.objects.v2 import fields from tacker.tests import base +from tacker.tests.unit.sol_refactored.infra_drivers.kubernetes import fakes CNF_SAMPLE_VNFD_ID = "b1bb0ce7-ebca-4fa7-95ed-4840d70a1177" @@ -3276,3 +3278,111 @@ class TestVnfLcmDriverV2(base.BaseTestCase): self.assertRaises( sol_ex.SolException, self.driver.change_vnfpkg_rollback, self.context, lcmocc, inst, grant_req, grant, self.vnfd_1) + + @mock.patch.object(nfvo_client.NfvoClient, 'get_vnfd') + @mock.patch.object(client.CoreV1Api, 'list_namespaced_pod') + def test_sync_db_kubernetes( + self, mock_list_namespaced_pod, mock_get_vnfd): + vnf_instance_obj = fakes.fake_vnf_instance() + + vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info( + vdu_id='VDU1', rsc_kind='Deployment', + pod_name="vdu1-1234567890-abcd", rsc_name="vdu1") + + vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [ + vnfc_rsc_info_obj1 + ] + vim_connection_object = fakes.fake_vim_connection_info() + vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object + + mock_list_namespaced_pod.return_value = client.V1PodList( + items=[ + fakes.get_fake_pod_info( + kind='Deployment', pod_name="vdu1-1234567890-abcd1"), + fakes.get_fake_pod_info( + kind='Deployment', pod_name="vdu1-1234567890-abcd2")]) + mock_get_vnfd.return_value = self.vnfd_1 + vnf_instance_obj.vnfdId = uuidutils.generate_uuid() + vnf_instance_obj.instantiatedVnfInfo.scaleStatus = [ + fakes.fake_scale_status(vnfd_id=vnf_instance_obj.vnfdId) + ] + + self.driver.sync_db( + context=self.context, vnf_inst=vnf_instance_obj, + vim_info=vim_connection_object) + + self.assertEqual( + 2, vnf_instance_obj.instantiatedVnfInfo.metadata[ + 'vdu_reses']['VDU1']['spec']['replicas']) + + @mock.patch.object(nfvo_client.NfvoClient, 'get_vnfd') + @mock.patch.object(client.CoreV1Api, 'list_namespaced_pod') + def test_sync_db_helm( + self, mock_list_namespaced_pod, mock_get_vnfd): + vnf_instance_obj = fakes.fake_vnf_instance() + + vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info( + vdu_id='VDU1', rsc_kind='Deployment', + pod_name="vdu1-1234567890-abcd", rsc_name="vdu1") + + vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [ + vnfc_rsc_info_obj1 + ] + vim_connection_object = fakes.fake_vim_connection_info( + vim_type='ETSINFV.HELM.V_3') + vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object + + mock_list_namespaced_pod.return_value = client.V1PodList( + items=[ + fakes.get_fake_pod_info( + kind='Deployment', pod_name="vdu1-1234567890-abcd1"), + fakes.get_fake_pod_info( + kind='Deployment', pod_name="vdu1-1234567890-abcd2")]) + mock_get_vnfd.return_value = self.vnfd_1 + vnf_instance_obj.vnfdId = uuidutils.generate_uuid() + vnf_instance_obj.instantiatedVnfInfo.scaleStatus = [ + fakes.fake_scale_status(vnfd_id=vnf_instance_obj.vnfdId) + ] + + self.driver.sync_db( + context=self.context, vnf_inst=vnf_instance_obj, + vim_info=vim_connection_object) + + self.assertEqual( + 2, vnf_instance_obj.instantiatedVnfInfo.metadata[ + 'vdu_reses']['VDU1']['spec']['replicas']) + + @mock.patch.object(nfvo_client.NfvoClient, 'get_vnfd') + @mock.patch.object(client.CoreV1Api, 'list_namespaced_pod') + def test_sync_db_not_support( + self, mock_list_namespaced_pod, mock_get_vnfd): + vnf_instance_obj = fakes.fake_vnf_instance() + + vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info( + vdu_id='VDU1', rsc_kind='Deployment', + pod_name="vdu1-1234567890-abcd", rsc_name="vdu1") + + vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [ + vnfc_rsc_info_obj1 + ] + vim_connection_object = fakes.fake_vim_connection_info( + vim_type="openstack") + vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object + + mock_list_namespaced_pod.return_value = client.V1PodList( + items=[ + fakes.get_fake_pod_info( + kind='Deployment', pod_name="vdu1-1234567890-abcd1"), + fakes.get_fake_pod_info( + kind='Deployment', pod_name="vdu1-1234567890-abcd2")]) + mock_get_vnfd.return_value = self.vnfd_1 + vnf_instance_obj.vnfdId = uuidutils.generate_uuid() + vnf_instance_obj.instantiatedVnfInfo.scaleStatus = [ + fakes.fake_scale_status(vnfd_id=vnf_instance_obj.vnfdId) + ] + + ex = self.assertRaises( + sol_ex.DbSyncNoDiff, self.driver.sync_db, + self.context, vnf_instance_obj, vim_connection_object) + self.assertEqual( + "There are no differences in Vnfc resources.", ex.args[0]) diff --git a/tacker/tests/unit/sol_refactored/infra_drivers/kubernetes/fakes.py b/tacker/tests/unit/sol_refactored/infra_drivers/kubernetes/fakes.py index 7978a6edf..1a6bbbf39 100644 --- a/tacker/tests/unit/sol_refactored/infra_drivers/kubernetes/fakes.py +++ b/tacker/tests/unit/sol_refactored/infra_drivers/kubernetes/fakes.py @@ -13,7 +13,12 @@ # License for the specific language governing permissions and limitations # under the License. +import datetime + from kubernetes import client +from oslo_utils import uuidutils + +from tacker.sol_refactored import objects def fake_namespace(): @@ -547,3 +552,114 @@ def fake_node(type='Ready', status='True'): ] ) ) + + +def get_fake_pod_info(kind, name='fake_name', pod_status='Running', + pod_name=None): + if not pod_name: + if kind == 'Deployment': + pod_name = _('{name}-1234567890-abcde').format(name=name) + elif kind == 'ReplicaSet' or kind == 'DaemonSet': + pod_name = _('{name}-12345').format(name=name) + elif kind == 'StatefulSet': + pod_name = _('{name}-1').format(name=name) + elif kind == 'Pod': + pod_name = name + return client.V1Pod( + metadata=client.V1ObjectMeta( + name=pod_name, + creation_timestamp=datetime.datetime.now().isoformat('T')), + status=client.V1PodStatus(phase=pod_status)) + + +def fake_vim_connection_info(vim_type="kubernetes"): + return objects.VimConnectionInfo.from_dict({ + 'vimId': 'a56258df-9853-4437-9fdb-7d470bc0b162', + 'vimType': vim_type, + 'interfaceInfo': { + 'endpoint': 'https://127.0.0.1:6443' + }, + 'accessInfo': { + 'bearer_token': 'secret_token', + 'username': 'test', + 'password': 'test', + 'region': 'RegionOne' + } + }) + + +def fake_vnfc_resource_info(vdu_id='VDU1', rsc_kind='Deployment', + rsc_name='fake_name', pod_name=None, + namespace='default'): + if not pod_name: + v1_pod = get_fake_pod_info(rsc_kind, rsc_name) + pod_name = v1_pod.metadata.name + id = uuidutils.generate_uuid() + vnfc_rsc_info = { + 'id': id, + 'vduId': vdu_id, + 'computeResource': { + 'resourceId': pod_name, + 'vimLevelResourceType': rsc_kind + }, + 'metadata': { + 'Deployment': { + 'name': rsc_name, + 'namespace': namespace + } + } + } + vnfc_rsc_info_obj = objects.VnfcResourceInfoV2.from_dict(vnfc_rsc_info) + + vnfc_info = { + 'id': vdu_id + "-" + id, + 'vduId': vdu_id, + 'vnfcResourceInfoId': id, + 'vnfcState': 'STARTED' + } + vnfc_info_obj = objects.VnfcInfoV2.from_dict(vnfc_info) + + return vnfc_rsc_info_obj, vnfc_info_obj + + +def fake_vnf_instance(instantiated_state='INSTANTIATED'): + return objects.VnfInstanceV2.from_dict({ + 'id': uuidutils.generate_uuid(), + 'vimConnectionInfo': {}, + 'instantiationState': instantiated_state, + 'instantiatedVnfInfo': { + 'flavourId': 'simple', + 'vnfState': 'STARTED', + 'vnfcResourceInfo': [], + 'vnfcInfo': [], + 'metadata': { + 'namespace': 'default', + 'lcm-kubernetes-def-files': [ + 'Files/kubernetes/deployment.yaml'], + 'vdu_reses': { + 'VDU1': { + 'kind': 'Deployment', + 'metadata': { + 'name': 'vdu1', + 'namespace': 'default', + }, + 'spec': { + 'replicas': 0 + } + } + } + } + }, + 'metadata': { + 'lcm-kubernetes-def-files': ['Files/kubernetes/deployment.yaml'] + } + }) + + +def fake_scale_status(aspect_id='vdu1_aspect', + vnfd_id=uuidutils.generate_uuid(), scale_level=1): + return objects.ScaleInfoV2.from_dict({ + 'aspectId': aspect_id, + 'vnfdId': vnfd_id, + 'scaleLevel': scale_level + }) diff --git a/tacker/tests/unit/sol_refactored/infra_drivers/kubernetes/test_kubernetes.py b/tacker/tests/unit/sol_refactored/infra_drivers/kubernetes/test_kubernetes.py index d386e592a..e35d20216 100644 --- a/tacker/tests/unit/sol_refactored/infra_drivers/kubernetes/test_kubernetes.py +++ b/tacker/tests/unit/sol_refactored/infra_drivers/kubernetes/test_kubernetes.py @@ -14,14 +14,19 @@ # under the License. import os + +from kubernetes import client +from oslo_utils import uuidutils from unittest import mock +from tacker import context from tacker.sol_refactored.common import exceptions as sol_ex from tacker.sol_refactored.common import vnfd_utils from tacker.sol_refactored.infra_drivers.kubernetes import kubernetes +from tacker.sol_refactored.nfvo import nfvo_client from tacker.sol_refactored import objects from tacker.tests.unit import base - +from tacker.tests.unit.sol_refactored.infra_drivers.kubernetes import fakes CNF_SAMPLE_VNFD_ID = "b1bb0ce7-ebca-4fa7-95ed-4840d70a1177" @@ -32,6 +37,7 @@ class TestKubernetes(base.TestCase): super(TestKubernetes, self).setUp() objects.register_all() self.driver = kubernetes.Kubernetes() + self.context = context.get_admin_context() cur_dir = os.path.dirname(__file__) sample_dir = os.path.join(cur_dir, "../..", "samples") @@ -104,3 +110,167 @@ class TestKubernetes(base.TestCase): # maybe 3 but possible 2 self.assertTrue(res1.is_ready.call_count >= 2) + + @mock.patch.object(nfvo_client.NfvoClient, 'get_vnfd') + @mock.patch.object(client.CoreV1Api, 'list_namespaced_pod') + def test_sync_db( + self, mock_list_namespaced_pod, mock_get_vnfd): + vnf_instance_obj = fakes.fake_vnf_instance() + + vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info( + vdu_id='VDU1', rsc_kind='Deployment', + pod_name="vdu1-1234567890-abcd", rsc_name="vdu1") + + vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [ + vnfc_rsc_info_obj1 + ] + vim_connection_object = fakes.fake_vim_connection_info() + vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object + + mock_list_namespaced_pod.return_value = client.V1PodList( + items=[ + fakes.get_fake_pod_info( + kind='Deployment', pod_name="vdu1-1234567890-abcd1"), + fakes.get_fake_pod_info( + kind='Deployment', pod_name="vdu1-1234567890-abcd2")]) + mock_get_vnfd.return_value = self.vnfd_1 + vnf_instance_obj.vnfdId = uuidutils.generate_uuid() + vnf_instance_obj.instantiatedVnfInfo.scaleStatus = [ + fakes.fake_scale_status(vnfd_id=vnf_instance_obj.vnfdId) + ] + + self.driver.sync_db( + context=self.context, vnf_instance=vnf_instance_obj, + vim_info=vim_connection_object) + + self.assertEqual( + 2, vnf_instance_obj.instantiatedVnfInfo.metadata[ + 'vdu_reses']['VDU1']['spec']['replicas']) + + @mock.patch.object(nfvo_client.NfvoClient, 'get_vnfd') + @mock.patch.object(client.CoreV1Api, 'list_namespaced_pod') + def test_sync_db_no_diff( + self, mock_list_namespaced_pod, mock_get_vnfd): + vnf_instance_obj = fakes.fake_vnf_instance() + + vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info( + vdu_id='VDU1', rsc_kind='Deployment', + pod_name="vdu1-1234567890-abcd1", rsc_name="vdu1") + + vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [ + vnfc_rsc_info_obj1 + ] + vim_connection_object = fakes.fake_vim_connection_info() + vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object + + mock_list_namespaced_pod.return_value = client.V1PodList( + items=[ + fakes.get_fake_pod_info( + kind='Deployment', pod_name="vdu1-1234567890-abcd1")]) + mock_get_vnfd.return_value = self.vnfd_1 + vnf_instance_obj.vnfdId = uuidutils.generate_uuid() + vnf_instance_obj.instantiatedVnfInfo.scaleStatus = [ + fakes.fake_scale_status(vnfd_id=vnf_instance_obj.vnfdId) + ] + + ex = self.assertRaises( + sol_ex.DbSyncNoDiff, self.driver.sync_db, + self.context, vnf_instance_obj, vim_connection_object) + self.assertEqual( + "There are no differences in Vnfc resources.", ex.args[0]) + + @mock.patch.object(vnfd_utils.Vnfd, 'get_scale_vdu_and_num') + @mock.patch.object(nfvo_client.NfvoClient, 'get_vnfd') + @mock.patch.object(client.CoreV1Api, 'list_namespaced_pod') + def test_sync_db_scale_level( + self, mock_list_namespaced_pod, mock_get_vnfd, + mock_scale_vdu_and_num): + vnf_instance_obj = fakes.fake_vnf_instance() + + vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info( + vdu_id='VDU1', rsc_kind='Deployment', + pod_name="vdu1-1234567890-abcd1", rsc_name="vdu1") + + vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [ + vnfc_rsc_info_obj1 + ] + vim_connection_object = fakes.fake_vim_connection_info() + vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object + + mock_list_namespaced_pod.return_value = client.V1PodList( + items=[ + fakes.get_fake_pod_info( + kind='Deployment', pod_name="vdu1-1234567890-abcd1"), + fakes.get_fake_pod_info( + kind='Deployment', pod_name="vdu1-1234567890-abcd2"), + fakes.get_fake_pod_info( + kind='Deployment', pod_name="vdu1-1234567890-abcd3"), + fakes.get_fake_pod_info( + kind='Deployment', pod_name="vdu1-1234567890-abcd4") + ]) + mock_get_vnfd.return_value = self.vnfd_1 + vnf_instance_obj.vnfdId = uuidutils.generate_uuid() + vnf_instance_obj.instantiatedVnfInfo.scaleStatus = [ + fakes.fake_scale_status(vnfd_id=vnf_instance_obj.vnfdId) + ] + delta = 2 + mock_scale_vdu_and_num.return_value = {'VDU1': delta} + current_pod_num = 4 + vdu_id = "VDU1" + + ex = self.assertRaises( + sol_ex.DbSyncFailed, self.driver.sync_db, + self.context, vnf_instance_obj, vim_connection_object) + self.assertEqual( + "Error computing 'scale_level'. current Pod num: " + f"{current_pod_num} delta: {delta}. vnf: {vnf_instance_obj.id} " + f"vdu: {vdu_id}", ex.args[0]) + + @mock.patch.object(vnfd_utils.Vnfd, 'get_scale_vdu_and_num') + @mock.patch.object(nfvo_client.NfvoClient, 'get_vnfd') + @mock.patch.object(client.CoreV1Api, 'list_namespaced_pod') + def test_sync_db_pod_range( + self, mock_list_namespaced_pod, mock_get_vnfd, + mock_scale_vdu_and_num): + vnf_instance_obj = fakes.fake_vnf_instance() + + vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info( + vdu_id='VDU1', rsc_kind='Deployment', + pod_name="vdu1-1234567890-abcd1", rsc_name="vdu1") + + vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [ + vnfc_rsc_info_obj1 + ] + vim_connection_object = fakes.fake_vim_connection_info() + vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object + + mock_list_namespaced_pod.return_value = client.V1PodList( + items=[ + fakes.get_fake_pod_info( + kind='Deployment', pod_name="vdu1-1234567890-abcd1"), + fakes.get_fake_pod_info( + kind='Deployment', pod_name="vdu1-1234567890-abcd2"), + fakes.get_fake_pod_info( + kind='Deployment', pod_name="vdu1-1234567890-abcd3"), + fakes.get_fake_pod_info( + kind='Deployment', pod_name="vdu1-1234567890-abcd4"), + fakes.get_fake_pod_info( + kind='Deployment', pod_name="vdu1-1234567890-abcd5") + ]) + mock_get_vnfd.return_value = self.vnfd_1 + vnf_instance_obj.vnfdId = uuidutils.generate_uuid() + vnf_instance_obj.instantiatedVnfInfo.scaleStatus = [ + fakes.fake_scale_status(vnfd_id=vnf_instance_obj.vnfdId) + ] + delta = 2 + mock_scale_vdu_and_num.return_value = {'VDU1': delta} + current_pod_num = 5 + vdu_id = "VDU1" + + ex = self.assertRaises( + sol_ex.DbSyncFailed, self.driver.sync_db, + self.context, vnf_instance_obj, vim_connection_object) + self.assertEqual( + f"Failed to update database vnf {vnf_instance_obj.id} " + f"vdu: {vdu_id}. Pod num is out of range. " + f"pod_num: {current_pod_num}", ex.args[0]) diff --git a/tacker/tests/unit/vnflcm/test_vnflcm_driver.py b/tacker/tests/unit/vnflcm/test_vnflcm_driver.py index 5e3133ca4..785469a2f 100644 --- a/tacker/tests/unit/vnflcm/test_vnflcm_driver.py +++ b/tacker/tests/unit/vnflcm/test_vnflcm_driver.py @@ -20,6 +20,7 @@ from unittest import mock import yaml from oslo_config import cfg +from oslo_log import log as logging from oslo_serialization import jsonutils from oslo_utils import uuidutils @@ -38,6 +39,7 @@ from tacker.objects import vim_connection from tacker.tests.unit.db import base as db_base from tacker.tests.unit.nfvo.test_nfvo_plugin import FakeVNFMPlugin from tacker.tests.unit.vnflcm import fakes +from tacker.tests.unit.vnfm.infra_drivers.kubernetes import fakes as k8s_fakes from tacker.tests import utils as test_utils from tacker.tests import uuidsentinel from tacker.vnflcm import vnflcm_driver @@ -3538,3 +3540,74 @@ class TestVnflcmDriver(db_base.SqlTestCase): "connectivity vnf '%s' is completed successfully") mock_log.info.assert_called_with(expected_msg, vnf_instance.id) + + @mock.patch.object(TackerManager, 'get_service_plugins', + return_value={'VNFM': FakeVNFMPlugin()}) + @mock.patch.object(VnfLcmDriver, '_init_mgmt_driver_hash') + @mock.patch.object(yaml, "safe_load") + @mock.patch.object(objects.VnfInstanceList, 'get_by_filters') + @mock.patch.object(objects.VimConnectionInfo, "obj_from_primitive") + def test_sync_db(self, mock_vim, mock_get_by_filter, + mock_yaml_safe_load, mock_init_hash, + mock_get_service_plugins): + mock_init_hash.return_value = { + "vnflcm_noop": "ffea638bfdbde3fb01f191bbe75b031859" + "b18d663b127100eb72b19eecd7ed51" + } + mock_yaml_safe_load.return_value = fakes.vnfd_dict_cnf() + vnf_instance_obj = fakes.return_vnf_instance( + fields.VnfInstanceState.INSTANTIATED) + vnf_instance_obj.vnf_metadata['namespace'] = "default" + vnfc_resource_info_obj = k8s_fakes.fake_vnfc_resource_info( + namespace="default") + vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = ( + [vnfc_resource_info_obj]) + + vim_connection_info = vim_connection.VimConnectionInfo( + vim_type="kubernetes", id="8a3adb69-0784-43c7-833e-aab0b6ab4470", + vim_id="67e5de95-1ad2-4627-89f8-597dec1683fa") + + vnf_instance_obj.vim_connection_info.append(vim_connection_info) + mock_get_by_filter.return_value = [vnf_instance_obj] + mock_vim.return_value = vim_connection_info + + self._mock_vnf_manager() + driver = vnflcm_driver.VnfLcmDriver() + driver.sync_db(self.context) + mock_get_by_filter.assert_called_once() + self._vnf_manager.invoke.assert_called_once() + + @mock.patch.object(TackerManager, 'get_service_plugins', + return_value={'VNFM': FakeVNFMPlugin()}) + @mock.patch.object(VnfLcmDriver, '_init_mgmt_driver_hash') + @mock.patch.object(yaml, "safe_load") + @mock.patch.object(objects.VnfInstanceList, 'get_by_filters') + @mock.patch.object(objects.VimConnectionInfo, "obj_from_primitive") + def test_sync_db_exception( + self, mock_vim, mock_get_by_filter, mock_yaml_safe_load, + mock_init_hash, mock_get_service_plugins): + mock_init_hash.return_value = { + "vnflcm_noop": "ffea638bfdbde3fb01f191bbe75b031859" + "b18d663b127100eb72b19eecd7ed51" + } + mock_yaml_safe_load.return_value = fakes.vnfd_dict_cnf() + vnf_instance_obj = fakes.return_vnf_instance( + fields.VnfInstanceState.INSTANTIATED) + vnf_instance_obj.vnf_metadata['namespace'] = "default" + vnfc_resource_info_obj = k8s_fakes.fake_vnfc_resource_info( + namespace="default") + vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = ( + [vnfc_resource_info_obj]) + + mock_get_by_filter.return_value = [vnf_instance_obj] + mock_vim.return_value = None + + driver = vnflcm_driver.VnfLcmDriver() + log_name = "tacker.vnflcm.vnflcm_driver" + with self.assertLogs(logger=log_name, level=logging.ERROR) as cm: + driver.sync_db(self.context) + + self.assertIn( + f"ERROR:{log_name}:Error is occoured vnf {vnf_instance_obj.id} " + f"Error: ", cm.output[0]) + mock_get_by_filter.assert_called_once() diff --git a/tacker/tests/unit/vnfm/infra_drivers/kubernetes/test_kubernetes_driver.py b/tacker/tests/unit/vnfm/infra_drivers/kubernetes/test_kubernetes_driver.py index 92a576b50..3fc4a80bd 100644 --- a/tacker/tests/unit/vnfm/infra_drivers/kubernetes/test_kubernetes_driver.py +++ b/tacker/tests/unit/vnfm/infra_drivers/kubernetes/test_kubernetes_driver.py @@ -18,6 +18,7 @@ import ddt import os from kubernetes import client +from oslo_log import log as logging from oslo_serialization import jsonutils from tacker.common.container import kubernetes_utils from tacker.common import exceptions @@ -36,6 +37,7 @@ from tacker.tests.unit.vnflcm import fakes as vnflcm_fakes from tacker.tests.unit.vnfm.infra_drivers.kubernetes import fakes from tacker.tests.unit.vnfm.infra_drivers.openstack.fixture_data import \ fixture_data_utils as fd_utils +from tacker.vnflcm import utils as vnflcm_utils from tacker.vnfm.infra_drivers.kubernetes.k8s import tosca_kube_object from tacker.vnfm.infra_drivers.kubernetes.k8s import translate_outputs from tacker.vnfm.infra_drivers.kubernetes import kubernetes_driver @@ -3855,3 +3857,525 @@ class TestKubernetes(base.TestCase): vim_connection_info=vim_connection_object, heal_vnf_request=heal_request_data_obj) self.assertEqual(mock_list_namespaced_pod.call_count, 0) + + @mock.patch.object(kubernetes_driver.Kubernetes, + "_sync_vnfc_resource_and_pod_resource") + @mock.patch.object(objects.VimConnectionInfo, "obj_from_primitive") + @mock.patch.object(vnflcm_utils, "get_vim") + @mock.patch.object(VnfInstance, "save") + @mock.patch.object(objects.VnfInstance, "get_by_id") + @mock.patch.object(kubernetes_driver.Kubernetes, '_check_pod_information') + @mock.patch.object(client.CoreV1Api, 'list_namespaced_pod') + def test_sync_db( + self, mock_list_namespaced_pod, mock_check_pod_information, + mock_get_by_id, mock_save, mock_get_vim, mock_vim, + mock_sync_vnfc): + mock_list_namespaced_pod.return_value = client.V1PodList( + items=[fakes.get_fake_pod_info(kind='Deployment')]) + mock_check_pod_information.return_value = True + + vnf_instance_obj = vnflcm_fakes.return_vnf_instance( + fields.VnfInstanceState.INSTANTIATED) + vnf_instance_obj.vnf_metadata['namespace'] = "default" + vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info( + vdu_id='VDU1', rsc_kind='Pod') + vnfc_resource_info_obj2 = fakes.fake_vnfc_resource_info( + vdu_id='VDU2', rsc_kind='Deployment') + vnfc_resource_info_obj3 = fakes.fake_vnfc_resource_info( + vdu_id='VDU3', rsc_kind='ReplicaSet') + vnfc_resource_info_obj4 = fakes.fake_vnfc_resource_info( + vdu_id='VDU4', rsc_kind='DaemonSet') + vnfc_resource_info_obj5 = fakes.fake_vnfc_resource_info( + vdu_id='VDU5', rsc_kind='StatefulSet') + vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [ + vnfc_resource_info_obj1, + vnfc_resource_info_obj2, + vnfc_resource_info_obj3, + vnfc_resource_info_obj4, + vnfc_resource_info_obj5 + ] + vim_connection_object = fakes.fake_vim_connection_info() + + mock_get_by_id.return_value = vnf_instance_obj + mock_vim.return_value = vim_connection_object + mock_sync_vnfc.return_value = True + + log_name = "tacker.vnfm.infra_drivers.kubernetes.kubernetes_driver" + with self.assertLogs(logger=log_name, level=logging.INFO) as cm: + self.kubernetes.sync_db( + context=self.context, vnf_instance=vnf_instance_obj, + vim_info=vim_connection_object) + self.assertCountEqual([ + f'INFO:{log_name}:Database synchronization succeeded. ' + f'vnf: {vnf_instance_obj.id} ' + f'vdu: {vnfc_resource_info_obj1.vdu_id}', + f'INFO:{log_name}:Database synchronization succeeded. ' + f'vnf: {vnf_instance_obj.id} ' + f'vdu: {vnfc_resource_info_obj2.vdu_id}', + f'INFO:{log_name}:Database synchronization succeeded. ' + f'vnf: {vnf_instance_obj.id} ' + f'vdu: {vnfc_resource_info_obj3.vdu_id}', + f'INFO:{log_name}:Database synchronization succeeded. ' + f'vnf: {vnf_instance_obj.id} ' + f'vdu: {vnfc_resource_info_obj4.vdu_id}', + f'INFO:{log_name}:Database synchronization succeeded. ' + f'vnf: {vnf_instance_obj.id} ' + f'vdu: {vnfc_resource_info_obj5.vdu_id}', + ], cm.output) + + @mock.patch.object(kubernetes_driver.Kubernetes, '_check_pod_information') + @mock.patch.object(client.CoreV1Api, 'list_namespaced_pod') + def test_sync_db_vnf_conflict( + self, mock_list_namespaced_pod, mock_check_pod_information): + mock_list_namespaced_pod.return_value = client.V1PodList( + items=[fakes.get_fake_pod_info(kind='Deployment')]) + mock_check_pod_information.return_value = True + + vnf_instance_obj = vnflcm_fakes.return_vnf_instance( + fields.VnfInstanceState.NOT_INSTANTIATED) + vnf_instance_obj.vnf_metadata['namespace'] = "default" + vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info( + vdu_id='VDU1', rsc_kind='Pod') + vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [ + vnfc_resource_info_obj1 + ] + vim_connection_object = fakes.fake_vim_connection_info() + log_name = "tacker.vnfm.infra_drivers.kubernetes.kubernetes_driver" + + with self.assertLogs(logger=log_name, level=logging.INFO) as cm: + self.kubernetes.sync_db( + context=self.context, vnf_instance=vnf_instance_obj, + vim_info=vim_connection_object) + self.assertEqual( + [f'INFO:{log_name}:There is an LCM operation in progress, ' + 'so skip this DB synchronization. ' + f'vnf: {vnf_instance_obj.id}'], cm.output) + + def test_sync_db_exception(self): + vnf_instance_obj = vnflcm_fakes.return_vnf_instance( + fields.VnfInstanceState.NOT_INSTANTIATED) + vnf_instance_obj.instantiated_vnf_info = None + vim_connection_object = fakes.fake_vim_connection_info() + log_name = "tacker.vnfm.infra_drivers.kubernetes.kubernetes_driver" + + with self.assertLogs(logger=log_name, level=logging.INFO) as cm: + self.kubernetes.sync_db( + context=self.context, vnf_instance=vnf_instance_obj, + vim_info=vim_connection_object) + self.assertIn( + f"Failed to synchronize database vnf: " + f"{vnf_instance_obj.id}", cm.output[0]) + + @mock.patch.object(objects.VimConnectionInfo, "obj_from_primitive") + @mock.patch.object(vnflcm_utils, "get_vim") + @mock.patch.object(VnfInstance, "save") + @mock.patch.object(objects.VnfInstance, "get_by_id") + @mock.patch.object(kubernetes_driver.Kubernetes, '_check_pod_information') + @mock.patch.object(client.CoreV1Api, 'list_namespaced_pod') + def test_sync_db_check_pod_false( + self, mock_list_namespaced_pod, mock_check_pod_information, + mock_get_by_id, mock_save, mock_get_vim, mock_vim): + mock_list_namespaced_pod.return_value = client.V1PodList( + items=[fakes.get_fake_pod_info(kind='Pod')]) + mock_check_pod_information.side_effect = [True, False] + + vnf_instance_obj = vnflcm_fakes.return_vnf_instance( + fields.VnfInstanceState.INSTANTIATED) + vnf_instance_obj.vnf_metadata['namespace'] = "default" + vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info( + vdu_id='VDU1', rsc_kind='Pod') + vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [ + vnfc_resource_info_obj1 + ] + vim_connection_object = fakes.fake_vim_connection_info() + + mock_get_by_id.return_value = vnf_instance_obj + mock_vim.return_value = vim_connection_object + + self.kubernetes.sync_db( + context=self.context, vnf_instance=vnf_instance_obj, + vim_info=vim_connection_object) + + self.assertEqual(2, mock_check_pod_information.call_count) + self.assertEqual(2, mock_save.call_count) + + @mock.patch.object(kubernetes_driver.Kubernetes, + "_sync_vnfc_resource_and_pod_resource") + @mock.patch.object(objects.VimConnectionInfo, "obj_from_primitive") + @mock.patch.object(vnflcm_utils, "get_vim") + @mock.patch.object(VnfInstance, "save") + @mock.patch.object(objects.VnfInstance, "get_by_id") + @mock.patch.object(kubernetes_driver.Kubernetes, '_check_pod_information') + @mock.patch.object(client.CoreV1Api, 'list_namespaced_pod') + def test_sync_db_not_succeeded( + self, mock_list_namespaced_pod, mock_check_pod_information, + mock_get_by_id, mock_save, mock_get_vim, mock_vim, + mock_sync_vnfc): + mock_list_namespaced_pod.return_value = client.V1PodList( + items=[fakes.get_fake_pod_info(kind='Pod')]) + mock_check_pod_information.return_value = True + + vnf_instance_obj = vnflcm_fakes.return_vnf_instance( + fields.VnfInstanceState.INSTANTIATED) + vnf_instance_obj.vnf_metadata['namespace'] = "default" + vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info( + vdu_id='VDU1', rsc_kind='Pod') + vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [ + vnfc_resource_info_obj1 + ] + vim_connection_object = fakes.fake_vim_connection_info() + + mock_get_by_id.return_value = vnf_instance_obj + mock_vim.return_value = vim_connection_object + mock_sync_vnfc.return_value = False + + self.kubernetes.sync_db( + context=self.context, vnf_instance=vnf_instance_obj, + vim_info=vim_connection_object) + self.assertEqual(1, mock_sync_vnfc.call_count) + + @mock.patch.object(objects.VimConnectionInfo, "obj_from_primitive") + @mock.patch.object(vnflcm_utils, "get_vim") + @mock.patch.object(VnfInstance, "save") + @mock.patch.object(objects.VnfInstance, "get_by_id") + @mock.patch.object(kubernetes_driver.Kubernetes, '_check_pod_information') + @mock.patch.object(client.CoreV1Api, 'list_namespaced_pod') + def test_sync_db_failed_update_db( + self, mock_list_namespaced_pod, mock_check_pod_information, + mock_get_by_id, mock_save, mock_get_vim, mock_vim): + mock_list_namespaced_pod.return_value = client.V1PodList( + items=[fakes.get_fake_pod_info(kind='Deployment')]) + mock_check_pod_information.return_value = True + + vnf_instance_obj = vnflcm_fakes.return_vnf_instance( + fields.VnfInstanceState.INSTANTIATED) + vnf_instance_obj.vnf_metadata['namespace'] = "default" + vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info( + vdu_id='VDU1', rsc_kind='Pod') + vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [ + vnfc_resource_info_obj1 + ] + vim_connection_object = fakes.fake_vim_connection_info() + + mock_get_by_id.return_value = vnf_instance_obj + mock_vim.return_value = vim_connection_object + + log_name = "tacker.vnfm.infra_drivers.kubernetes.kubernetes_driver" + with self.assertLogs(logger=log_name, level=logging.ERROR) as cm: + self.kubernetes.sync_db( + context=self.context, vnf_instance=vnf_instance_obj, + vim_info=vim_connection_object) + self.assertIn( + f'ERROR:{log_name}:Failed to update database vnf ' + f'{vnf_instance_obj.id} Error: ', cm.output[0]) + + @mock.patch.object(client.CoreV1Api, 'list_namespaced_pod') + def test_get_pod_information_no_namespace(self, mock_list_namespaced_pod): + mock_list_namespaced_pod.return_value = client.V1PodList(items=[]) + + vnf_instance_obj = vnflcm_fakes.return_vnf_instance( + fields.VnfInstanceState.INSTANTIATED) + vnf_instance_obj.vnf_metadata['namespace'] = "" + vim_connection_object = fakes.fake_vim_connection_info() + + result = self.kubernetes._get_pod_information( + resource_name=mock.ANY, resource_type=mock.ANY, + vnf_instance=vnf_instance_obj, + vim_connection_info=vim_connection_object) + + self.assertEqual({}, result) + + @mock.patch.object(kubernetes_driver.Kubernetes, '_sync_resource_id') + def test_sync_vnfc_resource_and_pod_resource_eq( + self, mock_sync_resource_id): + vnf_instance_obj = vnflcm_fakes.return_vnf_instance( + fields.VnfInstanceState.INSTANTIATED) + vnf_instance_obj.vnf_metadata['namespace'] = "default" + vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info( + vdu_id='VDU1', rsc_kind='Pod') + vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [ + vnfc_resource_info_obj1 + ] + pod_resources_from_k8s = { + 'fake_name-1234567890-abcde': { + 'name': 'fake_name-1234567890-abcde', + 'namespace': 'default' + } + } + result = self.kubernetes._sync_vnfc_resource_and_pod_resource( + context=self.context, vnf_instance=vnf_instance_obj, + pod_resources_from_k8s=pod_resources_from_k8s, vdu_id='VDU1') + self.assertTrue(result) + + @mock.patch.object(kubernetes_driver.Kubernetes, '_delete_vnfc_resource') + def test_sync_vnfc_resource_and_pod_resource_gt( + self, mock_delete_vnfc_resource): + vnf_instance_obj = vnflcm_fakes.return_vnf_instance( + fields.VnfInstanceState.INSTANTIATED) + vnf_instance_obj.vnf_metadata['namespace'] = "default" + vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info( + vdu_id='VDU1', rsc_kind='Pod') + vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [ + vnfc_resource_info_obj1 + ] + pod_resources_from_k8s = {} + mock_delete_vnfc_resource.return_value = False + result = self.kubernetes._sync_vnfc_resource_and_pod_resource( + context=self.context, vnf_instance=vnf_instance_obj, + pod_resources_from_k8s=pod_resources_from_k8s, vdu_id='VDU1') + self.assertEqual(1, mock_delete_vnfc_resource.call_count) + self.assertFalse(result) + + @mock.patch.object(kubernetes_driver.Kubernetes, '_add_vnfc_resource') + def test_sync_vnfc_resource_and_pod_resource_lt( + self, mock_add_vnfc_resource): + vnf_instance_obj = vnflcm_fakes.return_vnf_instance( + fields.VnfInstanceState.INSTANTIATED) + vnf_instance_obj.vnf_metadata['namespace'] = "default" + pod_resources_from_k8s = { + 'fake_name-1234567890-abcde': { + 'name': 'fake_name-1234567890-abcde', + 'namespace': 'default' + } + } + mock_add_vnfc_resource.return_value = True + result = self.kubernetes._sync_vnfc_resource_and_pod_resource( + context=self.context, vnf_instance=vnf_instance_obj, + pod_resources_from_k8s=pod_resources_from_k8s, vdu_id='VDU1') + self.assertEqual(1, mock_add_vnfc_resource.call_count) + self.assertTrue(result) + + @mock.patch.object(kubernetes_driver.Kubernetes, '_calc_scale_level') + def test_delete_vnfc_resource(self, mock_calc_scale_level): + vnf_instance_obj = vnflcm_fakes.return_vnf_instance( + fields.VnfInstanceState.INSTANTIATED) + vnf_instance_obj.vnf_metadata['namespace'] = "default" + vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info( + vdu_id='VDU1', rsc_kind='Pod', pod_name='pod') + vnfc_resource_info_obj2 = fakes.fake_vnfc_resource_info( + vdu_id='VDU2', rsc_kind='Deployment', + pod_name='deploy-1234567890-fghij') + vnfc_resource_info_obj3 = fakes.fake_vnfc_resource_info( + vdu_id='VDU2', rsc_kind='Deployment', + pod_name='deploy-1234567890-abcde') + vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [ + vnfc_resource_info_obj1, vnfc_resource_info_obj2, + vnfc_resource_info_obj3 + ] + pod_resources_from_k8s = { + 'deploy-1234567890-abcde': { + 'name': 'deploy-1234567890-abcde', + 'namespace': 'default' + } + } + mock_calc_scale_level.return_value = True + result = self.kubernetes._delete_vnfc_resource( + context=self.context, vnf_instance=vnf_instance_obj, vdu_id='VDU2', + pod_resources_from_k8s=pod_resources_from_k8s, vnfc_count=2) + self.assertEqual(1, mock_calc_scale_level.call_count) + self.assertTrue(result) + + @mock.patch.object(kubernetes_driver.Kubernetes, '_calc_scale_level') + def test_delete_vnfc_resource_rsc_not_same(self, mock_calc_scale_level): + vnf_instance_obj = vnflcm_fakes.return_vnf_instance( + fields.VnfInstanceState.INSTANTIATED) + vnf_instance_obj.vnf_metadata['namespace'] = "default" + vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info( + vdu_id='VDU1', rsc_kind='Pod', pod_name='pod') + vnfc_resource_info_obj2 = fakes.fake_vnfc_resource_info( + vdu_id='VDU2', rsc_kind='Deployment', + pod_name='deploy-1234567890-fghij') + vnfc_resource_info_obj3 = fakes.fake_vnfc_resource_info( + vdu_id='VDU2', rsc_kind='Deployment', + pod_name='deploy-1234567890-abcde') + vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [ + vnfc_resource_info_obj1, vnfc_resource_info_obj2, + vnfc_resource_info_obj3 + ] + pod_resources_from_k8s = { + 'deploy-1234567890-klmno': { + 'name': 'deploy-1234567890-klmno', + 'namespace': 'default' + } + } + mock_calc_scale_level.return_value = False + result = self.kubernetes._delete_vnfc_resource( + context=self.context, vnf_instance=vnf_instance_obj, vdu_id='VDU2', + pod_resources_from_k8s=pod_resources_from_k8s, vnfc_count=2) + self.assertEqual(1, mock_calc_scale_level.call_count) + self.assertFalse(result) + + @mock.patch.object(kubernetes_driver.Kubernetes, '_calc_scale_level') + def test_add_vnfc_resource(self, mock_calc_scale_level): + vnf_instance_obj = vnflcm_fakes.return_vnf_instance( + fields.VnfInstanceState.INSTANTIATED) + vnf_instance_obj.vnf_metadata['namespace'] = "default" + vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info( + vdu_id='VDU2', rsc_kind='Deployment', + pod_name='deploy-1234567890-fghij') + vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [ + vnfc_resource_info_obj1 + ] + pod_resources_from_k8s = { + 'deploy-1234567890-klmno': { + 'name': 'deploy-1234567890-klmno', + 'namespace': 'default' + }, + 'deploy-1234567890-abcde': { + 'name': 'deploy-1234567890-abcde', + 'namespace': 'default' + } + } + mock_calc_scale_level.return_value = False + result = self.kubernetes._add_vnfc_resource( + context=self.context, vnf_instance=vnf_instance_obj, vdu_id='VDU2', + pod_resources_from_k8s=pod_resources_from_k8s, vnfc_count=1) + self.assertEqual(1, mock_calc_scale_level.call_count) + self.assertFalse(result) + + @mock.patch('tacker.vnflcm.utils._get_vnfd_dict') + def test_calc_scale_level(self, mock_get_vnfd_dict): + mock_get_vnfd_dict.return_value = vnflcm_fakes.vnfd_dict_cnf() + vnf_instance_obj = vnflcm_fakes.return_vnf_instance( + fields.VnfInstanceState.INSTANTIATED, 'scale_status') + vnf_instance_obj.instantiated_vnf_info.scale_status[0].aspect_id = ( + 'vdu1_aspect') + vnf_instance_obj.vnf_metadata['namespace'] = "default" + vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info( + vdu_id='VDU1', rsc_kind='Deployment', + pod_name='deploy-1234567890-fghij') + vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [ + vnfc_resource_info_obj1 + ] + result = self.kubernetes._calc_scale_level( + context=self.context, vnf_instance=vnf_instance_obj, + vdu_id='VDU1', current_pod_num=1) + self.assertTrue(result) + + @mock.patch('tacker.vnflcm.utils._get_vnfd_dict') + def test_calc_scale_level_error(self, mock_get_vnfd_dict): + delta = 2 + vnfd_obj = vnflcm_fakes.vnfd_dict_cnf() + for policy in vnfd_obj['topology_template']['policies']: + if policy.get('vdu1_scaling_aspect_deltas'): + policy['vdu1_scaling_aspect_deltas']['properties'][ + 'deltas']['delta_1']['number_of_instances'] = delta + break + mock_get_vnfd_dict.return_value = vnfd_obj + vnf_instance_obj = vnflcm_fakes.return_vnf_instance( + fields.VnfInstanceState.INSTANTIATED, 'scale_status') + vnf_instance_obj.instantiated_vnf_info.scale_status[0].aspect_id = ( + 'vdu1_aspect') + vnf_instance_obj.vnf_metadata['namespace'] = "default" + vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info( + vdu_id='VDU1', rsc_kind='Deployment', + pod_name='deploy-1234567890-fghij') + vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [ + vnfc_resource_info_obj1 + ] + current_pod_num = 5 + + log_name = "tacker.vnfm.infra_drivers.kubernetes.kubernetes_driver" + with self.assertLogs(logger=log_name, level=logging.ERROR) as cm: + result = self.kubernetes._calc_scale_level( + context=self.context, vnf_instance=vnf_instance_obj, + vdu_id='VDU1', current_pod_num=current_pod_num) + self.assertEqual( + [f"ERROR:{log_name}:Error computing 'scale_level'. current Pod " + f"num: {current_pod_num} delta: {delta}. " + f"vnf: {vnf_instance_obj.id} vdu: VDU1"], cm.output) + self.assertFalse(result) + + @mock.patch('tacker.vnflcm.utils._get_vnfd_dict') + def test_calc_scale_level_pod_range_error(self, mock_get_vnfd_dict): + delta = 2 + vnfd_obj = vnflcm_fakes.vnfd_dict_cnf() + for policy in vnfd_obj['topology_template']['policies']: + if policy.get('vdu1_scaling_aspect_deltas'): + policy['vdu1_scaling_aspect_deltas']['properties'][ + 'deltas']['delta_1']['number_of_instances'] = delta + break + mock_get_vnfd_dict.return_value = vnfd_obj + vnf_instance_obj = vnflcm_fakes.return_vnf_instance( + fields.VnfInstanceState.INSTANTIATED, 'scale_status') + vnf_instance_obj.instantiated_vnf_info.scale_status[0].aspect_id = ( + 'vdu1_aspect') + vnf_instance_obj.vnf_metadata['namespace'] = "default" + vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info( + vdu_id='VDU1', rsc_kind='Deployment', + pod_name='deploy-1234567890-fghij') + vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [ + vnfc_resource_info_obj1 + ] + current_pod_num = 4 + + log_name = "tacker.vnfm.infra_drivers.kubernetes.kubernetes_driver" + with self.assertLogs(logger=log_name, level=logging.ERROR) as cm: + result = self.kubernetes._calc_scale_level( + context=self.context, vnf_instance=vnf_instance_obj, + vdu_id='VDU1', current_pod_num=current_pod_num) + self.assertEqual( + [f"ERROR:{log_name}:Failed to update database vnf " + f"{vnf_instance_obj.id} vdu: VDU1. Pod num is out of range. " + f"pod_num: {current_pod_num}"], cm.output) + self.assertFalse(result) + + def test_check_pod_information_len(self): + vnf_instance_obj = vnflcm_fakes.return_vnf_instance( + fields.VnfInstanceState.INSTANTIATED) + vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info( + vdu_id='VDU1', rsc_kind='Deployment', + pod_name='deploy-1234567890-fghij') + vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [ + vnfc_resource_info_obj1 + ] + pod_resources_from_k8s = { + 'deploy-1234567890-klmno': { + 'name': 'deploy-1234567890-klmno' + }, + 'deploy-1234567890-abcde': { + 'name': 'deploy-1234567890-abcde' + } + } + result = self.kubernetes._check_pod_information( + vnf_instance=vnf_instance_obj, vdu_id='VDU1', + pod_resources_from_k8s=pod_resources_from_k8s) + self.assertTrue(result) + + def test_check_pod_information_set(self): + vnf_instance_obj = vnflcm_fakes.return_vnf_instance( + fields.VnfInstanceState.INSTANTIATED) + vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info( + vdu_id='VDU1', rsc_kind='Deployment', + pod_name='deploy-1234567890-fghij') + vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [ + vnfc_resource_info_obj1 + ] + pod_resources_from_k8s = { + 'deploy-1234567890-klmno': { + 'name': 'deploy-1234567890-klmno' + } + } + result = self.kubernetes._check_pod_information( + vnf_instance=vnf_instance_obj, vdu_id='VDU1', + pod_resources_from_k8s=pod_resources_from_k8s) + self.assertTrue(result) + + def test_check_pod_information_false(self): + vnf_instance_obj = vnflcm_fakes.return_vnf_instance( + fields.VnfInstanceState.INSTANTIATED) + vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info( + vdu_id='VDU1', rsc_kind='Deployment', + pod_name='deploy-1234567890-fghij') + vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [ + vnfc_resource_info_obj1 + ] + pod_resources_from_k8s = { + 'deploy-1234567890-fghij': { + 'name': 'deploy-1234567890-fghij' + } + } + result = self.kubernetes._check_pod_information( + vnf_instance=vnf_instance_obj, vdu_id='VDU1', + pod_resources_from_k8s=pod_resources_from_k8s) + self.assertFalse(result) diff --git a/tacker/vnflcm/vnflcm_driver.py b/tacker/vnflcm/vnflcm_driver.py index b79a22b5a..591f3e2ee 100644 --- a/tacker/vnflcm/vnflcm_driver.py +++ b/tacker/vnflcm/vnflcm_driver.py @@ -1972,3 +1972,33 @@ class VnfLcmDriver(abstract_driver.VnfInstanceAbstractDriver): LOG.info("Request received for changing external connectivity " "vnf '%s' is completed successfully", vnf_instance.id) + + def sync_db(self, context): + filters = {'model': 'VnfInstance', + 'field': 'instantiation_state', + 'value': 'INSTANTIATED', + 'op': '==' + } + vnf_instance_list = objects.VnfInstanceList.get_by_filters( + context, filters) + for vnf_instance in vnf_instance_list: + vim_info = vnflcm_utils.get_vim( + context, vnf_instance.vim_connection_info) + vim_connection_info = (objects.VimConnectionInfo. + obj_from_primitive(vim_info, context)) + + try: + # Database synchronization works only when the + # vimType is Kubernetes. + if vim_connection_info.vim_type == 'kubernetes': + self._vnf_manager.invoke( + vim_connection_info.vim_type, + 'sync_db', + context=context, + vnf_instance=vnf_instance, + vim_info=vim_connection_info + ) + + except Exception as e: + LOG.error(f"Error is occoured vnf {vnf_instance.id} " + f"Error: {encodeutils.exception_to_unicode(e)}") diff --git a/tacker/vnfm/infra_drivers/kubernetes/kubernetes_driver.py b/tacker/vnfm/infra_drivers/kubernetes/kubernetes_driver.py index ec29ee073..38b6764fb 100644 --- a/tacker/vnfm/infra_drivers/kubernetes/kubernetes_driver.py +++ b/tacker/vnfm/infra_drivers/kubernetes/kubernetes_driver.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import copy import os import re import time @@ -23,16 +24,19 @@ from kubernetes import client from oslo_config import cfg from oslo_log import log as logging from oslo_serialization import jsonutils +from oslo_utils import encodeutils from oslo_utils import uuidutils from toscaparser import tosca_template from tacker._i18n import _ +from tacker.api.vnflcm.v1.controller import check_vnf_state from tacker.common.container import kubernetes_utils from tacker.common import exceptions from tacker.common import log from tacker.common import utils from tacker.extensions import vnfm from tacker import objects +from tacker.objects import fields from tacker.objects.fields import ErrorPoint as EP from tacker.objects import vnf_package as vnf_package_obj from tacker.objects import vnf_package_vnfd as vnfd_obj @@ -2671,3 +2675,395 @@ class Kubernetes(abstract_driver.VnfAbstractDriver, return_name_list = [] return_grp_id = None return return_id_list, return_name_list, return_grp_id + + def sync_db(self, context, vnf_instance, vim_info): + """method for database synchronization""" + try: + vdu_id_list = self._get_vdu_list(vnf_instance) + + for vdu_id in vdu_id_list: + # get pod information + for vnfc in ( + vnf_instance.instantiated_vnf_info.vnfc_resource_info + ): + if vnfc.vdu_id == vdu_id: + resource_type = ( + vnfc.compute_resource.vim_level_resource_type) + resource_name = self._get_resource_name( + vnfc.compute_resource.resource_id, resource_type) + break + pod_resources_from_k8s = self._get_pod_information( + resource_name, resource_type, + vnf_instance, vim_info + ) + # check difference + if not self._check_pod_information(vnf_instance, vdu_id, + pod_resources_from_k8s): + # no difference + continue + + # exec db sync + try: + if self._database_update(context, vnf_instance, + vdu_id): + LOG.info("Database synchronization succeeded. " + f"vnf: {vnf_instance.id} vdu: {vdu_id}") + + except exceptions.VnfInstanceConflictState: + LOG.info("There is an LCM operation in progress, so " + "skip this DB synchronization. " + f"vnf: {vnf_instance.id}") + + except Exception as e: + LOG.error(f"Failed to synchronize database vnf: " + f"{vnf_instance.id} Error: " + f"{encodeutils.exception_to_unicode(e)}") + + @check_vnf_state(action="db_sync", + instantiation_state=[fields.VnfInstanceState.INSTANTIATED], + task_state=[None]) + def _database_update(self, context, vnf_inst, + vdu_id): + """Update tacker DB + + True: succeeded database update + False: failed database update + """ + # get target object + vnf_instance = objects.VnfInstance.get_by_id( + context, vnf_inst.id) + if vnf_instance.instantiation_state != 'INSTANTIATED': + return False + # change task_state + vnf_instance.task_state = fields.VnfInstanceTaskState.DB_SYNCHRONIZING + vnf_instance.save() + + backup_vnf_instance = copy.deepcopy(vnf_instance) + + vim_info = vnflcm_utils.get_vim( + context, vnf_instance.vim_connection_info) + vim_connection_info = objects.VimConnectionInfo.obj_from_primitive( + vim_info, context) + + # get pod information + try: + for vnfc in vnf_instance.instantiated_vnf_info.vnfc_resource_info: + if vnfc.vdu_id == vdu_id: + resource_type = ( + vnfc.compute_resource.vim_level_resource_type) + resource_name = self._get_resource_name( + vnfc.compute_resource.resource_id, resource_type) + break + pod_resources_from_k8s = self._get_pod_information( + resource_name, resource_type, + vnf_instance, vim_connection_info + ) + + # check difference + if not self._check_pod_information(vnf_instance, vdu_id, + pod_resources_from_k8s): + vnf_instance.task_state = None + vnf_instance.save() + return False + + if self._sync_vnfc_resource_and_pod_resource(context, + vnf_instance, pod_resources_from_k8s, vdu_id): + vnf_instance.task_state = None + vnf_instance.save() + return True + + vnf_instance = copy.deepcopy(backup_vnf_instance) + vnf_instance.task_state = None + vnf_instance.save() + return False + + except Exception as e: + LOG.error(f"Failed to update database vnf {vnf_instance.id} " + f"Error: {encodeutils.exception_to_unicode(e)}") + vnf_instance = copy.deepcopy(backup_vnf_instance) + vnf_instance.task_state = None + vnf_instance.save() + return False + + def _sync_vnfc_resource_and_pod_resource(self, context, vnf_instance, + pod_resources_from_k8s, vdu_id): + """Sync Pod name between k8s and tacker""" + vnfc_count = 0 + for vnfc in vnf_instance.instantiated_vnf_info.vnfc_resource_info: + if vnfc.vdu_id == vdu_id: + vnfc_count += 1 + + result = False + # number of pod is same as k8s + if vnfc_count == len(pod_resources_from_k8s): + self._sync_resource_id(vnf_instance.instantiated_vnf_info, + vdu_id, pod_resources_from_k8s) + result = True + # the number of pod is greater than k8s + elif vnfc_count > len(pod_resources_from_k8s): + result = self._delete_vnfc_resource(context, vnf_instance, vdu_id, + pod_resources_from_k8s, vnfc_count) + # the number of pod is less than k8s + elif vnfc_count < len(pod_resources_from_k8s): + result = self._add_vnfc_resource(context, vnf_instance, vdu_id, + pod_resources_from_k8s, vnfc_count) + + return result + + def _delete_vnfc_resource(self, context, vnf_instance, vdu_id, + pod_resources_from_k8s, vnfc_count): + """Remove 'vnfcResourceInfo' with mismatched pod names""" + delete_index = [] + for i, vnfc_resource_info in enumerate( + vnf_instance.instantiated_vnf_info.vnfc_resource_info): + if vnfc_resource_info.vdu_id != vdu_id: + continue + if vnfc_resource_info.compute_resource.resource_id not in ( + set(pod_resources_from_k8s.keys())): + delete_index.append(i) + + # For patterns where the number of pods is reduced and + # the remaining pods are also renamed + delete_cnt = vnfc_count - len(pod_resources_from_k8s) + if delete_cnt != len(delete_index): + for i in range(len(delete_index) - delete_cnt): + del delete_index[-1] + for idx in reversed(delete_index): + del vnf_instance.instantiated_vnf_info.vnfc_resource_info[idx] + + self._sync_resource_id(vnf_instance.instantiated_vnf_info, + vdu_id, pod_resources_from_k8s) + return self._calc_scale_level(context, vnf_instance, vdu_id, + len(pod_resources_from_k8s)) + + def _add_vnfc_resource(self, context, vnf_instance, vdu_id, + pod_resources_from_k8s, vnfc_count): + """Add vnfcResourceInfo""" + stored_pod_names = { + vnfc.compute_resource.resource_id + for vnfc in ( + vnf_instance.instantiated_vnf_info.vnfc_resource_info) + if vnfc.vdu_id == vdu_id} + add_pod_names = { + pod_name for pod_name in ( + set(pod_resources_from_k8s.keys())) + if pod_name not in stored_pod_names} + + # Determine the number of 'vnfc_resource_info' to add + add_vnfc_resource_info_num = ( + len(pod_resources_from_k8s) - vnfc_count) + for vnfc in vnf_instance.instantiated_vnf_info.vnfc_resource_info: + if vnfc.vdu_id == vdu_id: + resource_type = ( + vnfc.compute_resource.vim_level_resource_type) + metadata = vnfc.metadata[resource_type] + break + for dummy in range(add_vnfc_resource_info_num): + resource_id = add_pod_names.pop() + vnfc_resource = objects.VnfcResourceInfo() + vnfc_resource.id = uuidutils.generate_uuid() + vnfc_resource.vdu_id = vdu_id + resource = objects.ResourceHandle() + resource.resource_id = resource_id + resource.vim_level_resource_type = resource_type + vnfc_resource.compute_resource = resource + vnfc_resource.metadata = {} + vnfc_resource.metadata["Pod"] = ( + jsonutils.dumps(pod_resources_from_k8s.get(resource_id)) + ) + vnfc_resource.metadata[resource_type] = metadata + vnf_instance.instantiated_vnf_info.vnfc_resource_info.append( + vnfc_resource) + + self._sync_resource_id(vnf_instance.instantiated_vnf_info, + vdu_id, pod_resources_from_k8s) + return self._calc_scale_level(context, vnf_instance, vdu_id, + len(pod_resources_from_k8s)) + + def _check_pod_information(self, vnf_instance, + vdu_id, pod_resources_from_k8s): + """Compare 'instantiatedVnfInfo' and 'pod_resources_from_k8s' info + + True: find difference + False: No difference + """ + vnfc_rscs = vnf_instance.instantiated_vnf_info.vnfc_resource_info + pod_names = {vnfc.compute_resource.resource_id for vnfc in vnfc_rscs + if vnfc.vdu_id == vdu_id} + + if len(pod_names) != len(pod_resources_from_k8s): + return True + + # pod name check + if pod_names != set(pod_resources_from_k8s.keys()): + return True + + return False + + def _sync_resource_id(self, instantiated_vnf_info, + vdu_id, pod_resources_from_k8s): + """Set new resourceId into old resourceId""" + match_pod_names = set() + unmatch_pod_in_tacker = set() + + for vnfc in instantiated_vnf_info.vnfc_resource_info: + if vnfc.vdu_id != vdu_id: + continue + + if vnfc.compute_resource.resource_id in ( + set(pod_resources_from_k8s.keys())): + match_pod_names.add( + vnfc.compute_resource.resource_id) + else: + unmatch_pod_in_tacker.add( + vnfc.compute_resource.resource_id) + + # pickup new pod name + new_pod_name = set(pod_resources_from_k8s.keys()) ^ match_pod_names + + for unmatch_pod_name in unmatch_pod_in_tacker: + for vnfc in instantiated_vnf_info.vnfc_resource_info: + if vnfc.vdu_id != vdu_id: + continue + if vnfc.compute_resource.resource_id == ( + unmatch_pod_name): + vnfc.compute_resource.resource_id = ( + new_pod_name.pop() + ) + vnfc.metadata["Pod"] = ( + jsonutils.dumps( + pod_resources_from_k8s.get( + (vnfc.compute_resource.resource_id) + ) + ) + ) + break + + def _calc_scale_level(self, context, vnf_instance, + vdu_id, current_pod_num): + """calc scale_level and set""" + vnfd = vnflcm_utils.get_vnfd_dict(context, + vnf_instance.vnfd_id, + vnf_instance.instantiated_vnf_info.flavour_id) + + policies = vnfd.get("topology_template", {}).get("policies", {}) + for policy in policies: + for v in policy.values(): + if (v.get("type") == ( + "tosca.policies.nfv.VduScalingAspectDeltas")): + if vdu_id in v.get("targets", []): + aspect_id = v.get("properties", {}).get("aspect") + break + step_deltas = (self._get_scale_value(policies, aspect_id)) + + for policy in policies: + for v in policy.values(): + if (v.get("type") == ( + "tosca.policies.nfv.VduScalingAspectDeltas") + and step_deltas + and vdu_id in v.get("targets", [])): + delta = (v.get("properties", {}) + .get("deltas", {}) + .get(step_deltas, {}) + .get("number_of_instances")) + elif (v.get("type") == ( + "tosca.policies.nfv.VduInitialDelta") + and vdu_id in v.get("targets", [])): + initial_delta = (v.get("properties", {}) + .get("initial_delta", {}) + .get("number_of_instances")) + if (current_pod_num - initial_delta) % delta != 0: + LOG.error("Error computing 'scale_level'. " + f"current Pod num: {current_pod_num} delta: {delta}. " + f"vnf: {vnf_instance.id} vdu: {vdu_id}") + return False + + scale_level = (current_pod_num - initial_delta) // delta + + for inst_vnf in vnf_instance.instantiated_vnf_info.scale_status: + if inst_vnf.aspect_id == aspect_id: + inst_vnf.scale_level = scale_level + break + + return self._check_pod_range(vnf_instance, vnfd, vdu_id, + current_pod_num) + + def _get_scale_value(self, policies, aspect_id): + """get step_deltas from vnfd""" + for policy in policies: + for v in policy.values(): + if v.get("type") == "tosca.policies.nfv.ScalingAspects": + step_deltas = (v.get("properties", {}) + .get("aspects", {}) + .get(aspect_id, {}) + .get("step_deltas", [])[0]) + break + return step_deltas + + def _check_pod_range(self, vnf_instance, vnfd, vdu_id, + current_pod_num): + """Check the range of the maximum or minimum number of pods. + + If it finds out of range, output a error log and do not update + database. + """ + vdu_profile = (vnfd.get("topology_template", {}) + .get("node_templates", []) + .get(vdu_id, {}) + .get("properties", {}) + .get("vdu_profile")) + max_number_of_instances = vdu_profile.get("max_number_of_instances") + min_number_of_instances = vdu_profile.get("min_number_of_instances") + + if (current_pod_num > max_number_of_instances + or current_pod_num < min_number_of_instances): + LOG.error(f"Failed to update database vnf {vnf_instance.id} " + f"vdu: {vdu_id}. Pod num is out of range. " + f"pod_num: {current_pod_num}") + return False + return True + + def _get_pod_information(self, resource_name, + resource_type, vnf_instance, vim_connection_info): + """Extract a Pod starting with the specified 'resource_name' name""" + namespace = vnf_instance.vnf_metadata.get('namespace') + if not namespace: + namespace = "default" + auth_attr = vim_connection_info.access_info + auth_cred, _ = self.get_auth_creds(auth_attr) + + core_v1_api_client = self.kubernetes.get_core_v1_api_client( + auth=auth_cred) + resource_pods = {} + all_pods = core_v1_api_client.list_namespaced_pod( + namespace=namespace) + + for pod in all_pods.items: + if self.is_match_pod_naming_rule(resource_type, + resource_name, pod.metadata.name): + resource_pods[pod.metadata.name] = pod.metadata.to_dict() + + return resource_pods + + def _get_vdu_list(self, vnf_instance): + """get vdu_list""" + vdu_id_list = set() + + for vnfc in vnf_instance.instantiated_vnf_info.vnfc_resource_info: + if vnfc.compute_resource.resource_id != "": + vdu_id_list.add(vnfc.vdu_id) + return vdu_id_list + + def _get_resource_name(self, resource_id, resource_type): + """get resource name""" + if resource_type == 'Pod': + resource_name = resource_id + else: + name_list = resource_id.split("-") + if resource_type == 'Deployment': + del name_list[-2:] + elif resource_type in ('ReplicaSet', 'DaemonSet', 'StatefulSet'): + del name_list[-1] + resource_name = '-'.join(name_list) + + return resource_name