Refactor FT of sol-kubernetes job

As discussed at yoga-PTG [1], the sol-kubernetes job now has a high code
cloning rate and should be refactored for future maintenance.

This patch includes the following refactors:
* Move and refactor common functions to base.py that is newly created.
* Refactor test functions to simplify by using common functions.
* Change used VNF package folder name.
* Change VNF name and description appropriately.
* Remove unnecessary sleep.
* Fix some lint issues.

As a result, the cloning rate for code under the sol_kubernetes folder
decreased from 42.85% to 0%.

[1] https://etherpad.opendev.org/p/tacker-yoga-ptg#L140

Implements: blueprint reduce-ft-code-clones
Change-Id: I90115e6f1d3187a352fdd4c72a51c1a943446f59
This commit is contained in:
Ayumu Ueha 2021-12-28 02:45:50 +00:00
parent c6b0ca5fe3
commit 35aaba1aab
43 changed files with 934 additions and 2089 deletions

View File

@ -0,0 +1,31 @@
tosca_definitions_version: tosca_simple_yaml_1_2
description: Sample VNF
imports:
- etsi_nfv_sol001_common_types.yaml
- etsi_nfv_sol001_vnfd_types.yaml
- helloworld3_types.yaml
- helloworld3_df_simple.yaml
topology_template:
inputs:
selected_flavour:
type: string
description: VNF deployment flavour selected by the consumer. It is provided in the API
node_templates:
VNF:
type: company.provider.VNF
properties:
flavour_id: { get_input: selected_flavour }
descriptor_id: b1bb0ce7-ebca-4fa7-95ed-4840d70a1177
provider: Company
product_name: Sample VNF
software_version: '1.0'
descriptor_version: '1.0'
vnfm_info:
- Tacker
requirements:
#- virtual_link_external # mapped in lower-level templates
#- virtual_link_internal # mapped in lower-level templates

View File

@ -0,0 +1,53 @@
tosca_definitions_version: tosca_simple_yaml_1_2
description: VNF type definition
imports:
- etsi_nfv_sol001_common_types.yaml
- etsi_nfv_sol001_vnfd_types.yaml
node_types:
company.provider.VNF:
derived_from: tosca.nodes.nfv.VNF
properties:
descriptor_id:
type: string
constraints: [ valid_values: [ b1bb0ce7-ebca-4fa7-95ed-4840d70a1177 ] ]
default: b1bb0ce7-ebca-4fa7-95ed-4840d70a1177
descriptor_version:
type: string
constraints: [ valid_values: [ '1.0' ] ]
default: '1.0'
provider:
type: string
constraints: [ valid_values: [ 'Company' ] ]
default: 'Company'
product_name:
type: string
constraints: [ valid_values: [ 'Sample VNF' ] ]
default: 'Sample VNF'
software_version:
type: string
constraints: [ valid_values: [ '1.0' ] ]
default: '1.0'
vnfm_info:
type: list
entry_schema:
type: string
constraints: [ valid_values: [ Tacker ] ]
default: [ Tacker ]
flavour_id:
type: string
constraints: [ valid_values: [ simple ] ]
default: simple
flavour_description:
type: string
default: ""
requirements:
- virtual_link_external:
capability: tosca.capabilities.nfv.VirtualLinkable
- virtual_link_internal:
capability: tosca.capabilities.nfv.VirtualLinkable
interfaces:
Vnflcm:
type: tosca.interfaces.nfv.Vnflcm

View File

@ -0,0 +1,42 @@
tosca_definitions_version: tosca_simple_yaml_1_2
description: Simple deployment flavour for Sample VNF
imports:
- etsi_nfv_sol001_common_types.yaml
- etsi_nfv_sol001_vnfd_types.yaml
- helloworld3_types.yaml
topology_template:
inputs:
descriptor_id:
type: string
descriptor_version:
type: string
provider:
type: string
product_name:
type: string
software_version:
type: string
vnfm_info:
type: list
entry_schema:
type: string
flavour_id:
type: string
flavour_description:
type: string
substitution_mappings:
node_type: company.provider.VNF
properties:
flavour_id: simple
requirements:
virtual_link_external: []
node_templates:
VNF:
type: company.provider.VNF
properties:
flavour_description: A simple flavour

View File

@ -0,0 +1,31 @@
tosca_definitions_version: tosca_simple_yaml_1_2
description: Sample VNF
imports:
- etsi_nfv_sol001_common_types.yaml
- etsi_nfv_sol001_vnfd_types.yaml
- helloworld3_types.yaml
- helloworld3_df_simple.yaml
topology_template:
inputs:
selected_flavour:
type: string
description: VNF deployment flavour selected by the consumer. It is provided in the API
node_templates:
VNF:
type: company.provider.VNF
properties:
flavour_id: { get_input: selected_flavour }
descriptor_id: b1bb0ce7-ebca-4fa7-95ed-4840d70a1177
provider: Company
product_name: Sample VNF
software_version: '1.0'
descriptor_version: '1.0'
vnfm_info:
- Tacker
requirements:
#- virtual_link_external # mapped in lower-level templates
#- virtual_link_internal # mapped in lower-level templates

View File

@ -0,0 +1,53 @@
tosca_definitions_version: tosca_simple_yaml_1_2
description: VNF type definition
imports:
- etsi_nfv_sol001_common_types.yaml
- etsi_nfv_sol001_vnfd_types.yaml
node_types:
company.provider.VNF:
derived_from: tosca.nodes.nfv.VNF
properties:
descriptor_id:
type: string
constraints: [ valid_values: [ b1bb0ce7-ebca-4fa7-95ed-4840d70a1177 ] ]
default: b1bb0ce7-ebca-4fa7-95ed-4840d70a1177
descriptor_version:
type: string
constraints: [ valid_values: [ '1.0' ] ]
default: '1.0'
provider:
type: string
constraints: [ valid_values: [ 'Company' ] ]
default: 'Company'
product_name:
type: string
constraints: [ valid_values: [ 'Sample VNF' ] ]
default: 'Sample VNF'
software_version:
type: string
constraints: [ valid_values: [ '1.0' ] ]
default: '1.0'
vnfm_info:
type: list
entry_schema:
type: string
constraints: [ valid_values: [ Tacker ] ]
default: [ Tacker ]
flavour_id:
type: string
constraints: [ valid_values: [ simple ] ]
default: simple
flavour_description:
type: string
default: ""
requirements:
- virtual_link_external:
capability: tosca.capabilities.nfv.VirtualLinkable
- virtual_link_internal:
capability: tosca.capabilities.nfv.VirtualLinkable
interfaces:
Vnflcm:
type: tosca.interfaces.nfv.Vnflcm

View File

@ -0,0 +1,417 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import time
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
from sqlalchemy import desc
from sqlalchemy.orm import joinedload
from tacker.common import exceptions
from tacker import context
from tacker.db import api as db_api
from tacker.db.db_sqlalchemy import api
from tacker.db.db_sqlalchemy import models
from tacker.objects import fields
from tacker.objects import vnf_lcm_op_occs
from tacker.tests.functional import base
from tacker.tests import utils
VNF_PACKAGE_UPLOAD_TIMEOUT = 300
VNF_INSTANTIATE_TIMEOUT = 600
VNF_TERMINATE_TIMEOUT = 600
VNF_HEAL_SOL002_TIMEOUT = 600
VNF_HEAL_SOL003_TIMEOUT = 1200
VNF_SCALE_TIMEOUT = 600
RETRY_WAIT_TIME = 5
WAIT_TIMEOUT_ERR_MSG = ("Failed to %(action)s, process could not be completed"
" within %(timeout)s seconds")
class BaseVnfLcmKubernetesTest(base.BaseTackerTest):
@classmethod
def setUpClass(cls):
super(BaseVnfLcmKubernetesTest, cls).setUpClass()
cls.tacker_client = base.BaseTackerTest.tacker_http_client()
cls.base_vnf_package_url = "/vnfpkgm/v1/vnf_packages"
cls.base_vnf_instances_url = "/vnflcm/v1/vnf_instances"
cls.base_vnf_lcm_op_occs_url = "/vnflcm/v1/vnf_lcm_op_occs"
cls.vnf_package_ids = []
@classmethod
def tearDownClass(cls):
# Update vnf package operational state to DISABLED and delete
for package_id in cls.vnf_package_ids:
cls._disable_and_delete_vnf_package(package_id)
super(BaseVnfLcmKubernetesTest, cls).tearDownClass()
def setUp(self):
super(BaseVnfLcmKubernetesTest, self).setUp()
self.context = context.get_admin_context()
vim_list = self.client.list_vims()
if not vim_list:
self.skipTest("Vims are not configured")
vim_name = 'vim-kubernetes'
vim = self.get_vim(vim_list, vim_name)
if not vim:
self.skipTest(f"Kubernetes VIM '{vim_name}' is missing")
self.vim_id = vim['id']
def _create_and_upload_vnf_package(self, tacker_client, csar_package_name,
user_defined_data):
# create vnf package
body = jsonutils.dumps({"userDefinedData": user_defined_data})
_, vnf_package = tacker_client.do_request(
self.base_vnf_package_url, "POST", body=body)
vnf_pkg_id = vnf_package['id']
# upload vnf package
csar_package_path = ("../../../etc/samples/etsi/nfv/"
f"{csar_package_name}")
file_path = os.path.abspath(os.path.join(os.path.dirname(__file__),
csar_package_path))
# Generating unique vnfd id. This is required when multiple workers
# are running concurrently. The call below creates a new temporary
# CSAR with unique vnfd id.
file_path, _ = utils.create_csar_with_unique_vnfd_id(file_path)
with open(file_path, 'rb') as file_object:
tacker_client.do_request(
f"{self.base_vnf_package_url}/{vnf_pkg_id}/package_content",
"PUT", body=file_object, content_type='application/zip')
# wait for onboard
start_time = int(time.time())
show_url = os.path.join(self.base_vnf_package_url, vnf_pkg_id)
vnfd_id = None
while True:
_, body = tacker_client.do_request(show_url, "GET")
if body['onboardingState'] == "ONBOARDED":
vnfd_id = body['vnfdId']
break
if (int(time.time()) - start_time) > VNF_PACKAGE_UPLOAD_TIMEOUT:
raise Exception(WAIT_TIMEOUT_ERR_MSG %
{"action": "onboard vnf package",
"timeout": VNF_PACKAGE_UPLOAD_TIMEOUT})
time.sleep(RETRY_WAIT_TIME)
# remove temporarily created CSAR file
os.remove(file_path)
return vnf_package['id'], vnfd_id
@classmethod
def _disable_and_delete_vnf_package(cls, package_id):
# Update vnf package operational state to DISABLED
update_req_body = jsonutils.dumps({
"operationalState": "DISABLED"})
cls.tacker_client.do_request(
f'{cls.base_vnf_package_url}/{package_id}',
"PATCH", content_type='application/json',
body=update_req_body)
# Delete vnf package
url = f'{cls.base_vnf_package_url}/{package_id}'
cls.tacker_client.do_request(url, "DELETE")
@classmethod
def _instantiate_vnf_instance_request(
cls, flavour_id, vim_id=None, additional_param=None):
request_body = {"flavourId": flavour_id}
if vim_id:
request_body["vimConnectionInfo"] = [
{"id": uuidutils.generate_uuid(),
"vimId": vim_id,
"vimType": "kubernetes"}]
if additional_param:
request_body["additionalParams"] = additional_param
return request_body
def _create_vnf_instance(self, vnfd_id, vnf_instance_name=None,
vnf_instance_description=None):
request_body = {'vnfdId': vnfd_id}
if vnf_instance_name:
request_body['vnfInstanceName'] = vnf_instance_name
if vnf_instance_description:
request_body['vnfInstanceDescription'] = vnf_instance_description
resp, response_body = self.http_client.do_request(
self.base_vnf_instances_url, "POST",
body=jsonutils.dumps(request_body))
return resp, response_body
def _delete_wait_vnf_instance(self, id):
url = os.path.join(self.base_vnf_instances_url, id)
start_time = int(time.time())
while True:
resp, _ = self.tacker_client.do_request(url, "DELETE")
if 204 == resp.status_code:
break
if (int(time.time()) - start_time) > VNF_TERMINATE_TIMEOUT:
raise Exception(WAIT_TIMEOUT_ERR_MSG %
{"action": "delete vnf instance",
"timeout": VNF_TERMINATE_TIMEOUT})
time.sleep(RETRY_WAIT_TIME)
def _show_vnf_instance(self, id):
show_url = os.path.join(self.base_vnf_instances_url, id)
_, vnf_instance = self.tacker_client.do_request(show_url, "GET")
return vnf_instance
def _vnf_instance_wait(
self, id,
instantiation_state=fields.VnfInstanceState.INSTANTIATED,
timeout=VNF_INSTANTIATE_TIMEOUT):
show_url = os.path.join(self.base_vnf_instances_url, id)
start_time = int(time.time())
while True:
_, body = self.tacker_client.do_request(show_url, "GET")
if body['instantiationState'] == instantiation_state:
break
if (int(time.time()) - start_time) > timeout:
raise Exception(WAIT_TIMEOUT_ERR_MSG %
{"action": "wait vnf instance", "timeout": timeout})
time.sleep(RETRY_WAIT_TIME)
def _instantiate_vnf_instance(self, id, request_body,
wait_state="COMPLETED"):
url = os.path.join(self.base_vnf_instances_url, id, "instantiate")
resp, _ = self.http_client.do_request(
url, "POST", body=jsonutils.dumps(request_body))
self.assertEqual(202, resp.status_code)
if wait_state == "COMPLETED":
self._vnf_instance_wait(id)
# wait vnflcm_op_occs.operation_state become wait_state
self._wait_vnflcm_op_occs(self.context, id, VNF_INSTANTIATE_TIMEOUT,
wait_state)
def _create_and_instantiate_vnf_instance(self, vnfd_id, flavour_id,
inst_name, inst_desc,
additional_params):
# create vnf instance
_, vnf_instance = self._create_vnf_instance(
vnfd_id, vnf_instance_name=inst_name,
vnf_instance_description=inst_desc)
# instantiate vnf instance
additional_param = additional_params
request_body = self._instantiate_vnf_instance_request(
flavour_id, vim_id=self.vim_id, additional_param=additional_param)
self._instantiate_vnf_instance(vnf_instance['id'], request_body)
vnf_instance = self._show_vnf_instance(vnf_instance['id'])
return vnf_instance
def _terminate_vnf_instance(self, id, request_body=None):
if request_body is None:
# Terminate vnf forcefully
request_body = {
"terminationType": fields.VnfInstanceTerminationType.FORCEFUL,
}
url = os.path.join(self.base_vnf_instances_url, id, "terminate")
resp, _ = self.http_client.do_request(
url, "POST", body=jsonutils.dumps(request_body))
self.assertEqual(202, resp.status_code)
timeout = request_body.get('gracefulTerminationTimeout', None)
start_time = int(time.time())
self._vnf_instance_wait(
id, instantiation_state=fields.VnfInstanceState.NOT_INSTANTIATED,
timeout=VNF_TERMINATE_TIMEOUT)
# If gracefulTerminationTimeout is set, check whether vnf
# instantiation_state is set to NOT_INSTANTIATED after
# gracefulTerminationTimeout seconds.
if timeout and int(time.time()) - start_time < timeout:
self.fail("Vnf is terminated before graceful termination "
"timeout period")
def _delete_vnf_instance(self, id):
self._delete_wait_vnf_instance(id)
# verify vnf instance is deleted
url = os.path.join(self.base_vnf_instances_url, id)
resp, _ = self.http_client.do_request(url, "GET")
self.assertEqual(404, resp.status_code)
def _scale_vnf_instance(self, id, type, aspect_id,
number_of_steps=1):
url = os.path.join(self.base_vnf_instances_url, id, "scale")
# generate body
request_body = {
"type": type,
"aspectId": aspect_id,
"numberOfSteps": number_of_steps}
resp, _ = self.http_client.do_request(
url, "POST", body=jsonutils.dumps(request_body))
self.assertEqual(202, resp.status_code)
def _heal_vnf_instance(self, id, vnfc_instance_id):
url = os.path.join(self.base_vnf_instances_url, id, "heal")
# generate body
request_body = {
"vnfcInstanceId": vnfc_instance_id}
resp, _ = self.http_client.do_request(
url, "POST", body=jsonutils.dumps(request_body))
self.assertEqual(202, resp.status_code)
@classmethod
@db_api.context_manager.reader
def _get_vnflcm_op_occs_by_id(cls, context, vnf_instance_id,
columns_to_join=None):
query = api.model_query(
context, models.VnfLcmOpOccs,
read_deleted="no", project_only=True).filter_by(
vnf_instance_id=vnf_instance_id).order_by(
desc("created_at"))
if columns_to_join:
for column in columns_to_join:
query = query.options(joinedload(column))
db_vnflcm_op_occ = query.first()
if not db_vnflcm_op_occ:
raise exceptions.VnfInstanceNotFound(id=vnf_instance_id)
vnflcm_op_occ = vnf_lcm_op_occs.VnfLcmOpOcc.obj_from_db_obj(
context, db_vnflcm_op_occ)
return vnflcm_op_occ
def _wait_vnflcm_op_occs(
self, context, vnf_instance_id, timeout,
operation_state='COMPLETED'):
start_time = int(time.time())
while True:
vnflcm_op_occ = self._get_vnflcm_op_occs_by_id(
context, vnf_instance_id)
if vnflcm_op_occ.operation_state == operation_state:
break
if (int(time.time()) - start_time) > timeout:
raise Exception("Timeout waiting for transition to"
f" {operation_state} state.")
time.sleep(RETRY_WAIT_TIME)
@classmethod
def _get_vnfc_resource_info(cls, vnf_instance):
inst_vnf_info = vnf_instance['instantiatedVnfInfo']
vnfc_resource_info = inst_vnf_info['vnfcResourceInfo']
return vnfc_resource_info
def _get_scale_level_by_aspect_id(self, vnf_instance, aspect_id):
scale_status = vnf_instance['instantiatedVnfInfo']['scaleStatus']
self.assertTrue(len(scale_status) > 0)
for status in scale_status:
self.assertIsNotNone(status.get('aspectId'))
self.assertIsNotNone(status.get('scaleLevel'))
if status.get('aspectId') == aspect_id:
scale_level = status.get('scaleLevel')
break
else:
raise Exception(f"aspectId {aspect_id} is not found.")
return scale_level
def _test_scale(self, id, type, aspect_id, previous_level,
number_of_steps=1, error=False):
# scale operation
self._scale_vnf_instance(id, type, aspect_id, number_of_steps)
wait_state = "COMPLETED"
if error:
expected_level = previous_level
wait_state = "FAILED_TEMP"
elif type == 'SCALE_OUT':
expected_level = previous_level + number_of_steps
else:
expected_level = previous_level - number_of_steps
# wait vnflcm_op_occs.operation_state become COMPLETE/FAILED_TEMP
self._wait_vnflcm_op_occs(
self.context, id, VNF_SCALE_TIMEOUT, wait_state)
# check scaleStatus after scale operation
vnf_instance = self._show_vnf_instance(id)
scale_level = self._get_scale_level_by_aspect_id(
vnf_instance, aspect_id)
self.assertEqual(scale_level, expected_level)
return scale_level
def _test_heal(self, vnf_instance, vnfc_instance_id):
before_vnfc_rscs = self._get_vnfc_resource_info(vnf_instance)
self._heal_vnf_instance(vnf_instance['id'], vnfc_instance_id)
# wait vnflcm_op_occs.operation_state become COMPLETE
if vnfc_instance_id:
timeout = VNF_HEAL_SOL002_TIMEOUT
else:
timeout = VNF_HEAL_SOL003_TIMEOUT
self._wait_vnflcm_op_occs(self.context, vnf_instance['id'], timeout)
# check vnfcResourceInfo after heal operation
vnf_instance = self._show_vnf_instance(vnf_instance['id'])
after_vnfc_rscs = self._get_vnfc_resource_info(vnf_instance)
self.assertEqual(len(before_vnfc_rscs), len(after_vnfc_rscs))
return after_vnfc_rscs
def _rollback_vnf_instance(self, vnf_lcm_op_occ_id):
url = os.path.join(
self.base_vnf_lcm_op_occs_url, vnf_lcm_op_occ_id, "rollback")
# generate body
resp, _ = self.http_client.do_request(url, "POST")
self.assertEqual(202, resp.status_code)
def _test_rollback_cnf_instantiate(self, id):
# get vnflcm_op_occ id for rollback
vnflcm_op_occ = self._get_vnflcm_op_occs_by_id(
self.context, id)
vnf_lcm_op_occ_id = vnflcm_op_occ.id
# rollback operation
self._rollback_vnf_instance(vnf_lcm_op_occ_id)
# wait vnflcm_op_occs.operation_state become ROLLED_BACK
self._wait_vnflcm_op_occs(self.context, id,
VNF_TERMINATE_TIMEOUT, "ROLLED_BACK")
def _test_rollback_cnf_scale(self, id, aspect_id, previous_level):
# get vnflcm_op_occ id for rollback
vnflcm_op_occ = self._get_vnflcm_op_occs_by_id(self.context, id)
vnf_lcm_op_occ_id = vnflcm_op_occ.id
# rollback operation
self._rollback_vnf_instance(vnf_lcm_op_occ_id)
# wait vnflcm_op_occs.operation_state become ROLLED_BACK
self._wait_vnflcm_op_occs(self.context, id, VNF_SCALE_TIMEOUT,
"ROLLED_BACK")
# check scaleStatus after scale operation
vnf_instance = self._show_vnf_instance(id)
expected_level = previous_level
scale_level = self._get_scale_level_by_aspect_id(
vnf_instance, aspect_id)
self.assertEqual(scale_level, expected_level)

View File

@ -13,293 +13,24 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
import time
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
from sqlalchemy import desc
from sqlalchemy.orm import joinedload
from tacker.common import exceptions
from tacker import context
from tacker.db import api as db_api
from tacker.db.db_sqlalchemy import api
from tacker.db.db_sqlalchemy import models
from tacker.objects import fields
from tacker.objects import vnf_lcm_op_occs
from tacker.tests.functional import base
from tacker.tests import utils
VNF_PACKAGE_UPLOAD_TIMEOUT = 300
VNF_INSTANTIATE_TIMEOUT = 600
VNF_TERMINATE_TIMEOUT = 600
VNF_HEAL_TIMEOUT = 600
RETRY_WAIT_TIME = 5
from tacker.tests.functional.sol_kubernetes.vnflcm import base as vnflcm_base
def _create_and_upload_vnf_package(tacker_client, csar_package_name,
user_defined_data):
# create vnf package
body = jsonutils.dumps({"userDefinedData": user_defined_data})
resp, vnf_package = tacker_client.do_request(
'/vnfpkgm/v1/vnf_packages', "POST", body=body)
# upload vnf package
csar_package_path = "../../../etc/samples/etsi/nfv/%s" % csar_package_name
file_path = os.path.abspath(os.path.join(os.path.dirname(__file__),
csar_package_path))
# Generating unique vnfd id. This is required when multiple workers
# are running concurrently. The call below creates a new temporary
# CSAR with unique vnfd id.
file_path, uniqueid = utils.create_csar_with_unique_vnfd_id(file_path)
with open(file_path, 'rb') as file_object:
resp, resp_body = tacker_client.do_request(
'/vnfpkgm/v1/vnf_packages/{id}/package_content'.format(
id=vnf_package['id']),
"PUT", body=file_object, content_type='application/zip')
# wait for onboard
start_time = int(time.time())
show_url = os.path.join('/vnfpkgm/v1/vnf_packages', vnf_package['id'])
vnfd_id = None
while True:
resp, body = tacker_client.do_request(show_url, "GET")
if body['onboardingState'] == "ONBOARDED":
vnfd_id = body['vnfdId']
break
if ((int(time.time()) - start_time) > VNF_PACKAGE_UPLOAD_TIMEOUT):
raise Exception("Failed to onboard vnf package, process could not"
" be completed within %d seconds", VNF_PACKAGE_UPLOAD_TIMEOUT)
time.sleep(RETRY_WAIT_TIME)
# remove temporarily created CSAR file
os.remove(file_path)
return vnf_package['id'], vnfd_id
def _delete_wait_vnf_instance(tacker_client, id):
url = os.path.join("/vnflcm/v1/vnf_instances", id)
start_time = int(time.time())
while True:
resp, body = tacker_client.do_request(url, "DELETE")
if 204 == resp.status_code:
break
if ((int(time.time()) - start_time) > VNF_TERMINATE_TIMEOUT):
raise Exception("Failed to delete vnf instance, process could not"
" be completed within %d seconds", VNF_TERMINATE_TIMEOUT)
time.sleep(RETRY_WAIT_TIME)
def _show_vnf_instance(tacker_client, id):
show_url = os.path.join("/vnflcm/v1/vnf_instances", id)
resp, vnf_instance = tacker_client.do_request(show_url, "GET")
return vnf_instance
def _vnf_instance_wait(
tacker_client, id,
instantiation_state=fields.VnfInstanceState.INSTANTIATED,
timeout=VNF_INSTANTIATE_TIMEOUT):
show_url = os.path.join("/vnflcm/v1/vnf_instances", id)
start_time = int(time.time())
while True:
resp, body = tacker_client.do_request(show_url, "GET")
if body['instantiationState'] == instantiation_state:
break
if ((int(time.time()) - start_time) > timeout):
raise Exception("Failed to wait vnf instance, process could not"
" be completed within %d seconds", timeout)
time.sleep(RETRY_WAIT_TIME)
class VnfLcmKubernetesHealTest(base.BaseTackerTest):
class VnfLcmKubernetesHealTest(vnflcm_base.BaseVnfLcmKubernetesTest):
@classmethod
def setUpClass(cls):
cls.tacker_client = base.BaseTackerTest.tacker_http_client()
cls.vnf_package_resource, cls.vnfd_id_resource = \
_create_and_upload_vnf_package(
cls.tacker_client, "test_cnf_heal",
{"key": "sample_heal_functional"})
cls.vnf_instance_ids = []
super(VnfLcmKubernetesHealTest, cls).setUpClass()
vnf_package_id, cls.vnfd_id = \
cls._create_and_upload_vnf_package(
cls, cls.tacker_client, "test_cnf_heal",
{"key": "sample_heal_functional"})
cls.vnf_package_ids.append(vnf_package_id)
@classmethod
def tearDownClass(cls):
# Update vnf package operational state to DISABLED
update_req_body = jsonutils.dumps({
"operationalState": "DISABLED"})
base_path = "/vnfpkgm/v1/vnf_packages"
for package_id in [cls.vnf_package_resource]:
resp, resp_body = cls.tacker_client.do_request(
'{base_path}/{id}'.format(id=package_id,
base_path=base_path),
"PATCH", content_type='application/json',
body=update_req_body)
# Delete vnf package
url = '/vnfpkgm/v1/vnf_packages/%s' % package_id
cls.tacker_client.do_request(url, "DELETE")
super(VnfLcmKubernetesHealTest, cls).tearDownClass()
def setUp(self):
super(VnfLcmKubernetesHealTest, self).setUp()
self.base_vnf_instances_url = "/vnflcm/v1/vnf_instances"
self.base_vnf_lcm_op_occs_url = "/vnflcm/v1/vnf_lcm_op_occs"
self.context = context.get_admin_context()
vim_list = self.client.list_vims()
if not vim_list:
self.skipTest("Vims are not configured")
vim_id = 'vim-kubernetes'
vim = self.get_vim(vim_list, vim_id)
if not vim:
self.skipTest("Kubernetes VIM '%s' is missing" % vim_id)
self.vim_id = vim['id']
def _instantiate_vnf_instance_request(
self, flavour_id, vim_id=None, additional_param=None):
request_body = {"flavourId": flavour_id}
if vim_id:
request_body["vimConnectionInfo"] = [
{"id": uuidutils.generate_uuid(),
"vimId": vim_id,
"vimType": "kubernetes"}]
if additional_param:
request_body["additionalParams"] = additional_param
return request_body
def _create_vnf_instance(self, vnfd_id, vnf_instance_name=None,
vnf_instance_description=None):
request_body = {'vnfdId': vnfd_id}
if vnf_instance_name:
request_body['vnfInstanceName'] = vnf_instance_name
if vnf_instance_description:
request_body['vnfInstanceDescription'] = vnf_instance_description
resp, response_body = self.http_client.do_request(
self.base_vnf_instances_url, "POST",
body=jsonutils.dumps(request_body))
return resp, response_body
def _instantiate_vnf_instance(self, id, request_body):
url = os.path.join(self.base_vnf_instances_url, id, "instantiate")
resp, body = self.http_client.do_request(
url, "POST", body=jsonutils.dumps(request_body))
self.assertEqual(202, resp.status_code)
_vnf_instance_wait(self.tacker_client, id)
def _create_and_instantiate_vnf_instance(self, flavour_id,
additional_params):
# create vnf instance
vnf_instance_name = "test_vnf_instance_for_cnf_heal-%s" % \
uuidutils.generate_uuid()
vnf_instance_description = "vnf instance for cnf heal testing"
resp, vnf_instance = self._create_vnf_instance(
self.vnfd_id_resource, vnf_instance_name=vnf_instance_name,
vnf_instance_description=vnf_instance_description)
# instantiate vnf instance
additional_param = additional_params
request_body = self._instantiate_vnf_instance_request(
flavour_id, vim_id=self.vim_id, additional_param=additional_param)
self._instantiate_vnf_instance(vnf_instance['id'], request_body)
vnf_instance = _show_vnf_instance(
self.tacker_client, vnf_instance['id'])
self.vnf_instance_ids.append(vnf_instance['id'])
return vnf_instance
def _terminate_vnf_instance(self, id):
# Terminate vnf forcefully
request_body = {
"terminationType": fields.VnfInstanceTerminationType.FORCEFUL,
}
url = os.path.join(self.base_vnf_instances_url, id, "terminate")
resp, body = self.http_client.do_request(
url, "POST", body=jsonutils.dumps(request_body))
self.assertEqual(202, resp.status_code)
_vnf_instance_wait(
self.tacker_client, id,
instantiation_state=fields.VnfInstanceState.NOT_INSTANTIATED,
timeout=VNF_TERMINATE_TIMEOUT)
def _delete_vnf_instance(self, id):
_delete_wait_vnf_instance(self.tacker_client, id)
# verify vnf instance is deleted
url = os.path.join(self.base_vnf_instances_url, id)
resp, body = self.http_client.do_request(url, "GET")
self.assertEqual(404, resp.status_code)
def _heal_vnf_instance(self, id, vnfc_instance_id):
url = os.path.join(self.base_vnf_instances_url, id, "heal")
# generate body
request_body = {
"vnfcInstanceId": vnfc_instance_id}
resp, body = self.http_client.do_request(
url, "POST", body=jsonutils.dumps(request_body))
self.assertEqual(202, resp.status_code)
@db_api.context_manager.reader
def _vnf_notify_get_by_id(self, context, vnf_instance_id,
columns_to_join=None):
query = api.model_query(
context, models.VnfLcmOpOccs,
read_deleted="no", project_only=True).filter_by(
vnf_instance_id=vnf_instance_id).order_by(
desc("created_at"))
if columns_to_join:
for column in columns_to_join:
query = query.options(joinedload(column))
db_vnflcm_op_occ = query.first()
if not db_vnflcm_op_occ:
raise exceptions.VnfInstanceNotFound(id=vnf_instance_id)
vnflcm_op_occ = vnf_lcm_op_occs.VnfLcmOpOcc.obj_from_db_obj(
context, db_vnflcm_op_occ)
return vnflcm_op_occ
def _wait_vnflcm_op_occs(
self, context, vnf_instance_id,
operation_state='COMPLETED'):
start_time = int(time.time())
while True:
vnflcm_op_occ = self._vnf_notify_get_by_id(
context, vnf_instance_id)
if vnflcm_op_occ.operation_state == operation_state:
break
if ((int(time.time()) - start_time) > VNF_HEAL_TIMEOUT):
raise Exception("Failed to wait heal instance")
time.sleep(RETRY_WAIT_TIME)
def _get_vnfc_resource_info(self, vnf_instance):
inst_vnf_info = vnf_instance['instantiatedVnfInfo']
vnfc_resource_info = inst_vnf_info['vnfcResourceInfo']
return vnfc_resource_info
def test_heal_cnf_with_sol002(self):
"""Test heal as per SOL002 for CNF
@ -307,13 +38,16 @@ class VnfLcmKubernetesHealTest(base.BaseTackerTest):
i.e. with vnfcInstanceId, so that the specified vnfc instance is healed
which includes Kubernetes resources (Pod and Deployment).
"""
vnf_instance_name = "cnf_heal_with_sol002"
vnf_instance_description = "cnf heal with sol002"
# use def-files of singleton Pod and Deployment (replicas=2)
inst_additional_param = {
"lcm-kubernetes-def-files": [
"Files/kubernetes/deployment_heal_complex.yaml",
"Files/kubernetes/pod_heal.yaml"]}
vnf_instance = self._create_and_instantiate_vnf_instance(
"complex", inst_additional_param)
self.vnfd_id, "complex", vnf_instance_name,
vnf_instance_description, inst_additional_param)
before_vnfc_rscs = self._get_vnfc_resource_info(vnf_instance)
# get vnfc_instance_id of heal target
@ -334,14 +68,7 @@ class VnfLcmKubernetesHealTest(base.BaseTackerTest):
# test heal SOL-002 (partial heal)
vnfc_instance_id = \
[pod_target_vnfc['id'], deployment_target_vnfc['id']]
self._heal_vnf_instance(vnf_instance['id'], vnfc_instance_id)
# wait vnflcm_op_occs.operation_state become COMPLETE
self._wait_vnflcm_op_occs(self.context, vnf_instance['id'])
# check vnfcResourceInfo after heal operation
vnf_instance = _show_vnf_instance(
self.tacker_client, vnf_instance['id'])
after_vnfc_rscs = self._get_vnfc_resource_info(vnf_instance)
self.assertEqual(len(before_vnfc_rscs), len(after_vnfc_rscs))
after_vnfc_rscs = self._test_heal(vnf_instance, vnfc_instance_id)
for vnfc_rsc in after_vnfc_rscs:
after_pod_name = vnfc_rsc['computeResource']['resourceId']
if vnfc_rsc['id'] == pod_target_vnfc['id']:
@ -372,23 +99,20 @@ class VnfLcmKubernetesHealTest(base.BaseTackerTest):
i.e. without passing vnfcInstanceId, so that the entire vnf is healed
which includes Kubernetes resource (Deployment).
"""
vnf_instance_name = "cnf_heal_with_sol003"
vnf_instance_description = "cnf heal with sol003"
# use def-files of Deployment (replicas=2)
inst_additional_param = {
"lcm-kubernetes-def-files": [
"Files/kubernetes/deployment_heal_simple.yaml"]}
vnf_instance = self._create_and_instantiate_vnf_instance(
"simple", inst_additional_param)
self.vnfd_id, "simple", vnf_instance_name,
vnf_instance_description, inst_additional_param)
before_vnfc_rscs = self._get_vnfc_resource_info(vnf_instance)
# test heal SOL-003 (entire heal)
vnfc_instance_id = []
self._heal_vnf_instance(vnf_instance['id'], vnfc_instance_id)
# wait vnflcm_op_occs.operation_state become COMPLETE
self._wait_vnflcm_op_occs(self.context, vnf_instance['id'])
# check vnfcResourceInfo after heal operation
vnf_instance = _show_vnf_instance(
self.tacker_client, vnf_instance['id'])
after_vnfc_rscs = self._get_vnfc_resource_info(vnf_instance)
after_vnfc_rscs = self._test_heal(vnf_instance, vnfc_instance_id)
self.assertEqual(len(before_vnfc_rscs), len(after_vnfc_rscs))
# check id and pod name (as computeResource.resourceId) is changed
for before_vnfc_rsc in before_vnfc_rscs:

View File

@ -12,347 +12,36 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
import time
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
from sqlalchemy import desc
from sqlalchemy.orm import joinedload
from tacker.common import exceptions
from tacker import context
from tacker.db import api as db_api
from tacker.db.db_sqlalchemy import api
from tacker.db.db_sqlalchemy import models
from tacker.objects import fields
from tacker.objects import vnf_lcm_op_occs
from tacker.tests.functional import base
from tacker.tests import utils
VNF_PACKAGE_UPLOAD_TIMEOUT = 300
VNF_INSTANTIATE_TIMEOUT = 600
VNF_TERMINATE_TIMEOUT = 600
VNF_HEAL_SOL002_TIMEOUT = 600
VNF_HEAL_SOL003_TIMEOUT = 1200
VNF_SCALE_TIMEOUT = 600
RETRY_WAIT_TIME = 5
from tacker.tests.functional.sol_kubernetes.vnflcm import base as vnflcm_base
def _create_and_upload_vnf_package(tacker_client, csar_package_name,
user_defined_data):
# create vnf package
body = jsonutils.dumps({"userDefinedData": user_defined_data})
resp, vnf_package = tacker_client.do_request(
'/vnfpkgm/v1/vnf_packages', "POST", body=body)
# upload vnf package
csar_package_path = "../../../etc/samples/etsi/nfv/{}".format(
csar_package_name)
file_path = os.path.abspath(os.path.join(os.path.dirname(__file__),
csar_package_path))
# Generating unique vnfd id. This is required when multiple workers
# are running concurrently. The call below creates a new temporary
# CSAR with unique vnfd id.
file_path, uniqueid = utils.create_csar_with_unique_vnfd_id(file_path)
with open(file_path, 'rb') as file_object:
resp, resp_body = tacker_client.do_request(
'/vnfpkgm/v1/vnf_packages/{}/package_content'.format(
vnf_package['id']),
"PUT", body=file_object, content_type='application/zip')
# wait for onboard
start_time = int(time.time())
show_url = os.path.join('/vnfpkgm/v1/vnf_packages', vnf_package['id'])
vnfd_id = None
while True:
resp, body = tacker_client.do_request(show_url, "GET")
if body['onboardingState'] == "ONBOARDED":
vnfd_id = body['vnfdId']
break
if ((int(time.time()) - start_time) > VNF_PACKAGE_UPLOAD_TIMEOUT):
raise Exception("Failed to onboard vnf package, process could not"
" be completed within {} seconds".format(
VNF_PACKAGE_UPLOAD_TIMEOUT))
time.sleep(RETRY_WAIT_TIME)
# remove temporarily created CSAR file
os.remove(file_path)
return vnf_package['id'], vnfd_id
class VnfLcmKubernetesHelmTest(base.BaseTackerTest):
class VnfLcmKubernetesHelmTest(vnflcm_base.BaseVnfLcmKubernetesTest):
@classmethod
def setUpClass(cls):
cls.tacker_client = base.BaseTackerTest.tacker_http_client()
cls.vnf_package_resource, cls.vnfd_id_resource = \
_create_and_upload_vnf_package(
cls.tacker_client, "test_cnf_helmchart",
{"key": "sample_helmchart_functional"})
cls.vnf_instance_ids = []
super(VnfLcmKubernetesHelmTest, cls).setUpClass()
vnf_package_id, cls.vnfd_id = \
cls._create_and_upload_vnf_package(
cls, cls.tacker_client, "test_cnf_helmchart",
{"key": "sample_helmchart_functional"})
cls.vnf_package_ids.append(vnf_package_id)
@classmethod
def tearDownClass(cls):
# Update vnf package operational state to DISABLED
update_req_body = jsonutils.dumps({
"operationalState": "DISABLED"})
base_path = "/vnfpkgm/v1/vnf_packages"
for package_id in [cls.vnf_package_resource]:
resp, resp_body = cls.tacker_client.do_request(
'{base_path}/{id}'.format(id=package_id,
base_path=base_path),
"PATCH", content_type='application/json',
body=update_req_body)
# Delete vnf package
url = '/vnfpkgm/v1/vnf_packages/{}'.format(package_id)
cls.tacker_client.do_request(url, "DELETE")
super(VnfLcmKubernetesHelmTest, cls).tearDownClass()
def setUp(self):
super(VnfLcmKubernetesHelmTest, self).setUp()
self.base_vnf_instances_url = "/vnflcm/v1/vnf_instances"
self.base_vnf_lcm_op_occs_url = "/vnflcm/v1/vnf_lcm_op_occs"
self.context = context.get_admin_context()
vim_list = self.client.list_vims()
if not vim_list:
self.skipTest("Vims are not configured")
vim_id = 'vim-kubernetes'
vim = self.get_vim(vim_list, vim_id)
if not vim:
self.skipTest("Kubernetes VIM '{}' is missing".format(vim_id))
self.vim_id = vim['id']
def _instantiate_vnf_instance_request(
self, flavour_id, vim_id=None, additional_param=None):
request_body = {"flavourId": flavour_id}
if vim_id:
request_body["vimConnectionInfo"] = [
{"id": uuidutils.generate_uuid(),
"vimId": vim_id,
"vimType": "kubernetes"}]
if additional_param:
request_body["additionalParams"] = additional_param
return request_body
def _create_vnf_instance(self, vnfd_id, vnf_instance_name=None,
vnf_instance_description=None):
request_body = {'vnfdId': vnfd_id}
if vnf_instance_name:
request_body['vnfInstanceName'] = vnf_instance_name
if vnf_instance_description:
request_body['vnfInstanceDescription'] = vnf_instance_description
resp, response_body = self.http_client.do_request(
self.base_vnf_instances_url, "POST",
body=jsonutils.dumps(request_body))
return resp, response_body
def _delete_wait_vnf_instance(self, id):
url = os.path.join("/vnflcm/v1/vnf_instances", id)
start_time = int(time.time())
while True:
resp, body = self.tacker_client.do_request(url, "DELETE")
if 204 == resp.status_code:
break
if ((int(time.time()) - start_time) > VNF_TERMINATE_TIMEOUT):
raise Exception("Failed to delete vnf instance, process could"
" not be completed within {} seconds".format(
VNF_TERMINATE_TIMEOUT))
time.sleep(RETRY_WAIT_TIME)
def _show_vnf_instance(self, id):
show_url = os.path.join("/vnflcm/v1/vnf_instances", id)
resp, vnf_instance = self.tacker_client.do_request(show_url, "GET")
return vnf_instance
def _vnf_instance_wait(
self, id,
instantiation_state=fields.VnfInstanceState.INSTANTIATED,
timeout=VNF_INSTANTIATE_TIMEOUT):
show_url = os.path.join("/vnflcm/v1/vnf_instances", id)
start_time = int(time.time())
while True:
resp, body = self.tacker_client.do_request(show_url, "GET")
if body['instantiationState'] == instantiation_state:
break
if ((int(time.time()) - start_time) > timeout):
raise Exception("Failed to wait vnf instance, process could"
" not be completed within {} seconds".format(timeout))
time.sleep(RETRY_WAIT_TIME)
def _instantiate_vnf_instance(self, id, request_body):
url = os.path.join(self.base_vnf_instances_url, id, "instantiate")
resp, body = self.http_client.do_request(
url, "POST", body=jsonutils.dumps(request_body))
self.assertEqual(202, resp.status_code)
self._vnf_instance_wait(id)
def _create_and_instantiate_vnf_instance(self, flavour_id,
additional_params):
# create vnf instance
vnf_instance_name = "test_vnf_instance_for_cnf_heal-{}".format(
uuidutils.generate_uuid())
vnf_instance_description = "vnf instance for cnf heal testing"
resp, vnf_instance = self._create_vnf_instance(
self.vnfd_id_resource, vnf_instance_name=vnf_instance_name,
vnf_instance_description=vnf_instance_description)
# instantiate vnf instance
additional_param = additional_params
request_body = self._instantiate_vnf_instance_request(
flavour_id, vim_id=self.vim_id, additional_param=additional_param)
self._instantiate_vnf_instance(vnf_instance['id'], request_body)
vnf_instance = self._show_vnf_instance(vnf_instance['id'])
self.vnf_instance_ids.append(vnf_instance['id'])
return vnf_instance
def _terminate_vnf_instance(self, id):
# Terminate vnf forcefully
request_body = {
"terminationType": fields.VnfInstanceTerminationType.FORCEFUL,
}
url = os.path.join(self.base_vnf_instances_url, id, "terminate")
resp, body = self.http_client.do_request(
url, "POST", body=jsonutils.dumps(request_body))
self.assertEqual(202, resp.status_code)
self._vnf_instance_wait(
id,
instantiation_state=fields.VnfInstanceState.NOT_INSTANTIATED,
timeout=VNF_TERMINATE_TIMEOUT)
def _delete_vnf_instance(self, id):
self._delete_wait_vnf_instance(id)
# verify vnf instance is deleted
url = os.path.join(self.base_vnf_instances_url, id)
resp, body = self.http_client.do_request(url, "GET")
self.assertEqual(404, resp.status_code)
def _scale_vnf_instance(self, id, type, aspect_id,
number_of_steps=1):
url = os.path.join(self.base_vnf_instances_url, id, "scale")
# generate body
request_body = {
"type": type,
"aspectId": aspect_id,
"numberOfSteps": number_of_steps}
resp, body = self.http_client.do_request(
url, "POST", body=jsonutils.dumps(request_body))
self.assertEqual(202, resp.status_code)
def _heal_vnf_instance(self, id, vnfc_instance_id):
url = os.path.join(self.base_vnf_instances_url, id, "heal")
# generate body
request_body = {
"vnfcInstanceId": vnfc_instance_id}
resp, body = self.http_client.do_request(
url, "POST", body=jsonutils.dumps(request_body))
self.assertEqual(202, resp.status_code)
@db_api.context_manager.reader
def _vnf_notify_get_by_id(self, context, vnf_instance_id,
columns_to_join=None):
query = api.model_query(
context, models.VnfLcmOpOccs,
read_deleted="no", project_only=True).filter_by(
vnf_instance_id=vnf_instance_id).order_by(
desc("created_at"))
if columns_to_join:
for column in columns_to_join:
query = query.options(joinedload(column))
db_vnflcm_op_occ = query.first()
if not db_vnflcm_op_occ:
raise exceptions.VnfInstanceNotFound(id=vnf_instance_id)
vnflcm_op_occ = vnf_lcm_op_occs.VnfLcmOpOcc.obj_from_db_obj(
context, db_vnflcm_op_occ)
return vnflcm_op_occ
def _wait_vnflcm_op_occs(
self, context, vnf_instance_id, timeout,
operation_state='COMPLETED'):
start_time = int(time.time())
while True:
vnflcm_op_occ = self._vnf_notify_get_by_id(
context, vnf_instance_id)
if vnflcm_op_occ.operation_state == operation_state:
break
if ((int(time.time()) - start_time) > timeout):
raise Exception("Timeout waiting for transition to"
" {} state.".format(operation_state))
time.sleep(RETRY_WAIT_TIME)
def _get_vnfc_resource_info(self, vnf_instance):
inst_vnf_info = vnf_instance['instantiatedVnfInfo']
vnfc_resource_info = inst_vnf_info['vnfcResourceInfo']
return vnfc_resource_info
def _test_scale_cnf(self, vnf_instance):
def _test_scale_cnf(self, vnf_instance, aspect_id):
"""Test scale in/out CNF"""
def _test_scale(id, type, aspect_id, previous_level,
delta_num=1, number_of_steps=1):
# scale operation
self._scale_vnf_instance(id, type, aspect_id, number_of_steps)
# wait vnflcm_op_occs.operation_state become COMPLETE
self._wait_vnflcm_op_occs(self.context, id, VNF_SCALE_TIMEOUT)
# check scaleStatus after scale operation
vnf_instance = self._show_vnf_instance(id)
scale_status_after = \
vnf_instance['instantiatedVnfInfo']['scaleStatus']
if type == 'SCALE_OUT':
expected_level = previous_level + number_of_steps
else:
expected_level = previous_level - number_of_steps
for status in scale_status_after:
if status.get('aspectId') == aspect_id:
self.assertEqual(status.get('scaleLevel'), expected_level)
previous_level = status.get('scaleLevel')
return previous_level
aspect_id = "vdu1_aspect"
scale_status_initial = \
vnf_instance['instantiatedVnfInfo']['scaleStatus']
self.assertTrue(len(scale_status_initial) > 0)
for status in scale_status_initial:
self.assertIsNotNone(status.get('aspectId'))
self.assertIsNotNone(status.get('scaleLevel'))
if status.get('aspectId') == aspect_id:
previous_level = status.get('scaleLevel')
scale_level = self._get_scale_level_by_aspect_id(
vnf_instance, aspect_id)
# test scale out
previous_level = _test_scale(
vnf_instance['id'], 'SCALE_OUT', aspect_id, previous_level)
scale_level = self._test_scale(
vnf_instance['id'], 'SCALE_OUT', aspect_id, scale_level)
# test scale in
previous_level = _test_scale(
vnf_instance['id'], 'SCALE_IN', aspect_id, previous_level)
scale_level = self._test_scale(
vnf_instance['id'], 'SCALE_IN', aspect_id, scale_level)
def _test_heal_cnf_with_sol002(self, vnf_instance):
"""Test heal as per SOL002 for CNF"""
@ -360,8 +49,8 @@ class VnfLcmKubernetesHelmTest(base.BaseTackerTest):
before_vnfc_rscs = self._get_vnfc_resource_info(vnf_instance)
# get vnfc_instance_id of heal target
before_pod_name = dict()
vnfc_instance_id = list()
before_pod_name = {}
vnfc_instance_id = []
for vnfc_rsc in before_vnfc_rscs:
if vnfc_rsc['vduId'] == "vdu1":
before_pod_name['vdu1'] = \
@ -372,14 +61,7 @@ class VnfLcmKubernetesHelmTest(base.BaseTackerTest):
vnfc_instance_id.append(vnfc_rsc['id'])
# test heal SOL-002 (partial heal)
self._heal_vnf_instance(vnf_instance['id'], vnfc_instance_id)
# wait vnflcm_op_occs.operation_state become COMPLETE
self._wait_vnflcm_op_occs(self.context, vnf_instance['id'],
VNF_HEAL_SOL002_TIMEOUT)
# check vnfcResourceInfo after heal operation
vnf_instance = self._show_vnf_instance(vnf_instance['id'])
after_vnfc_rscs = self._get_vnfc_resource_info(vnf_instance)
self.assertEqual(len(before_vnfc_rscs), len(after_vnfc_rscs))
after_vnfc_rscs = self._test_heal(vnf_instance, vnfc_instance_id)
for vnfc_rsc in after_vnfc_rscs:
after_pod_name = vnfc_rsc['computeResource']['resourceId']
if vnfc_rsc['vduId'] == "vdu1":
@ -403,11 +85,10 @@ class VnfLcmKubernetesHelmTest(base.BaseTackerTest):
self._heal_vnf_instance(vnf_instance['id'], vnfc_instance_id)
# wait vnflcm_op_occs.operation_state become COMPLETE
self._wait_vnflcm_op_occs(self.context, vnf_instance['id'],
VNF_HEAL_SOL003_TIMEOUT)
vnflcm_base.VNF_HEAL_SOL003_TIMEOUT)
# check vnfcResourceInfo after heal operation
vnf_instance = self._show_vnf_instance(vnf_instance['id'])
after_vnfc_rscs = self._get_vnfc_resource_info(vnf_instance)
self.assertEqual(len(before_vnfc_rscs), len(after_vnfc_rscs))
after_vnfc_rscs = self._test_heal(vnf_instance, vnfc_instance_id)
# check id and pod name (as computeResource.resourceId) is changed
for before_vnfc_rsc in before_vnfc_rscs:
for after_vnfc_rsc in after_vnfc_rscs:
@ -418,7 +99,13 @@ class VnfLcmKubernetesHelmTest(base.BaseTackerTest):
after_vnfc_rsc['computeResource']['resourceId'])
def test_vnflcm_with_helmchart(self):
# use def-files of singleton Pod and Deployment (replicas=2)
"""Test LCM using Helm chart
This test will instantiate, scale, heal, terminate cnf by using
local and external Helm charts.
"""
vnf_instance_name = "cnf_with_helmchart"
vnf_instance_description = "cnf with helmchart"
helmchartfile_path = "Files/kubernetes/localhelm-0.1.0.tgz"
inst_additional_param = {
"namespace": "default",
@ -442,8 +129,10 @@ class VnfLcmKubernetesHelmTest(base.BaseTackerTest):
]
}
vnf_instance = self._create_and_instantiate_vnf_instance(
"helmchart", inst_additional_param)
self._test_scale_cnf(vnf_instance)
self.vnfd_id, "helmchart", vnf_instance_name,
vnf_instance_description, inst_additional_param)
self._test_scale_cnf(vnf_instance, aspect_id="vdu1_aspect")
self._test_heal_cnf_with_sol002(vnf_instance)
self._test_heal_cnf_with_sol003(vnf_instance)

View File

@ -12,473 +12,119 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import time
import unittest
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
from sqlalchemy import desc
from sqlalchemy.orm import joinedload
from tacker.common import exceptions
from tacker import context
from tacker.db import api as db_api
from tacker.db.db_sqlalchemy import api
from tacker.db.db_sqlalchemy import models
from tacker.objects import fields
from tacker.objects import vnf_lcm_op_occs
from tacker.tests.functional import base
from tacker.tests import utils
VNF_PACKAGE_UPLOAD_TIMEOUT = 300
VNF_INSTANTIATE_TIMEOUT = 600
VNF_TERMINATE_TIMEOUT = 600
VNF_SCALE_TIMEOUT = 600
RETRY_WAIT_TIME = 5
from tacker.tests.functional.sol_kubernetes.vnflcm import base as vnflcm_base
def _create_and_upload_vnf_package(tacker_client, csar_package_name,
user_defined_data):
# create vnf package
body = jsonutils.dumps({"userDefinedData": user_defined_data})
resp, vnf_package = tacker_client.do_request(
'/vnfpkgm/v1/vnf_packages', "POST", body=body)
# upload vnf package
csar_package_path = "../../../etc/samples/etsi/nfv/%s" % csar_package_name
file_path = os.path.abspath(os.path.join(os.path.dirname(__file__),
csar_package_path))
# Generating unique vnfd id. This is required when multiple workers
# are running concurrently. The call below creates a new temporary
# CSAR with unique vnfd id.
file_path, uniqueid = utils.create_csar_with_unique_vnfd_id(file_path)
with open(file_path, 'rb') as file_object:
resp, resp_body = tacker_client.do_request(
'/vnfpkgm/v1/vnf_packages/{id}/package_content'.format(
id=vnf_package['id']),
"PUT", body=file_object, content_type='application/zip')
# wait for onboard
timeout = VNF_PACKAGE_UPLOAD_TIMEOUT
start_time = int(time.time())
show_url = os.path.join('/vnfpkgm/v1/vnf_packages', vnf_package['id'])
vnfd_id = None
while True:
resp, body = tacker_client.do_request(show_url, "GET")
if body['onboardingState'] == "ONBOARDED":
vnfd_id = body['vnfdId']
break
if ((int(time.time()) - start_time) > timeout):
raise Exception("Failed to onboard vnf package")
time.sleep(1)
# remove temporarily created CSAR file
os.remove(file_path)
return vnf_package['id'], vnfd_id
def _delete_wait_vnf_instance(tacker_client, id):
timeout = VNF_TERMINATE_TIMEOUT
url = os.path.join("/vnflcm/v1/vnf_instances", id)
start_time = int(time.time())
while True:
resp, body = tacker_client.do_request(url, "DELETE")
if 204 == resp.status_code:
break
if ((int(time.time()) - start_time) > timeout):
raise Exception("Failed to delete vnf instance")
time.sleep(RETRY_WAIT_TIME)
def _delete_vnf_instance(tacker_client, id):
_delete_wait_vnf_instance(tacker_client, id)
# verify vnf instance is deleted
url = os.path.join("/vnflcm/v1/vnf_instances", id)
resp, body = tacker_client.do_request(url, "GET")
def _show_vnf_instance(tacker_client, id):
show_url = os.path.join("/vnflcm/v1/vnf_instances", id)
resp, vnf_instance = tacker_client.do_request(show_url, "GET")
return vnf_instance
def _vnf_instance_wait(
tacker_client, id,
instantiation_state=fields.VnfInstanceState.INSTANTIATED,
timeout=VNF_INSTANTIATE_TIMEOUT):
show_url = os.path.join("/vnflcm/v1/vnf_instances", id)
start_time = int(time.time())
while True:
resp, body = tacker_client.do_request(show_url, "GET")
if body['instantiationState'] == instantiation_state:
break
if ((int(time.time()) - start_time) > timeout):
raise Exception("Failed to wait vnf instance")
time.sleep(RETRY_WAIT_TIME)
def _terminate_vnf_instance(tacker_client, id, request_body):
url = os.path.join("/vnflcm/v1/vnf_instances", id, "terminate")
resp, body = tacker_client.do_request(
url, "POST", body=jsonutils.dumps(request_body))
timeout = request_body.get('gracefulTerminationTimeout')
start_time = int(time.time())
_vnf_instance_wait(
tacker_client, id,
instantiation_state=fields.VnfInstanceState.NOT_INSTANTIATED,
timeout=VNF_TERMINATE_TIMEOUT)
# If gracefulTerminationTimeout is set, check whether vnf
# instantiation_state is set to NOT_INSTANTIATED after
# gracefulTerminationTimeout seconds.
if timeout and int(time.time()) - start_time < timeout:
raise Exception("Failed to terminate vnf instance")
class VnfLcmKubernetesScaleTest(base.BaseTackerTest):
class VnfLcmKubernetesScaleTest(vnflcm_base.BaseVnfLcmKubernetesTest):
@classmethod
def setUpClass(cls):
cls.tacker_client = base.BaseTackerTest.tacker_http_client()
cls.vnf_package_resource, cls.vnfd_id_resource = \
_create_and_upload_vnf_package(
cls.tacker_client, "test_cnf_scale",
{"key": "sample_scale_functional"})
cls.vnf_instance_ids = []
super(VnfLcmKubernetesScaleTest, cls).setUpClass()
vnf_package_id, cls.vnfd_id = \
cls._create_and_upload_vnf_package(
cls, cls.tacker_client, "test_cnf_scale",
{"key": "sample_scale_functional"})
cls.vnf_package_ids.append(vnf_package_id)
@classmethod
def tearDownClass(cls):
# Terminate vnf forcefully
terminate_req_body = {
"terminationType": fields.VnfInstanceTerminationType.FORCEFUL,
}
for id in cls.vnf_instance_ids:
_terminate_vnf_instance(cls.tacker_client, id,
terminate_req_body)
_delete_vnf_instance(cls.tacker_client, id)
# Update vnf package operational state to DISABLED
update_req_body = jsonutils.dumps({
"operationalState": "DISABLED"})
base_path = "/vnfpkgm/v1/vnf_packages"
for package_id in [cls.vnf_package_resource]:
resp, resp_body = cls.tacker_client.do_request(
'{base_path}/{id}'.format(id=package_id,
base_path=base_path),
"PATCH", content_type='application/json',
body=update_req_body)
# Delete vnf package
url = '/vnfpkgm/v1/vnf_packages/%s' % package_id
cls.tacker_client.do_request(url, "DELETE")
super(VnfLcmKubernetesScaleTest, cls).tearDownClass()
def setUp(self):
super(VnfLcmKubernetesScaleTest, self).setUp()
self.base_vnf_instances_url = "/vnflcm/v1/vnf_instances"
self.base_vnf_lcm_op_occs_url = "/vnflcm/v1/vnf_lcm_op_occs"
self.context = context.get_admin_context()
vim_list = self.client.list_vims()
if not vim_list:
self.skipTest("Vims are not configured")
def _test_cnf_scale(self, vnf_instance, aspect_id,
number_of_steps=1, error=False):
scale_level = self._get_scale_level_by_aspect_id(
vnf_instance, aspect_id)
vim_id = 'vim-kubernetes'
vim = self.get_vim(vim_list, vim_id)
if not vim:
self.skipTest("Kubernetes VIM '%s' is missing" % vim_id)
self.vim_id = vim['id']
# test scale out
scale_level = self._test_scale(
vnf_instance['id'], 'SCALE_OUT', aspect_id, scale_level,
number_of_steps, error)
if error:
return scale_level
def _instantiate_vnf_instance_request(
self, flavour_id, vim_id=None, additional_param=None):
request_body = {"flavourId": flavour_id}
# test scale in
scale_level = self._test_scale(
vnf_instance['id'], 'SCALE_IN', aspect_id, scale_level,
number_of_steps)
if vim_id:
request_body["vimConnectionInfo"] = [
{"id": uuidutils.generate_uuid(),
"vimId": vim_id,
"vimType": "kubernetes"}]
if additional_param:
request_body["additionalParams"] = additional_param
return request_body
def _create_vnf_instance(self, vnfd_id, vnf_instance_name=None,
vnf_instance_description=None):
request_body = {'vnfdId': vnfd_id}
if vnf_instance_name:
request_body['vnfInstanceName'] = vnf_instance_name
if vnf_instance_description:
request_body['vnfInstanceDescription'] = vnf_instance_description
resp, response_body = self.http_client.do_request(
self.base_vnf_instances_url, "POST",
body=jsonutils.dumps(request_body))
return resp, response_body
def _instantiate_vnf_instance(self, id, request_body):
url = os.path.join(self.base_vnf_instances_url, id, "instantiate")
resp, body = self.http_client.do_request(
url, "POST", body=jsonutils.dumps(request_body))
self.assertEqual(202, resp.status_code)
_vnf_instance_wait(self.tacker_client, id)
def _create_and_instantiate_vnf_instance(self, flavour_id,
additional_params):
# create vnf instance
vnf_instance_name = "test_vnf_instance_for_cnf_scale-%s" % \
uuidutils.generate_uuid()
vnf_instance_description = "vnf instance for cnf scale testing"
resp, vnf_instance = self._create_vnf_instance(
self.vnfd_id_resource, vnf_instance_name=vnf_instance_name,
vnf_instance_description=vnf_instance_description)
# instantiate vnf instance
additional_param = additional_params
request_body = self._instantiate_vnf_instance_request(
flavour_id, vim_id=self.vim_id, additional_param=additional_param)
self._instantiate_vnf_instance(vnf_instance['id'], request_body)
vnf_instance = _show_vnf_instance(
self.tacker_client, vnf_instance['id'])
self.vnf_instance_ids.append(vnf_instance['id'])
return vnf_instance
def _scale_vnf_instance(self, id, type, aspect_id,
number_of_steps=1):
url = os.path.join(self.base_vnf_instances_url, id, "scale")
# generate body
request_body = {
"type": type,
"aspectId": aspect_id,
"numberOfSteps": number_of_steps}
resp, body = self.http_client.do_request(
url, "POST", body=jsonutils.dumps(request_body))
self.assertEqual(202, resp.status_code)
def _test_scale_cnf(self, id, type, aspect_id, previous_level,
delta_num=1, number_of_steps=1):
# scale operation
self._scale_vnf_instance(id, type, aspect_id, number_of_steps)
# wait vnflcm_op_occs.operation_state become COMPLETE
self._wait_vnflcm_op_occs(self.context, id)
# check scaleStatus after scale operation
vnf_instance = _show_vnf_instance(
self.tacker_client, id)
scale_status_after = \
vnf_instance['instantiatedVnfInfo']['scaleStatus']
if type == 'SCALE_OUT':
expected_level = previous_level + number_of_steps
else:
expected_level = previous_level - number_of_steps
for status in scale_status_after:
if status.get('aspectId') == aspect_id:
self.assertEqual(status.get('scaleLevel'), expected_level)
previous_level = status.get('scaleLevel')
return previous_level
def _test_scale_cnf_fail(self, id, type, aspect_id, previous_level,
delta_num=1, number_of_steps=1):
# scale operation
self._scale_vnf_instance(id, type, aspect_id, number_of_steps)
# wait vnflcm_op_occs.operation_state become FAILED_TEMP
self._wait_vnflcm_op_occs(self.context, id, "FAILED_TEMP")
# check scaleStatus after scale operation
vnf_instance = _show_vnf_instance(
self.tacker_client, id)
scale_status_after = \
vnf_instance['instantiatedVnfInfo']['scaleStatus']
expected_level = previous_level
for status in scale_status_after:
if status.get('aspectId') == aspect_id:
self.assertEqual(status.get('scaleLevel'), expected_level)
previous_level = status.get('scaleLevel')
return previous_level
def _rollback_vnf_instance(self, vnf_lcm_op_occ_id):
url = os.path.join(
self.base_vnf_lcm_op_occs_url, vnf_lcm_op_occ_id, "rollback")
# generate body
resp, body = self.http_client.do_request(url, "POST")
self.assertEqual(202, resp.status_code)
def _test_rollback_cnf(self, id, aspect_id, previous_level,
delta_num=1, number_of_steps=1):
# get vnflcm_op_occ id for rollback
vnflcm_op_occ = self._vnf_notify_get_by_id(self.context, id)
vnf_lcm_op_occ_id = vnflcm_op_occ.id
# rollback operation
self._rollback_vnf_instance(vnf_lcm_op_occ_id)
# wait vnflcm_op_occs.operation_state become ROLLED_BACK
self._wait_vnflcm_op_occs(self.context, id, "ROLLED_BACK")
# check scaleStatus after scale operation
vnf_instance = _show_vnf_instance(
self.tacker_client, id)
scale_status_after = \
vnf_instance['instantiatedVnfInfo']['scaleStatus']
expected_level = previous_level
for status in scale_status_after:
if status.get('aspectId') == aspect_id:
self.assertEqual(status.get('scaleLevel'), expected_level)
previous_level = status.get('scaleLevel')
@db_api.context_manager.reader
def _vnf_notify_get_by_id(self, context, vnf_instance_id,
columns_to_join=None):
query = api.model_query(
context, models.VnfLcmOpOccs,
read_deleted="no", project_only=True).filter_by(
vnf_instance_id=vnf_instance_id).order_by(
desc("created_at"))
if columns_to_join:
for column in columns_to_join:
query = query.options(joinedload(column))
db_vnflcm_op_occ = query.first()
if not db_vnflcm_op_occ:
raise exceptions.VnfInstanceNotFound(id=vnf_instance_id)
vnflcm_op_occ = vnf_lcm_op_occs.VnfLcmOpOcc.obj_from_db_obj(
context, db_vnflcm_op_occ)
return vnflcm_op_occ
def _wait_vnflcm_op_occs(
self, context, vnf_instance_id,
operation_state='COMPLETED'):
timeout = VNF_SCALE_TIMEOUT
start_time = int(time.time())
while True:
vnflcm_op_occ = self._vnf_notify_get_by_id(
context, vnf_instance_id)
if vnflcm_op_occ.operation_state == operation_state:
break
if ((int(time.time()) - start_time) > timeout):
raise Exception("Failed to wait scale instance")
time.sleep(RETRY_WAIT_TIME)
return scale_level
def test_scale_cnf_with_statefulset(self):
"""Test scale for CNF (StatefulSet)
This test will instantiate cnf with StatefulSet and scale replicas.
"""
vnf_instance_name = "cnf_scale_with_statefulset"
vnf_instance_description = "cnf scale with statefulset"
inst_additional_param = {
"lcm-kubernetes-def-files": [
"Files/kubernetes/statefulset_scale.yaml"]}
vnf_instance = self._create_and_instantiate_vnf_instance(
"simple", inst_additional_param)
aspect_id = "vdu1_aspect"
scale_status_initial = \
vnf_instance['instantiatedVnfInfo']['scaleStatus']
self.assertTrue(len(scale_status_initial) > 0)
for status in scale_status_initial:
self.assertIsNotNone(status.get('aspectId'))
self.assertIsNotNone(status.get('scaleLevel'))
if status.get('aspectId') == aspect_id:
previous_level = status.get('scaleLevel')
# test scale out
previous_level = self._test_scale_cnf(
vnf_instance['id'], 'SCALE_OUT', aspect_id, previous_level)
# test scale in
previous_level = self._test_scale_cnf(
vnf_instance['id'], 'SCALE_IN', aspect_id, previous_level)
self.vnfd_id, "simple", vnf_instance_name,
vnf_instance_description, inst_additional_param)
self._test_cnf_scale(vnf_instance, "vdu1_aspect")
self._terminate_vnf_instance(vnf_instance['id'])
self._delete_vnf_instance(vnf_instance['id'])
def test_scale_cnf_with_replicaset(self):
"""Test scale for CNF (ReplicaSet)
This test will instantiate cnf with ReplicaSet and scale replicas.
"""
vnf_instance_name = "cnf_scale_with_replicaset"
vnf_instance_description = "cnf scale with replicaset"
inst_additional_param = {
"lcm-kubernetes-def-files": [
"Files/kubernetes/replicaset_scale.yaml"]}
vnf_instance = self._create_and_instantiate_vnf_instance(
"simple", inst_additional_param)
aspect_id = "vdu1_aspect"
scale_status_initial = \
vnf_instance['instantiatedVnfInfo']['scaleStatus']
self.assertTrue(len(scale_status_initial) > 0)
for status in scale_status_initial:
self.assertIsNotNone(status.get('aspectId'))
self.assertIsNotNone(status.get('scaleLevel'))
if status.get('aspectId') == aspect_id:
previous_level = status.get('scaleLevel')
# test scale out
previous_level = self._test_scale_cnf(
vnf_instance['id'], 'SCALE_OUT', aspect_id, previous_level)
# test scale in
previous_level = self._test_scale_cnf(
vnf_instance['id'], 'SCALE_IN', aspect_id, previous_level)
self.vnfd_id, "simple", vnf_instance_name,
vnf_instance_description, inst_additional_param)
self._test_cnf_scale(vnf_instance, "vdu1_aspect")
self._terminate_vnf_instance(vnf_instance['id'])
self._delete_vnf_instance(vnf_instance['id'])
def test_scale_cnf_deployment_with_scaling_and_delta_two(self):
"""Test scale for CNF (Deployment)
This test will instantiate cnf with Deployment and scale replicas.
And scaling steps of ScaleVnfRequest set to two and scaling deltas that
defined in VNFD set to two.
"""
vnf_instance_name = "cnf_scale_with_scaling_and_delta_two"
vnf_instance_description = "cnf scale with scaling and delta two"
inst_additional_param = {
"lcm-kubernetes-def-files": [
"Files/kubernetes/deployment_scale.yaml"]}
# Use flavour_id scalingsteps that is set to delta_num=2
vnf_instance = self._create_and_instantiate_vnf_instance(
"scalingsteps", inst_additional_param)
aspect_id = "vdu1_aspect"
scale_status_initial = \
vnf_instance['instantiatedVnfInfo']['scaleStatus']
self.assertTrue(len(scale_status_initial) > 0)
for status in scale_status_initial:
self.assertIsNotNone(status.get('aspectId'))
self.assertIsNotNone(status.get('scaleLevel'))
if status.get('aspectId') == aspect_id:
previous_level = status.get('scaleLevel')
# test scale out (test for delta_num=2 and number_of_steps=2)
previous_level = self._test_scale_cnf(
vnf_instance['id'], 'SCALE_OUT', aspect_id, previous_level,
delta_num=2, number_of_steps=2)
# test scale in (test for delta_num=2 and number_of_steps=2)
previous_level = self._test_scale_cnf(
vnf_instance['id'], 'SCALE_IN', aspect_id, previous_level,
delta_num=2, number_of_steps=2)
self.vnfd_id, "scalingsteps", vnf_instance_name,
vnf_instance_description, inst_additional_param)
# Use flavour_id scalingsteps that is set to delta_num=2
self._test_cnf_scale(vnf_instance, "vdu1_aspect", number_of_steps=2)
self._terminate_vnf_instance(vnf_instance['id'])
self._delete_vnf_instance(vnf_instance['id'])
@unittest.skip("Reduce test time")
def test_scale_out_cnf_rollback(self):
"""Test rollback after scaling failure for CNF
This test will rollback after failing scale out operation.
"""
vnf_instance_name = "cnf_rollback_after_scale_out_fail"
vnf_instance_description = "cnf rollback after scale out fail"
inst_additional_param = {
"lcm-kubernetes-def-files": [
"Files/kubernetes/statefulset_scale.yaml"]}
vnf_instance = self._create_and_instantiate_vnf_instance(
"simple", inst_additional_param)
aspect_id = "vdu1_aspect"
scale_status_initial = \
vnf_instance['instantiatedVnfInfo']['scaleStatus']
self.assertTrue(len(scale_status_initial) > 0)
for status in scale_status_initial:
self.assertIsNotNone(status.get('aspectId'))
self.assertIsNotNone(status.get('scaleLevel'))
if status.get('aspectId') == aspect_id:
previous_level = status.get('scaleLevel')
self.vnfd_id, "simple", vnf_instance_name,
vnf_instance_description, inst_additional_param)
# fail scale out for rollback
previous_level = self._test_scale_cnf_fail(
vnf_instance['id'], 'SCALE_OUT', aspect_id, previous_level,
number_of_steps=2)
aspect_id = "vdu1_aspect"
previous_level = self._test_cnf_scale(vnf_instance, aspect_id,
number_of_steps=2, error=True)
# test rollback
self._test_rollback_cnf(vnf_instance['id'], aspect_id, previous_level)
self._test_rollback_cnf_scale(
vnf_instance['id'], aspect_id, previous_level)
self._terminate_vnf_instance(vnf_instance['id'])
self._delete_vnf_instance(vnf_instance['id'])

View File

@ -121,6 +121,9 @@ def create_csar_with_unique_vnfd_id(csar_dir):
for (dpath, _, fnames) in os.walk(common_dir):
if not fnames:
continue
if ('test_cnf' in csar_dir and
re.search('images|kubernetes|Scripts', dpath)):
continue
for fname in fnames:
src_file = os.path.join(dpath, fname)
dst_file = os.path.relpath(os.path.join(dpath, fname), common_dir)
@ -190,9 +193,6 @@ def create_csar_with_unique_artifact(csar_dir):
for (dpath, _, fnames) in os.walk(common_dir):
if not fnames:
continue
if ('vnf_instance' in csar_dir and 'kubernetes' in dpath) or \
('vnf_instance' in csar_dir and 'Scripts' in dpath):
continue
for fname in fnames:
src_file = os.path.join(dpath, fname)
dst_file = os.path.relpath(os.path.join(dpath, fname), common_dir)