Merge "Database sync between TackerDB and Kubernetes"
This commit is contained in:
commit
3738e0f5fa
@ -0,0 +1,13 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
This patch adds the ability to periodically synchronize resources
|
||||
in K8s VIM and Tacker DB.
|
||||
There is no interface on the K8s to notify Tacker when the auto-scale or
|
||||
auto-heal of a pod is running and the pod information is updated.
|
||||
This can lead to inconsistencies between the Pod information in Tacker
|
||||
database and the Pod information running on the actual K8s.
|
||||
This function periodically checks the pod information in the Tacker
|
||||
database and the pod information in the K8s, and updates the information
|
||||
in the Tacker database according to the K8s side if there is
|
||||
any inconsistency.
|
@ -22,6 +22,7 @@ import oslo_messaging
|
||||
import requests
|
||||
import shutil
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import traceback
|
||||
import yaml
|
||||
@ -30,6 +31,7 @@ from glance_store import exceptions as store_exceptions
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_service import loopingcall
|
||||
from oslo_service import periodic_task
|
||||
from oslo_service import service
|
||||
from oslo_utils import encodeutils
|
||||
@ -75,6 +77,10 @@ from tacker.vnfm import nfvo_client
|
||||
from tacker.vnfm import plugin
|
||||
|
||||
CONF = tacker.conf.CONF
|
||||
# NOTE(fengyi): After the conductor is started, since the start-up process
|
||||
# is being executed, it should take a while to start the actual DB
|
||||
# synchronization periodical process.
|
||||
DB_SYNC_INITIAL_DELAY = 60
|
||||
|
||||
# NOTE(tpatil): keystone_authtoken opts registered explicitly as conductor
|
||||
# service doesn't use the keystonemiddleware.authtoken middleware as it's
|
||||
@ -292,6 +298,14 @@ def grant_error_common(function):
|
||||
return decorated_function
|
||||
|
||||
|
||||
def async_call(func):
|
||||
def inner(*args, **kwargs):
|
||||
th = threading.Thread(target=func, args=args,
|
||||
kwargs=kwargs, daemon=True)
|
||||
th.start()
|
||||
return inner
|
||||
|
||||
|
||||
class Conductor(manager.Manager, v2_hook.ConductorV2Hook):
|
||||
def __init__(self, host, conf=None):
|
||||
if conf:
|
||||
@ -304,6 +318,15 @@ class Conductor(manager.Manager, v2_hook.ConductorV2Hook):
|
||||
self.vnf_manager = driver_manager.DriverManager(
|
||||
'tacker.tacker.vnfm.drivers',
|
||||
cfg.CONF.tacker.infra_driver)
|
||||
self._periodic_call()
|
||||
|
||||
@async_call
|
||||
def _periodic_call(self):
|
||||
self.periodic = loopingcall.FixedIntervalLoopingCall(
|
||||
self._sync_db)
|
||||
self.periodic.start(
|
||||
interval=CONF.db_synchronization_interval,
|
||||
initial_delay=DB_SYNC_INITIAL_DELAY)
|
||||
|
||||
def start(self):
|
||||
coordination.COORDINATOR.start()
|
||||
@ -2506,6 +2529,13 @@ class Conductor(manager.Manager, v2_hook.ConductorV2Hook):
|
||||
error_point=vnf_dict['current_error_point']
|
||||
)
|
||||
|
||||
def _sync_db(self):
|
||||
"""Periodic database update invocation method"""
|
||||
LOG.debug("Starting _sync_db")
|
||||
context = t_context.get_admin_context()
|
||||
self.vnflcm_driver.sync_db(context)
|
||||
LOG.debug("Ended _sync_db")
|
||||
|
||||
|
||||
def init(args, **kwargs):
|
||||
CONF(args=args, project='tacker',
|
||||
|
@ -23,6 +23,10 @@ interval_opts = [
|
||||
default=1800,
|
||||
help=_('Seconds between running periodic tasks '
|
||||
'to cleanup residues of deleted vnf packages')),
|
||||
cfg.IntOpt('db_synchronization_interval',
|
||||
default=300,
|
||||
help=_('Interval time in sec for DB sync between '
|
||||
'Tacker and Kubernetes VIMs')),
|
||||
]
|
||||
|
||||
|
||||
|
@ -37,6 +37,7 @@ from tacker.db import db_base
|
||||
from tacker.db.db_sqlalchemy import models
|
||||
from tacker.db import model_base
|
||||
from tacker.db import models_v1
|
||||
from tacker.db.nfvo import nfvo_db # noqa: F401
|
||||
from tacker.db.nfvo import ns_db
|
||||
from tacker.db import types
|
||||
from tacker.extensions import vnfm
|
||||
|
@ -139,8 +139,10 @@ class VnfInstanceTaskState(BaseTackerEnum):
|
||||
TERMINATING = 'TERMINATING'
|
||||
SCALING = 'SCALING'
|
||||
ERROR = 'ERROR'
|
||||
DB_SYNCHRONIZING = 'DB_SYNCHRONIZING'
|
||||
|
||||
ALL = (INSTANTIATING, HEALING, TERMINATING, SCALING, ERROR)
|
||||
ALL = (INSTANTIATING, HEALING, TERMINATING,
|
||||
SCALING, ERROR, DB_SYNCHRONIZING)
|
||||
|
||||
|
||||
class VnfInstanceTaskStateField(BaseEnumField):
|
||||
|
@ -347,6 +347,14 @@ class InvalidPagingMarker(SolHttpError400):
|
||||
message = _("Paging marker value %(marker)s is invalid.")
|
||||
|
||||
|
||||
class DbSyncNoDiff(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class DbSyncFailed(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class K8sOperationFailed(SolHttpError422):
|
||||
# title and detail are set in the code from kubernetes operation
|
||||
pass
|
||||
|
@ -459,3 +459,14 @@ class Vnfd(object):
|
||||
|
||||
mani_artifact_files.extend(meta_artifacts_files)
|
||||
return mani_artifact_files
|
||||
|
||||
def get_initial_delta(self, flavour_id, vdu_id):
|
||||
initial_deltas = self.get_policy_values_by_type(flavour_id,
|
||||
'tosca.policies.nfv.VduInitialDelta')
|
||||
for aspect in initial_deltas:
|
||||
if vdu_id in aspect.get('targets', []):
|
||||
initial_delta = (aspect.get('properties', {})
|
||||
.get('initial_delta', {})
|
||||
.get('number_of_instances'))
|
||||
break
|
||||
return initial_delta
|
||||
|
@ -13,7 +13,11 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import threading
|
||||
|
||||
from oslo_log import log as logging
|
||||
from oslo_service import loopingcall
|
||||
from oslo_utils import encodeutils
|
||||
|
||||
from tacker.common import log
|
||||
from tacker import context as tacker_context
|
||||
@ -35,6 +39,18 @@ from tacker.sol_refactored.objects.v2 import fields
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
CONF = config.CONF
|
||||
# NOTE(fengyi): After the conductor is started, since the start-up process
|
||||
# is being executed, it should take a while to start the actual DB
|
||||
# synchronization periodical process.
|
||||
DB_SYNC_INITIAL_DELAY = 60
|
||||
|
||||
|
||||
def async_call(func):
|
||||
def inner(*args, **kwargs):
|
||||
th = threading.Thread(target=func, args=args,
|
||||
kwargs=kwargs, daemon=True)
|
||||
th.start()
|
||||
return inner
|
||||
|
||||
|
||||
class ConductorV2(object):
|
||||
@ -49,6 +65,15 @@ class ConductorV2(object):
|
||||
self.sn_driver = sdrv.ServerNotificationDriver.instance()
|
||||
self._change_lcm_op_state()
|
||||
|
||||
self._periodic_call()
|
||||
|
||||
@async_call
|
||||
def _periodic_call(self):
|
||||
self.periodic = loopingcall.FixedIntervalLoopingCall(
|
||||
self._sync_db)
|
||||
self.periodic.start(interval=CONF.db_synchronization_interval,
|
||||
initial_delay=DB_SYNC_INITIAL_DELAY)
|
||||
|
||||
def _change_lcm_op_state(self):
|
||||
# NOTE: If the conductor down during processing and
|
||||
# the LcmOperationState STARTING/PROCESSING/ROLLING_BACK remain,
|
||||
@ -321,6 +346,37 @@ class ConductorV2(object):
|
||||
self.nfvo_client.send_lcmocc_notification(context, lcmocc, inst,
|
||||
self.endpoint)
|
||||
|
||||
def _sync_db(self):
|
||||
"""Periodic database update invocation method(v2 api)"""
|
||||
LOG.debug("Starting _sync_db")
|
||||
context = tacker_context.get_admin_context()
|
||||
|
||||
vnf_instances = objects.VnfInstanceV2.get_by_filter(
|
||||
context, instantiationState='INSTANTIATED')
|
||||
for inst in vnf_instances:
|
||||
try:
|
||||
vim_info = inst_utils.select_vim_info(inst.vimConnectionInfo)
|
||||
self.vnflcm_driver.diff_check_inst(inst, vim_info)
|
||||
self._sync_inst(context, inst, vim_info)
|
||||
except sol_ex.DbSyncNoDiff:
|
||||
continue
|
||||
except sol_ex.DbSyncFailed as e:
|
||||
LOG.error("%s: %s", e.__class__.__name__, e.args[0])
|
||||
except sol_ex.OtherOperationInProgress:
|
||||
LOG.info("There is an LCM operation in progress, so "
|
||||
f"skip this DB synchronization. vnf: {inst.id}.")
|
||||
except Exception as e:
|
||||
LOG.error(f"Failed to synchronize database vnf: {inst.id} "
|
||||
f"Error: {encodeutils.exception_to_unicode(e)}")
|
||||
LOG.debug("Ended _sync_db")
|
||||
|
||||
@coordinate.lock_vnf_instance('{inst.id}')
|
||||
def _sync_inst(self, context, inst, vim_info):
|
||||
vnf_inst = inst_utils.get_inst(context, inst.id)
|
||||
self.vnflcm_driver.sync_db(
|
||||
context, vnf_inst, vim_info)
|
||||
vnf_inst.update(context)
|
||||
|
||||
def store_alarm_info(self, context, alarm):
|
||||
self.vnffm_driver.store_alarm_info(context, alarm)
|
||||
|
||||
|
@ -1129,3 +1129,29 @@ class VnfLcmDriverV2(object):
|
||||
else:
|
||||
# should not occur
|
||||
raise sol_ex.SolException(sol_detail='not support vim type')
|
||||
|
||||
def sync_db(self, context, vnf_inst, vim_info):
|
||||
# Database synchronization works only when
|
||||
# the vimType is kubernetes or ETSINFV.HELM.V_3
|
||||
if vim_info.vimType == 'kubernetes':
|
||||
driver = kubernetes.Kubernetes()
|
||||
driver.sync_db(context, vnf_inst, vim_info)
|
||||
elif vim_info.vimType == 'ETSINFV.HELM.V_3':
|
||||
driver = helm.Helm()
|
||||
driver.sync_db(context, vnf_inst, vim_info)
|
||||
else:
|
||||
# Only support CNF
|
||||
raise sol_ex.DbSyncNoDiff(
|
||||
"There are no differences in Vnfc resources.")
|
||||
|
||||
def diff_check_inst(self, vnf_inst, vim_info):
|
||||
if vim_info.vimType == 'kubernetes':
|
||||
driver = kubernetes.Kubernetes()
|
||||
driver.diff_check_inst(vnf_inst, vim_info)
|
||||
elif vim_info.vimType == 'ETSINFV.HELM.V_3':
|
||||
driver = helm.Helm()
|
||||
driver.diff_check_inst(vnf_inst, vim_info)
|
||||
else:
|
||||
# Only support CNF
|
||||
raise sol_ex.DbSyncNoDiff(
|
||||
"There are no differences in Vnfc resources.")
|
||||
|
@ -13,8 +13,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import re
|
||||
|
||||
from kubernetes import client
|
||||
from oslo_log import log as logging
|
||||
|
||||
@ -305,27 +303,4 @@ class Kubernetes(kubernetes_common.KubernetesCommon):
|
||||
)
|
||||
|
||||
def _is_match_pod_naming_rule(self, rsc_kind, rsc_name, pod_name):
|
||||
match_result = None
|
||||
if rsc_kind == 'Pod':
|
||||
# Expected example: name
|
||||
if rsc_name == pod_name:
|
||||
return True
|
||||
elif rsc_kind == 'Deployment':
|
||||
# Expected example: name-012789abef-019az
|
||||
# NOTE(horie): The naming rule of Pod in deployment is
|
||||
# "(deployment name)-(pod template hash)-(5 charactors)".
|
||||
# The "pod template hash" string is generated from 32 bit hash.
|
||||
# This may be from 1 to 10 caracters but not sure the lower limit
|
||||
# from the source code of Kubernetes.
|
||||
match_result = re.match(
|
||||
rsc_name + '-([0-9a-f]{1,10})-([0-9a-z]{5})+$', pod_name)
|
||||
elif rsc_kind in ('ReplicaSet', 'DaemonSet'):
|
||||
# Expected example: name-019az
|
||||
match_result = re.match(rsc_name + '-([0-9a-z]{5})+$', pod_name)
|
||||
elif rsc_kind == 'StatefulSet':
|
||||
# Expected example: name-0
|
||||
match_result = re.match(rsc_name + '-[0-9]+$', pod_name)
|
||||
if match_result:
|
||||
return True
|
||||
|
||||
return False
|
||||
return self.is_match_pod_naming(rsc_kind, rsc_name, pod_name)
|
||||
|
@ -13,6 +13,10 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
import operator
|
||||
import re
|
||||
|
||||
from oslo_log import log as logging
|
||||
from oslo_service import loopingcall
|
||||
|
||||
@ -21,6 +25,7 @@ from tacker.sol_refactored.common import exceptions as sol_ex
|
||||
from tacker.sol_refactored.common import vnf_instance_utils as inst_utils
|
||||
from tacker.sol_refactored.infra_drivers.kubernetes import kubernetes_resource
|
||||
from tacker.sol_refactored.infra_drivers.kubernetes import kubernetes_utils
|
||||
from tacker.sol_refactored.nfvo import nfvo_client
|
||||
from tacker.sol_refactored import objects
|
||||
|
||||
|
||||
@ -214,3 +219,175 @@ class KubernetesCommon(object):
|
||||
check_reses = set(k8s_reses)
|
||||
self._check_status(_check_updated, check_reses, k8s_api_client,
|
||||
namespace, old_pods_names)
|
||||
|
||||
def diff_check_inst(self, inst, vim_info):
|
||||
inst_tmp = copy.deepcopy(inst)
|
||||
self.diff_check_and_update_vnfc(inst_tmp, vim_info)
|
||||
|
||||
def diff_check_and_update_vnfc(self, vnf_instance, vim_info):
|
||||
with kubernetes_utils.AuthContextManager(vim_info) as acm:
|
||||
k8s_api_client = acm.init_k8s_api_client()
|
||||
old_pods_names = {
|
||||
vnfc.computeResource.resourceId for vnfc in
|
||||
vnf_instance.instantiatedVnfInfo.vnfcResourceInfo}
|
||||
self._update_vnfc_info(vnf_instance, k8s_api_client)
|
||||
new_pods_names = {
|
||||
vnfc.computeResource.resourceId for vnfc in
|
||||
vnf_instance.instantiatedVnfInfo.vnfcResourceInfo}
|
||||
if operator.eq(old_pods_names, new_pods_names):
|
||||
raise sol_ex.DbSyncNoDiff(
|
||||
"There are no differences in Vnfc resources.")
|
||||
|
||||
def sync_db(self, context, vnf_instance, vim_info):
|
||||
self.diff_check_and_update_vnfc(vnf_instance, vim_info)
|
||||
|
||||
vdu_id_list = self._get_vdu_list(vnf_instance)
|
||||
for vdu_id in vdu_id_list:
|
||||
resource_type = ""
|
||||
resource_name = ""
|
||||
# get pod information
|
||||
for vnfc in vnf_instance.instantiatedVnfInfo.vnfcResourceInfo:
|
||||
if vnfc.vduId == vdu_id:
|
||||
resource_type = (vnfc.computeResource
|
||||
.vimLevelResourceType)
|
||||
resource_name = self._get_resource_name(
|
||||
vnfc.computeResource.resourceId, resource_type)
|
||||
break
|
||||
pod_resources_from_k8s = self._get_pod_information(
|
||||
resource_name, resource_type, vnf_instance, vim_info)
|
||||
|
||||
vnfcs = [vnfc for vnfc in
|
||||
vnf_instance.instantiatedVnfInfo.vnfcResourceInfo if
|
||||
vnfc.vduId == vdu_id]
|
||||
replicas = vnf_instance.instantiatedVnfInfo.metadata[
|
||||
'vdu_reses'][vdu_id]['spec'].get('replicas')
|
||||
if replicas is not None:
|
||||
vnf_instance.instantiatedVnfInfo.metadata[
|
||||
'vdu_reses'][vdu_id]['spec']['replicas'] = len(vnfcs)
|
||||
|
||||
self._calc_scale_level(
|
||||
context, vnf_instance, vdu_id,
|
||||
len(pod_resources_from_k8s))
|
||||
|
||||
def _calc_scale_level(self, context, vnf_instance,
|
||||
vdu_id, current_pod_num):
|
||||
"""calc scale_level and set"""
|
||||
aspect_id = ""
|
||||
flavour_id = vnf_instance.instantiatedVnfInfo.flavourId
|
||||
client = nfvo_client.NfvoClient()
|
||||
vnfd = client.get_vnfd(context, vnf_instance.vnfdId)
|
||||
for aspect_delta in vnfd.get_policy_values_by_type(flavour_id,
|
||||
'tosca.policies.nfv.VduScalingAspectDeltas'):
|
||||
if vdu_id in aspect_delta.get('targets', []):
|
||||
aspect_id = aspect_delta.get('properties', {}).get('aspect')
|
||||
break
|
||||
if not aspect_id:
|
||||
return
|
||||
|
||||
delta = vnfd.get_scale_vdu_and_num(flavour_id, aspect_id).get(vdu_id)
|
||||
initial_delta = vnfd.get_initial_delta(flavour_id, vdu_id)
|
||||
|
||||
if (current_pod_num - initial_delta) % delta != 0:
|
||||
raise sol_ex.DbSyncFailed(
|
||||
"Error computing 'scale_level'. current Pod num: "
|
||||
f"{current_pod_num} delta: {delta}. vnf: {vnf_instance.id} "
|
||||
f"vdu: {vdu_id}")
|
||||
|
||||
scale_level = (current_pod_num - initial_delta) // delta
|
||||
|
||||
for inst_vnf in vnf_instance.instantiatedVnfInfo.scaleStatus:
|
||||
if inst_vnf.aspectId == aspect_id:
|
||||
inst_vnf.scaleLevel = scale_level
|
||||
break
|
||||
|
||||
self._check_pod_range(
|
||||
vnf_instance, vnfd, vdu_id, flavour_id, current_pod_num)
|
||||
|
||||
def _check_pod_range(self, vnf_instance, vnfd, vdu_id,
|
||||
flavour_id, current_pod_num):
|
||||
"""Check the range of the maximum or minimum number of pods.
|
||||
|
||||
If it finds out of range, output a error log and do not update
|
||||
database.
|
||||
"""
|
||||
vdu_profile = (vnfd.get_vdu_nodes(flavour_id)
|
||||
.get(vdu_id, {})
|
||||
.get("properties", {})
|
||||
.get("vdu_profile", {}))
|
||||
max_number_of_instances = vdu_profile.get("max_number_of_instances")
|
||||
min_number_of_instances = vdu_profile.get("min_number_of_instances")
|
||||
|
||||
if (current_pod_num > max_number_of_instances
|
||||
or current_pod_num < min_number_of_instances):
|
||||
raise sol_ex.DbSyncFailed(
|
||||
f"Failed to update database vnf {vnf_instance.id} "
|
||||
f"vdu: {vdu_id}. Pod num is out of range. "
|
||||
f"pod_num: {current_pod_num}")
|
||||
|
||||
def _get_pod_information(self, resource_name,
|
||||
resource_type, vnf_instance, vim_connection_info):
|
||||
"""Extract a Pod starting with the specified 'resource_name' name"""
|
||||
namespace = vnf_instance.metadata.get('namespace')
|
||||
if not namespace:
|
||||
namespace = "default"
|
||||
|
||||
with kubernetes_utils.AuthContextManager(vim_connection_info) as acm:
|
||||
k8s_api_client = acm.init_k8s_api_client()
|
||||
all_pods = kubernetes_utils.list_namespaced_pods(
|
||||
k8s_api_client, namespace)
|
||||
|
||||
resource_pods = {}
|
||||
|
||||
for pod in all_pods:
|
||||
if self.is_match_pod_naming(resource_type,
|
||||
resource_name, pod.metadata.name):
|
||||
resource_pods[pod.metadata.name] = pod.metadata.to_dict()
|
||||
return resource_pods
|
||||
|
||||
def _get_vdu_list(self, vnf_instance):
|
||||
"""get vdu_list"""
|
||||
vdu_id_list = set()
|
||||
|
||||
for vnfc in vnf_instance.instantiatedVnfInfo.vnfcResourceInfo:
|
||||
vdu_id_list.add(vnfc.vduId)
|
||||
return vdu_id_list
|
||||
|
||||
def _get_resource_name(self, resource_id, resource_type):
|
||||
"""get resource name"""
|
||||
if resource_type == 'Pod':
|
||||
resource_name = resource_id
|
||||
else:
|
||||
name_list = resource_id.split("-")
|
||||
if resource_type == 'Deployment':
|
||||
del name_list[-2:]
|
||||
elif resource_type in ('ReplicaSet', 'DaemonSet', 'StatefulSet'):
|
||||
del name_list[-1]
|
||||
resource_name = '-'.join(name_list)
|
||||
|
||||
return resource_name
|
||||
|
||||
def is_match_pod_naming(self, rsc_kind, rsc_name, pod_name):
|
||||
match_result = None
|
||||
if rsc_kind == 'Pod':
|
||||
# Expected example: name
|
||||
if rsc_name == pod_name:
|
||||
return True
|
||||
elif rsc_kind == 'Deployment':
|
||||
# Expected example: name-012789abef-019az
|
||||
# NOTE(horie): The naming rule of Pod in deployment is
|
||||
# "(deployment name)-(pod template hash)-(5 charactors)".
|
||||
# The "pod template hash" string is generated from 32 bit hash.
|
||||
# This may be from 1 to 10 caracters but not sure the lower limit
|
||||
# from the source code of Kubernetes.
|
||||
match_result = re.match(
|
||||
rsc_name + '-([0-9a-f]{1,10})-([0-9a-z]{5})+$', pod_name)
|
||||
elif rsc_kind in ('ReplicaSet', 'DaemonSet'):
|
||||
# Expected example: name-019az
|
||||
match_result = re.match(rsc_name + '-([0-9a-z]{5})+$', pod_name)
|
||||
elif rsc_kind == 'StatefulSet':
|
||||
# Expected example: name-0
|
||||
match_result = re.match(rsc_name + '-[0-9]+$', pod_name)
|
||||
if match_result:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
@ -3987,3 +3987,7 @@ class TestConductor(SqlTestCase, unit_base.FixturedTestCase):
|
||||
mock_change_vnf_status.assert_not_called()
|
||||
self.assertEqual(mock_change_ext_conn_grant.call_count, 0)
|
||||
mock_update_vnf_attributes.assert_called_once()
|
||||
|
||||
def test_sync_db(self):
|
||||
self.conductor._sync_db()
|
||||
self.vnflcm_driver.sync_db.assert_called_once()
|
||||
|
@ -17,7 +17,10 @@ from datetime import datetime
|
||||
from unittest import mock
|
||||
|
||||
import ddt
|
||||
from kubernetes import client
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import uuidutils
|
||||
from tooz.drivers import file
|
||||
|
||||
from tacker import context
|
||||
from tacker.sol_refactored.common import exceptions as sol_ex
|
||||
@ -28,6 +31,11 @@ from tacker.sol_refactored.nfvo import nfvo_client
|
||||
from tacker.sol_refactored import objects
|
||||
from tacker.sol_refactored.objects.v2 import fields
|
||||
from tacker.tests.unit.db import base as db_base
|
||||
from tacker.tests.unit.sol_refactored.infra_drivers.kubernetes import fakes
|
||||
from tacker.vnfm.infra_drivers.kubernetes import kubernetes_driver
|
||||
|
||||
|
||||
CNF_SAMPLE_VNFD_ID = "b1bb0ce7-ebca-4fa7-95ed-4840d70a1177"
|
||||
|
||||
|
||||
@ddt.ddt
|
||||
@ -629,3 +637,97 @@ class TestConductorV2(db_base.SqlTestCase):
|
||||
|
||||
result = self.conductor.modify_vnfinfo(self.context, lcmocc.id)
|
||||
self.assertEqual(None, result)
|
||||
|
||||
@mock.patch.object(vnflcm_driver_v2.VnfLcmDriverV2,
|
||||
'sync_db')
|
||||
@mock.patch.object(kubernetes_driver.Kubernetes,
|
||||
'_check_pod_information')
|
||||
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
|
||||
@mock.patch.object(objects.base.TackerPersistentObject, "get_by_filter")
|
||||
def test_sync_db(
|
||||
self, mock_db_sync, mock_get_by_filters,
|
||||
mock_list_namespaced_pod, mock_check_pod_information):
|
||||
vnf_instance_obj = fakes.fake_vnf_instance()
|
||||
vnf_instance_obj.id = CNF_SAMPLE_VNFD_ID
|
||||
vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU1', rsc_kind='Pod')
|
||||
vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [
|
||||
vnfc_rsc_info_obj1
|
||||
]
|
||||
vim_connection_object = fakes.fake_vim_connection_info()
|
||||
vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object
|
||||
|
||||
mock_get_by_filters.return_value = [
|
||||
vnf_instance_obj, vnf_instance_obj]
|
||||
mock_list_namespaced_pod.return_value = client.V1PodList(
|
||||
items=[fakes.get_fake_pod_info(kind='Pod')])
|
||||
mock_check_pod_information.return_value = True
|
||||
self.conductor._sync_db()
|
||||
mock_db_sync.assert_called_once()
|
||||
|
||||
@mock.patch.object(kubernetes_driver.Kubernetes,
|
||||
'_check_pod_information')
|
||||
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
|
||||
@mock.patch.object(objects.base.TackerPersistentObject, "get_by_filter")
|
||||
def test_sync_db_exception(
|
||||
self, mock_get_by_filters, mock_list_namespaced_pod,
|
||||
mock_check_pod_information):
|
||||
vnf_instance_obj = fakes.fake_vnf_instance()
|
||||
vnf_instance_obj.id = CNF_SAMPLE_VNFD_ID
|
||||
vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU1', rsc_kind='Pod')
|
||||
vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [
|
||||
vnfc_rsc_info_obj1
|
||||
]
|
||||
vim_connection_object = fakes.fake_vim_connection_info()
|
||||
vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object
|
||||
|
||||
mock_get_by_filters.return_value = [
|
||||
vnf_instance_obj, vnf_instance_obj]
|
||||
mock_list_namespaced_pod.return_value = client.V1PodList(
|
||||
items=[fakes.get_fake_pod_info(kind='Pod')])
|
||||
mock_check_pod_information.return_value = True
|
||||
|
||||
log_name = "tacker.sol_refactored.conductor.conductor_v2"
|
||||
with self.assertLogs(logger=log_name, level=logging.DEBUG) as cm:
|
||||
self.conductor._sync_db()
|
||||
|
||||
msg = (f'ERROR:{log_name}:Failed to synchronize database vnf: '
|
||||
f'{vnf_instance_obj.id} Error: ')
|
||||
self.assertIn(f'{msg}', cm.output[1])
|
||||
|
||||
@mock.patch.object(file.FileLock, 'acquire')
|
||||
@mock.patch.object(kubernetes_driver.Kubernetes,
|
||||
'_check_pod_information')
|
||||
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
|
||||
@mock.patch.object(objects.base.TackerPersistentObject, "get_by_filter")
|
||||
def test_sync_db_sol_ex(
|
||||
self, mock_get_by_filters, mock_list_namespaced_pod,
|
||||
mock_check_pod_information, mock_acquire):
|
||||
vnf_instance_obj = fakes.fake_vnf_instance()
|
||||
# vnf_instance_obj.id = CNF_SAMPLE_VNFD_ID
|
||||
vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU1', rsc_kind='Pod')
|
||||
vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [
|
||||
vnfc_rsc_info_obj1
|
||||
]
|
||||
vim_connection_object = fakes.fake_vim_connection_info()
|
||||
vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object
|
||||
|
||||
vnf_instance_obj1 = fakes.fake_vnf_instance()
|
||||
vnf_instance_obj1.vimConnectionInfo['vim1'] = vim_connection_object
|
||||
mock_get_by_filters.return_value = [
|
||||
vnf_instance_obj]
|
||||
mock_list_namespaced_pod.return_value = client.V1PodList(
|
||||
items=[fakes.get_fake_pod_info(kind='Pod')])
|
||||
mock_check_pod_information.return_value = True
|
||||
mock_acquire.return_value = False
|
||||
|
||||
log_name = "tacker.sol_refactored.conductor.conductor_v2"
|
||||
with self.assertLogs(logger=log_name, level=logging.DEBUG) as cm:
|
||||
self.conductor._sync_db()
|
||||
|
||||
msg = (f'INFO:{log_name}:There is an LCM operation in progress, '
|
||||
f'so skip this DB synchronization. '
|
||||
f'vnf: {vnf_instance_obj.id}.')
|
||||
self.assertIn(f'{msg}', cm.output)
|
||||
|
@ -17,6 +17,7 @@ from datetime import datetime
|
||||
import os
|
||||
from unittest import mock
|
||||
|
||||
from kubernetes import client
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from tacker import context
|
||||
@ -31,6 +32,7 @@ from tacker.sol_refactored.nfvo import nfvo_client
|
||||
from tacker.sol_refactored import objects
|
||||
from tacker.sol_refactored.objects.v2 import fields
|
||||
from tacker.tests import base
|
||||
from tacker.tests.unit.sol_refactored.infra_drivers.kubernetes import fakes
|
||||
|
||||
|
||||
CNF_SAMPLE_VNFD_ID = "b1bb0ce7-ebca-4fa7-95ed-4840d70a1177"
|
||||
@ -3276,3 +3278,111 @@ class TestVnfLcmDriverV2(base.BaseTestCase):
|
||||
self.assertRaises(
|
||||
sol_ex.SolException, self.driver.change_vnfpkg_rollback,
|
||||
self.context, lcmocc, inst, grant_req, grant, self.vnfd_1)
|
||||
|
||||
@mock.patch.object(nfvo_client.NfvoClient, 'get_vnfd')
|
||||
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
|
||||
def test_sync_db_kubernetes(
|
||||
self, mock_list_namespaced_pod, mock_get_vnfd):
|
||||
vnf_instance_obj = fakes.fake_vnf_instance()
|
||||
|
||||
vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU1', rsc_kind='Deployment',
|
||||
pod_name="vdu1-1234567890-abcd", rsc_name="vdu1")
|
||||
|
||||
vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [
|
||||
vnfc_rsc_info_obj1
|
||||
]
|
||||
vim_connection_object = fakes.fake_vim_connection_info()
|
||||
vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object
|
||||
|
||||
mock_list_namespaced_pod.return_value = client.V1PodList(
|
||||
items=[
|
||||
fakes.get_fake_pod_info(
|
||||
kind='Deployment', pod_name="vdu1-1234567890-abcd1"),
|
||||
fakes.get_fake_pod_info(
|
||||
kind='Deployment', pod_name="vdu1-1234567890-abcd2")])
|
||||
mock_get_vnfd.return_value = self.vnfd_1
|
||||
vnf_instance_obj.vnfdId = uuidutils.generate_uuid()
|
||||
vnf_instance_obj.instantiatedVnfInfo.scaleStatus = [
|
||||
fakes.fake_scale_status(vnfd_id=vnf_instance_obj.vnfdId)
|
||||
]
|
||||
|
||||
self.driver.sync_db(
|
||||
context=self.context, vnf_inst=vnf_instance_obj,
|
||||
vim_info=vim_connection_object)
|
||||
|
||||
self.assertEqual(
|
||||
2, vnf_instance_obj.instantiatedVnfInfo.metadata[
|
||||
'vdu_reses']['VDU1']['spec']['replicas'])
|
||||
|
||||
@mock.patch.object(nfvo_client.NfvoClient, 'get_vnfd')
|
||||
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
|
||||
def test_sync_db_helm(
|
||||
self, mock_list_namespaced_pod, mock_get_vnfd):
|
||||
vnf_instance_obj = fakes.fake_vnf_instance()
|
||||
|
||||
vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU1', rsc_kind='Deployment',
|
||||
pod_name="vdu1-1234567890-abcd", rsc_name="vdu1")
|
||||
|
||||
vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [
|
||||
vnfc_rsc_info_obj1
|
||||
]
|
||||
vim_connection_object = fakes.fake_vim_connection_info(
|
||||
vim_type='ETSINFV.HELM.V_3')
|
||||
vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object
|
||||
|
||||
mock_list_namespaced_pod.return_value = client.V1PodList(
|
||||
items=[
|
||||
fakes.get_fake_pod_info(
|
||||
kind='Deployment', pod_name="vdu1-1234567890-abcd1"),
|
||||
fakes.get_fake_pod_info(
|
||||
kind='Deployment', pod_name="vdu1-1234567890-abcd2")])
|
||||
mock_get_vnfd.return_value = self.vnfd_1
|
||||
vnf_instance_obj.vnfdId = uuidutils.generate_uuid()
|
||||
vnf_instance_obj.instantiatedVnfInfo.scaleStatus = [
|
||||
fakes.fake_scale_status(vnfd_id=vnf_instance_obj.vnfdId)
|
||||
]
|
||||
|
||||
self.driver.sync_db(
|
||||
context=self.context, vnf_inst=vnf_instance_obj,
|
||||
vim_info=vim_connection_object)
|
||||
|
||||
self.assertEqual(
|
||||
2, vnf_instance_obj.instantiatedVnfInfo.metadata[
|
||||
'vdu_reses']['VDU1']['spec']['replicas'])
|
||||
|
||||
@mock.patch.object(nfvo_client.NfvoClient, 'get_vnfd')
|
||||
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
|
||||
def test_sync_db_not_support(
|
||||
self, mock_list_namespaced_pod, mock_get_vnfd):
|
||||
vnf_instance_obj = fakes.fake_vnf_instance()
|
||||
|
||||
vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU1', rsc_kind='Deployment',
|
||||
pod_name="vdu1-1234567890-abcd", rsc_name="vdu1")
|
||||
|
||||
vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [
|
||||
vnfc_rsc_info_obj1
|
||||
]
|
||||
vim_connection_object = fakes.fake_vim_connection_info(
|
||||
vim_type="openstack")
|
||||
vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object
|
||||
|
||||
mock_list_namespaced_pod.return_value = client.V1PodList(
|
||||
items=[
|
||||
fakes.get_fake_pod_info(
|
||||
kind='Deployment', pod_name="vdu1-1234567890-abcd1"),
|
||||
fakes.get_fake_pod_info(
|
||||
kind='Deployment', pod_name="vdu1-1234567890-abcd2")])
|
||||
mock_get_vnfd.return_value = self.vnfd_1
|
||||
vnf_instance_obj.vnfdId = uuidutils.generate_uuid()
|
||||
vnf_instance_obj.instantiatedVnfInfo.scaleStatus = [
|
||||
fakes.fake_scale_status(vnfd_id=vnf_instance_obj.vnfdId)
|
||||
]
|
||||
|
||||
ex = self.assertRaises(
|
||||
sol_ex.DbSyncNoDiff, self.driver.sync_db,
|
||||
self.context, vnf_instance_obj, vim_connection_object)
|
||||
self.assertEqual(
|
||||
"There are no differences in Vnfc resources.", ex.args[0])
|
||||
|
@ -13,7 +13,12 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import datetime
|
||||
|
||||
from kubernetes import client
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from tacker.sol_refactored import objects
|
||||
|
||||
|
||||
def fake_namespace():
|
||||
@ -547,3 +552,114 @@ def fake_node(type='Ready', status='True'):
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def get_fake_pod_info(kind, name='fake_name', pod_status='Running',
|
||||
pod_name=None):
|
||||
if not pod_name:
|
||||
if kind == 'Deployment':
|
||||
pod_name = _('{name}-1234567890-abcde').format(name=name)
|
||||
elif kind == 'ReplicaSet' or kind == 'DaemonSet':
|
||||
pod_name = _('{name}-12345').format(name=name)
|
||||
elif kind == 'StatefulSet':
|
||||
pod_name = _('{name}-1').format(name=name)
|
||||
elif kind == 'Pod':
|
||||
pod_name = name
|
||||
return client.V1Pod(
|
||||
metadata=client.V1ObjectMeta(
|
||||
name=pod_name,
|
||||
creation_timestamp=datetime.datetime.now().isoformat('T')),
|
||||
status=client.V1PodStatus(phase=pod_status))
|
||||
|
||||
|
||||
def fake_vim_connection_info(vim_type="kubernetes"):
|
||||
return objects.VimConnectionInfo.from_dict({
|
||||
'vimId': 'a56258df-9853-4437-9fdb-7d470bc0b162',
|
||||
'vimType': vim_type,
|
||||
'interfaceInfo': {
|
||||
'endpoint': 'https://127.0.0.1:6443'
|
||||
},
|
||||
'accessInfo': {
|
||||
'bearer_token': 'secret_token',
|
||||
'username': 'test',
|
||||
'password': 'test',
|
||||
'region': 'RegionOne'
|
||||
}
|
||||
})
|
||||
|
||||
|
||||
def fake_vnfc_resource_info(vdu_id='VDU1', rsc_kind='Deployment',
|
||||
rsc_name='fake_name', pod_name=None,
|
||||
namespace='default'):
|
||||
if not pod_name:
|
||||
v1_pod = get_fake_pod_info(rsc_kind, rsc_name)
|
||||
pod_name = v1_pod.metadata.name
|
||||
id = uuidutils.generate_uuid()
|
||||
vnfc_rsc_info = {
|
||||
'id': id,
|
||||
'vduId': vdu_id,
|
||||
'computeResource': {
|
||||
'resourceId': pod_name,
|
||||
'vimLevelResourceType': rsc_kind
|
||||
},
|
||||
'metadata': {
|
||||
'Deployment': {
|
||||
'name': rsc_name,
|
||||
'namespace': namespace
|
||||
}
|
||||
}
|
||||
}
|
||||
vnfc_rsc_info_obj = objects.VnfcResourceInfoV2.from_dict(vnfc_rsc_info)
|
||||
|
||||
vnfc_info = {
|
||||
'id': vdu_id + "-" + id,
|
||||
'vduId': vdu_id,
|
||||
'vnfcResourceInfoId': id,
|
||||
'vnfcState': 'STARTED'
|
||||
}
|
||||
vnfc_info_obj = objects.VnfcInfoV2.from_dict(vnfc_info)
|
||||
|
||||
return vnfc_rsc_info_obj, vnfc_info_obj
|
||||
|
||||
|
||||
def fake_vnf_instance(instantiated_state='INSTANTIATED'):
|
||||
return objects.VnfInstanceV2.from_dict({
|
||||
'id': uuidutils.generate_uuid(),
|
||||
'vimConnectionInfo': {},
|
||||
'instantiationState': instantiated_state,
|
||||
'instantiatedVnfInfo': {
|
||||
'flavourId': 'simple',
|
||||
'vnfState': 'STARTED',
|
||||
'vnfcResourceInfo': [],
|
||||
'vnfcInfo': [],
|
||||
'metadata': {
|
||||
'namespace': 'default',
|
||||
'lcm-kubernetes-def-files': [
|
||||
'Files/kubernetes/deployment.yaml'],
|
||||
'vdu_reses': {
|
||||
'VDU1': {
|
||||
'kind': 'Deployment',
|
||||
'metadata': {
|
||||
'name': 'vdu1',
|
||||
'namespace': 'default',
|
||||
},
|
||||
'spec': {
|
||||
'replicas': 0
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
'metadata': {
|
||||
'lcm-kubernetes-def-files': ['Files/kubernetes/deployment.yaml']
|
||||
}
|
||||
})
|
||||
|
||||
|
||||
def fake_scale_status(aspect_id='vdu1_aspect',
|
||||
vnfd_id=uuidutils.generate_uuid(), scale_level=1):
|
||||
return objects.ScaleInfoV2.from_dict({
|
||||
'aspectId': aspect_id,
|
||||
'vnfdId': vnfd_id,
|
||||
'scaleLevel': scale_level
|
||||
})
|
||||
|
@ -14,14 +14,19 @@
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
|
||||
from kubernetes import client
|
||||
from oslo_utils import uuidutils
|
||||
from unittest import mock
|
||||
|
||||
from tacker import context
|
||||
from tacker.sol_refactored.common import exceptions as sol_ex
|
||||
from tacker.sol_refactored.common import vnfd_utils
|
||||
from tacker.sol_refactored.infra_drivers.kubernetes import kubernetes
|
||||
from tacker.sol_refactored.nfvo import nfvo_client
|
||||
from tacker.sol_refactored import objects
|
||||
from tacker.tests.unit import base
|
||||
|
||||
from tacker.tests.unit.sol_refactored.infra_drivers.kubernetes import fakes
|
||||
|
||||
CNF_SAMPLE_VNFD_ID = "b1bb0ce7-ebca-4fa7-95ed-4840d70a1177"
|
||||
|
||||
@ -32,6 +37,7 @@ class TestKubernetes(base.TestCase):
|
||||
super(TestKubernetes, self).setUp()
|
||||
objects.register_all()
|
||||
self.driver = kubernetes.Kubernetes()
|
||||
self.context = context.get_admin_context()
|
||||
|
||||
cur_dir = os.path.dirname(__file__)
|
||||
sample_dir = os.path.join(cur_dir, "../..", "samples")
|
||||
@ -104,3 +110,167 @@ class TestKubernetes(base.TestCase):
|
||||
|
||||
# maybe 3 but possible 2
|
||||
self.assertTrue(res1.is_ready.call_count >= 2)
|
||||
|
||||
@mock.patch.object(nfvo_client.NfvoClient, 'get_vnfd')
|
||||
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
|
||||
def test_sync_db(
|
||||
self, mock_list_namespaced_pod, mock_get_vnfd):
|
||||
vnf_instance_obj = fakes.fake_vnf_instance()
|
||||
|
||||
vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU1', rsc_kind='Deployment',
|
||||
pod_name="vdu1-1234567890-abcd", rsc_name="vdu1")
|
||||
|
||||
vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [
|
||||
vnfc_rsc_info_obj1
|
||||
]
|
||||
vim_connection_object = fakes.fake_vim_connection_info()
|
||||
vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object
|
||||
|
||||
mock_list_namespaced_pod.return_value = client.V1PodList(
|
||||
items=[
|
||||
fakes.get_fake_pod_info(
|
||||
kind='Deployment', pod_name="vdu1-1234567890-abcd1"),
|
||||
fakes.get_fake_pod_info(
|
||||
kind='Deployment', pod_name="vdu1-1234567890-abcd2")])
|
||||
mock_get_vnfd.return_value = self.vnfd_1
|
||||
vnf_instance_obj.vnfdId = uuidutils.generate_uuid()
|
||||
vnf_instance_obj.instantiatedVnfInfo.scaleStatus = [
|
||||
fakes.fake_scale_status(vnfd_id=vnf_instance_obj.vnfdId)
|
||||
]
|
||||
|
||||
self.driver.sync_db(
|
||||
context=self.context, vnf_instance=vnf_instance_obj,
|
||||
vim_info=vim_connection_object)
|
||||
|
||||
self.assertEqual(
|
||||
2, vnf_instance_obj.instantiatedVnfInfo.metadata[
|
||||
'vdu_reses']['VDU1']['spec']['replicas'])
|
||||
|
||||
@mock.patch.object(nfvo_client.NfvoClient, 'get_vnfd')
|
||||
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
|
||||
def test_sync_db_no_diff(
|
||||
self, mock_list_namespaced_pod, mock_get_vnfd):
|
||||
vnf_instance_obj = fakes.fake_vnf_instance()
|
||||
|
||||
vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU1', rsc_kind='Deployment',
|
||||
pod_name="vdu1-1234567890-abcd1", rsc_name="vdu1")
|
||||
|
||||
vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [
|
||||
vnfc_rsc_info_obj1
|
||||
]
|
||||
vim_connection_object = fakes.fake_vim_connection_info()
|
||||
vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object
|
||||
|
||||
mock_list_namespaced_pod.return_value = client.V1PodList(
|
||||
items=[
|
||||
fakes.get_fake_pod_info(
|
||||
kind='Deployment', pod_name="vdu1-1234567890-abcd1")])
|
||||
mock_get_vnfd.return_value = self.vnfd_1
|
||||
vnf_instance_obj.vnfdId = uuidutils.generate_uuid()
|
||||
vnf_instance_obj.instantiatedVnfInfo.scaleStatus = [
|
||||
fakes.fake_scale_status(vnfd_id=vnf_instance_obj.vnfdId)
|
||||
]
|
||||
|
||||
ex = self.assertRaises(
|
||||
sol_ex.DbSyncNoDiff, self.driver.sync_db,
|
||||
self.context, vnf_instance_obj, vim_connection_object)
|
||||
self.assertEqual(
|
||||
"There are no differences in Vnfc resources.", ex.args[0])
|
||||
|
||||
@mock.patch.object(vnfd_utils.Vnfd, 'get_scale_vdu_and_num')
|
||||
@mock.patch.object(nfvo_client.NfvoClient, 'get_vnfd')
|
||||
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
|
||||
def test_sync_db_scale_level(
|
||||
self, mock_list_namespaced_pod, mock_get_vnfd,
|
||||
mock_scale_vdu_and_num):
|
||||
vnf_instance_obj = fakes.fake_vnf_instance()
|
||||
|
||||
vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU1', rsc_kind='Deployment',
|
||||
pod_name="vdu1-1234567890-abcd1", rsc_name="vdu1")
|
||||
|
||||
vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [
|
||||
vnfc_rsc_info_obj1
|
||||
]
|
||||
vim_connection_object = fakes.fake_vim_connection_info()
|
||||
vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object
|
||||
|
||||
mock_list_namespaced_pod.return_value = client.V1PodList(
|
||||
items=[
|
||||
fakes.get_fake_pod_info(
|
||||
kind='Deployment', pod_name="vdu1-1234567890-abcd1"),
|
||||
fakes.get_fake_pod_info(
|
||||
kind='Deployment', pod_name="vdu1-1234567890-abcd2"),
|
||||
fakes.get_fake_pod_info(
|
||||
kind='Deployment', pod_name="vdu1-1234567890-abcd3"),
|
||||
fakes.get_fake_pod_info(
|
||||
kind='Deployment', pod_name="vdu1-1234567890-abcd4")
|
||||
])
|
||||
mock_get_vnfd.return_value = self.vnfd_1
|
||||
vnf_instance_obj.vnfdId = uuidutils.generate_uuid()
|
||||
vnf_instance_obj.instantiatedVnfInfo.scaleStatus = [
|
||||
fakes.fake_scale_status(vnfd_id=vnf_instance_obj.vnfdId)
|
||||
]
|
||||
delta = 2
|
||||
mock_scale_vdu_and_num.return_value = {'VDU1': delta}
|
||||
current_pod_num = 4
|
||||
vdu_id = "VDU1"
|
||||
|
||||
ex = self.assertRaises(
|
||||
sol_ex.DbSyncFailed, self.driver.sync_db,
|
||||
self.context, vnf_instance_obj, vim_connection_object)
|
||||
self.assertEqual(
|
||||
"Error computing 'scale_level'. current Pod num: "
|
||||
f"{current_pod_num} delta: {delta}. vnf: {vnf_instance_obj.id} "
|
||||
f"vdu: {vdu_id}", ex.args[0])
|
||||
|
||||
@mock.patch.object(vnfd_utils.Vnfd, 'get_scale_vdu_and_num')
|
||||
@mock.patch.object(nfvo_client.NfvoClient, 'get_vnfd')
|
||||
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
|
||||
def test_sync_db_pod_range(
|
||||
self, mock_list_namespaced_pod, mock_get_vnfd,
|
||||
mock_scale_vdu_and_num):
|
||||
vnf_instance_obj = fakes.fake_vnf_instance()
|
||||
|
||||
vnfc_rsc_info_obj1, vnfc_info_obj1 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU1', rsc_kind='Deployment',
|
||||
pod_name="vdu1-1234567890-abcd1", rsc_name="vdu1")
|
||||
|
||||
vnf_instance_obj.instantiatedVnfInfo.vnfcResourceInfo = [
|
||||
vnfc_rsc_info_obj1
|
||||
]
|
||||
vim_connection_object = fakes.fake_vim_connection_info()
|
||||
vnf_instance_obj.vimConnectionInfo['vim1'] = vim_connection_object
|
||||
|
||||
mock_list_namespaced_pod.return_value = client.V1PodList(
|
||||
items=[
|
||||
fakes.get_fake_pod_info(
|
||||
kind='Deployment', pod_name="vdu1-1234567890-abcd1"),
|
||||
fakes.get_fake_pod_info(
|
||||
kind='Deployment', pod_name="vdu1-1234567890-abcd2"),
|
||||
fakes.get_fake_pod_info(
|
||||
kind='Deployment', pod_name="vdu1-1234567890-abcd3"),
|
||||
fakes.get_fake_pod_info(
|
||||
kind='Deployment', pod_name="vdu1-1234567890-abcd4"),
|
||||
fakes.get_fake_pod_info(
|
||||
kind='Deployment', pod_name="vdu1-1234567890-abcd5")
|
||||
])
|
||||
mock_get_vnfd.return_value = self.vnfd_1
|
||||
vnf_instance_obj.vnfdId = uuidutils.generate_uuid()
|
||||
vnf_instance_obj.instantiatedVnfInfo.scaleStatus = [
|
||||
fakes.fake_scale_status(vnfd_id=vnf_instance_obj.vnfdId)
|
||||
]
|
||||
delta = 2
|
||||
mock_scale_vdu_and_num.return_value = {'VDU1': delta}
|
||||
current_pod_num = 5
|
||||
vdu_id = "VDU1"
|
||||
|
||||
ex = self.assertRaises(
|
||||
sol_ex.DbSyncFailed, self.driver.sync_db,
|
||||
self.context, vnf_instance_obj, vim_connection_object)
|
||||
self.assertEqual(
|
||||
f"Failed to update database vnf {vnf_instance_obj.id} "
|
||||
f"vdu: {vdu_id}. Pod num is out of range. "
|
||||
f"pod_num: {current_pod_num}", ex.args[0])
|
||||
|
@ -20,6 +20,7 @@ from unittest import mock
|
||||
import yaml
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
@ -38,6 +39,7 @@ from tacker.objects import vim_connection
|
||||
from tacker.tests.unit.db import base as db_base
|
||||
from tacker.tests.unit.nfvo.test_nfvo_plugin import FakeVNFMPlugin
|
||||
from tacker.tests.unit.vnflcm import fakes
|
||||
from tacker.tests.unit.vnfm.infra_drivers.kubernetes import fakes as k8s_fakes
|
||||
from tacker.tests import utils as test_utils
|
||||
from tacker.tests import uuidsentinel
|
||||
from tacker.vnflcm import vnflcm_driver
|
||||
@ -3538,3 +3540,74 @@ class TestVnflcmDriver(db_base.SqlTestCase):
|
||||
"connectivity vnf '%s' is completed successfully")
|
||||
mock_log.info.assert_called_with(expected_msg,
|
||||
vnf_instance.id)
|
||||
|
||||
@mock.patch.object(TackerManager, 'get_service_plugins',
|
||||
return_value={'VNFM': FakeVNFMPlugin()})
|
||||
@mock.patch.object(VnfLcmDriver, '_init_mgmt_driver_hash')
|
||||
@mock.patch.object(yaml, "safe_load")
|
||||
@mock.patch.object(objects.VnfInstanceList, 'get_by_filters')
|
||||
@mock.patch.object(objects.VimConnectionInfo, "obj_from_primitive")
|
||||
def test_sync_db(self, mock_vim, mock_get_by_filter,
|
||||
mock_yaml_safe_load, mock_init_hash,
|
||||
mock_get_service_plugins):
|
||||
mock_init_hash.return_value = {
|
||||
"vnflcm_noop": "ffea638bfdbde3fb01f191bbe75b031859"
|
||||
"b18d663b127100eb72b19eecd7ed51"
|
||||
}
|
||||
mock_yaml_safe_load.return_value = fakes.vnfd_dict_cnf()
|
||||
vnf_instance_obj = fakes.return_vnf_instance(
|
||||
fields.VnfInstanceState.INSTANTIATED)
|
||||
vnf_instance_obj.vnf_metadata['namespace'] = "default"
|
||||
vnfc_resource_info_obj = k8s_fakes.fake_vnfc_resource_info(
|
||||
namespace="default")
|
||||
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = (
|
||||
[vnfc_resource_info_obj])
|
||||
|
||||
vim_connection_info = vim_connection.VimConnectionInfo(
|
||||
vim_type="kubernetes", id="8a3adb69-0784-43c7-833e-aab0b6ab4470",
|
||||
vim_id="67e5de95-1ad2-4627-89f8-597dec1683fa")
|
||||
|
||||
vnf_instance_obj.vim_connection_info.append(vim_connection_info)
|
||||
mock_get_by_filter.return_value = [vnf_instance_obj]
|
||||
mock_vim.return_value = vim_connection_info
|
||||
|
||||
self._mock_vnf_manager()
|
||||
driver = vnflcm_driver.VnfLcmDriver()
|
||||
driver.sync_db(self.context)
|
||||
mock_get_by_filter.assert_called_once()
|
||||
self._vnf_manager.invoke.assert_called_once()
|
||||
|
||||
@mock.patch.object(TackerManager, 'get_service_plugins',
|
||||
return_value={'VNFM': FakeVNFMPlugin()})
|
||||
@mock.patch.object(VnfLcmDriver, '_init_mgmt_driver_hash')
|
||||
@mock.patch.object(yaml, "safe_load")
|
||||
@mock.patch.object(objects.VnfInstanceList, 'get_by_filters')
|
||||
@mock.patch.object(objects.VimConnectionInfo, "obj_from_primitive")
|
||||
def test_sync_db_exception(
|
||||
self, mock_vim, mock_get_by_filter, mock_yaml_safe_load,
|
||||
mock_init_hash, mock_get_service_plugins):
|
||||
mock_init_hash.return_value = {
|
||||
"vnflcm_noop": "ffea638bfdbde3fb01f191bbe75b031859"
|
||||
"b18d663b127100eb72b19eecd7ed51"
|
||||
}
|
||||
mock_yaml_safe_load.return_value = fakes.vnfd_dict_cnf()
|
||||
vnf_instance_obj = fakes.return_vnf_instance(
|
||||
fields.VnfInstanceState.INSTANTIATED)
|
||||
vnf_instance_obj.vnf_metadata['namespace'] = "default"
|
||||
vnfc_resource_info_obj = k8s_fakes.fake_vnfc_resource_info(
|
||||
namespace="default")
|
||||
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = (
|
||||
[vnfc_resource_info_obj])
|
||||
|
||||
mock_get_by_filter.return_value = [vnf_instance_obj]
|
||||
mock_vim.return_value = None
|
||||
|
||||
driver = vnflcm_driver.VnfLcmDriver()
|
||||
log_name = "tacker.vnflcm.vnflcm_driver"
|
||||
with self.assertLogs(logger=log_name, level=logging.ERROR) as cm:
|
||||
driver.sync_db(self.context)
|
||||
|
||||
self.assertIn(
|
||||
f"ERROR:{log_name}:Error is occoured vnf {vnf_instance_obj.id} "
|
||||
f"Error: ", cm.output[0])
|
||||
mock_get_by_filter.assert_called_once()
|
||||
|
@ -18,6 +18,7 @@ import ddt
|
||||
import os
|
||||
|
||||
from kubernetes import client
|
||||
from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
from tacker.common.container import kubernetes_utils
|
||||
from tacker.common import exceptions
|
||||
@ -36,6 +37,7 @@ from tacker.tests.unit.vnflcm import fakes as vnflcm_fakes
|
||||
from tacker.tests.unit.vnfm.infra_drivers.kubernetes import fakes
|
||||
from tacker.tests.unit.vnfm.infra_drivers.openstack.fixture_data import \
|
||||
fixture_data_utils as fd_utils
|
||||
from tacker.vnflcm import utils as vnflcm_utils
|
||||
from tacker.vnfm.infra_drivers.kubernetes.k8s import tosca_kube_object
|
||||
from tacker.vnfm.infra_drivers.kubernetes.k8s import translate_outputs
|
||||
from tacker.vnfm.infra_drivers.kubernetes import kubernetes_driver
|
||||
@ -3855,3 +3857,525 @@ class TestKubernetes(base.TestCase):
|
||||
vim_connection_info=vim_connection_object,
|
||||
heal_vnf_request=heal_request_data_obj)
|
||||
self.assertEqual(mock_list_namespaced_pod.call_count, 0)
|
||||
|
||||
@mock.patch.object(kubernetes_driver.Kubernetes,
|
||||
"_sync_vnfc_resource_and_pod_resource")
|
||||
@mock.patch.object(objects.VimConnectionInfo, "obj_from_primitive")
|
||||
@mock.patch.object(vnflcm_utils, "get_vim")
|
||||
@mock.patch.object(VnfInstance, "save")
|
||||
@mock.patch.object(objects.VnfInstance, "get_by_id")
|
||||
@mock.patch.object(kubernetes_driver.Kubernetes, '_check_pod_information')
|
||||
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
|
||||
def test_sync_db(
|
||||
self, mock_list_namespaced_pod, mock_check_pod_information,
|
||||
mock_get_by_id, mock_save, mock_get_vim, mock_vim,
|
||||
mock_sync_vnfc):
|
||||
mock_list_namespaced_pod.return_value = client.V1PodList(
|
||||
items=[fakes.get_fake_pod_info(kind='Deployment')])
|
||||
mock_check_pod_information.return_value = True
|
||||
|
||||
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
|
||||
fields.VnfInstanceState.INSTANTIATED)
|
||||
vnf_instance_obj.vnf_metadata['namespace'] = "default"
|
||||
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU1', rsc_kind='Pod')
|
||||
vnfc_resource_info_obj2 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU2', rsc_kind='Deployment')
|
||||
vnfc_resource_info_obj3 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU3', rsc_kind='ReplicaSet')
|
||||
vnfc_resource_info_obj4 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU4', rsc_kind='DaemonSet')
|
||||
vnfc_resource_info_obj5 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU5', rsc_kind='StatefulSet')
|
||||
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
|
||||
vnfc_resource_info_obj1,
|
||||
vnfc_resource_info_obj2,
|
||||
vnfc_resource_info_obj3,
|
||||
vnfc_resource_info_obj4,
|
||||
vnfc_resource_info_obj5
|
||||
]
|
||||
vim_connection_object = fakes.fake_vim_connection_info()
|
||||
|
||||
mock_get_by_id.return_value = vnf_instance_obj
|
||||
mock_vim.return_value = vim_connection_object
|
||||
mock_sync_vnfc.return_value = True
|
||||
|
||||
log_name = "tacker.vnfm.infra_drivers.kubernetes.kubernetes_driver"
|
||||
with self.assertLogs(logger=log_name, level=logging.INFO) as cm:
|
||||
self.kubernetes.sync_db(
|
||||
context=self.context, vnf_instance=vnf_instance_obj,
|
||||
vim_info=vim_connection_object)
|
||||
self.assertCountEqual([
|
||||
f'INFO:{log_name}:Database synchronization succeeded. '
|
||||
f'vnf: {vnf_instance_obj.id} '
|
||||
f'vdu: {vnfc_resource_info_obj1.vdu_id}',
|
||||
f'INFO:{log_name}:Database synchronization succeeded. '
|
||||
f'vnf: {vnf_instance_obj.id} '
|
||||
f'vdu: {vnfc_resource_info_obj2.vdu_id}',
|
||||
f'INFO:{log_name}:Database synchronization succeeded. '
|
||||
f'vnf: {vnf_instance_obj.id} '
|
||||
f'vdu: {vnfc_resource_info_obj3.vdu_id}',
|
||||
f'INFO:{log_name}:Database synchronization succeeded. '
|
||||
f'vnf: {vnf_instance_obj.id} '
|
||||
f'vdu: {vnfc_resource_info_obj4.vdu_id}',
|
||||
f'INFO:{log_name}:Database synchronization succeeded. '
|
||||
f'vnf: {vnf_instance_obj.id} '
|
||||
f'vdu: {vnfc_resource_info_obj5.vdu_id}',
|
||||
], cm.output)
|
||||
|
||||
@mock.patch.object(kubernetes_driver.Kubernetes, '_check_pod_information')
|
||||
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
|
||||
def test_sync_db_vnf_conflict(
|
||||
self, mock_list_namespaced_pod, mock_check_pod_information):
|
||||
mock_list_namespaced_pod.return_value = client.V1PodList(
|
||||
items=[fakes.get_fake_pod_info(kind='Deployment')])
|
||||
mock_check_pod_information.return_value = True
|
||||
|
||||
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
|
||||
fields.VnfInstanceState.NOT_INSTANTIATED)
|
||||
vnf_instance_obj.vnf_metadata['namespace'] = "default"
|
||||
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU1', rsc_kind='Pod')
|
||||
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
|
||||
vnfc_resource_info_obj1
|
||||
]
|
||||
vim_connection_object = fakes.fake_vim_connection_info()
|
||||
log_name = "tacker.vnfm.infra_drivers.kubernetes.kubernetes_driver"
|
||||
|
||||
with self.assertLogs(logger=log_name, level=logging.INFO) as cm:
|
||||
self.kubernetes.sync_db(
|
||||
context=self.context, vnf_instance=vnf_instance_obj,
|
||||
vim_info=vim_connection_object)
|
||||
self.assertEqual(
|
||||
[f'INFO:{log_name}:There is an LCM operation in progress, '
|
||||
'so skip this DB synchronization. '
|
||||
f'vnf: {vnf_instance_obj.id}'], cm.output)
|
||||
|
||||
def test_sync_db_exception(self):
|
||||
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
|
||||
fields.VnfInstanceState.NOT_INSTANTIATED)
|
||||
vnf_instance_obj.instantiated_vnf_info = None
|
||||
vim_connection_object = fakes.fake_vim_connection_info()
|
||||
log_name = "tacker.vnfm.infra_drivers.kubernetes.kubernetes_driver"
|
||||
|
||||
with self.assertLogs(logger=log_name, level=logging.INFO) as cm:
|
||||
self.kubernetes.sync_db(
|
||||
context=self.context, vnf_instance=vnf_instance_obj,
|
||||
vim_info=vim_connection_object)
|
||||
self.assertIn(
|
||||
f"Failed to synchronize database vnf: "
|
||||
f"{vnf_instance_obj.id}", cm.output[0])
|
||||
|
||||
@mock.patch.object(objects.VimConnectionInfo, "obj_from_primitive")
|
||||
@mock.patch.object(vnflcm_utils, "get_vim")
|
||||
@mock.patch.object(VnfInstance, "save")
|
||||
@mock.patch.object(objects.VnfInstance, "get_by_id")
|
||||
@mock.patch.object(kubernetes_driver.Kubernetes, '_check_pod_information')
|
||||
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
|
||||
def test_sync_db_check_pod_false(
|
||||
self, mock_list_namespaced_pod, mock_check_pod_information,
|
||||
mock_get_by_id, mock_save, mock_get_vim, mock_vim):
|
||||
mock_list_namespaced_pod.return_value = client.V1PodList(
|
||||
items=[fakes.get_fake_pod_info(kind='Pod')])
|
||||
mock_check_pod_information.side_effect = [True, False]
|
||||
|
||||
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
|
||||
fields.VnfInstanceState.INSTANTIATED)
|
||||
vnf_instance_obj.vnf_metadata['namespace'] = "default"
|
||||
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU1', rsc_kind='Pod')
|
||||
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
|
||||
vnfc_resource_info_obj1
|
||||
]
|
||||
vim_connection_object = fakes.fake_vim_connection_info()
|
||||
|
||||
mock_get_by_id.return_value = vnf_instance_obj
|
||||
mock_vim.return_value = vim_connection_object
|
||||
|
||||
self.kubernetes.sync_db(
|
||||
context=self.context, vnf_instance=vnf_instance_obj,
|
||||
vim_info=vim_connection_object)
|
||||
|
||||
self.assertEqual(2, mock_check_pod_information.call_count)
|
||||
self.assertEqual(2, mock_save.call_count)
|
||||
|
||||
@mock.patch.object(kubernetes_driver.Kubernetes,
|
||||
"_sync_vnfc_resource_and_pod_resource")
|
||||
@mock.patch.object(objects.VimConnectionInfo, "obj_from_primitive")
|
||||
@mock.patch.object(vnflcm_utils, "get_vim")
|
||||
@mock.patch.object(VnfInstance, "save")
|
||||
@mock.patch.object(objects.VnfInstance, "get_by_id")
|
||||
@mock.patch.object(kubernetes_driver.Kubernetes, '_check_pod_information')
|
||||
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
|
||||
def test_sync_db_not_succeeded(
|
||||
self, mock_list_namespaced_pod, mock_check_pod_information,
|
||||
mock_get_by_id, mock_save, mock_get_vim, mock_vim,
|
||||
mock_sync_vnfc):
|
||||
mock_list_namespaced_pod.return_value = client.V1PodList(
|
||||
items=[fakes.get_fake_pod_info(kind='Pod')])
|
||||
mock_check_pod_information.return_value = True
|
||||
|
||||
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
|
||||
fields.VnfInstanceState.INSTANTIATED)
|
||||
vnf_instance_obj.vnf_metadata['namespace'] = "default"
|
||||
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU1', rsc_kind='Pod')
|
||||
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
|
||||
vnfc_resource_info_obj1
|
||||
]
|
||||
vim_connection_object = fakes.fake_vim_connection_info()
|
||||
|
||||
mock_get_by_id.return_value = vnf_instance_obj
|
||||
mock_vim.return_value = vim_connection_object
|
||||
mock_sync_vnfc.return_value = False
|
||||
|
||||
self.kubernetes.sync_db(
|
||||
context=self.context, vnf_instance=vnf_instance_obj,
|
||||
vim_info=vim_connection_object)
|
||||
self.assertEqual(1, mock_sync_vnfc.call_count)
|
||||
|
||||
@mock.patch.object(objects.VimConnectionInfo, "obj_from_primitive")
|
||||
@mock.patch.object(vnflcm_utils, "get_vim")
|
||||
@mock.patch.object(VnfInstance, "save")
|
||||
@mock.patch.object(objects.VnfInstance, "get_by_id")
|
||||
@mock.patch.object(kubernetes_driver.Kubernetes, '_check_pod_information')
|
||||
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
|
||||
def test_sync_db_failed_update_db(
|
||||
self, mock_list_namespaced_pod, mock_check_pod_information,
|
||||
mock_get_by_id, mock_save, mock_get_vim, mock_vim):
|
||||
mock_list_namespaced_pod.return_value = client.V1PodList(
|
||||
items=[fakes.get_fake_pod_info(kind='Deployment')])
|
||||
mock_check_pod_information.return_value = True
|
||||
|
||||
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
|
||||
fields.VnfInstanceState.INSTANTIATED)
|
||||
vnf_instance_obj.vnf_metadata['namespace'] = "default"
|
||||
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU1', rsc_kind='Pod')
|
||||
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
|
||||
vnfc_resource_info_obj1
|
||||
]
|
||||
vim_connection_object = fakes.fake_vim_connection_info()
|
||||
|
||||
mock_get_by_id.return_value = vnf_instance_obj
|
||||
mock_vim.return_value = vim_connection_object
|
||||
|
||||
log_name = "tacker.vnfm.infra_drivers.kubernetes.kubernetes_driver"
|
||||
with self.assertLogs(logger=log_name, level=logging.ERROR) as cm:
|
||||
self.kubernetes.sync_db(
|
||||
context=self.context, vnf_instance=vnf_instance_obj,
|
||||
vim_info=vim_connection_object)
|
||||
self.assertIn(
|
||||
f'ERROR:{log_name}:Failed to update database vnf '
|
||||
f'{vnf_instance_obj.id} Error: ', cm.output[0])
|
||||
|
||||
@mock.patch.object(client.CoreV1Api, 'list_namespaced_pod')
|
||||
def test_get_pod_information_no_namespace(self, mock_list_namespaced_pod):
|
||||
mock_list_namespaced_pod.return_value = client.V1PodList(items=[])
|
||||
|
||||
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
|
||||
fields.VnfInstanceState.INSTANTIATED)
|
||||
vnf_instance_obj.vnf_metadata['namespace'] = ""
|
||||
vim_connection_object = fakes.fake_vim_connection_info()
|
||||
|
||||
result = self.kubernetes._get_pod_information(
|
||||
resource_name=mock.ANY, resource_type=mock.ANY,
|
||||
vnf_instance=vnf_instance_obj,
|
||||
vim_connection_info=vim_connection_object)
|
||||
|
||||
self.assertEqual({}, result)
|
||||
|
||||
@mock.patch.object(kubernetes_driver.Kubernetes, '_sync_resource_id')
|
||||
def test_sync_vnfc_resource_and_pod_resource_eq(
|
||||
self, mock_sync_resource_id):
|
||||
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
|
||||
fields.VnfInstanceState.INSTANTIATED)
|
||||
vnf_instance_obj.vnf_metadata['namespace'] = "default"
|
||||
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU1', rsc_kind='Pod')
|
||||
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
|
||||
vnfc_resource_info_obj1
|
||||
]
|
||||
pod_resources_from_k8s = {
|
||||
'fake_name-1234567890-abcde': {
|
||||
'name': 'fake_name-1234567890-abcde',
|
||||
'namespace': 'default'
|
||||
}
|
||||
}
|
||||
result = self.kubernetes._sync_vnfc_resource_and_pod_resource(
|
||||
context=self.context, vnf_instance=vnf_instance_obj,
|
||||
pod_resources_from_k8s=pod_resources_from_k8s, vdu_id='VDU1')
|
||||
self.assertTrue(result)
|
||||
|
||||
@mock.patch.object(kubernetes_driver.Kubernetes, '_delete_vnfc_resource')
|
||||
def test_sync_vnfc_resource_and_pod_resource_gt(
|
||||
self, mock_delete_vnfc_resource):
|
||||
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
|
||||
fields.VnfInstanceState.INSTANTIATED)
|
||||
vnf_instance_obj.vnf_metadata['namespace'] = "default"
|
||||
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU1', rsc_kind='Pod')
|
||||
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
|
||||
vnfc_resource_info_obj1
|
||||
]
|
||||
pod_resources_from_k8s = {}
|
||||
mock_delete_vnfc_resource.return_value = False
|
||||
result = self.kubernetes._sync_vnfc_resource_and_pod_resource(
|
||||
context=self.context, vnf_instance=vnf_instance_obj,
|
||||
pod_resources_from_k8s=pod_resources_from_k8s, vdu_id='VDU1')
|
||||
self.assertEqual(1, mock_delete_vnfc_resource.call_count)
|
||||
self.assertFalse(result)
|
||||
|
||||
@mock.patch.object(kubernetes_driver.Kubernetes, '_add_vnfc_resource')
|
||||
def test_sync_vnfc_resource_and_pod_resource_lt(
|
||||
self, mock_add_vnfc_resource):
|
||||
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
|
||||
fields.VnfInstanceState.INSTANTIATED)
|
||||
vnf_instance_obj.vnf_metadata['namespace'] = "default"
|
||||
pod_resources_from_k8s = {
|
||||
'fake_name-1234567890-abcde': {
|
||||
'name': 'fake_name-1234567890-abcde',
|
||||
'namespace': 'default'
|
||||
}
|
||||
}
|
||||
mock_add_vnfc_resource.return_value = True
|
||||
result = self.kubernetes._sync_vnfc_resource_and_pod_resource(
|
||||
context=self.context, vnf_instance=vnf_instance_obj,
|
||||
pod_resources_from_k8s=pod_resources_from_k8s, vdu_id='VDU1')
|
||||
self.assertEqual(1, mock_add_vnfc_resource.call_count)
|
||||
self.assertTrue(result)
|
||||
|
||||
@mock.patch.object(kubernetes_driver.Kubernetes, '_calc_scale_level')
|
||||
def test_delete_vnfc_resource(self, mock_calc_scale_level):
|
||||
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
|
||||
fields.VnfInstanceState.INSTANTIATED)
|
||||
vnf_instance_obj.vnf_metadata['namespace'] = "default"
|
||||
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU1', rsc_kind='Pod', pod_name='pod')
|
||||
vnfc_resource_info_obj2 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU2', rsc_kind='Deployment',
|
||||
pod_name='deploy-1234567890-fghij')
|
||||
vnfc_resource_info_obj3 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU2', rsc_kind='Deployment',
|
||||
pod_name='deploy-1234567890-abcde')
|
||||
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
|
||||
vnfc_resource_info_obj1, vnfc_resource_info_obj2,
|
||||
vnfc_resource_info_obj3
|
||||
]
|
||||
pod_resources_from_k8s = {
|
||||
'deploy-1234567890-abcde': {
|
||||
'name': 'deploy-1234567890-abcde',
|
||||
'namespace': 'default'
|
||||
}
|
||||
}
|
||||
mock_calc_scale_level.return_value = True
|
||||
result = self.kubernetes._delete_vnfc_resource(
|
||||
context=self.context, vnf_instance=vnf_instance_obj, vdu_id='VDU2',
|
||||
pod_resources_from_k8s=pod_resources_from_k8s, vnfc_count=2)
|
||||
self.assertEqual(1, mock_calc_scale_level.call_count)
|
||||
self.assertTrue(result)
|
||||
|
||||
@mock.patch.object(kubernetes_driver.Kubernetes, '_calc_scale_level')
|
||||
def test_delete_vnfc_resource_rsc_not_same(self, mock_calc_scale_level):
|
||||
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
|
||||
fields.VnfInstanceState.INSTANTIATED)
|
||||
vnf_instance_obj.vnf_metadata['namespace'] = "default"
|
||||
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU1', rsc_kind='Pod', pod_name='pod')
|
||||
vnfc_resource_info_obj2 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU2', rsc_kind='Deployment',
|
||||
pod_name='deploy-1234567890-fghij')
|
||||
vnfc_resource_info_obj3 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU2', rsc_kind='Deployment',
|
||||
pod_name='deploy-1234567890-abcde')
|
||||
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
|
||||
vnfc_resource_info_obj1, vnfc_resource_info_obj2,
|
||||
vnfc_resource_info_obj3
|
||||
]
|
||||
pod_resources_from_k8s = {
|
||||
'deploy-1234567890-klmno': {
|
||||
'name': 'deploy-1234567890-klmno',
|
||||
'namespace': 'default'
|
||||
}
|
||||
}
|
||||
mock_calc_scale_level.return_value = False
|
||||
result = self.kubernetes._delete_vnfc_resource(
|
||||
context=self.context, vnf_instance=vnf_instance_obj, vdu_id='VDU2',
|
||||
pod_resources_from_k8s=pod_resources_from_k8s, vnfc_count=2)
|
||||
self.assertEqual(1, mock_calc_scale_level.call_count)
|
||||
self.assertFalse(result)
|
||||
|
||||
@mock.patch.object(kubernetes_driver.Kubernetes, '_calc_scale_level')
|
||||
def test_add_vnfc_resource(self, mock_calc_scale_level):
|
||||
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
|
||||
fields.VnfInstanceState.INSTANTIATED)
|
||||
vnf_instance_obj.vnf_metadata['namespace'] = "default"
|
||||
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU2', rsc_kind='Deployment',
|
||||
pod_name='deploy-1234567890-fghij')
|
||||
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
|
||||
vnfc_resource_info_obj1
|
||||
]
|
||||
pod_resources_from_k8s = {
|
||||
'deploy-1234567890-klmno': {
|
||||
'name': 'deploy-1234567890-klmno',
|
||||
'namespace': 'default'
|
||||
},
|
||||
'deploy-1234567890-abcde': {
|
||||
'name': 'deploy-1234567890-abcde',
|
||||
'namespace': 'default'
|
||||
}
|
||||
}
|
||||
mock_calc_scale_level.return_value = False
|
||||
result = self.kubernetes._add_vnfc_resource(
|
||||
context=self.context, vnf_instance=vnf_instance_obj, vdu_id='VDU2',
|
||||
pod_resources_from_k8s=pod_resources_from_k8s, vnfc_count=1)
|
||||
self.assertEqual(1, mock_calc_scale_level.call_count)
|
||||
self.assertFalse(result)
|
||||
|
||||
@mock.patch('tacker.vnflcm.utils._get_vnfd_dict')
|
||||
def test_calc_scale_level(self, mock_get_vnfd_dict):
|
||||
mock_get_vnfd_dict.return_value = vnflcm_fakes.vnfd_dict_cnf()
|
||||
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
|
||||
fields.VnfInstanceState.INSTANTIATED, 'scale_status')
|
||||
vnf_instance_obj.instantiated_vnf_info.scale_status[0].aspect_id = (
|
||||
'vdu1_aspect')
|
||||
vnf_instance_obj.vnf_metadata['namespace'] = "default"
|
||||
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU1', rsc_kind='Deployment',
|
||||
pod_name='deploy-1234567890-fghij')
|
||||
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
|
||||
vnfc_resource_info_obj1
|
||||
]
|
||||
result = self.kubernetes._calc_scale_level(
|
||||
context=self.context, vnf_instance=vnf_instance_obj,
|
||||
vdu_id='VDU1', current_pod_num=1)
|
||||
self.assertTrue(result)
|
||||
|
||||
@mock.patch('tacker.vnflcm.utils._get_vnfd_dict')
|
||||
def test_calc_scale_level_error(self, mock_get_vnfd_dict):
|
||||
delta = 2
|
||||
vnfd_obj = vnflcm_fakes.vnfd_dict_cnf()
|
||||
for policy in vnfd_obj['topology_template']['policies']:
|
||||
if policy.get('vdu1_scaling_aspect_deltas'):
|
||||
policy['vdu1_scaling_aspect_deltas']['properties'][
|
||||
'deltas']['delta_1']['number_of_instances'] = delta
|
||||
break
|
||||
mock_get_vnfd_dict.return_value = vnfd_obj
|
||||
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
|
||||
fields.VnfInstanceState.INSTANTIATED, 'scale_status')
|
||||
vnf_instance_obj.instantiated_vnf_info.scale_status[0].aspect_id = (
|
||||
'vdu1_aspect')
|
||||
vnf_instance_obj.vnf_metadata['namespace'] = "default"
|
||||
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU1', rsc_kind='Deployment',
|
||||
pod_name='deploy-1234567890-fghij')
|
||||
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
|
||||
vnfc_resource_info_obj1
|
||||
]
|
||||
current_pod_num = 5
|
||||
|
||||
log_name = "tacker.vnfm.infra_drivers.kubernetes.kubernetes_driver"
|
||||
with self.assertLogs(logger=log_name, level=logging.ERROR) as cm:
|
||||
result = self.kubernetes._calc_scale_level(
|
||||
context=self.context, vnf_instance=vnf_instance_obj,
|
||||
vdu_id='VDU1', current_pod_num=current_pod_num)
|
||||
self.assertEqual(
|
||||
[f"ERROR:{log_name}:Error computing 'scale_level'. current Pod "
|
||||
f"num: {current_pod_num} delta: {delta}. "
|
||||
f"vnf: {vnf_instance_obj.id} vdu: VDU1"], cm.output)
|
||||
self.assertFalse(result)
|
||||
|
||||
@mock.patch('tacker.vnflcm.utils._get_vnfd_dict')
|
||||
def test_calc_scale_level_pod_range_error(self, mock_get_vnfd_dict):
|
||||
delta = 2
|
||||
vnfd_obj = vnflcm_fakes.vnfd_dict_cnf()
|
||||
for policy in vnfd_obj['topology_template']['policies']:
|
||||
if policy.get('vdu1_scaling_aspect_deltas'):
|
||||
policy['vdu1_scaling_aspect_deltas']['properties'][
|
||||
'deltas']['delta_1']['number_of_instances'] = delta
|
||||
break
|
||||
mock_get_vnfd_dict.return_value = vnfd_obj
|
||||
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
|
||||
fields.VnfInstanceState.INSTANTIATED, 'scale_status')
|
||||
vnf_instance_obj.instantiated_vnf_info.scale_status[0].aspect_id = (
|
||||
'vdu1_aspect')
|
||||
vnf_instance_obj.vnf_metadata['namespace'] = "default"
|
||||
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU1', rsc_kind='Deployment',
|
||||
pod_name='deploy-1234567890-fghij')
|
||||
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
|
||||
vnfc_resource_info_obj1
|
||||
]
|
||||
current_pod_num = 4
|
||||
|
||||
log_name = "tacker.vnfm.infra_drivers.kubernetes.kubernetes_driver"
|
||||
with self.assertLogs(logger=log_name, level=logging.ERROR) as cm:
|
||||
result = self.kubernetes._calc_scale_level(
|
||||
context=self.context, vnf_instance=vnf_instance_obj,
|
||||
vdu_id='VDU1', current_pod_num=current_pod_num)
|
||||
self.assertEqual(
|
||||
[f"ERROR:{log_name}:Failed to update database vnf "
|
||||
f"{vnf_instance_obj.id} vdu: VDU1. Pod num is out of range. "
|
||||
f"pod_num: {current_pod_num}"], cm.output)
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_check_pod_information_len(self):
|
||||
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
|
||||
fields.VnfInstanceState.INSTANTIATED)
|
||||
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU1', rsc_kind='Deployment',
|
||||
pod_name='deploy-1234567890-fghij')
|
||||
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
|
||||
vnfc_resource_info_obj1
|
||||
]
|
||||
pod_resources_from_k8s = {
|
||||
'deploy-1234567890-klmno': {
|
||||
'name': 'deploy-1234567890-klmno'
|
||||
},
|
||||
'deploy-1234567890-abcde': {
|
||||
'name': 'deploy-1234567890-abcde'
|
||||
}
|
||||
}
|
||||
result = self.kubernetes._check_pod_information(
|
||||
vnf_instance=vnf_instance_obj, vdu_id='VDU1',
|
||||
pod_resources_from_k8s=pod_resources_from_k8s)
|
||||
self.assertTrue(result)
|
||||
|
||||
def test_check_pod_information_set(self):
|
||||
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
|
||||
fields.VnfInstanceState.INSTANTIATED)
|
||||
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU1', rsc_kind='Deployment',
|
||||
pod_name='deploy-1234567890-fghij')
|
||||
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
|
||||
vnfc_resource_info_obj1
|
||||
]
|
||||
pod_resources_from_k8s = {
|
||||
'deploy-1234567890-klmno': {
|
||||
'name': 'deploy-1234567890-klmno'
|
||||
}
|
||||
}
|
||||
result = self.kubernetes._check_pod_information(
|
||||
vnf_instance=vnf_instance_obj, vdu_id='VDU1',
|
||||
pod_resources_from_k8s=pod_resources_from_k8s)
|
||||
self.assertTrue(result)
|
||||
|
||||
def test_check_pod_information_false(self):
|
||||
vnf_instance_obj = vnflcm_fakes.return_vnf_instance(
|
||||
fields.VnfInstanceState.INSTANTIATED)
|
||||
vnfc_resource_info_obj1 = fakes.fake_vnfc_resource_info(
|
||||
vdu_id='VDU1', rsc_kind='Deployment',
|
||||
pod_name='deploy-1234567890-fghij')
|
||||
vnf_instance_obj.instantiated_vnf_info.vnfc_resource_info = [
|
||||
vnfc_resource_info_obj1
|
||||
]
|
||||
pod_resources_from_k8s = {
|
||||
'deploy-1234567890-fghij': {
|
||||
'name': 'deploy-1234567890-fghij'
|
||||
}
|
||||
}
|
||||
result = self.kubernetes._check_pod_information(
|
||||
vnf_instance=vnf_instance_obj, vdu_id='VDU1',
|
||||
pod_resources_from_k8s=pod_resources_from_k8s)
|
||||
self.assertFalse(result)
|
||||
|
@ -1972,3 +1972,33 @@ class VnfLcmDriver(abstract_driver.VnfInstanceAbstractDriver):
|
||||
|
||||
LOG.info("Request received for changing external connectivity "
|
||||
"vnf '%s' is completed successfully", vnf_instance.id)
|
||||
|
||||
def sync_db(self, context):
|
||||
filters = {'model': 'VnfInstance',
|
||||
'field': 'instantiation_state',
|
||||
'value': 'INSTANTIATED',
|
||||
'op': '=='
|
||||
}
|
||||
vnf_instance_list = objects.VnfInstanceList.get_by_filters(
|
||||
context, filters)
|
||||
for vnf_instance in vnf_instance_list:
|
||||
vim_info = vnflcm_utils.get_vim(
|
||||
context, vnf_instance.vim_connection_info)
|
||||
vim_connection_info = (objects.VimConnectionInfo.
|
||||
obj_from_primitive(vim_info, context))
|
||||
|
||||
try:
|
||||
# Database synchronization works only when the
|
||||
# vimType is Kubernetes.
|
||||
if vim_connection_info.vim_type == 'kubernetes':
|
||||
self._vnf_manager.invoke(
|
||||
vim_connection_info.vim_type,
|
||||
'sync_db',
|
||||
context=context,
|
||||
vnf_instance=vnf_instance,
|
||||
vim_info=vim_connection_info
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
LOG.error(f"Error is occoured vnf {vnf_instance.id} "
|
||||
f"Error: {encodeutils.exception_to_unicode(e)}")
|
||||
|
@ -13,6 +13,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
@ -23,16 +24,19 @@ from kubernetes import client
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_utils import encodeutils
|
||||
from oslo_utils import uuidutils
|
||||
from toscaparser import tosca_template
|
||||
|
||||
from tacker._i18n import _
|
||||
from tacker.api.vnflcm.v1.controller import check_vnf_state
|
||||
from tacker.common.container import kubernetes_utils
|
||||
from tacker.common import exceptions
|
||||
from tacker.common import log
|
||||
from tacker.common import utils
|
||||
from tacker.extensions import vnfm
|
||||
from tacker import objects
|
||||
from tacker.objects import fields
|
||||
from tacker.objects.fields import ErrorPoint as EP
|
||||
from tacker.objects import vnf_package as vnf_package_obj
|
||||
from tacker.objects import vnf_package_vnfd as vnfd_obj
|
||||
@ -2671,3 +2675,395 @@ class Kubernetes(abstract_driver.VnfAbstractDriver,
|
||||
return_name_list = []
|
||||
return_grp_id = None
|
||||
return return_id_list, return_name_list, return_grp_id
|
||||
|
||||
def sync_db(self, context, vnf_instance, vim_info):
|
||||
"""method for database synchronization"""
|
||||
try:
|
||||
vdu_id_list = self._get_vdu_list(vnf_instance)
|
||||
|
||||
for vdu_id in vdu_id_list:
|
||||
# get pod information
|
||||
for vnfc in (
|
||||
vnf_instance.instantiated_vnf_info.vnfc_resource_info
|
||||
):
|
||||
if vnfc.vdu_id == vdu_id:
|
||||
resource_type = (
|
||||
vnfc.compute_resource.vim_level_resource_type)
|
||||
resource_name = self._get_resource_name(
|
||||
vnfc.compute_resource.resource_id, resource_type)
|
||||
break
|
||||
pod_resources_from_k8s = self._get_pod_information(
|
||||
resource_name, resource_type,
|
||||
vnf_instance, vim_info
|
||||
)
|
||||
# check difference
|
||||
if not self._check_pod_information(vnf_instance, vdu_id,
|
||||
pod_resources_from_k8s):
|
||||
# no difference
|
||||
continue
|
||||
|
||||
# exec db sync
|
||||
try:
|
||||
if self._database_update(context, vnf_instance,
|
||||
vdu_id):
|
||||
LOG.info("Database synchronization succeeded. "
|
||||
f"vnf: {vnf_instance.id} vdu: {vdu_id}")
|
||||
|
||||
except exceptions.VnfInstanceConflictState:
|
||||
LOG.info("There is an LCM operation in progress, so "
|
||||
"skip this DB synchronization. "
|
||||
f"vnf: {vnf_instance.id}")
|
||||
|
||||
except Exception as e:
|
||||
LOG.error(f"Failed to synchronize database vnf: "
|
||||
f"{vnf_instance.id} Error: "
|
||||
f"{encodeutils.exception_to_unicode(e)}")
|
||||
|
||||
@check_vnf_state(action="db_sync",
|
||||
instantiation_state=[fields.VnfInstanceState.INSTANTIATED],
|
||||
task_state=[None])
|
||||
def _database_update(self, context, vnf_inst,
|
||||
vdu_id):
|
||||
"""Update tacker DB
|
||||
|
||||
True: succeeded database update
|
||||
False: failed database update
|
||||
"""
|
||||
# get target object
|
||||
vnf_instance = objects.VnfInstance.get_by_id(
|
||||
context, vnf_inst.id)
|
||||
if vnf_instance.instantiation_state != 'INSTANTIATED':
|
||||
return False
|
||||
# change task_state
|
||||
vnf_instance.task_state = fields.VnfInstanceTaskState.DB_SYNCHRONIZING
|
||||
vnf_instance.save()
|
||||
|
||||
backup_vnf_instance = copy.deepcopy(vnf_instance)
|
||||
|
||||
vim_info = vnflcm_utils.get_vim(
|
||||
context, vnf_instance.vim_connection_info)
|
||||
vim_connection_info = objects.VimConnectionInfo.obj_from_primitive(
|
||||
vim_info, context)
|
||||
|
||||
# get pod information
|
||||
try:
|
||||
for vnfc in vnf_instance.instantiated_vnf_info.vnfc_resource_info:
|
||||
if vnfc.vdu_id == vdu_id:
|
||||
resource_type = (
|
||||
vnfc.compute_resource.vim_level_resource_type)
|
||||
resource_name = self._get_resource_name(
|
||||
vnfc.compute_resource.resource_id, resource_type)
|
||||
break
|
||||
pod_resources_from_k8s = self._get_pod_information(
|
||||
resource_name, resource_type,
|
||||
vnf_instance, vim_connection_info
|
||||
)
|
||||
|
||||
# check difference
|
||||
if not self._check_pod_information(vnf_instance, vdu_id,
|
||||
pod_resources_from_k8s):
|
||||
vnf_instance.task_state = None
|
||||
vnf_instance.save()
|
||||
return False
|
||||
|
||||
if self._sync_vnfc_resource_and_pod_resource(context,
|
||||
vnf_instance, pod_resources_from_k8s, vdu_id):
|
||||
vnf_instance.task_state = None
|
||||
vnf_instance.save()
|
||||
return True
|
||||
|
||||
vnf_instance = copy.deepcopy(backup_vnf_instance)
|
||||
vnf_instance.task_state = None
|
||||
vnf_instance.save()
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
LOG.error(f"Failed to update database vnf {vnf_instance.id} "
|
||||
f"Error: {encodeutils.exception_to_unicode(e)}")
|
||||
vnf_instance = copy.deepcopy(backup_vnf_instance)
|
||||
vnf_instance.task_state = None
|
||||
vnf_instance.save()
|
||||
return False
|
||||
|
||||
def _sync_vnfc_resource_and_pod_resource(self, context, vnf_instance,
|
||||
pod_resources_from_k8s, vdu_id):
|
||||
"""Sync Pod name between k8s and tacker"""
|
||||
vnfc_count = 0
|
||||
for vnfc in vnf_instance.instantiated_vnf_info.vnfc_resource_info:
|
||||
if vnfc.vdu_id == vdu_id:
|
||||
vnfc_count += 1
|
||||
|
||||
result = False
|
||||
# number of pod is same as k8s
|
||||
if vnfc_count == len(pod_resources_from_k8s):
|
||||
self._sync_resource_id(vnf_instance.instantiated_vnf_info,
|
||||
vdu_id, pod_resources_from_k8s)
|
||||
result = True
|
||||
# the number of pod is greater than k8s
|
||||
elif vnfc_count > len(pod_resources_from_k8s):
|
||||
result = self._delete_vnfc_resource(context, vnf_instance, vdu_id,
|
||||
pod_resources_from_k8s, vnfc_count)
|
||||
# the number of pod is less than k8s
|
||||
elif vnfc_count < len(pod_resources_from_k8s):
|
||||
result = self._add_vnfc_resource(context, vnf_instance, vdu_id,
|
||||
pod_resources_from_k8s, vnfc_count)
|
||||
|
||||
return result
|
||||
|
||||
def _delete_vnfc_resource(self, context, vnf_instance, vdu_id,
|
||||
pod_resources_from_k8s, vnfc_count):
|
||||
"""Remove 'vnfcResourceInfo' with mismatched pod names"""
|
||||
delete_index = []
|
||||
for i, vnfc_resource_info in enumerate(
|
||||
vnf_instance.instantiated_vnf_info.vnfc_resource_info):
|
||||
if vnfc_resource_info.vdu_id != vdu_id:
|
||||
continue
|
||||
if vnfc_resource_info.compute_resource.resource_id not in (
|
||||
set(pod_resources_from_k8s.keys())):
|
||||
delete_index.append(i)
|
||||
|
||||
# For patterns where the number of pods is reduced and
|
||||
# the remaining pods are also renamed
|
||||
delete_cnt = vnfc_count - len(pod_resources_from_k8s)
|
||||
if delete_cnt != len(delete_index):
|
||||
for i in range(len(delete_index) - delete_cnt):
|
||||
del delete_index[-1]
|
||||
for idx in reversed(delete_index):
|
||||
del vnf_instance.instantiated_vnf_info.vnfc_resource_info[idx]
|
||||
|
||||
self._sync_resource_id(vnf_instance.instantiated_vnf_info,
|
||||
vdu_id, pod_resources_from_k8s)
|
||||
return self._calc_scale_level(context, vnf_instance, vdu_id,
|
||||
len(pod_resources_from_k8s))
|
||||
|
||||
def _add_vnfc_resource(self, context, vnf_instance, vdu_id,
|
||||
pod_resources_from_k8s, vnfc_count):
|
||||
"""Add vnfcResourceInfo"""
|
||||
stored_pod_names = {
|
||||
vnfc.compute_resource.resource_id
|
||||
for vnfc in (
|
||||
vnf_instance.instantiated_vnf_info.vnfc_resource_info)
|
||||
if vnfc.vdu_id == vdu_id}
|
||||
add_pod_names = {
|
||||
pod_name for pod_name in (
|
||||
set(pod_resources_from_k8s.keys()))
|
||||
if pod_name not in stored_pod_names}
|
||||
|
||||
# Determine the number of 'vnfc_resource_info' to add
|
||||
add_vnfc_resource_info_num = (
|
||||
len(pod_resources_from_k8s) - vnfc_count)
|
||||
for vnfc in vnf_instance.instantiated_vnf_info.vnfc_resource_info:
|
||||
if vnfc.vdu_id == vdu_id:
|
||||
resource_type = (
|
||||
vnfc.compute_resource.vim_level_resource_type)
|
||||
metadata = vnfc.metadata[resource_type]
|
||||
break
|
||||
for dummy in range(add_vnfc_resource_info_num):
|
||||
resource_id = add_pod_names.pop()
|
||||
vnfc_resource = objects.VnfcResourceInfo()
|
||||
vnfc_resource.id = uuidutils.generate_uuid()
|
||||
vnfc_resource.vdu_id = vdu_id
|
||||
resource = objects.ResourceHandle()
|
||||
resource.resource_id = resource_id
|
||||
resource.vim_level_resource_type = resource_type
|
||||
vnfc_resource.compute_resource = resource
|
||||
vnfc_resource.metadata = {}
|
||||
vnfc_resource.metadata["Pod"] = (
|
||||
jsonutils.dumps(pod_resources_from_k8s.get(resource_id))
|
||||
)
|
||||
vnfc_resource.metadata[resource_type] = metadata
|
||||
vnf_instance.instantiated_vnf_info.vnfc_resource_info.append(
|
||||
vnfc_resource)
|
||||
|
||||
self._sync_resource_id(vnf_instance.instantiated_vnf_info,
|
||||
vdu_id, pod_resources_from_k8s)
|
||||
return self._calc_scale_level(context, vnf_instance, vdu_id,
|
||||
len(pod_resources_from_k8s))
|
||||
|
||||
def _check_pod_information(self, vnf_instance,
|
||||
vdu_id, pod_resources_from_k8s):
|
||||
"""Compare 'instantiatedVnfInfo' and 'pod_resources_from_k8s' info
|
||||
|
||||
True: find difference
|
||||
False: No difference
|
||||
"""
|
||||
vnfc_rscs = vnf_instance.instantiated_vnf_info.vnfc_resource_info
|
||||
pod_names = {vnfc.compute_resource.resource_id for vnfc in vnfc_rscs
|
||||
if vnfc.vdu_id == vdu_id}
|
||||
|
||||
if len(pod_names) != len(pod_resources_from_k8s):
|
||||
return True
|
||||
|
||||
# pod name check
|
||||
if pod_names != set(pod_resources_from_k8s.keys()):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _sync_resource_id(self, instantiated_vnf_info,
|
||||
vdu_id, pod_resources_from_k8s):
|
||||
"""Set new resourceId into old resourceId"""
|
||||
match_pod_names = set()
|
||||
unmatch_pod_in_tacker = set()
|
||||
|
||||
for vnfc in instantiated_vnf_info.vnfc_resource_info:
|
||||
if vnfc.vdu_id != vdu_id:
|
||||
continue
|
||||
|
||||
if vnfc.compute_resource.resource_id in (
|
||||
set(pod_resources_from_k8s.keys())):
|
||||
match_pod_names.add(
|
||||
vnfc.compute_resource.resource_id)
|
||||
else:
|
||||
unmatch_pod_in_tacker.add(
|
||||
vnfc.compute_resource.resource_id)
|
||||
|
||||
# pickup new pod name
|
||||
new_pod_name = set(pod_resources_from_k8s.keys()) ^ match_pod_names
|
||||
|
||||
for unmatch_pod_name in unmatch_pod_in_tacker:
|
||||
for vnfc in instantiated_vnf_info.vnfc_resource_info:
|
||||
if vnfc.vdu_id != vdu_id:
|
||||
continue
|
||||
if vnfc.compute_resource.resource_id == (
|
||||
unmatch_pod_name):
|
||||
vnfc.compute_resource.resource_id = (
|
||||
new_pod_name.pop()
|
||||
)
|
||||
vnfc.metadata["Pod"] = (
|
||||
jsonutils.dumps(
|
||||
pod_resources_from_k8s.get(
|
||||
(vnfc.compute_resource.resource_id)
|
||||
)
|
||||
)
|
||||
)
|
||||
break
|
||||
|
||||
def _calc_scale_level(self, context, vnf_instance,
|
||||
vdu_id, current_pod_num):
|
||||
"""calc scale_level and set"""
|
||||
vnfd = vnflcm_utils.get_vnfd_dict(context,
|
||||
vnf_instance.vnfd_id,
|
||||
vnf_instance.instantiated_vnf_info.flavour_id)
|
||||
|
||||
policies = vnfd.get("topology_template", {}).get("policies", {})
|
||||
for policy in policies:
|
||||
for v in policy.values():
|
||||
if (v.get("type") == (
|
||||
"tosca.policies.nfv.VduScalingAspectDeltas")):
|
||||
if vdu_id in v.get("targets", []):
|
||||
aspect_id = v.get("properties", {}).get("aspect")
|
||||
break
|
||||
step_deltas = (self._get_scale_value(policies, aspect_id))
|
||||
|
||||
for policy in policies:
|
||||
for v in policy.values():
|
||||
if (v.get("type") == (
|
||||
"tosca.policies.nfv.VduScalingAspectDeltas")
|
||||
and step_deltas
|
||||
and vdu_id in v.get("targets", [])):
|
||||
delta = (v.get("properties", {})
|
||||
.get("deltas", {})
|
||||
.get(step_deltas, {})
|
||||
.get("number_of_instances"))
|
||||
elif (v.get("type") == (
|
||||
"tosca.policies.nfv.VduInitialDelta")
|
||||
and vdu_id in v.get("targets", [])):
|
||||
initial_delta = (v.get("properties", {})
|
||||
.get("initial_delta", {})
|
||||
.get("number_of_instances"))
|
||||
if (current_pod_num - initial_delta) % delta != 0:
|
||||
LOG.error("Error computing 'scale_level'. "
|
||||
f"current Pod num: {current_pod_num} delta: {delta}. "
|
||||
f"vnf: {vnf_instance.id} vdu: {vdu_id}")
|
||||
return False
|
||||
|
||||
scale_level = (current_pod_num - initial_delta) // delta
|
||||
|
||||
for inst_vnf in vnf_instance.instantiated_vnf_info.scale_status:
|
||||
if inst_vnf.aspect_id == aspect_id:
|
||||
inst_vnf.scale_level = scale_level
|
||||
break
|
||||
|
||||
return self._check_pod_range(vnf_instance, vnfd, vdu_id,
|
||||
current_pod_num)
|
||||
|
||||
def _get_scale_value(self, policies, aspect_id):
|
||||
"""get step_deltas from vnfd"""
|
||||
for policy in policies:
|
||||
for v in policy.values():
|
||||
if v.get("type") == "tosca.policies.nfv.ScalingAspects":
|
||||
step_deltas = (v.get("properties", {})
|
||||
.get("aspects", {})
|
||||
.get(aspect_id, {})
|
||||
.get("step_deltas", [])[0])
|
||||
break
|
||||
return step_deltas
|
||||
|
||||
def _check_pod_range(self, vnf_instance, vnfd, vdu_id,
|
||||
current_pod_num):
|
||||
"""Check the range of the maximum or minimum number of pods.
|
||||
|
||||
If it finds out of range, output a error log and do not update
|
||||
database.
|
||||
"""
|
||||
vdu_profile = (vnfd.get("topology_template", {})
|
||||
.get("node_templates", [])
|
||||
.get(vdu_id, {})
|
||||
.get("properties", {})
|
||||
.get("vdu_profile"))
|
||||
max_number_of_instances = vdu_profile.get("max_number_of_instances")
|
||||
min_number_of_instances = vdu_profile.get("min_number_of_instances")
|
||||
|
||||
if (current_pod_num > max_number_of_instances
|
||||
or current_pod_num < min_number_of_instances):
|
||||
LOG.error(f"Failed to update database vnf {vnf_instance.id} "
|
||||
f"vdu: {vdu_id}. Pod num is out of range. "
|
||||
f"pod_num: {current_pod_num}")
|
||||
return False
|
||||
return True
|
||||
|
||||
def _get_pod_information(self, resource_name,
|
||||
resource_type, vnf_instance, vim_connection_info):
|
||||
"""Extract a Pod starting with the specified 'resource_name' name"""
|
||||
namespace = vnf_instance.vnf_metadata.get('namespace')
|
||||
if not namespace:
|
||||
namespace = "default"
|
||||
auth_attr = vim_connection_info.access_info
|
||||
auth_cred, _ = self.get_auth_creds(auth_attr)
|
||||
|
||||
core_v1_api_client = self.kubernetes.get_core_v1_api_client(
|
||||
auth=auth_cred)
|
||||
resource_pods = {}
|
||||
all_pods = core_v1_api_client.list_namespaced_pod(
|
||||
namespace=namespace)
|
||||
|
||||
for pod in all_pods.items:
|
||||
if self.is_match_pod_naming_rule(resource_type,
|
||||
resource_name, pod.metadata.name):
|
||||
resource_pods[pod.metadata.name] = pod.metadata.to_dict()
|
||||
|
||||
return resource_pods
|
||||
|
||||
def _get_vdu_list(self, vnf_instance):
|
||||
"""get vdu_list"""
|
||||
vdu_id_list = set()
|
||||
|
||||
for vnfc in vnf_instance.instantiated_vnf_info.vnfc_resource_info:
|
||||
if vnfc.compute_resource.resource_id != "":
|
||||
vdu_id_list.add(vnfc.vdu_id)
|
||||
return vdu_id_list
|
||||
|
||||
def _get_resource_name(self, resource_id, resource_type):
|
||||
"""get resource name"""
|
||||
if resource_type == 'Pod':
|
||||
resource_name = resource_id
|
||||
else:
|
||||
name_list = resource_id.split("-")
|
||||
if resource_type == 'Deployment':
|
||||
del name_list[-2:]
|
||||
elif resource_type in ('ReplicaSet', 'DaemonSet', 'StatefulSet'):
|
||||
del name_list[-1]
|
||||
resource_name = '-'.join(name_list)
|
||||
|
||||
return resource_name
|
||||
|
Loading…
Reference in New Issue
Block a user