Database sync between TackerDB and Kubernetes

The patch supports synchronization of Pod information
(Number of Pods, Pod name) between TackerDB and Kubernetes.
If there are differences in the Pod information,
it will synchronize the TackerDB information with the
Kubernetes information.
This process is performed periodically, and its interval
is configurable.

Implements: blueprint database-synchronization
Change-Id: I7bb00d9cc0a6f110a68cdb4ecc2bc10665dcdacc
This commit is contained in:
Hideki Matsuda 2022-08-10 10:27:10 +00:00 committed by Yi Feng
parent a5c7189051
commit 0e2434d9c2
20 changed files with 1856 additions and 28 deletions

View File

@ -0,0 +1,13 @@
---
features:
- |
This patch adds the ability to periodically synchronize resources
in K8s VIM and Tacker DB.
There is no interface on the K8s to notify Tacker when the auto-scale or
auto-heal of a pod is running and the pod information is updated.
This can lead to inconsistencies between the Pod information in Tacker
database and the Pod information running on the actual K8s.
This function periodically checks the pod information in the Tacker
database and the pod information in the K8s, and updates the information
in the Tacker database according to the K8s side if there is
any inconsistency.

View File

@ -22,6 +22,7 @@ import oslo_messaging
import requests
import shutil
import sys
import threading
import time
import traceback
import yaml
@ -30,6 +31,7 @@ from glance_store import exceptions as store_exceptions
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_service import loopingcall
from oslo_service import periodic_task
from oslo_service import service
from oslo_utils import encodeutils
@ -75,6 +77,10 @@ from tacker.vnfm import nfvo_client
from tacker.vnfm import plugin
CONF = tacker.conf.CONF
# NOTE(fengyi): After the conductor is started, since the start-up process
# is being executed, it should take a while to start the actual DB
# synchronization periodical process.
DB_SYNC_INITIAL_DELAY = 60
# NOTE(tpatil): keystone_authtoken opts registered explicitly as conductor
# service doesn't use the keystonemiddleware.authtoken middleware as it's
@ -292,6 +298,14 @@ def grant_error_common(function):
return decorated_function
def async_call(func):
def inner(*args, **kwargs):
th = threading.Thread(target=func, args=args,
kwargs=kwargs, daemon=True)
th.start()
return inner
class Conductor(manager.Manager, v2_hook.ConductorV2Hook):
def __init__(self, host, conf=None):
if conf:
@ -304,6 +318,15 @@ class Conductor(manager.Manager, v2_hook.ConductorV2Hook):
self.vnf_manager = driver_manager.DriverManager(
'tacker.tacker.vnfm.drivers',
cfg.CONF.tacker.infra_driver)
self._periodic_call()
@async_call
def _periodic_call(self):
self.periodic = loopingcall.FixedIntervalLoopingCall(
self._sync_db)
self.periodic.start(
interval=CONF.db_synchronization_interval,
initial_delay=DB_SYNC_INITIAL_DELAY)
def start(self):
coordination.COORDINATOR.start()
@ -2506,6 +2529,13 @@ class Conductor(manager.Manager, v2_hook.ConductorV2Hook):
error_point=vnf_dict['current_error_point']
)
def _sync_db(self):
"""Periodic database update invocation method"""
LOG.debug("Starting _sync_db")
context = t_context.get_admin_context()
self.vnflcm_driver.sync_db(context)
LOG.debug("Ended _sync_db")
def init(args, **kwargs):
CONF(args=args, project='tacker',

View File

@ -23,6 +23,10 @@ interval_opts = [
default=1800,
help=_('Seconds between running periodic tasks '
'to cleanup residues of deleted vnf packages')),
cfg.IntOpt('db_synchronization_interval',
default=300,
help=_('Interval time in sec for DB sync between '
'Tacker and Kubernetes VIMs')),
]

View File

@ -37,6 +37,7 @@ from tacker.db import db_base
from tacker.db.db_sqlalchemy import models
from tacker.db import model_base
from tacker.db import models_v1
from tacker.db.nfvo import nfvo_db # noqa: F401
from tacker.db.nfvo import ns_db
from tacker.db import types
from tacker.extensions import vnfm

View File

@ -139,8 +139,10 @@ class VnfInstanceTaskState(BaseTackerEnum):
TERMINATING = 'TERMINATING'
SCALING = 'SCALING'
ERROR = 'ERROR'
DB_SYNCHRONIZING = 'DB_SYNCHRONIZING'
ALL = (INSTANTIATING, HEALING, TERMINATING, SCALING, ERROR)
ALL = (INSTANTIATING, HEALING, TERMINATING,
SCALING, ERROR, DB_SYNCHRONIZING)
class VnfInstanceTaskStateField(BaseEnumField):

View File

@ -347,6 +347,14 @@ class InvalidPagingMarker(SolHttpError400):
message = _("Paging marker value %(marker)s is invalid.")
class DbSyncNoDiff(Exception):
pass
class DbSyncFailed(Exception):
pass
class K8sOperationFailed(SolHttpError422):
# title and detail are set in the code from kubernetes operation
pass

View File

@ -459,3 +459,14 @@ class Vnfd(object):
mani_artifact_files.extend(meta_artifacts_files)
return mani_artifact_files
def get_initial_delta(self, flavour_id, vdu_id):
initial_deltas = self.get_policy_values_by_type(flavour_id,
'tosca.policies.nfv.VduInitialDelta')
for aspect in initial_deltas:
if vdu_id in aspect.get('targets', []):
initial_delta = (aspect.get('properties', {})
.get('initial_delta', {})
.get('number_of_instances'))
break
return initial_delta

View File

@ -13,7 +13,11 @@
# License for the specific language governing permissions and limitations
# under the License.
import threading
from oslo_log import log as logging
from oslo_service import loopingcall
from oslo_utils import encodeutils
from tacker.common import log
from tacker import context as tacker_context
@ -34,6 +38,18 @@ from tacker.sol_refactored.objects.v2 import fields
LOG = logging.getLogger(__name__)
CONF = config.CONF
# NOTE(fengyi): After the conductor is started, since the start-up process
# is being executed, it should take a while to start the actual DB
# synchronization periodical process.
DB_SYNC_INITIAL_DELAY = 60
def async_call(func):
def inner(*args, **kwargs):
th = threading.Thread(target=func, args=args,
kwargs=kwargs, daemon=True)
th.start()
return inner
class ConductorV2(object):
@ -48,6 +64,15 @@ class ConductorV2(object):
self._change_lcm_op_state()
self._periodic_call()
@async_call
def _periodic_call(self):
self.periodic = loopingcall.FixedIntervalLoopingCall(
self._sync_db)
self.periodic.start(interval=CONF.db_synchronization_interval,
initial_delay=DB_SYNC_INITIAL_DELAY)
def _change_lcm_op_state(self):
# NOTE: If the conductor down during processing and
# the LcmOperationState STARTING/PROCESSING/ROLLING_BACK remain,
@ -320,6 +345,37 @@ class ConductorV2(object):
self.nfvo_client.send_lcmocc_notification(context, lcmocc, inst,
self.endpoint)
def _sync_db(self):
"""Periodic database update invocation method(v2 api)"""
LOG.debug("Starting _sync_db")
context = tacker_context.get_admin_context()
vnf_instances = objects.VnfInstanceV2.get_by_filter(
context, instantiationState='INSTANTIATED')
for inst in vnf_instances:
try:
vim_info = inst_utils.select_vim_info(inst.vimConnectionInfo)
self.vnflcm_driver.diff_check_inst(inst, vim_info)
self._sync_inst(context, inst, vim_info)
except sol_ex.DbSyncNoDiff:
continue
except sol_ex.DbSyncFailed as e:
LOG.error("%s: %s", e.__class__.__name__, e.args[0])
except sol_ex.OtherOperationInProgress:
LOG.info("There is an LCM operation in progress, so "
f"skip this DB synchronization. vnf: {inst.id}.")
except Exception as e:
LOG.error(f"Failed to synchronize database vnf: {inst.id} "
f"Error: {encodeutils.exception_to_unicode(e)}")
LOG.debug("Ended _sync_db")
@coordinate.lock_vnf_instance('{inst.id}')
def _sync_inst(self, context, inst, vim_info):
vnf_inst = inst_utils.get_inst(context, inst.id)
self.vnflcm_driver.sync_db(
context, vnf_inst, vim_info)
vnf_inst.update(context)
def store_alarm_info(self, context, alarm):
self.vnffm_driver.store_alarm_info(context, alarm)

View File

@ -1118,3 +1118,29 @@ class VnfLcmDriverV2(object):
else:
# should not occur
raise sol_ex.SolException(sol_detail='not support vim type')
def sync_db(self, context, vnf_inst, vim_info):
# Database synchronization works only when
# the vimType is kubernetes or ETSINFV.HELM.V_3
if vim_info.vimType == 'kubernetes':
driver = kubernetes.Kubernetes()
driver.sync_db(context, vnf_inst, vim_info)
elif vim_info.vimType == 'ETSINFV.HELM.V_3':
driver = helm.Helm()
driver.sync_db(context, vnf_inst, vim_info)
else:
# Only support CNF
raise sol_ex.DbSyncNoDiff(
"There are no differences in Vnfc resources.")
def diff_check_inst(self, vnf_inst, vim_info):
if vim_info.vimType == 'kubernetes':
driver = kubernetes.Kubernetes()
driver.diff_check_inst(vnf_inst, vim_info)
elif vim_info.vimType == 'ETSINFV.HELM.V_3':
driver = helm.Helm()
driver.diff_check_inst(vnf_inst, vim_info)
else:
# Only support CNF
raise sol_ex.DbSyncNoDiff(
"There are no differences in Vnfc resources.")

View File

@ -13,8 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import re
from kubernetes import client
from oslo_log import log as logging
@ -305,27 +303,4 @@ class Kubernetes(kubernetes_common.KubernetesCommon):
)
def _is_match_pod_naming_rule(self, rsc_kind, rsc_name, pod_name):
match_result = None
if rsc_kind == 'Pod':
# Expected example: name
if rsc_name == pod_name:
return True
elif rsc_kind == 'Deployment':
# Expected example: name-012789abef-019az
# NOTE(horie): The naming rule of Pod in deployment is
# "(deployment name)-(pod template hash)-(5 charactors)".
# The "pod template hash" string is generated from 32 bit hash.
# This may be from 1 to 10 caracters but not sure the lower limit
# from the source code of Kubernetes.
match_result = re.match(
rsc_name + '-([0-9a-f]{1,10})-([0-9a-z]{5})+$', pod_name)
elif rsc_kind in ('ReplicaSet', 'DaemonSet'):
# Expected example: name-019az
match_result = re.match(rsc_name + '-([0-9a-z]{5})+$', pod_name)
elif rsc_kind == 'StatefulSet':
# Expected example: name-0
match_result = re.match(rsc_name + '-[0-9]+$', pod_name)
if match_result:
return True
return False
return self.is_match_pod_naming(rsc_kind, rsc_name, pod_name)

View File

@ -13,6 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
import operator
import re
from oslo_log import log as logging
from oslo_service import loopingcall
@ -21,6 +25,7 @@ from tacker.sol_refactored.common import exceptions as sol_ex
from tacker.sol_refactored.common import vnf_instance_utils as inst_utils
from tacker.sol_refactored.infra_drivers.kubernetes import kubernetes_resource
from tacker.sol_refactored.infra_drivers.kubernetes import kubernetes_utils
from tacker.sol_refactored.nfvo import nfvo_client
from tacker.sol_refactored import objects
@ -214,3 +219,175 @@ class KubernetesCommon(object):
check_reses = set(k8s_reses)
self._check_status(_check_updated, check_reses, k8s_api_client,
namespace, old_pods_names)
def diff_check_inst(self, inst, vim_info):
inst_tmp = copy.deepcopy(inst)
self.diff_check_and_update_vnfc(inst_tmp, vim_info)
def diff_check_and_update_vnfc(self, vnf_instance, vim_info):
with kubernetes_utils.AuthContextManager(vim_info) as acm:
k8s_api_client = acm.init_k8s_api_client()
old_pods_names = {
vnfc.computeResource.resourceId for vnfc in
vnf_instance.instantiatedVnfInfo.vnfcResourceInfo}
self._update_vnfc_info(vnf_instance, k8s_api_client)
new_pods_names = {
vnfc.computeResource.resourceId for vnfc in
vnf_instance.instantiatedVnfInfo.vnfcResourceInfo}
if operator.eq(old_pods_names, new_pods_names):
raise sol_ex.DbSyncNoDiff(
"There are no differences in Vnfc resources.")
def sync_db(self, context, vnf_instance, vim_info):
self.diff_check_and_update_vnfc(vnf_instance, vim_info)
vdu_id_list = self._get_vdu_list(vnf_instance)
for vdu_id in vdu_id_list:
resource_type = ""
resource_name = ""
# get pod information
for vnfc in vnf_instance.instantiatedVnfInfo.vnfcResourceInfo:
if vnfc.vduId == vdu_id:
resource_type = (vnfc.computeResource
.vimLevelResourceType)
resource_name = self._get_resource_name(
vnfc.computeResource.resourceId, resource_type)
break
pod_resources_from_k8s = self._get_pod_information(
resource_name, resource_type, vnf_instance, vim_info)
vnfcs = [vnfc for vnfc in
vnf_instance.instantiatedVnfInfo.vnfcResourceInfo if
vnfc.vduId == vdu_id]
replicas = vnf_instance.instantiatedVnfInfo.metadata[
'vdu_reses'][vdu_id]['spec'].get('replicas')
if replicas is not None:
vnf_instance.instantiatedVnfInfo.metadata[
'vdu_reses'][vdu_id]['spec']['replicas'] = len(vnfcs)
self._calc_scale_level(
context, vnf_instance, vdu_id,
len(pod_resources_from_k8s))
def _calc_scale_level(self, context, vnf_instance,
vdu_id, current_pod_num):
"""calc scale_level and set"""
aspect_id = ""
flavour_id = vnf_instance.instantiatedVnfInfo.flavourId
client = nfvo_client.NfvoClient()
vnfd = client.get_vnfd(context, vnf_instance.vnfdId)
for aspect_delta in vnfd.get_policy_values_by_type(flavour_id,
'tosca.policies.nfv.VduScalingAspectDeltas'):
if vdu_id in aspect_delta.get('targets', []):
aspect_id = aspect_delta.get('properties', {}).get('aspect')
break
if not aspect_id:
return
delta = vnfd.get_scale_vdu_and_num(flavour_id, aspect_id).get(vdu_id)
initial_delta = vnfd.get_initial_delta(flavour_id, vdu_id)
if (current_pod_num - initial_delta) % delta != 0:
raise sol_ex.DbSyncFailed(
"Error computing 'scale_level'. current Pod num: "
f"{current_pod_num} delta: {delta}. vnf: {vnf_instance.id} "
f"vdu: {vdu_id}")
scale_level = (current_pod_num - initial_delta) // delta
for inst_vnf in vnf_instance.instantiatedVnfInfo.scaleStatus:
if inst_vnf.aspectId == aspect_id:
inst_vnf.scaleLevel = scale_level
break
self._check_pod_range(
vnf_instance, vnfd, vdu_id, flavour_id, current_pod_num)
def _check_pod_range(self, vnf_instance, vnfd, vdu_id,
flavour_id, current_pod_num):
"""Check the range of the maximum or minimum number of pods.
If it finds out of range, output a error log and do not update
database.
"""
vdu_profile = (vnfd.get_vdu_nodes(flavour_id)
.get(vdu_id, {})
.get("properties", {})
.get("vdu_profile", {}))
max_number_of_instances = vdu_profile.get("max_number_of_instances")
min_number_of_instances = vdu_profile.get("min_number_of_instances")
if (current_pod_num > max_number_of_instances
or current_pod_num < min_number_of_instances):
raise sol_ex.DbSyncFailed(
f"Failed to update database vnf {vnf_instance.id} "
f"vdu: {vdu_id}. Pod num is out of range. "
f"pod_num: {current_pod_num}")
def _get_pod_information(self, resource_name,
resource_type, vnf_instance, vim_connection_info):
"""Extract a Pod starting with the specified 'resource_name' name"""
namespace = vnf_instance.metadata.get('namespace')
if not namespace:
namespace = "default"
with kubernetes_utils.AuthContextManager(vim_connection_info) as acm:
k8s_api_client = acm.init_k8s_api_client()
all_pods = kubernetes_utils.list_namespaced_pods(
k8s_api_client, namespace)
resource_pods = {}
for pod in all_pods:
if self.is_match_pod_naming(resource_type,
resource_name, pod.metadata.name):
resource_pods[pod.metadata.name] = pod.metadata.to_dict()
return resource_pods
def _get_vdu_list(self, vnf_instance):
"""get vdu_list"""
vdu_id_list = set()
for vnfc in vnf_instance.instantiatedVnfInfo.vnfcResourceInfo:
vdu_id_list.add(vnfc.vduId)
return vdu_id_list
def _get_resource_name(self, resource_id, resource_type):
"""get resource name"""
if resource_type == 'Pod':
resource_name = resource_id
else:
name_list = resource_id.split("-")
if resource_type == 'Deployment':
del name_list[-2:]
elif resource_type in ('ReplicaSet', 'DaemonSet', 'StatefulSet'):
del name_list[-1]
resource_name = '-'.join(name_list)
return resource_name
def is_match_pod_naming(self, rsc_kind, rsc_name, pod_name):
match_result = None
if rsc_kind == 'Pod':
# Expected example: name
if rsc_name == pod_name:
return True
elif rsc_kind == 'Deployment':
# Expected example: name-012789abef-019az
# NOTE(horie): The naming rule of Pod in deployment is
# "(deployment name)-(pod template hash)-(5 charactors)".
# The "pod template hash" string is generated from 32 bit hash.
# This may be from 1 to 10 caracters but not sure the lower limit
# from the source code of Kubernetes.
match_result = re.match(
rsc_name + '-([0-9a-f]{1,10})-([0-9a-z]{5})+$', pod_name)
elif rsc_kind in ('ReplicaSet', 'DaemonSet'):
# Expected example: name-019az
match_result = re.match(rsc_name + '-([0-9a-z]{5})+$', pod_name)
elif rsc_kind == 'StatefulSet':
# Expected example: name-0
match_result = re.match(rsc_name + '-[0-9]+$', pod_name)
if match_result:
return True
return False

View File

@ -3987,3 +3987,7 @@ class TestConductor(SqlTestCase, unit_base.FixturedTestCase):
mock_change_vnf_status.assert_not_called()
self.assertEqual(mock_change_ext_conn_grant.call_count, 0)
mock_update_vnf_attributes.assert_called_once()
def test_sync_db(self):
self.conductor._sync_db()
self.vnflcm_driver.sync_db.assert_called_once()

View File

@ -17,7 +17,10 @@ from datetime import datetime
from unittest import mock
import ddt
from kubernetes import client
from oslo_log import log as logging
from oslo_utils import uuidutils
from tooz.drivers import file
from tacker import context
from tacker.sol_refactored.common import exceptions as sol_ex
@ -28,6 +31,11 @@ from tacker.sol_refactored.nfvo import nfvo_client
from tacker.sol_refactored import objects
from tacker.sol_refactored.objects.v2 import fields
from tacker.tests.unit.db import base as db_base
from tacker.tests.unit.sol_refactored.infra_drivers.kubernetes import fakes
from tacker.vnfm.infra_drivers.kubernetes import kubernetes_driver
CNF_SAMPLE_VNFD_ID = "b1bb0ce7-ebca-4fa7-95ed-4840d70a1177"
@ddt.ddt
@ -629,3 +637,97 @@ class TestConductorV2(db_base.SqlTestCase):
result = self.conductor.modify_vnfinfo(self.context, lcmocc.id)
self.assertEqual(None, result)
@mock.patch.object(vnflcm_driver_v2.VnfLcmDriverV2,
'sync_db')
@mock.patch.object(kubernetes_driver.Kubernetes,
'_check_pod_information')
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
@mock.patch.object(objects.base.TackerPersistentObject, "get_by_filter")
def test_sync_db(
self, mock_db_sync, mock_get_by_filters,
mock_list_namespaced_pod, mock_check_pod_information):
vnf_instance_obj = fakes.fake_vnf_instance()
vnf_instance_obj.id = CNF_SAMPLE_VNFD_ID
vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info(
vdu_id='VDU1', rsc_kind='Pod')
vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [
vnfc_rsc_info_obj1
]
vim_connection_object = fakes.fake_vim_connection_info()
vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object
mock_get_by_filters.return_value = [
vnf_instance_obj, vnf_instance_obj]
mock_list_namespaced_pod.return_value = client.V1PodList(
items=[fakes.get_fake_pod_info(kind='Pod')])
mock_check_pod_information.return_value = True
self.conductor._sync_db()
mock_db_sync.assert_called_once()
@mock.patch.object(kubernetes_driver.Kubernetes,
'_check_pod_information')
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
@mock.patch.object(objects.base.TackerPersistentObject, "get_by_filter")
def test_sync_db_exception(
self, mock_get_by_filters, mock_list_namespaced_pod,
mock_check_pod_information):
vnf_instance_obj = fakes.fake_vnf_instance()
vnf_instance_obj.id = CNF_SAMPLE_VNFD_ID
vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info(
vdu_id='VDU1', rsc_kind='Pod')
vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [
vnfc_rsc_info_obj1
]
vim_connection_object = fakes.fake_vim_connection_info()
vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object
mock_get_by_filters.return_value = [
vnf_instance_obj, vnf_instance_obj]
mock_list_namespaced_pod.return_value = client.V1PodList(
items=[fakes.get_fake_pod_info(kind='Pod')])
mock_check_pod_information.return_value = True
log_name = "tacker.sol_refactored.conductor.conductor_v2"
with self.assertLogs(logger=log_name, level=logging.DEBUG) as cm:
self.conductor._sync_db()
msg = (f'ERROR:{log_name}:Failed to synchronize database vnf: '
f'{vnf_instance_obj.id} Error: ')
self.assertIn(f'{msg}', cm.output[1])
@mock.patch.object(file.FileLock, 'acquire')
@mock.patch.object(kubernetes_driver.Kubernetes,
'_check_pod_information')
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
@mock.patch.object(objects.base.TackerPersistentObject, "get_by_filter")
def test_sync_db_sol_ex(
self, mock_get_by_filters, mock_list_namespaced_pod,
mock_check_pod_information, mock_acquire):
vnf_instance_obj = fakes.fake_vnf_instance()
# vnf_instance_obj.id = CNF_SAMPLE_VNFD_ID
vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info(
vdu_id='VDU1', rsc_kind='Pod')
vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [
vnfc_rsc_info_obj1
]
vim_connection_object = fakes.fake_vim_connection_info()
vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object
vnf_instance_obj1 = fakes.fake_vnf_instance()
vnf_instance_obj1.vimConnectionInfo['vim1'] = vim_connection_object
mock_get_by_filters.return_value = [
vnf_instance_obj]
mock_list_namespaced_pod.return_value = client.V1PodList(
items=[fakes.get_fake_pod_info(kind='Pod')])
mock_check_pod_information.return_value = True
mock_acquire.return_value = False
log_name = "tacker.sol_refactored.conductor.conductor_v2"
with self.assertLogs(logger=log_name, level=logging.DEBUG) as cm:
self.conductor._sync_db()
msg = (f'INFO:{log_name}:There is an LCM operation in progress, '
f'so skip this DB synchronization. '
f'vnf: {vnf_instance_obj.id}.')
self.assertIn(f'{msg}', cm.output)

View File

@ -17,6 +17,7 @@ from datetime import datetime
import os
from unittest import mock
from kubernetes import client
from oslo_utils import uuidutils
from tacker import context
@ -31,6 +32,7 @@ from tacker.sol_refactored.nfvo import nfvo_client
from tacker.sol_refactored import objects
from tacker.sol_refactored.objects.v2 import fields
from tacker.tests import base
from tacker.tests.unit.sol_refactored.infra_drivers.kubernetes import fakes
CNF_SAMPLE_VNFD_ID = "b1bb0ce7-ebca-4fa7-95ed-4840d70a1177"
@ -3276,3 +3278,111 @@ class TestVnfLcmDriverV2(base.BaseTestCase):
self.assertRaises(
sol_ex.SolException, self.driver.change_vnfpkg_rollback,
self.context, lcmocc, inst, grant_req, grant, self.vnfd_1)
@mock.patch.object(nfvo_client.NfvoClient, 'get_vnfd')
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
def test_sync_db_kubernetes(
self, mock_list_namespaced_pod, mock_get_vnfd):
vnf_instance_obj = fakes.fake_vnf_instance()
vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info(
vdu_id='VDU1', rsc_kind='Deployment',
pod_name="vdu1-1234567890-abcd", rsc_name="vdu1")
vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [
vnfc_rsc_info_obj1
]
vim_connection_object = fakes.fake_vim_connection_info()
vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object
mock_list_namespaced_pod.return_value = client.V1PodList(
items=[
fakes.get_fake_pod_info(
kind='Deployment', pod_name="vdu1-1234567890-abcd1"),
fakes.get_fake_pod_info(
kind='Deployment', pod_name="vdu1-1234567890-abcd2")])
mock_get_vnfd.return_value = self.vnfd_1
vnf_instance_obj.vnfdId = uuidutils.generate_uuid()
vnf_instance_obj.instantiatedVnfInfo.scaleStatus = [
fakes.fake_scale_status(vnfd_id=vnf_instance_obj.vnfdId)
]
self.driver.sync_db(
context=self.context, vnf_inst=vnf_instance_obj,
vim_info=vim_connection_object)
self.assertEqual(
2, vnf_instance_obj.instantiatedVnfInfo.metadata[
'vdu_reses']['VDU1']['spec']['replicas'])
@mock.patch.object(nfvo_client.NfvoClient, 'get_vnfd')
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
def test_sync_db_helm(
self, mock_list_namespaced_pod, mock_get_vnfd):
vnf_instance_obj = fakes.fake_vnf_instance()
vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info(
vdu_id='VDU1', rsc_kind='Deployment',
pod_name="vdu1-1234567890-abcd", rsc_name="vdu1")
vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [
vnfc_rsc_info_obj1
]
vim_connection_object = fakes.fake_vim_connection_info(
vim_type='ETSINFV.HELM.V_3')
vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object
mock_list_namespaced_pod.return_value = client.V1PodList(
items=[
fakes.get_fake_pod_info(
kind='Deployment', pod_name="vdu1-1234567890-abcd1"),
fakes.get_fake_pod_info(
kind='Deployment', pod_name="vdu1-1234567890-abcd2")])
mock_get_vnfd.return_value = self.vnfd_1
vnf_instance_obj.vnfdId = uuidutils.generate_uuid()
vnf_instance_obj.instantiatedVnfInfo.scaleStatus = [
fakes.fake_scale_status(vnfd_id=vnf_instance_obj.vnfdId)
]
self.driver.sync_db(
context=self.context, vnf_inst=vnf_instance_obj,
vim_info=vim_connection_object)
self.assertEqual(
2, vnf_instance_obj.instantiatedVnfInfo.metadata[
'vdu_reses']['VDU1']['spec']['replicas'])
@mock.patch.object(nfvo_client.NfvoClient, 'get_vnfd')
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
def test_sync_db_not_support(
self, mock_list_namespaced_pod, mock_get_vnfd):
vnf_instance_obj = fakes.fake_vnf_instance()
vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info(
vdu_id='VDU1', rsc_kind='Deployment',
pod_name="vdu1-1234567890-abcd", rsc_name="vdu1")
vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [
vnfc_rsc_info_obj1
]
vim_connection_object = fakes.fake_vim_connection_info(
vim_type="openstack")
vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object
mock_list_namespaced_pod.return_value = client.V1PodList(
items=[
fakes.get_fake_pod_info(
kind='Deployment', pod_name="vdu1-1234567890-abcd1"),
fakes.get_fake_pod_info(
kind='Deployment', pod_name="vdu1-1234567890-abcd2")])
mock_get_vnfd.return_value = self.vnfd_1
vnf_instance_obj.vnfdId = uuidutils.generate_uuid()
vnf_instance_obj.instantiatedVnfInfo.scaleStatus = [
fakes.fake_scale_status(vnfd_id=vnf_instance_obj.vnfdId)
]
ex = self.assertRaises(
sol_ex.DbSyncNoDiff, self.driver.sync_db,
self.context, vnf_instance_obj, vim_connection_object)
self.assertEqual(
"There are no differences in Vnfc resources.", ex.args[0])

View File

@ -13,7 +13,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from kubernetes import client
from oslo_utils import uuidutils
from tacker.sol_refactored import objects
def fake_namespace():
@ -547,3 +552,114 @@ def fake_node(type='Ready', status='True'):
]
)
)
def get_fake_pod_info(kind, name='fake_name', pod_status='Running',
pod_name=None):
if not pod_name:
if kind == 'Deployment':
pod_name = _('{name}-1234567890-abcde').format(name=name)
elif kind == 'ReplicaSet' or kind == 'DaemonSet':
pod_name = _('{name}-12345').format(name=name)
elif kind == 'StatefulSet':
pod_name = _('{name}-1').format(name=name)
elif kind == 'Pod':
pod_name = name
return client.V1Pod(
metadata=client.V1ObjectMeta(
name=pod_name,
creation_timestamp=datetime.datetime.now().isoformat('T')),
status=client.V1PodStatus(phase=pod_status))
def fake_vim_connection_info(vim_type="kubernetes"):
return objects.VimConnectionInfo.from_dict({
'vimId': 'a56258df-9853-4437-9fdb-7d470bc0b162',
'vimType': vim_type,
'interfaceInfo': {
'endpoint': 'https://127.0.0.1:6443'
},
'accessInfo': {
'bearer_token': 'secret_token',
'username': 'test',
'password': 'test',
'region': 'RegionOne'
}
})
def fake_vnfc_resource_info(vdu_id='VDU1', rsc_kind='Deployment',
rsc_name='fake_name', pod_name=None,
namespace='default'):
if not pod_name:
v1_pod = get_fake_pod_info(rsc_kind, rsc_name)
pod_name = v1_pod.metadata.name
id = uuidutils.generate_uuid()
vnfc_rsc_info = {
'id': id,
'vduId': vdu_id,
'computeResource': {
'resourceId': pod_name,
'vimLevelResourceType': rsc_kind
},
'metadata': {
'Deployment': {
'name': rsc_name,
'namespace': namespace
}
}
}
vnfc_rsc_info_obj = objects.VnfcResourceInfoV2.from_dict(vnfc_rsc_info)
vnfc_info = {
'id': vdu_id + "-" + id,
'vduId': vdu_id,
'vnfcResourceInfoId': id,
'vnfcState': 'STARTED'
}
vnfc_info_obj = objects.VnfcInfoV2.from_dict(vnfc_info)
return vnfc_rsc_info_obj, vnfc_info_obj
def fake_vnf_instance(instantiated_state='INSTANTIATED'):
return objects.VnfInstanceV2.from_dict({
'id': uuidutils.generate_uuid(),
'vimConnectionInfo': {},
'instantiationState': instantiated_state,
'instantiatedVnfInfo': {
'flavourId': 'simple',
'vnfState': 'STARTED',
'vnfcResourceInfo': [],
'vnfcInfo': [],
'metadata': {
'namespace': 'default',
'lcm-kubernetes-def-files': [
'Files/kubernetes/deployment.yaml'],
'vdu_reses': {
'VDU1': {
'kind': 'Deployment',
'metadata': {
'name': 'vdu1',
'namespace': 'default',
},
'spec': {
'replicas': 0
}
}
}
}
},
'metadata': {
'lcm-kubernetes-def-files': ['Files/kubernetes/deployment.yaml']
}
})
def fake_scale_status(aspect_id='vdu1_aspect',
vnfd_id=uuidutils.generate_uuid(), scale_level=1):
return objects.ScaleInfoV2.from_dict({
'aspectId': aspect_id,
'vnfdId': vnfd_id,
'scaleLevel': scale_level
})

View File

@ -14,14 +14,19 @@
# under the License.
import os
from kubernetes import client
from oslo_utils import uuidutils
from unittest import mock
from tacker import context
from tacker.sol_refactored.common import exceptions as sol_ex
from tacker.sol_refactored.common import vnfd_utils
from tacker.sol_refactored.infra_drivers.kubernetes import kubernetes
from tacker.sol_refactored.nfvo import nfvo_client
from tacker.sol_refactored import objects
from tacker.tests.unit import base
from tacker.tests.unit.sol_refactored.infra_drivers.kubernetes import fakes
CNF_SAMPLE_VNFD_ID = "b1bb0ce7-ebca-4fa7-95ed-4840d70a1177"
@ -32,6 +37,7 @@ class TestKubernetes(base.TestCase):
super(TestKubernetes, self).setUp()
objects.register_all()
self.driver = kubernetes.Kubernetes()
self.context = context.get_admin_context()
cur_dir = os.path.dirname(__file__)
sample_dir = os.path.join(cur_dir, "../..", "samples")
@ -104,3 +110,167 @@ class TestKubernetes(base.TestCase):
# maybe 3 but possible 2
self.assertTrue(res1.is_ready.call_count >= 2)
@mock.patch.object(nfvo_client.NfvoClient, 'get_vnfd')
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
def test_sync_db(
self, mock_list_namespaced_pod, mock_get_vnfd):
vnf_instance_obj = fakes.fake_vnf_instance()
vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info(
vdu_id='VDU1', rsc_kind='Deployment',
pod_name="vdu1-1234567890-abcd", rsc_name="vdu1")
vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [
vnfc_rsc_info_obj1
]
vim_connection_object = fakes.fake_vim_connection_info()
vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object
mock_list_namespaced_pod.return_value = client.V1PodList(
items=[
fakes.get_fake_pod_info(
kind='Deployment', pod_name="vdu1-1234567890-abcd1"),
fakes.get_fake_pod_info(
kind='Deployment', pod_name="vdu1-1234567890-abcd2")])
mock_get_vnfd.return_value = self.vnfd_1
vnf_instance_obj.vnfdId = uuidutils.generate_uuid()
vnf_instance_obj.instantiatedVnfInfo.scaleStatus = [
fakes.fake_scale_status(vnfd_id=vnf_instance_obj.vnfdId)
]
self.driver.sync_db(
context=self.context, vnf_instance=vnf_instance_obj,
vim_info=vim_connection_object)
self.assertEqual(
2, vnf_instance_obj.instantiatedVnfInfo.metadata[
'vdu_reses']['VDU1']['spec']['replicas'])
@mock.patch.object(nfvo_client.NfvoClient, 'get_vnfd')
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
def test_sync_db_no_diff(
self, mock_list_namespaced_pod, mock_get_vnfd):
vnf_instance_obj = fakes.fake_vnf_instance()
vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info(
vdu_id='VDU1', rsc_kind='Deployment',
pod_name="vdu1-1234567890-abcd1", rsc_name="vdu1")
vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [
vnfc_rsc_info_obj1
]
vim_connection_object = fakes.fake_vim_connection_info()
vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object
mock_list_namespaced_pod.return_value = client.V1PodList(
items=[
fakes.get_fake_pod_info(
kind='Deployment', pod_name="vdu1-1234567890-abcd1")])
mock_get_vnfd.return_value = self.vnfd_1
vnf_instance_obj.vnfdId = uuidutils.generate_uuid()
vnf_instance_obj.instantiatedVnfInfo.scaleStatus = [
fakes.fake_scale_status(vnfd_id=vnf_instance_obj.vnfdId)
]
ex = self.assertRaises(
sol_ex.DbSyncNoDiff, self.driver.sync_db,
self.context, vnf_instance_obj, vim_connection_object)
self.assertEqual(
"There are no differences in Vnfc resources.", ex.args[0])
@mock.patch.object(vnfd_utils.Vnfd, 'get_scale_vdu_and_num')
@mock.patch.object(nfvo_client.NfvoClient, 'get_vnfd')
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
def test_sync_db_scale_level(
self, mock_list_namespaced_pod, mock_get_vnfd,
mock_scale_vdu_and_num):
vnf_instance_obj = fakes.fake_vnf_instance()
vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info(
vdu_id='VDU1', rsc_kind='Deployment',
pod_name="vdu1-1234567890-abcd1", rsc_name="vdu1")
vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [
vnfc_rsc_info_obj1
]
vim_connection_object = fakes.fake_vim_connection_info()
vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object
mock_list_namespaced_pod.return_value = client.V1PodList(
items=[
fakes.get_fake_pod_info(
kind='Deployment', pod_name="vdu1-1234567890-abcd1"),
fakes.get_fake_pod_info(
kind='Deployment', pod_name="vdu1-1234567890-abcd2"),
fakes.get_fake_pod_info(
kind='Deployment', pod_name="vdu1-1234567890-abcd3"),
fakes.get_fake_pod_info(
kind='Deployment', pod_name="vdu1-1234567890-abcd4")
])
mock_get_vnfd.return_value = self.vnfd_1
vnf_instance_obj.vnfdId = uuidutils.generate_uuid()
vnf_instance_obj.instantiatedVnfInfo.scaleStatus = [
fakes.fake_scale_status(vnfd_id=vnf_instance_obj.vnfdId)
]
delta = 2
mock_scale_vdu_and_num.return_value = {'VDU1': delta}
current_pod_num = 4
vdu_id = "VDU1"
ex = self.assertRaises(
sol_ex.DbSyncFailed, self.driver.sync_db,
self.context, vnf_instance_obj, vim_connection_object)
self.assertEqual(
"Error computing 'scale_level'. current Pod num: "
f"{current_pod_num} delta: {delta}. vnf: {vnf_instance_obj.id} "
f"vdu: {vdu_id}", ex.args[0])
@mock.patch.object(vnfd_utils.Vnfd, 'get_scale_vdu_and_num')
@mock.patch.object(nfvo_client.NfvoClient, 'get_vnfd')
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
def test_sync_db_pod_range(
self, mock_list_namespaced_pod, mock_get_vnfd,
mock_scale_vdu_and_num):
vnf_instance_obj = fakes.fake_vnf_instance()
vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info(
vdu_id='VDU1', rsc_kind='Deployment',
pod_name="vdu1-1234567890-abcd1", rsc_name="vdu1")
vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [
vnfc_rsc_info_obj1
]
vim_connection_object = fakes.fake_vim_connection_info()
vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object
mock_list_namespaced_pod.return_value = client.V1PodList(
items=[
fakes.get_fake_pod_info(
kind='Deployment', pod_name="vdu1-1234567890-abcd1"),
fakes.get_fake_pod_info(
kind='Deployment', pod_name="vdu1-1234567890-abcd2"),
fakes.get_fake_pod_info(
kind='Deployment', pod_name="vdu1-1234567890-abcd3"),
fakes.get_fake_pod_info(
kind='Deployment', pod_name="vdu1-1234567890-abcd4"),
fakes.get_fake_pod_info(
kind='Deployment', pod_name="vdu1-1234567890-abcd5")
])
mock_get_vnfd.return_value = self.vnfd_1
vnf_instance_obj.vnfdId = uuidutils.generate_uuid()
vnf_instance_obj.instantiatedVnfInfo.scaleStatus = [
fakes.fake_scale_status(vnfd_id=vnf_instance_obj.vnfdId)
]
delta = 2
mock_scale_vdu_and_num.return_value = {'VDU1': delta}
current_pod_num = 5
vdu_id = "VDU1"
ex = self.assertRaises(
sol_ex.DbSyncFailed, self.driver.sync_db,
self.context, vnf_instance_obj, vim_connection_object)
self.assertEqual(
f"Failed to update database vnf {vnf_instance_obj.id} "
f"vdu: {vdu_id}. Pod num is out of range. "
f"pod_num: {current_pod_num}", ex.args[0])

View File

@ -20,6 +20,7 @@ from unittest import mock
import yaml
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
@ -38,6 +39,7 @@ from tacker.objects import vim_connection
from tacker.tests.unit.db import base as db_base
from tacker.tests.unit.nfvo.test_nfvo_plugin import FakeVNFMPlugin
from tacker.tests.unit.vnflcm import fakes
from tacker.tests.unit.vnfm.infra_drivers.kubernetes import fakes as k8s_fakes
from tacker.tests import utils as test_utils
from tacker.tests import uuidsentinel
from tacker.vnflcm import vnflcm_driver
@ -3538,3 +3540,74 @@ class TestVnflcmDriver(db_base.SqlTestCase):
"connectivity vnf '%s' is completed successfully")
mock_log.info.assert_called_with(expected_msg,
vnf_instance.id)
@mock.patch.object(TackerManager, 'get_service_plugins',
return_value={'VNFM': FakeVNFMPlugin()})
@mock.patch.object(VnfLcmDriver, '_init_mgmt_driver_hash')
@mock.patch.object(yaml, "safe_load")
@mock.patch.object(objects.VnfInstanceList, 'get_by_filters')
@mock.patch.object(objects.VimConnectionInfo, "obj_from_primitive")
def test_sync_db(self, mock_vim, mock_get_by_filter,
mock_yaml_safe_load, mock_init_hash,
mock_get_service_plugins):
mock_init_hash.return_value = {
"vnflcm_noop": "ffea638bfdbde3fb01f191bbe75b031859"
"b18d663b127100eb72b19eecd7ed51"
}
mock_yaml_safe_load.return_value = fakes.vnfd_dict_cnf()
vnf_instance_obj = fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED)
vnf_instance_obj.vnf_metadata['namespace'] = "default"
vnfc_resource_info_obj = k8s_fakes.fake_vnfc_resource_info(
namespace="default")
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = (
[vnfc_resource_info_obj])
vim_connection_info = vim_connection.VimConnectionInfo(
vim_type="kubernetes", id="8a3adb69-0784-43c7-833e-aab0b6ab4470",
vim_id="67e5de95-1ad2-4627-89f8-597dec1683fa")
vnf_instance_obj.vim_connection_info.append(vim_connection_info)
mock_get_by_filter.return_value = [vnf_instance_obj]
mock_vim.return_value = vim_connection_info
self._mock_vnf_manager()
driver = vnflcm_driver.VnfLcmDriver()
driver.sync_db(self.context)
mock_get_by_filter.assert_called_once()
self._vnf_manager.invoke.assert_called_once()
@mock.patch.object(TackerManager, 'get_service_plugins',
return_value={'VNFM': FakeVNFMPlugin()})
@mock.patch.object(VnfLcmDriver, '_init_mgmt_driver_hash')
@mock.patch.object(yaml, "safe_load")
@mock.patch.object(objects.VnfInstanceList, 'get_by_filters')
@mock.patch.object(objects.VimConnectionInfo, "obj_from_primitive")
def test_sync_db_exception(
self, mock_vim, mock_get_by_filter, mock_yaml_safe_load,
mock_init_hash, mock_get_service_plugins):
mock_init_hash.return_value = {
"vnflcm_noop": "ffea638bfdbde3fb01f191bbe75b031859"
"b18d663b127100eb72b19eecd7ed51"
}
mock_yaml_safe_load.return_value = fakes.vnfd_dict_cnf()
vnf_instance_obj = fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED)
vnf_instance_obj.vnf_metadata['namespace'] = "default"
vnfc_resource_info_obj = k8s_fakes.fake_vnfc_resource_info(
namespace="default")
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = (
[vnfc_resource_info_obj])
mock_get_by_filter.return_value = [vnf_instance_obj]
mock_vim.return_value = None
driver = vnflcm_driver.VnfLcmDriver()
log_name = "tacker.vnflcm.vnflcm_driver"
with self.assertLogs(logger=log_name, level=logging.ERROR) as cm:
driver.sync_db(self.context)
self.assertIn(
f"ERROR:{log_name}:Error is occoured vnf {vnf_instance_obj.id} "
f"Error: ", cm.output[0])
mock_get_by_filter.assert_called_once()

View File

@ -18,6 +18,7 @@ import ddt
import os
from kubernetes import client
from oslo_log import log as logging
from oslo_serialization import jsonutils
from tacker.common.container import kubernetes_utils
from tacker.common import exceptions
@ -36,6 +37,7 @@ from tacker.tests.unit.vnflcm import fakes as vnflcm_fakes
from tacker.tests.unit.vnfm.infra_drivers.kubernetes import fakes
from tacker.tests.unit.vnfm.infra_drivers.openstack.fixture_data import \
fixture_data_utils as fd_utils
from tacker.vnflcm import utils as vnflcm_utils
from tacker.vnfm.infra_drivers.kubernetes.k8s import tosca_kube_object
from tacker.vnfm.infra_drivers.kubernetes.k8s import translate_outputs
from tacker.vnfm.infra_drivers.kubernetes import kubernetes_driver
@ -3855,3 +3857,525 @@ class TestKubernetes(base.TestCase):
vim_connection_info=vim_connection_object,
heal_vnf_request=heal_request_data_obj)
self.assertEqual(mock_list_namespaced_pod.call_count, 0)
@mock.patch.object(kubernetes_driver.Kubernetes,
"_sync_vnfc_resource_and_pod_resource")
@mock.patch.object(objects.VimConnectionInfo, "obj_from_primitive")
@mock.patch.object(vnflcm_utils, "get_vim")
@mock.patch.object(VnfInstance, "save")
@mock.patch.object(objects.VnfInstance, "get_by_id")
@mock.patch.object(kubernetes_driver.Kubernetes, '_check_pod_information')
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
def test_sync_db(
self, mock_list_namespaced_pod, mock_check_pod_information,
mock_get_by_id, mock_save, mock_get_vim, mock_vim,
mock_sync_vnfc):
mock_list_namespaced_pod.return_value = client.V1PodList(
items=[fakes.get_fake_pod_info(kind='Deployment')])
mock_check_pod_information.return_value = True
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED)
vnf_instance_obj.vnf_metadata['namespace'] = "default"
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
vdu_id='VDU1', rsc_kind='Pod')
vnfc_resource_info_obj2 = fakes.fake_vnfc_resource_info(
vdu_id='VDU2', rsc_kind='Deployment')
vnfc_resource_info_obj3 = fakes.fake_vnfc_resource_info(
vdu_id='VDU3', rsc_kind='ReplicaSet')
vnfc_resource_info_obj4 = fakes.fake_vnfc_resource_info(
vdu_id='VDU4', rsc_kind='DaemonSet')
vnfc_resource_info_obj5 = fakes.fake_vnfc_resource_info(
vdu_id='VDU5', rsc_kind='StatefulSet')
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
vnfc_resource_info_obj1,
vnfc_resource_info_obj2,
vnfc_resource_info_obj3,
vnfc_resource_info_obj4,
vnfc_resource_info_obj5
]
vim_connection_object = fakes.fake_vim_connection_info()
mock_get_by_id.return_value = vnf_instance_obj
mock_vim.return_value = vim_connection_object
mock_sync_vnfc.return_value = True
log_name = "tacker.vnfm.infra_drivers.kubernetes.kubernetes_driver"
with self.assertLogs(logger=log_name, level=logging.INFO) as cm:
self.kubernetes.sync_db(
context=self.context, vnf_instance=vnf_instance_obj,
vim_info=vim_connection_object)
self.assertCountEqual([
f'INFO:{log_name}:Database synchronization succeeded. '
f'vnf: {vnf_instance_obj.id} '
f'vdu: {vnfc_resource_info_obj1.vdu_id}',
f'INFO:{log_name}:Database synchronization succeeded. '
f'vnf: {vnf_instance_obj.id} '
f'vdu: {vnfc_resource_info_obj2.vdu_id}',
f'INFO:{log_name}:Database synchronization succeeded. '
f'vnf: {vnf_instance_obj.id} '
f'vdu: {vnfc_resource_info_obj3.vdu_id}',
f'INFO:{log_name}:Database synchronization succeeded. '
f'vnf: {vnf_instance_obj.id} '
f'vdu: {vnfc_resource_info_obj4.vdu_id}',
f'INFO:{log_name}:Database synchronization succeeded. '
f'vnf: {vnf_instance_obj.id} '
f'vdu: {vnfc_resource_info_obj5.vdu_id}',
], cm.output)
@mock.patch.object(kubernetes_driver.Kubernetes, '_check_pod_information')
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
def test_sync_db_vnf_conflict(
self, mock_list_namespaced_pod, mock_check_pod_information):
mock_list_namespaced_pod.return_value = client.V1PodList(
items=[fakes.get_fake_pod_info(kind='Deployment')])
mock_check_pod_information.return_value = True
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
fields.VnfInstanceState.NOT_INSTANTIATED)
vnf_instance_obj.vnf_metadata['namespace'] = "default"
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
vdu_id='VDU1', rsc_kind='Pod')
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
vnfc_resource_info_obj1
]
vim_connection_object = fakes.fake_vim_connection_info()
log_name = "tacker.vnfm.infra_drivers.kubernetes.kubernetes_driver"
with self.assertLogs(logger=log_name, level=logging.INFO) as cm:
self.kubernetes.sync_db(
context=self.context, vnf_instance=vnf_instance_obj,
vim_info=vim_connection_object)
self.assertEqual(
[f'INFO:{log_name}:There is an LCM operation in progress, '
'so skip this DB synchronization. '
f'vnf: {vnf_instance_obj.id}'], cm.output)
def test_sync_db_exception(self):
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
fields.VnfInstanceState.NOT_INSTANTIATED)
vnf_instance_obj.instantiated_vnf_info = None
vim_connection_object = fakes.fake_vim_connection_info()
log_name = "tacker.vnfm.infra_drivers.kubernetes.kubernetes_driver"
with self.assertLogs(logger=log_name, level=logging.INFO) as cm:
self.kubernetes.sync_db(
context=self.context, vnf_instance=vnf_instance_obj,
vim_info=vim_connection_object)
self.assertIn(
f"Failed to synchronize database vnf: "
f"{vnf_instance_obj.id}", cm.output[0])
@mock.patch.object(objects.VimConnectionInfo, "obj_from_primitive")
@mock.patch.object(vnflcm_utils, "get_vim")
@mock.patch.object(VnfInstance, "save")
@mock.patch.object(objects.VnfInstance, "get_by_id")
@mock.patch.object(kubernetes_driver.Kubernetes, '_check_pod_information')
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
def test_sync_db_check_pod_false(
self, mock_list_namespaced_pod, mock_check_pod_information,
mock_get_by_id, mock_save, mock_get_vim, mock_vim):
mock_list_namespaced_pod.return_value = client.V1PodList(
items=[fakes.get_fake_pod_info(kind='Pod')])
mock_check_pod_information.side_effect = [True, False]
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED)
vnf_instance_obj.vnf_metadata['namespace'] = "default"
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
vdu_id='VDU1', rsc_kind='Pod')
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
vnfc_resource_info_obj1
]
vim_connection_object = fakes.fake_vim_connection_info()
mock_get_by_id.return_value = vnf_instance_obj
mock_vim.return_value = vim_connection_object
self.kubernetes.sync_db(
context=self.context, vnf_instance=vnf_instance_obj,
vim_info=vim_connection_object)
self.assertEqual(2, mock_check_pod_information.call_count)
self.assertEqual(2, mock_save.call_count)
@mock.patch.object(kubernetes_driver.Kubernetes,
"_sync_vnfc_resource_and_pod_resource")
@mock.patch.object(objects.VimConnectionInfo, "obj_from_primitive")
@mock.patch.object(vnflcm_utils, "get_vim")
@mock.patch.object(VnfInstance, "save")
@mock.patch.object(objects.VnfInstance, "get_by_id")
@mock.patch.object(kubernetes_driver.Kubernetes, '_check_pod_information')
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
def test_sync_db_not_succeeded(
self, mock_list_namespaced_pod, mock_check_pod_information,
mock_get_by_id, mock_save, mock_get_vim, mock_vim,
mock_sync_vnfc):
mock_list_namespaced_pod.return_value = client.V1PodList(
items=[fakes.get_fake_pod_info(kind='Pod')])
mock_check_pod_information.return_value = True
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED)
vnf_instance_obj.vnf_metadata['namespace'] = "default"
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
vdu_id='VDU1', rsc_kind='Pod')
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
vnfc_resource_info_obj1
]
vim_connection_object = fakes.fake_vim_connection_info()
mock_get_by_id.return_value = vnf_instance_obj
mock_vim.return_value = vim_connection_object
mock_sync_vnfc.return_value = False
self.kubernetes.sync_db(
context=self.context, vnf_instance=vnf_instance_obj,
vim_info=vim_connection_object)
self.assertEqual(1, mock_sync_vnfc.call_count)
@mock.patch.object(objects.VimConnectionInfo, "obj_from_primitive")
@mock.patch.object(vnflcm_utils, "get_vim")
@mock.patch.object(VnfInstance, "save")
@mock.patch.object(objects.VnfInstance, "get_by_id")
@mock.patch.object(kubernetes_driver.Kubernetes, '_check_pod_information')
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
def test_sync_db_failed_update_db(
self, mock_list_namespaced_pod, mock_check_pod_information,
mock_get_by_id, mock_save, mock_get_vim, mock_vim):
mock_list_namespaced_pod.return_value = client.V1PodList(
items=[fakes.get_fake_pod_info(kind='Deployment')])
mock_check_pod_information.return_value = True
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED)
vnf_instance_obj.vnf_metadata['namespace'] = "default"
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
vdu_id='VDU1', rsc_kind='Pod')
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
vnfc_resource_info_obj1
]
vim_connection_object = fakes.fake_vim_connection_info()
mock_get_by_id.return_value = vnf_instance_obj
mock_vim.return_value = vim_connection_object
log_name = "tacker.vnfm.infra_drivers.kubernetes.kubernetes_driver"
with self.assertLogs(logger=log_name, level=logging.ERROR) as cm:
self.kubernetes.sync_db(
context=self.context, vnf_instance=vnf_instance_obj,
vim_info=vim_connection_object)
self.assertIn(
f'ERROR:{log_name}:Failed to update database vnf '
f'{vnf_instance_obj.id} Error: ', cm.output[0])
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
def test_get_pod_information_no_namespace(self, mock_list_namespaced_pod):
mock_list_namespaced_pod.return_value = client.V1PodList(items=[])
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED)
vnf_instance_obj.vnf_metadata['namespace'] = ""
vim_connection_object = fakes.fake_vim_connection_info()
result = self.kubernetes._get_pod_information(
resource_name=mock.ANY, resource_type=mock.ANY,
vnf_instance=vnf_instance_obj,
vim_connection_info=vim_connection_object)
self.assertEqual({}, result)
@mock.patch.object(kubernetes_driver.Kubernetes, '_sync_resource_id')
def test_sync_vnfc_resource_and_pod_resource_eq(
self, mock_sync_resource_id):
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED)
vnf_instance_obj.vnf_metadata['namespace'] = "default"
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
vdu_id='VDU1', rsc_kind='Pod')
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
vnfc_resource_info_obj1
]
pod_resources_from_k8s = {
'fake_name-1234567890-abcde': {
'name': 'fake_name-1234567890-abcde',
'namespace': 'default'
}
}
result = self.kubernetes._sync_vnfc_resource_and_pod_resource(
context=self.context, vnf_instance=vnf_instance_obj,
pod_resources_from_k8s=pod_resources_from_k8s, vdu_id='VDU1')
self.assertTrue(result)
@mock.patch.object(kubernetes_driver.Kubernetes, '_delete_vnfc_resource')
def test_sync_vnfc_resource_and_pod_resource_gt(
self, mock_delete_vnfc_resource):
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED)
vnf_instance_obj.vnf_metadata['namespace'] = "default"
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
vdu_id='VDU1', rsc_kind='Pod')
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
vnfc_resource_info_obj1
]
pod_resources_from_k8s = {}
mock_delete_vnfc_resource.return_value = False
result = self.kubernetes._sync_vnfc_resource_and_pod_resource(
context=self.context, vnf_instance=vnf_instance_obj,
pod_resources_from_k8s=pod_resources_from_k8s, vdu_id='VDU1')
self.assertEqual(1, mock_delete_vnfc_resource.call_count)
self.assertFalse(result)
@mock.patch.object(kubernetes_driver.Kubernetes, '_add_vnfc_resource')
def test_sync_vnfc_resource_and_pod_resource_lt(
self, mock_add_vnfc_resource):
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED)
vnf_instance_obj.vnf_metadata['namespace'] = "default"
pod_resources_from_k8s = {
'fake_name-1234567890-abcde': {
'name': 'fake_name-1234567890-abcde',
'namespace': 'default'
}
}
mock_add_vnfc_resource.return_value = True
result = self.kubernetes._sync_vnfc_resource_and_pod_resource(
context=self.context, vnf_instance=vnf_instance_obj,
pod_resources_from_k8s=pod_resources_from_k8s, vdu_id='VDU1')
self.assertEqual(1, mock_add_vnfc_resource.call_count)
self.assertTrue(result)
@mock.patch.object(kubernetes_driver.Kubernetes, '_calc_scale_level')
def test_delete_vnfc_resource(self, mock_calc_scale_level):
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED)
vnf_instance_obj.vnf_metadata['namespace'] = "default"
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
vdu_id='VDU1', rsc_kind='Pod', pod_name='pod')
vnfc_resource_info_obj2 = fakes.fake_vnfc_resource_info(
vdu_id='VDU2', rsc_kind='Deployment',
pod_name='deploy-1234567890-fghij')
vnfc_resource_info_obj3 = fakes.fake_vnfc_resource_info(
vdu_id='VDU2', rsc_kind='Deployment',
pod_name='deploy-1234567890-abcde')
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
vnfc_resource_info_obj1, vnfc_resource_info_obj2,
vnfc_resource_info_obj3
]
pod_resources_from_k8s = {
'deploy-1234567890-abcde': {
'name': 'deploy-1234567890-abcde',
'namespace': 'default'
}
}
mock_calc_scale_level.return_value = True
result = self.kubernetes._delete_vnfc_resource(
context=self.context, vnf_instance=vnf_instance_obj, vdu_id='VDU2',
pod_resources_from_k8s=pod_resources_from_k8s, vnfc_count=2)
self.assertEqual(1, mock_calc_scale_level.call_count)
self.assertTrue(result)
@mock.patch.object(kubernetes_driver.Kubernetes, '_calc_scale_level')
def test_delete_vnfc_resource_rsc_not_same(self, mock_calc_scale_level):
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED)
vnf_instance_obj.vnf_metadata['namespace'] = "default"
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
vdu_id='VDU1', rsc_kind='Pod', pod_name='pod')
vnfc_resource_info_obj2 = fakes.fake_vnfc_resource_info(
vdu_id='VDU2', rsc_kind='Deployment',
pod_name='deploy-1234567890-fghij')
vnfc_resource_info_obj3 = fakes.fake_vnfc_resource_info(
vdu_id='VDU2', rsc_kind='Deployment',
pod_name='deploy-1234567890-abcde')
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
vnfc_resource_info_obj1, vnfc_resource_info_obj2,
vnfc_resource_info_obj3
]
pod_resources_from_k8s = {
'deploy-1234567890-klmno': {
'name': 'deploy-1234567890-klmno',
'namespace': 'default'
}
}
mock_calc_scale_level.return_value = False
result = self.kubernetes._delete_vnfc_resource(
context=self.context, vnf_instance=vnf_instance_obj, vdu_id='VDU2',
pod_resources_from_k8s=pod_resources_from_k8s, vnfc_count=2)
self.assertEqual(1, mock_calc_scale_level.call_count)
self.assertFalse(result)
@mock.patch.object(kubernetes_driver.Kubernetes, '_calc_scale_level')
def test_add_vnfc_resource(self, mock_calc_scale_level):
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED)
vnf_instance_obj.vnf_metadata['namespace'] = "default"
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
vdu_id='VDU2', rsc_kind='Deployment',
pod_name='deploy-1234567890-fghij')
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
vnfc_resource_info_obj1
]
pod_resources_from_k8s = {
'deploy-1234567890-klmno': {
'name': 'deploy-1234567890-klmno',
'namespace': 'default'
},
'deploy-1234567890-abcde': {
'name': 'deploy-1234567890-abcde',
'namespace': 'default'
}
}
mock_calc_scale_level.return_value = False
result = self.kubernetes._add_vnfc_resource(
context=self.context, vnf_instance=vnf_instance_obj, vdu_id='VDU2',
pod_resources_from_k8s=pod_resources_from_k8s, vnfc_count=1)
self.assertEqual(1, mock_calc_scale_level.call_count)
self.assertFalse(result)
@mock.patch('tacker.vnflcm.utils._get_vnfd_dict')
def test_calc_scale_level(self, mock_get_vnfd_dict):
mock_get_vnfd_dict.return_value = vnflcm_fakes.vnfd_dict_cnf()
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED, 'scale_status')
vnf_instance_obj.instantiated_vnf_info.scale_status[0].aspect_id = (
'vdu1_aspect')
vnf_instance_obj.vnf_metadata['namespace'] = "default"
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
vdu_id='VDU1', rsc_kind='Deployment',
pod_name='deploy-1234567890-fghij')
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
vnfc_resource_info_obj1
]
result = self.kubernetes._calc_scale_level(
context=self.context, vnf_instance=vnf_instance_obj,
vdu_id='VDU1', current_pod_num=1)
self.assertTrue(result)
@mock.patch('tacker.vnflcm.utils._get_vnfd_dict')
def test_calc_scale_level_error(self, mock_get_vnfd_dict):
delta = 2
vnfd_obj = vnflcm_fakes.vnfd_dict_cnf()
for policy in vnfd_obj['topology_template']['policies']:
if policy.get('vdu1_scaling_aspect_deltas'):
policy['vdu1_scaling_aspect_deltas']['properties'][
'deltas']['delta_1']['number_of_instances'] = delta
break
mock_get_vnfd_dict.return_value = vnfd_obj
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED, 'scale_status')
vnf_instance_obj.instantiated_vnf_info.scale_status[0].aspect_id = (
'vdu1_aspect')
vnf_instance_obj.vnf_metadata['namespace'] = "default"
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
vdu_id='VDU1', rsc_kind='Deployment',
pod_name='deploy-1234567890-fghij')
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
vnfc_resource_info_obj1
]
current_pod_num = 5
log_name = "tacker.vnfm.infra_drivers.kubernetes.kubernetes_driver"
with self.assertLogs(logger=log_name, level=logging.ERROR) as cm:
result = self.kubernetes._calc_scale_level(
context=self.context, vnf_instance=vnf_instance_obj,
vdu_id='VDU1', current_pod_num=current_pod_num)
self.assertEqual(
[f"ERROR:{log_name}:Error computing 'scale_level'. current Pod "
f"num: {current_pod_num} delta: {delta}. "
f"vnf: {vnf_instance_obj.id} vdu: VDU1"], cm.output)
self.assertFalse(result)
@mock.patch('tacker.vnflcm.utils._get_vnfd_dict')
def test_calc_scale_level_pod_range_error(self, mock_get_vnfd_dict):
delta = 2
vnfd_obj = vnflcm_fakes.vnfd_dict_cnf()
for policy in vnfd_obj['topology_template']['policies']:
if policy.get('vdu1_scaling_aspect_deltas'):
policy['vdu1_scaling_aspect_deltas']['properties'][
'deltas']['delta_1']['number_of_instances'] = delta
break
mock_get_vnfd_dict.return_value = vnfd_obj
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED, 'scale_status')
vnf_instance_obj.instantiated_vnf_info.scale_status[0].aspect_id = (
'vdu1_aspect')
vnf_instance_obj.vnf_metadata['namespace'] = "default"
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
vdu_id='VDU1', rsc_kind='Deployment',
pod_name='deploy-1234567890-fghij')
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
vnfc_resource_info_obj1
]
current_pod_num = 4
log_name = "tacker.vnfm.infra_drivers.kubernetes.kubernetes_driver"
with self.assertLogs(logger=log_name, level=logging.ERROR) as cm:
result = self.kubernetes._calc_scale_level(
context=self.context, vnf_instance=vnf_instance_obj,
vdu_id='VDU1', current_pod_num=current_pod_num)
self.assertEqual(
[f"ERROR:{log_name}:Failed to update database vnf "
f"{vnf_instance_obj.id} vdu: VDU1. Pod num is out of range. "
f"pod_num: {current_pod_num}"], cm.output)
self.assertFalse(result)
def test_check_pod_information_len(self):
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED)
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
vdu_id='VDU1', rsc_kind='Deployment',
pod_name='deploy-1234567890-fghij')
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
vnfc_resource_info_obj1
]
pod_resources_from_k8s = {
'deploy-1234567890-klmno': {
'name': 'deploy-1234567890-klmno'
},
'deploy-1234567890-abcde': {
'name': 'deploy-1234567890-abcde'
}
}
result = self.kubernetes._check_pod_information(
vnf_instance=vnf_instance_obj, vdu_id='VDU1',
pod_resources_from_k8s=pod_resources_from_k8s)
self.assertTrue(result)
def test_check_pod_information_set(self):
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED)
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
vdu_id='VDU1', rsc_kind='Deployment',
pod_name='deploy-1234567890-fghij')
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
vnfc_resource_info_obj1
]
pod_resources_from_k8s = {
'deploy-1234567890-klmno': {
'name': 'deploy-1234567890-klmno'
}
}
result = self.kubernetes._check_pod_information(
vnf_instance=vnf_instance_obj, vdu_id='VDU1',
pod_resources_from_k8s=pod_resources_from_k8s)
self.assertTrue(result)
def test_check_pod_information_false(self):
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
fields.VnfInstanceState.INSTANTIATED)
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
vdu_id='VDU1', rsc_kind='Deployment',
pod_name='deploy-1234567890-fghij')
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
vnfc_resource_info_obj1
]
pod_resources_from_k8s = {
'deploy-1234567890-fghij': {
'name': 'deploy-1234567890-fghij'
}
}
result = self.kubernetes._check_pod_information(
vnf_instance=vnf_instance_obj, vdu_id='VDU1',
pod_resources_from_k8s=pod_resources_from_k8s)
self.assertFalse(result)

View File

@ -1972,3 +1972,33 @@ class VnfLcmDriver(abstract_driver.VnfInstanceAbstractDriver):
LOG.info("Request received for changing external connectivity "
"vnf '%s' is completed successfully", vnf_instance.id)
def sync_db(self, context):
filters = {'model': 'VnfInstance',
'field': 'instantiation_state',
'value': 'INSTANTIATED',
'op': '=='
}
vnf_instance_list = objects.VnfInstanceList.get_by_filters(
context, filters)
for vnf_instance in vnf_instance_list:
vim_info = vnflcm_utils.get_vim(
context, vnf_instance.vim_connection_info)
vim_connection_info = (objects.VimConnectionInfo.
obj_from_primitive(vim_info, context))
try:
# Database synchronization works only when the
# vimType is Kubernetes.
if vim_connection_info.vim_type == 'kubernetes':
self._vnf_manager.invoke(
vim_connection_info.vim_type,
'sync_db',
context=context,
vnf_instance=vnf_instance,
vim_info=vim_connection_info
)
except Exception as e:
LOG.error(f"Error is occoured vnf {vnf_instance.id} "
f"Error: {encodeutils.exception_to_unicode(e)}")

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
import os
import re
import time
@ -23,16 +24,19 @@ from kubernetes import client
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_utils import encodeutils
from oslo_utils import uuidutils
from toscaparser import tosca_template
from tacker._i18n import _
from tacker.api.vnflcm.v1.controller import check_vnf_state
from tacker.common.container import kubernetes_utils
from tacker.common import exceptions
from tacker.common import log
from tacker.common import utils
from tacker.extensions import vnfm
from tacker import objects
from tacker.objects import fields
from tacker.objects.fields import ErrorPoint as EP
from tacker.objects import vnf_package as vnf_package_obj
from tacker.objects import vnf_package_vnfd as vnfd_obj
@ -2671,3 +2675,395 @@ class Kubernetes(abstract_driver.VnfAbstractDriver,
return_name_list = []
return_grp_id = None
return return_id_list, return_name_list, return_grp_id
def sync_db(self, context, vnf_instance, vim_info):
"""method for database synchronization"""
try:
vdu_id_list = self._get_vdu_list(vnf_instance)
for vdu_id in vdu_id_list:
# get pod information
for vnfc in (
vnf_instance.instantiated_vnf_info.vnfc_resource_info
):
if vnfc.vdu_id == vdu_id:
resource_type = (
vnfc.compute_resource.vim_level_resource_type)
resource_name = self._get_resource_name(
vnfc.compute_resource.resource_id, resource_type)
break
pod_resources_from_k8s = self._get_pod_information(
resource_name, resource_type,
vnf_instance, vim_info
)
# check difference
if not self._check_pod_information(vnf_instance, vdu_id,
pod_resources_from_k8s):
# no difference
continue
# exec db sync
try:
if self._database_update(context, vnf_instance,
vdu_id):
LOG.info("Database synchronization succeeded. "
f"vnf: {vnf_instance.id} vdu: {vdu_id}")
except exceptions.VnfInstanceConflictState:
LOG.info("There is an LCM operation in progress, so "
"skip this DB synchronization. "
f"vnf: {vnf_instance.id}")
except Exception as e:
LOG.error(f"Failed to synchronize database vnf: "
f"{vnf_instance.id} Error: "
f"{encodeutils.exception_to_unicode(e)}")
@check_vnf_state(action="db_sync",
instantiation_state=[fields.VnfInstanceState.INSTANTIATED],
task_state=[None])
def _database_update(self, context, vnf_inst,
vdu_id):
"""Update tacker DB
True: succeeded database update
False: failed database update
"""
# get target object
vnf_instance = objects.VnfInstance.get_by_id(
context, vnf_inst.id)
if vnf_instance.instantiation_state != 'INSTANTIATED':
return False
# change task_state
vnf_instance.task_state = fields.VnfInstanceTaskState.DB_SYNCHRONIZING
vnf_instance.save()
backup_vnf_instance = copy.deepcopy(vnf_instance)
vim_info = vnflcm_utils.get_vim(
context, vnf_instance.vim_connection_info)
vim_connection_info = objects.VimConnectionInfo.obj_from_primitive(
vim_info, context)
# get pod information
try:
for vnfc in vnf_instance.instantiated_vnf_info.vnfc_resource_info:
if vnfc.vdu_id == vdu_id:
resource_type = (
vnfc.compute_resource.vim_level_resource_type)
resource_name = self._get_resource_name(
vnfc.compute_resource.resource_id, resource_type)
break
pod_resources_from_k8s = self._get_pod_information(
resource_name, resource_type,
vnf_instance, vim_connection_info
)
# check difference
if not self._check_pod_information(vnf_instance, vdu_id,
pod_resources_from_k8s):
vnf_instance.task_state = None
vnf_instance.save()
return False
if self._sync_vnfc_resource_and_pod_resource(context,
vnf_instance, pod_resources_from_k8s, vdu_id):
vnf_instance.task_state = None
vnf_instance.save()
return True
vnf_instance = copy.deepcopy(backup_vnf_instance)
vnf_instance.task_state = None
vnf_instance.save()
return False
except Exception as e:
LOG.error(f"Failed to update database vnf {vnf_instance.id} "
f"Error: {encodeutils.exception_to_unicode(e)}")
vnf_instance = copy.deepcopy(backup_vnf_instance)
vnf_instance.task_state = None
vnf_instance.save()
return False
def _sync_vnfc_resource_and_pod_resource(self, context, vnf_instance,
pod_resources_from_k8s, vdu_id):
"""Sync Pod name between k8s and tacker"""
vnfc_count = 0
for vnfc in vnf_instance.instantiated_vnf_info.vnfc_resource_info:
if vnfc.vdu_id == vdu_id:
vnfc_count += 1
result = False
# number of pod is same as k8s
if vnfc_count == len(pod_resources_from_k8s):
self._sync_resource_id(vnf_instance.instantiated_vnf_info,
vdu_id, pod_resources_from_k8s)
result = True
# the number of pod is greater than k8s
elif vnfc_count > len(pod_resources_from_k8s):
result = self._delete_vnfc_resource(context, vnf_instance, vdu_id,
pod_resources_from_k8s, vnfc_count)
# the number of pod is less than k8s
elif vnfc_count < len(pod_resources_from_k8s):
result = self._add_vnfc_resource(context, vnf_instance, vdu_id,
pod_resources_from_k8s, vnfc_count)
return result
def _delete_vnfc_resource(self, context, vnf_instance, vdu_id,
pod_resources_from_k8s, vnfc_count):
"""Remove 'vnfcResourceInfo' with mismatched pod names"""
delete_index = []
for i, vnfc_resource_info in enumerate(
vnf_instance.instantiated_vnf_info.vnfc_resource_info):
if vnfc_resource_info.vdu_id != vdu_id:
continue
if vnfc_resource_info.compute_resource.resource_id not in (
set(pod_resources_from_k8s.keys())):
delete_index.append(i)
# For patterns where the number of pods is reduced and
# the remaining pods are also renamed
delete_cnt = vnfc_count - len(pod_resources_from_k8s)
if delete_cnt != len(delete_index):
for i in range(len(delete_index) - delete_cnt):
del delete_index[-1]
for idx in reversed(delete_index):
del vnf_instance.instantiated_vnf_info.vnfc_resource_info[idx]
self._sync_resource_id(vnf_instance.instantiated_vnf_info,
vdu_id, pod_resources_from_k8s)
return self._calc_scale_level(context, vnf_instance, vdu_id,
len(pod_resources_from_k8s))
def _add_vnfc_resource(self, context, vnf_instance, vdu_id,
pod_resources_from_k8s, vnfc_count):
"""Add vnfcResourceInfo"""
stored_pod_names = {
vnfc.compute_resource.resource_id
for vnfc in (
vnf_instance.instantiated_vnf_info.vnfc_resource_info)
if vnfc.vdu_id == vdu_id}
add_pod_names = {
pod_name for pod_name in (
set(pod_resources_from_k8s.keys()))
if pod_name not in stored_pod_names}
# Determine the number of 'vnfc_resource_info' to add
add_vnfc_resource_info_num = (
len(pod_resources_from_k8s) - vnfc_count)
for vnfc in vnf_instance.instantiated_vnf_info.vnfc_resource_info:
if vnfc.vdu_id == vdu_id:
resource_type = (
vnfc.compute_resource.vim_level_resource_type)
metadata = vnfc.metadata[resource_type]
break
for dummy in range(add_vnfc_resource_info_num):
resource_id = add_pod_names.pop()
vnfc_resource = objects.VnfcResourceInfo()
vnfc_resource.id = uuidutils.generate_uuid()
vnfc_resource.vdu_id = vdu_id
resource = objects.ResourceHandle()
resource.resource_id = resource_id
resource.vim_level_resource_type = resource_type
vnfc_resource.compute_resource = resource
vnfc_resource.metadata = {}
vnfc_resource.metadata["Pod"] = (
jsonutils.dumps(pod_resources_from_k8s.get(resource_id))
)
vnfc_resource.metadata[resource_type] = metadata
vnf_instance.instantiated_vnf_info.vnfc_resource_info.append(
vnfc_resource)
self._sync_resource_id(vnf_instance.instantiated_vnf_info,
vdu_id, pod_resources_from_k8s)
return self._calc_scale_level(context, vnf_instance, vdu_id,
len(pod_resources_from_k8s))
def _check_pod_information(self, vnf_instance,
vdu_id, pod_resources_from_k8s):
"""Compare 'instantiatedVnfInfo' and 'pod_resources_from_k8s' info
True: find difference
False: No difference
"""
vnfc_rscs = vnf_instance.instantiated_vnf_info.vnfc_resource_info
pod_names = {vnfc.compute_resource.resource_id for vnfc in vnfc_rscs
if vnfc.vdu_id == vdu_id}
if len(pod_names) != len(pod_resources_from_k8s):
return True
# pod name check
if pod_names != set(pod_resources_from_k8s.keys()):
return True
return False
def _sync_resource_id(self, instantiated_vnf_info,
vdu_id, pod_resources_from_k8s):
"""Set new resourceId into old resourceId"""
match_pod_names = set()
unmatch_pod_in_tacker = set()
for vnfc in instantiated_vnf_info.vnfc_resource_info:
if vnfc.vdu_id != vdu_id:
continue
if vnfc.compute_resource.resource_id in (
set(pod_resources_from_k8s.keys())):
match_pod_names.add(
vnfc.compute_resource.resource_id)
else:
unmatch_pod_in_tacker.add(
vnfc.compute_resource.resource_id)
# pickup new pod name
new_pod_name = set(pod_resources_from_k8s.keys()) ^ match_pod_names
for unmatch_pod_name in unmatch_pod_in_tacker:
for vnfc in instantiated_vnf_info.vnfc_resource_info:
if vnfc.vdu_id != vdu_id:
continue
if vnfc.compute_resource.resource_id == (
unmatch_pod_name):
vnfc.compute_resource.resource_id = (
new_pod_name.pop()
)
vnfc.metadata["Pod"] = (
jsonutils.dumps(
pod_resources_from_k8s.get(
(vnfc.compute_resource.resource_id)
)
)
)
break
def _calc_scale_level(self, context, vnf_instance,
vdu_id, current_pod_num):
"""calc scale_level and set"""
vnfd = vnflcm_utils.get_vnfd_dict(context,
vnf_instance.vnfd_id,
vnf_instance.instantiated_vnf_info.flavour_id)
policies = vnfd.get("topology_template", {}).get("policies", {})
for policy in policies:
for v in policy.values():
if (v.get("type") == (
"tosca.policies.nfv.VduScalingAspectDeltas")):
if vdu_id in v.get("targets", []):
aspect_id = v.get("properties", {}).get("aspect")
break
step_deltas = (self._get_scale_value(policies, aspect_id))
for policy in policies:
for v in policy.values():
if (v.get("type") == (
"tosca.policies.nfv.VduScalingAspectDeltas")
and step_deltas
and vdu_id in v.get("targets", [])):
delta = (v.get("properties", {})
.get("deltas", {})
.get(step_deltas, {})
.get("number_of_instances"))
elif (v.get("type") == (
"tosca.policies.nfv.VduInitialDelta")
and vdu_id in v.get("targets", [])):
initial_delta = (v.get("properties", {})
.get("initial_delta", {})
.get("number_of_instances"))
if (current_pod_num - initial_delta) % delta != 0:
LOG.error("Error computing 'scale_level'. "
f"current Pod num: {current_pod_num} delta: {delta}. "
f"vnf: {vnf_instance.id} vdu: {vdu_id}")
return False
scale_level = (current_pod_num - initial_delta) // delta
for inst_vnf in vnf_instance.instantiated_vnf_info.scale_status:
if inst_vnf.aspect_id == aspect_id:
inst_vnf.scale_level = scale_level
break
return self._check_pod_range(vnf_instance, vnfd, vdu_id,
current_pod_num)
def _get_scale_value(self, policies, aspect_id):
"""get step_deltas from vnfd"""
for policy in policies:
for v in policy.values():
if v.get("type") == "tosca.policies.nfv.ScalingAspects":
step_deltas = (v.get("properties", {})
.get("aspects", {})
.get(aspect_id, {})
.get("step_deltas", [])[0])
break
return step_deltas
def _check_pod_range(self, vnf_instance, vnfd, vdu_id,
current_pod_num):
"""Check the range of the maximum or minimum number of pods.
If it finds out of range, output a error log and do not update
database.
"""
vdu_profile = (vnfd.get("topology_template", {})
.get("node_templates", [])
.get(vdu_id, {})
.get("properties", {})
.get("vdu_profile"))
max_number_of_instances = vdu_profile.get("max_number_of_instances")
min_number_of_instances = vdu_profile.get("min_number_of_instances")
if (current_pod_num > max_number_of_instances
or current_pod_num < min_number_of_instances):
LOG.error(f"Failed to update database vnf {vnf_instance.id} "
f"vdu: {vdu_id}. Pod num is out of range. "
f"pod_num: {current_pod_num}")
return False
return True
def _get_pod_information(self, resource_name,
resource_type, vnf_instance, vim_connection_info):
"""Extract a Pod starting with the specified 'resource_name' name"""
namespace = vnf_instance.vnf_metadata.get('namespace')
if not namespace:
namespace = "default"
auth_attr = vim_connection_info.access_info
auth_cred, _ = self.get_auth_creds(auth_attr)
core_v1_api_client = self.kubernetes.get_core_v1_api_client(
auth=auth_cred)
resource_pods = {}
all_pods = core_v1_api_client.list_namespaced_pod(
namespace=namespace)
for pod in all_pods.items:
if self.is_match_pod_naming_rule(resource_type,
resource_name, pod.metadata.name):
resource_pods[pod.metadata.name] = pod.metadata.to_dict()
return resource_pods
def _get_vdu_list(self, vnf_instance):
"""get vdu_list"""
vdu_id_list = set()
for vnfc in vnf_instance.instantiated_vnf_info.vnfc_resource_info:
if vnfc.compute_resource.resource_id != "":
vdu_id_list.add(vnfc.vdu_id)
return vdu_id_list
def _get_resource_name(self, resource_id, resource_type):
"""get resource name"""
if resource_type == 'Pod':
resource_name = resource_id
else:
name_list = resource_id.split("-")
if resource_type == 'Deployment':
del name_list[-2:]
elif resource_type in ('ReplicaSet', 'DaemonSet', 'StatefulSet'):
del name_list[-1]
resource_name = '-'.join(name_list)
return resource_name