Support Additional common function of FT

Added Common Functional tests for Tacker (Victoria).

Change-Id: Idbb4132e1f6576a0e678f2beafbaa9c4248e85a8
This commit is contained in:
Aldinson Esto 2020-09-11 15:52:55 +09:00 committed by Aldinson C. Esto
parent 59e166b62a
commit 172095279b
14 changed files with 2346 additions and 456 deletions

View File

@ -0,0 +1,98 @@
heat_template_version: 2013-05-23
description: 'Simple Base HOT for Sample VNF'
parameters:
nfv:
type: json
resources:
VDU1:
type: OS::Heat::AutoScalingGroup
properties:
min_size: 1
max_size: 3
desired_capacity: 1
resource:
type: VDU1.yaml
properties:
flavor: { get_param: [ nfv, VDU, VDU1, flavor ] }
image: { get_param: [ nfv, VDU, VirtualStorage, image ] }
zone: { get_param: [ nfv, vdu, VDU1, zone ] }
net1: { get_param: [ nfv, CP, VDU1_CP1, network ] }
net2: { get_param: [ nfv, CP, VDU1_CP2, network ] }
net3: { get_resource: extmanageNW_1 }
net4: { get_resource: extmanageNW_2 }
net5: { get_resource: internalNW_1 }
VDU1_scale_out:
type: OS::Heat::ScalingPolicy
properties:
scaling_adjustment: 1
auto_scaling_group_id:
get_resource: VDU1
adjustment_type: change_in_capacity
VDU1_scale_in:
type: OS::Heat::ScalingPolicy
properties:
scaling_adjustment: -1
auto_scaling_group_id:
get_resource: VDU1
adjustment_type: change_in_capacity
VDU2:
type: OS::Heat::AutoScalingGroup
properties:
min_size: 2
max_size: 2
desired_capacity: 2
resource:
type: VDU2.yaml
properties:
flavor: { get_param: [ nfv, VDU, VDU2, flavor ] }
image: { get_param: [ nfv, VDU, VDU2, image ] }
zone: { get_param: [ nfv, vdu, VDU2, zone ] }
net1: { get_param: [ nfv, CP, VDU2_CP1, network ] }
net2: { get_param: [ nfv, CP, VDU2_CP2, network ] }
net3: { get_resource: extmanageNW_1 }
net4: { get_resource: extmanageNW_2 }
net5: { get_resource: internalNW_1 }
VDU2_scale_out:
type: OS::Heat::ScalingPolicy
properties:
scaling_adjustment: 1
auto_scaling_group_id:
get_resource: VDU2
adjustment_type: change_in_capacity
VDU2_scale_in:
type: OS::Heat::ScalingPolicy
properties:
scaling_adjustment: -1
auto_scaling_group_id:
get_resource: VDU2
adjustment_type: change_in_capacity
extmanageNW_1:
type: OS::Neutron::Net
extmanageNW_2:
type: OS::Neutron::Net
internalNW_1:
type: OS::Neutron::Net
extmanageNW_1_subnet:
type: OS::Neutron::Subnet
properties:
ip_version: 4
network:
get_resource: extmanageNW_1
cidr: 192.168.3.0/24
extmanageNW_2_subnet:
type: OS::Neutron::Subnet
properties:
ip_version: 4
network:
get_resource: extmanageNW_2
cidr: 192.168.4.0/24
internalNW_1_subnet:
type: OS::Neutron::Subnet
properties:
ip_version: 4
network:
get_resource: internalNW_1
cidr: 192.168.5.0/24
outputs: {}

View File

@ -0,0 +1,72 @@
heat_template_version: 2013-05-23
description: 'VDU1 HOT for Sample VNF'
parameters:
flavor:
type: string
image:
type: string
zone:
type: string
net1:
type: string
net2:
type: string
net3:
type: string
net4:
type: string
net5:
type: string
resources:
VDU1:
type: OS::Nova::Server
properties:
flavor: { get_param: flavor }
name: VDU1
block_device_mapping_v2: [{"volume_id": { get_resource: VirtualStorage }}]
networks:
- port:
get_resource: VDU1_CP1
- port:
get_resource: VDU1_CP2
- port:
get_resource: VDU1_CP3
- port:
get_resource: VDU1_CP4
- port:
get_resource: VDU1_CP5
availability_zone: { get_param: zone }
VirtualStorage:
type: OS::Cinder::Volume
properties:
image: { get_param: image }
size: 1
volume_type: { get_resource: multi }
multi:
type: OS::Cinder::VolumeType
properties:
name: { get_resource: VDU1_CP1 }
metadata: { multiattach: "<is> True" }
VDU1_CP1:
type: OS::Neutron::Port
properties:
network: { get_param: net1 }
VDU1_CP2:
type: OS::Neutron::Port
properties:
network: { get_param: net2 }
VDU1_CP3:
type: OS::Neutron::Port
properties:
network: { get_param: net3 }
VDU1_CP4:
type: OS::Neutron::Port
properties:
network: { get_param: net4 }
VDU1_CP5:
type: OS::Neutron::Port
properties:
network: { get_param: net5 }

View File

@ -0,0 +1,61 @@
heat_template_version: 2013-05-23
description: 'VDU2 HOT for Sample VNF'
parameters:
flavor:
type: string
image:
type: string
zone:
type: string
net1:
type: string
net2:
type: string
net3:
type: string
net4:
type: string
net5:
type: string
resources:
VDU2:
type: OS::Nova::Server
properties:
flavor: { get_param: flavor }
name: VDU2
image: { get_param: image }
networks:
- port:
get_resource: VDU2_CP1
- port:
get_resource: VDU2_CP2
- port:
get_resource: VDU2_CP3
- port:
get_resource: VDU2_CP4
- port:
get_resource: VDU2_CP5
availability_zone: { get_param: zone }
VDU2_CP1:
type: OS::Neutron::Port
properties:
network: { get_param: net1 }
VDU2_CP2:
type: OS::Neutron::Port
properties:
network: { get_param: net2 }
VDU2_CP3:
type: OS::Neutron::Port
properties:
network: { get_param: net3 }
VDU2_CP4:
type: OS::Neutron::Port
properties:
network: { get_param: net4 }
VDU2_CP5:
type: OS::Neutron::Port
properties:
network: { get_param: net5 }

View File

@ -0,0 +1,403 @@
tosca_definitions_version: tosca_simple_yaml_1_2
description: Simple deployment flavour for Sample VNF
imports:
- etsi_nfv_sol001_common_types.yaml
- etsi_nfv_sol001_vnfd_types.yaml
- helloworld3_types.yaml
topology_template:
inputs:
descriptor_id:
type: string
descriptor_version:
type: string
provider:
type: string
product_name:
type: string
software_version:
type: string
vnfm_info:
type: list
entry_schema:
type: string
flavour_id:
type: string
flavour_description:
type: string
substitution_mappings:
node_type: company.provider.VNF
properties:
flavour_id: simple
requirements:
virtual_link_external1_1: [ VDU1_CP1, virtual_link ]
virtual_link_external1_2: [ VDU2_CP1, virtual_link ]
virtual_link_external2_1: [ VDU1_CP2, virtual_link ]
virtual_link_external2_2: [ VDU2_CP2, virtual_link ]
node_templates:
VNF:
type: company.provider.VNF
properties:
flavour_description: A simple flavour
interfaces:
Vnflcm:
instantiate: []
instantiate_start: []
instantiate_end: []
terminate: []
terminate_start: []
terminate_end: []
modify_information: []
modify_information_start: []
modify_information_end: []
VDU1:
type: tosca.nodes.nfv.Vdu.Compute
properties:
name: VDU1
description: VDU1 compute node
vdu_profile:
min_number_of_instances: 1
max_number_of_instances: 3
capabilities:
virtual_compute:
properties:
requested_additional_capabilities:
properties:
requested_additional_capability_name: m1.tiny
support_mandatory: true
target_performance_parameters:
entry_schema: test
virtual_memory:
virtual_mem_size: 512 MB
virtual_cpu:
num_virtual_cpu: 1
virtual_local_storage:
- size_of_storage: 3 GB
requirements:
- virtual_storage: VirtualStorage
VDU2:
type: tosca.nodes.nfv.Vdu.Compute
properties:
name: VDU2
description: VDU2 compute node
vdu_profile:
min_number_of_instances: 2
max_number_of_instances: 2
sw_image_data:
name: cirros-0.4.0-x86_64-disk
version: '0.4.0'
checksum:
algorithm: sha-256
hash: a8dd75ecffd4cdd96072d60c2237b448e0c8b2bc94d57f10fdbc8c481d9005b8
container_format: bare
disk_format: qcow2
min_disk: 0 GB
min_ram: 256 MB
size: 12 GB
capabilities:
virtual_compute:
properties:
requested_additional_capabilities:
properties:
requested_additional_capability_name: m1.tiny
support_mandatory: true
target_performance_parameters:
entry_schema: test
virtual_memory:
virtual_mem_size: 512 MB
virtual_cpu:
num_virtual_cpu: 1
virtual_local_storage:
- size_of_storage: 3 GB
VirtualStorage:
type: tosca.nodes.nfv.Vdu.VirtualBlockStorage
properties:
virtual_block_storage_data:
size_of_storage: 1 GB
rdma_enabled: true
sw_image_data:
name: cirros-0.4.0-x86_64-disk
version: '0.4.0'
checksum:
algorithm: sha-256
hash: a8dd75ecffd4cdd96072d60c2237b448e0c8b2bc94d57f10fdbc8c481d9005b8
container_format: bare
disk_format: qcow2
min_disk: 0 GB
min_ram: 256 MB
size: 12 GB
VDU1_CP1:
type: tosca.nodes.nfv.VduCp
properties:
layer_protocols: [ ipv4 ]
order: 0
requirements:
- virtual_binding: VDU1
VDU1_CP2:
type: tosca.nodes.nfv.VduCp
properties:
layer_protocols: [ ipv4 ]
order: 1
requirements:
- virtual_binding: VDU1
VDU1_CP3:
type: tosca.nodes.nfv.VduCp
properties:
layer_protocols: [ ipv4 ]
order: 2
requirements:
- virtual_binding: VDU1
- virtual_link: internalVL1
VDU1_CP4:
type: tosca.nodes.nfv.VduCp
properties:
layer_protocols: [ ipv4 ]
order: 3
requirements:
- virtual_binding: VDU1
- virtual_link: internalVL2
VDU1_CP5:
type: tosca.nodes.nfv.VduCp
properties:
layer_protocols: [ ipv4 ]
order: 4
requirements:
- virtual_binding: VDU1
- virtual_link: internalVL3
VDU2_CP1:
type: tosca.nodes.nfv.VduCp
properties:
layer_protocols: [ ipv4 ]
order: 0
requirements:
- virtual_binding: VDU2
VDU2_CP2:
type: tosca.nodes.nfv.VduCp
properties:
layer_protocols: [ ipv4 ]
order: 1
requirements:
- virtual_binding: VDU2
VDU2_CP3:
type: tosca.nodes.nfv.VduCp
properties:
layer_protocols: [ ipv4 ]
order: 2
requirements:
- virtual_binding: VDU2
- virtual_link: internalVL1
VDU2_CP4:
type: tosca.nodes.nfv.VduCp
properties:
layer_protocols: [ ipv4 ]
order: 3
requirements:
- virtual_binding: VDU2
- virtual_link: internalVL2
VDU2_CP5:
type: tosca.nodes.nfv.VduCp
properties:
layer_protocols: [ ipv4 ]
order: 4
requirements:
- virtual_binding: VDU2
- virtual_link: internalVL3
internalVL1:
type: tosca.nodes.nfv.VnfVirtualLink
properties:
connectivity_type:
layer_protocols: [ ipv4 ]
description: External Managed Virtual link in the VNF
vl_profile:
max_bitrate_requirements:
root: 1048576
leaf: 1048576
min_bitrate_requirements:
root: 1048576
leaf: 1048576
virtual_link_protocol_data:
- associated_layer_protocol: ipv4
l3_protocol_data:
ip_version: ipv4
cidr: 33.33.0.0/24
internalVL2:
type: tosca.nodes.nfv.VnfVirtualLink
properties:
connectivity_type:
layer_protocols: [ ipv4 ]
description: External Managed Virtual link in the VNF
vl_profile:
max_bitrate_requirements:
root: 1048576
leaf: 1048576
min_bitrate_requirements:
root: 1048576
leaf: 1048576
virtual_link_protocol_data:
- associated_layer_protocol: ipv4
l3_protocol_data:
ip_version: ipv4
cidr: 33.34.0.0/24
internalVL3:
type: tosca.nodes.nfv.VnfVirtualLink
properties:
connectivity_type:
layer_protocols: [ ipv4 ]
description: Internal Virtual link in the VNF
vl_profile:
max_bitrate_requirements:
root: 1048576
leaf: 1048576
min_bitrate_requirements:
root: 1048576
leaf: 1048576
virtual_link_protocol_data:
- associated_layer_protocol: ipv4
l3_protocol_data:
ip_version: ipv4
cidr: 33.35.0.0/24
policies:
- scaling_aspects:
type: tosca.policies.nfv.ScalingAspects
properties:
aspects:
worker_instance:
name: worker_instance_aspect
description: worker_instance scaling aspect
max_scale_level: 2
step_deltas:
- delta_1
- VDU1_initial_delta:
type: tosca.policies.nfv.VduInitialDelta
properties:
initial_delta:
number_of_instances: 1
targets: [ VDU1 ]
- VDU2_initial_delta:
type: tosca.policies.nfv.VduInitialDelta
properties:
initial_delta:
number_of_instances: 2
targets: [ VDU2 ]
- VDU1_scaling_aspect_deltas:
type: tosca.policies.nfv.VduScalingAspectDeltas
properties:
aspect: worker_instance
deltas:
delta_1:
number_of_instances: 1
targets: [ VDU1 ]
- instantiation_levels:
type: tosca.policies.nfv.InstantiationLevels
properties:
levels:
instantiation_level_1:
description: Smallest size
scale_info:
worker_instance:
scale_level: 0
instantiation_level_2:
description: Largest size
scale_info:
worker_instance:
scale_level: 2
default_level: instantiation_level_1
- VDU1_instantiation_levels:
type: tosca.policies.nfv.VduInstantiationLevels
properties:
levels:
instantiation_level_1:
number_of_instances: 1
instantiation_level_2:
number_of_instances: 3
targets: [ VDU1 ]
- VDU2_instantiation_levels:
type: tosca.policies.nfv.VduInstantiationLevels
properties:
levels:
instantiation_level_1:
number_of_instances: 2
instantiation_level_2:
number_of_instances: 2
targets: [ VDU2 ]
- internalVL1_instantiation_levels:
type: tosca.policies.nfv.VirtualLinkInstantiationLevels
properties:
levels:
instantiation_level_1:
bitrate_requirements:
root: 1048576
leaf: 1048576
instantiation_level_2:
bitrate_requirements:
root: 1048576
leaf: 1048576
targets: [ internalVL1 ]
- internalVL2_instantiation_levels:
type: tosca.policies.nfv.VirtualLinkInstantiationLevels
properties:
levels:
instantiation_level_1:
bitrate_requirements:
root: 1048576
leaf: 1048576
instantiation_level_2:
bitrate_requirements:
root: 1048576
leaf: 1048576
targets: [ internalVL2 ]
- internalVL3_instantiation_levels:
type: tosca.policies.nfv.VirtualLinkInstantiationLevels
properties:
levels:
instantiation_level_1:
bitrate_requirements:
root: 1048576
leaf: 1048576
instantiation_level_2:
bitrate_requirements:
root: 1048576
leaf: 1048576
targets: [ internalVL3 ]
- policy_antiaffinity_vdu1:
type: tosca.policies.nfv.AntiAffinityRule
targets: [ VDU1 ]
properties:
scope: zone
- policy_antiaffinity_vdu2:
type: tosca.policies.nfv.AntiAffinityRule
targets: [ VDU2 ]
properties:
scope: zone

View File

@ -0,0 +1,31 @@
tosca_definitions_version: tosca_simple_yaml_1_2
description: Sample VNF
imports:
- etsi_nfv_sol001_common_types.yaml
- etsi_nfv_sol001_vnfd_types.yaml
- helloworld3_types.yaml
- helloworld3_df_simple.yaml
topology_template:
inputs:
selected_flavour:
type: string
description: VNF deployment flavour selected by the consumer. It is provided in the API
node_templates:
VNF:
type: company.provider.VNF
properties:
flavour_id: { get_input: selected_flavour }
descriptor_id: b1bb0ce7-ebca-4fa7-95ed-4840d7000000
provider: Company
product_name: Sample VNF
software_version: '1.0'
descriptor_version: '1.0'
vnfm_info:
- Tacker
requirements:
#- virtual_link_external # mapped in lower-level templates
#- virtual_link_internal # mapped in lower-level templates

View File

@ -0,0 +1,55 @@
tosca_definitions_version: tosca_simple_yaml_1_2
description: VNF type definition
imports:
- etsi_nfv_sol001_common_types.yaml
- etsi_nfv_sol001_vnfd_types.yaml
node_types:
company.provider.VNF:
derived_from: tosca.nodes.nfv.VNF
properties:
descriptor_id:
type: string
constraints: [ valid_values: [ b1bb0ce7-ebca-4fa7-95ed-4840d7000000 ] ]
default: b1bb0ce7-ebca-4fa7-95ed-4840d7000000
descriptor_version:
type: string
constraints: [ valid_values: [ '1.0' ] ]
default: '1.0'
provider:
type: string
constraints: [ valid_values: [ 'Company' ] ]
default: 'Company'
product_name:
type: string
constraints: [ valid_values: [ 'Sample VNF' ] ]
default: 'Sample VNF'
software_version:
type: string
constraints: [ valid_values: [ '1.0' ] ]
default: '1.0'
vnfm_info:
type: list
entry_schema:
type: string
constraints: [ valid_values: [ Tacker ] ]
default: [ Tacker ]
flavour_id:
type: string
constraints: [ valid_values: [ simple ] ]
default: simple
flavour_description:
type: string
default: "falvour"
requirements:
- virtual_link_external1:
capability: tosca.capabilities.nfv.VirtualLinkable
- virtual_link_external2:
capability: tosca.capabilities.nfv.VirtualLinkable
- virtual_link_internal:
capability: tosca.capabilities.nfv.VirtualLinkable
interfaces:
Vnflcm:
type: tosca.interfaces.nfv.Vnflcm

View File

@ -0,0 +1,4 @@
TOSCA-Meta-File-Version: 1.0
CSAR-Version: 1.1
Created-by: Onboarding portal
Entry-Definitions: Definitions/helloworld3_top.vnfd.yaml

View File

@ -0,0 +1,35 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tacker.vnfm.lcm_user_data.abstract_user_data import AbstractUserData
import tacker.vnfm.lcm_user_data.utils as UserDataUtil
class SampleUserData(AbstractUserData):
@staticmethod
def instantiate(base_hot_dict=None,
vnfd_dict=None,
inst_req_info=None,
grant_info=None):
api_param = UserDataUtil.get_diff_base_hot_param_from_api(
base_hot_dict, inst_req_info)
initial_param_dict = \
UserDataUtil.create_initial_param_server_port_dict(
base_hot_dict)
vdu_flavor_dict = \
UserDataUtil.create_vdu_flavor_capability_name_dict(vnfd_dict)
vdu_image_dict = UserDataUtil.create_sw_image_dict(vnfd_dict)
cpd_vl_dict = UserDataUtil.create_network_dict(
inst_req_info, initial_param_dict)
final_param_dict = UserDataUtil.create_final_param_dict(
initial_param_dict, vdu_flavor_dict, vdu_image_dict, cpd_vl_dict)
return {**final_param_dict, **api_param}

View File

@ -81,12 +81,15 @@ class SessionClient(adapter.Adapter):
class BaseTackerTest(base.BaseTestCase):
"""Base test case class for all Tacker API tests."""
# Class specific variables
tacker_config_file = '/etc/tacker/tacker.conf'
@classmethod
def setUpClass(cls):
super(BaseTackerTest, cls).setUpClass()
kwargs = {}
cfg.CONF(args=['--config-file', '/etc/tacker/tacker.conf'],
cfg.CONF(args=['--config-file', cls.tacker_config_file],
project='tacker',
version='%%prog %s' % version.version_info.release_string(),
**kwargs)
@ -94,6 +97,7 @@ class BaseTackerTest(base.BaseTestCase):
cls.client = cls.tackerclient()
cls.http_client = cls.tacker_http_client()
cls.h_client = cls.heatclient()
cls.glance_client = cls.glanceclient()
@classmethod
def get_credentials(cls):

View File

@ -0,0 +1,420 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from datetime import datetime as dt
import http.server
import inspect
import json
import os
import threading
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
class SingletonMixin:
"""Mixin class to make your class a Singleton class."""
_instance = None
_rlock = threading.RLock()
_inside_instance = False
@classmethod
def get_instance(cls, *args, **kwargs):
"""Get *the* instance of the class, constructed when needed using(kw)args.
Return the instance of the class. If it did not yet exist, create
it by calling the "constructor" with whatever arguments and keyword
arguments provided.
This routine is thread-safe. It uses the *double-checked locking*
design pattern ``https://en.wikipedia.org/wiki/Double-checked_locking``
for this.
:param args: Used for constructing the instance, when not performed
yet.
:param kwargs: Used for constructing the instance, when not
perfored yet.
:return: An instance of the class.
"""
if cls._instance is not None:
return cls._instance
with cls._rlock:
# re-check, perhaps it was created in the mean time...
if cls._instance is None:
cls._inside_instance = True
try:
cls._instance = cls(*args, **kwargs)
finally:
cls._inside_instance = False
return cls._instance
def __new__(cls, *args, **kwargs):
"""Raise Exception when not called from the :func:``instance``
Class method.
This method raises RuntimeError when not called from the
instance class method.
:param args: Arguments eventually passed to
:func:``__init__``_.
:param kwargs: Keyword arguments eventually passed to
:func:``__init__``_
:return: the created instance.
"""
if cls is SingletonMixin:
raise TypeError(
"Attempt to instantiate\
mixin class {}".format(cls.__qualname__)
)
if cls._instance is None:
with cls._rlock:
if cls._instance is None and cls._inside_instance:
return super().__new__(cls, *args, **kwargs)
raise RuntimeError(
"Attempt to create a {}\
instance outside of instance()".format(cls.__qualname__)
)
class DummyRequestHander(http.server.CGIHTTPRequestHandler):
"""HTTP request handler for dummy server."""
def __init__(self, request, client_address, server):
super().__init__(request, client_address, server)
return
def _is_match_with_list(self):
"""Return given path is listed in dictionary or not.
Return:
True/False
"""
manager = FakeServerManager.get_instance()
func_uri_list = manager._methods[self.command]
for objChkUrl in func_uri_list:
# Check which requested path is in our list.
LOG.debug('path for check:%s' % objChkUrl)
if(self.path.startswith(objChkUrl)):
return True
return False
def _returned_callback(self, mock_info):
"""Send responses to client. Called in do_* methods.
This method do not handle message when error is occured.
Args:
mock_info (tuple): callback informations from caller.
"""
request_headers = dict(self.headers._headers)
request_body = self._parse_request_body()
response_body_str = b''
(status_code, mock_headers, mock_body) = self._get_mock_info(
mock_info, request_headers, request_body)
self.send_response(status_code)
# Check what I should return to client ?
if mock_info.get('content') is not None:
response_body_str = open(mock_info.get('content'), 'rb').read()
elif len(mock_body) > 0:
response_body_str = json.dumps(mock_body).encode('utf-8')
mock_headers['Content-Length'] = str(len(response_body_str))
# Send custom header if exist
for key, val in mock_headers.items():
self.send_header(key, val)
self.end_headers()
if len(response_body_str) > 0:
self.wfile.write(response_body_str)
FakeServerManager.get_instance().add_history(self.path, RequestHistory(
status_code=status_code,
request_headers=request_headers,
request_body=request_body,
response_headers=copy.deepcopy(mock_headers),
response_body=copy.deepcopy(mock_body))
)
def _parse_request_body(self):
if 'content-length' not in self.headers:
return {}
request_content_len = int(self.headers.get('content-length'))
if request_content_len == 0:
return {}
decode_request_body = self.rfile.read(
request_content_len).decode('utf-8')
return json.loads(decode_request_body)
def _get_mock_info(self, mock_info, request_headers, request_body):
"""Call mock(callback) and get responses
This method is called from _returned_callback().
Args:
mock_info (tuple): callback informations from caller.
request_headers (dict): Request headers
request_body (dict): Request Bodies
Returns:
(tuple): status_code, response headers, response bodies.
response body will be converted into JSON string
with json.dumps().
"""
# Prepare response contents
func = mock_info.get('callback')
status_code = mock_info.get('status_code')
mock_headers = mock_info.get('response_headers')
mock_body = mock_info.get('response_body')
# Call function if callable.
if callable(func):
mock_body = func(request_headers, request_body)
return (status_code, mock_headers, mock_body)
def do_DELETE(self):
raise NotImplementedError
def do_GET(self):
"""Process GET request"""
LOG.debug(
'[Start] %s.%s()' %
(self.__class__.__name__,
inspect.currentframe().f_code.co_name))
# Check URI in request.
if self._is_match_with_list():
# Request is registered in our list.
self._returned_callback(
FakeServerManager.get_instance()._funcs_gets[self.path])
else:
# Unregistered URI is requested
LOG.debug('GET Recv. Unknown URL: "%s"' % self.path)
self.send_response(http.HTTPStatus.BAD_REQUEST)
self.end_headers()
LOG.debug('[ End ] %s.%s()' %
(self.__class__.__name__,
inspect.currentframe().f_code.co_name))
def do_POST(self):
"""Process POST request"""
LOG.debug(
'[Start] %s.%s()' %
(self.__class__.__name__,
inspect.currentframe().f_code.co_name))
# URI might have trailing uuid or not.
if self._is_match_with_list():
# Request is registered in our list.
self._returned_callback(
FakeServerManager.get_instance()._funcs_posts[self.path])
else:
# Unregistered URI is requested
LOG.debug('POST Recv. Unknown URL: "%s"' % self.path)
self.send_response(http.HTTPStatus.BAD_REQUEST)
self.end_headers()
LOG.debug(
'[ End ] %s.%s()' %
(self.__class__.__name__,
inspect.currentframe().f_code.co_name))
def do_PUT(self):
raise NotImplementedError
class RequestHistory:
"""Storage class for storing requested data(Maybe POSTed datas)."""
def __init__(
self,
status_code,
request_headers=None,
request_body=None,
response_headers=None,
response_body=None):
self.timestamp = dt.now()
self.status_code = status_code
self.request_headers = request_headers
self.request_body = request_body
self.response_headers = response_headers
self.response_body = response_body
class FakeServerManager(SingletonMixin):
"""Manager class to manage dummy server setting and control"""
SERVER_PORT = 9990
def __init__(self):
# Initialize class-specific variables.
# Storage for request header/body and response header/body
# history (dict) is updated using RequestHistory class.
self._history = {}
# Initialize function list for each request method.
# DELETE/PUT method is listed but not supported currently.
self._funcs_deletes = {}
self._funcs_gets = {}
self._funcs_posts = {}
self._funcs_puts = {}
self._methods = {
'DELETE': self._funcs_deletes,
'GET': self._funcs_gets,
'POST': self._funcs_posts,
'PUT': self._funcs_puts}
def set_callback(
self,
method,
uri,
status_code=None,
response_headers=None,
response_body=None,
content=None,
callback=None):
"""Set callback function and some stuff for specified URI.
ALL additional parameter is set default to None, so you have to
specify what your callback-function need. response_header and
response_body will be passed to callback.
Args:
method (str): Reqested method
uri (str): Requested URI
status_code (http.HTTPStatus): HTTP status code and
reason phrase.
response_headers (dict): Addtional response header.
response_body (dict): Response body. Must be Jason Bourne
content (str): File path that you want client to download.
callback (callable): Callback function. Must return serializable
object json.dumps() can handle.
"""
callbacks = self._methods[method]
callbacks[uri] = {
'status_code': status_code or http.HTTPStatus.OK,
'response_headers': response_headers or {},
'response_body': response_body or {},
'content': content,
'callback': callback,
}
self._methods[method].update(callbacks)
# Check file existence for content
if content is not None:
if not os.path.isfile(content):
raise FileNotFoundError
LOG.debug('Set callback for %s(%s): %s' %
(method, uri, callback))
def add_history(self, path, history):
"""Add Request/Response header/body to history.
This method maybe called in DummyRequestHandler._returned_callback()
only. This method should not be called from outside of This class.
Args:
path (str): URI path
history (RequestHistory): Storage container for each request.
"""
with self._rlock:
if path in self._history:
self._history[path].append(history)
else:
self._history[path] = [history]
def clear_history(self, path=None):
"""Clear Request/Response header/body of history.
Args:
path (str): URI path
"""
with self._rlock:
if not path:
self._history = {}
return
if path in self._history:
self._history.pop(path)
def get_history(self, path=None):
"""Get Request/Response header/body from history.
Args:
path (str): URI path
Returns:
history list(RequestHistory): Storage container for each request.
"""
history = copy.deepcopy(self._history)
if not path:
return history
return history.get(path) or []
def prepare_http_server(
self,
address="localhost",
port=SERVER_PORT):
"""Set up HTTPd server your behalf.
Args:
address (str): bind address for listen
port (int): por number for listen
"""
LOG.debug(
'[Start] %s.%s()' %
(self.__class__.__name__,
inspect.currentframe().f_code.co_name))
self.objHttpd = http.server.HTTPServer(
(address, port), DummyRequestHander)
LOG.debug(
'[ End ] %s.%s()' %
(self.__class__.__name__,
inspect.currentframe().f_code.co_name))
def start_server(self):
"""Start server in thread."""
LOG.debug('[START] %s()' % inspect.currentframe().f_code.co_name)
threading.Thread(None, self.run).start()
LOG.debug('[ END ] %s()' % inspect.currentframe().f_code.co_name)
def run(self):
"""HTTPd server runner"""
LOG.debug('[START] %s()' % inspect.currentframe().f_code.co_name)
try:
self.objHttpd.serve_forever()
except KeyboardInterrupt:
self.stop_server()
LOG.debug('[ END ] %s()' % inspect.currentframe().f_code.co_name)
def stop_server(self):
"""Stop HTTP Server"""
LOG.debug('[START] %s()' % inspect.currentframe().f_code.co_name)
self.objHttpd.shutdown()
LOG.debug('[ END ] %s()' % inspect.currentframe().f_code.co_name)

View File

@ -0,0 +1,871 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import tempfile
import time
from urllib.parse import urlparse
import yaml
import zipfile
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
from tacker.objects import fields
from tacker.tests.functional import base
from tacker.tests.functional.common.fake_server import FakeServerManager
from tacker.tests import utils
from tacker.vnfm.infra_drivers.openstack import constants as infra_cnst
VNF_PACKAGE_UPLOAD_TIMEOUT = 60
VNF_INSTANTIATE_TIMEOUT = 60
VNF_TERMINATE_TIMEOUT = 60
VNF_SUBSCRIPTION_TIMEOUT = 60
VNF_INSTANTIATE_ERROR_WAIT = 80
VNF_DELETE_COMPLETION_WAIT = 60
VNF_HEAL_TIMEOUT = 600
VNF_LCM_DONE_TIMEOUT = 600
RETRY_WAIT_TIME = 5
FAKE_SERVER_MANAGER = FakeServerManager.get_instance()
MOCK_NOTIFY_CALLBACK_URL = '/notification/callback'
UUID_RE = r'\w{8}-\w{4}-\w{4}-\w{4}-\w{12}'
def _get_external_virtual_links(net0_id):
return [
{
"id": "net0",
"resourceId": net0_id,
"extCps": [{
"cpdId": "CP1",
"cpConfig": [{
"cpProtocolData": [{
"layerProtocol": "IP_OVER_ETHERNET",
}]
}]
}]
}
]
def _create_csar_user_data_common(csar_dir):
ud_common_dir = os.path.join(csar_dir, "../user_data_common/")
return _create_csar_with_unique_vnfd_id(
csar_dir, ud_common_dir)
def _create_csar_with_unique_vnfd_id(csar_dir, *include_dirs):
tempfd, tempname = tempfile.mkstemp(suffix=".zip",
dir=os.path.dirname(csar_dir))
os.close(tempfd)
common_dir = os.path.join(csar_dir, "../common/")
target_dirs = [csar_dir, common_dir]
target_dirs.extend(include_dirs)
unique_id = uuidutils.generate_uuid()
with zipfile.ZipFile(tempname, 'w') as zcsar:
_write_zipfile(zcsar, unique_id, target_dirs)
return tempname, unique_id
def _write_zipfile(zcsar, unique_id, target_dir_list):
for target_dir in target_dir_list:
for (dpath, _, fnames) in os.walk(target_dir):
if not fnames:
continue
for fname in fnames:
src_file = os.path.join(dpath, fname)
dst_file = os.path.relpath(
os.path.join(dpath, fname), target_dir)
if fname.endswith('.yaml') or fname.endswith('.yml'):
with open(src_file, 'rb') as yfile:
data = yaml.safe_load(yfile)
utils._update_unique_id_in_yaml(data, unique_id)
zcsar.writestr(dst_file, yaml.dump(
data, default_flow_style=False,
allow_unicode=True))
else:
zcsar.write(src_file, dst_file)
def _create_and_upload_vnf_package(
tacker_client,
user_defined_data,
temp_csar_path):
# create vnf package
body = jsonutils.dumps({"userDefinedData": user_defined_data})
resp, vnf_package = tacker_client.do_request(
'/vnfpkgm/v1/vnf_packages', "POST", body=body)
with open(temp_csar_path, 'rb') as file_object:
resp, resp_body = tacker_client.do_request(
'/vnfpkgm/v1/vnf_packages/{id}/package_content'.format(
id=vnf_package['id']),
"PUT", body=file_object, content_type='application/zip')
# wait for onboard
timeout = VNF_PACKAGE_UPLOAD_TIMEOUT
start_time = int(time.time())
show_url = os.path.join('/vnfpkgm/v1/vnf_packages', vnf_package['id'])
vnfd_id = None
while True:
resp, body = tacker_client.do_request(show_url, "GET")
if body['onboardingState'] == "ONBOARDED":
vnfd_id = body['vnfdId']
break
if ((int(time.time()) - start_time) > timeout):
raise Exception("Failed to onboard vnf package")
time.sleep(1)
# remove temporarily created CSAR file
os.remove(temp_csar_path)
return vnf_package['id'], vnfd_id
def _delete_vnf_package(tacker_client, vnf_package_id):
url = '/vnfpkgm/v1/vnf_packages/%s' % vnf_package_id
# Update vnf package before delete
req_body = jsonutils.dumps({"operationalState": "DISABLED"})
tacker_client.do_request(url, "PATCH", body=req_body)
# Delete vnf package before delete
tacker_client.do_request(url, "DELETE")
def _show_vnf_package(tacker_client, vnf_package_id):
# wait for onboard
timeout = VNF_PACKAGE_UPLOAD_TIMEOUT
start_time = int(time.time())
show_url = os.path.join('/vnfpkgm/v1/vnf_packages', vnf_package_id)
while True:
resp, body = tacker_client.do_request(show_url, "GET")
if resp.ok:
return resp, body
if ((int(time.time()) - start_time) > timeout):
raise Exception("Failed to onboard vnf package")
time.sleep(1)
def _list_vnf_package(tacker_client, **kwargs):
# wait for onboard
timeout = VNF_PACKAGE_UPLOAD_TIMEOUT
start_time = int(time.time())
while True:
resp, body = tacker_client.do_request(
'/vnfpkgm/v1/vnf_packages', "GET", **kwargs)
if resp.ok:
return resp, body
if ((int(time.time()) - start_time) > timeout):
raise Exception("Failed to onboard vnf package")
time.sleep(1)
def _create_instantiate_vnf_request_body(flavour_id,
instantiation_level_id=None, vim_id=None, ext_vl=None,
add_params=None):
request_body = {"flavourId": flavour_id}
if instantiation_level_id:
request_body["instantiationLevelId"] = instantiation_level_id
if ext_vl:
request_body["extVirtualLinks"] = ext_vl
if vim_id:
request_body["vimConnectionInfo"] = [
{"id": uuidutils.generate_uuid(),
"vimId": vim_id,
"vimType": "ETSINFV.OPENSTACK_KEYSTONE.v_2"}]
if add_params:
request_body["additionalParams"] = add_params
return request_body
class BaseVnfLcmTest(base.BaseTackerTest):
@classmethod
def setUpClass(cls):
'''Set up test class.
we set up fake NFVO server for test at here.
'''
super(BaseVnfLcmTest, cls).setUpClass()
FAKE_SERVER_MANAGER.prepare_http_server()
FAKE_SERVER_MANAGER.start_server()
FAKE_SERVER_MANAGER.set_callback(
'POST',
MOCK_NOTIFY_CALLBACK_URL,
status_code=204
)
@classmethod
def tearDownClass(cls):
super(BaseVnfLcmTest, cls).tearDownClass()
FAKE_SERVER_MANAGER.stop_server()
def setUp(self):
super(BaseVnfLcmTest, self).setUp()
self.tacker_client = base.BaseTackerTest.tacker_http_client()
self.base_vnf_instances_url = "/vnflcm/v1/vnf_instances"
self.base_subscriptions_url = "/vnflcm/v1/subscriptions"
self.base_vnf_lcm_op_occs_url = "/vnflcm/v1/vnf_lcm_op_occs"
vim_list = self.client.list_vims()
self.vim = self.get_vim(vim_list, 'VIM0')
if not self.vim:
assert False, "vim_list is Empty: Default VIM is missing"
# Create external external.
self.ext_networks = list()
# Create external managed networks
self.ext_mngd_networks = list() # Store ids for cleaning.
networks = self.neutronclient().list_networks()
for nw in networks.get('networks'):
if nw['name'] == 'net0':
self.ext_networks.append(nw['id'])
elif nw['name'] == 'net1':
self.ext_mngd_networks.append(nw['id'])
# create new network.
self.ext_networks.append(
self._create_network("external_net"))
self.ext_mngd_networks.append(
self._create_network("external_managed_internal_net"))
# Create external link ports in net0
self.ext_link_ports = list()
# Create external subnet in net1
self.ext_subnets = list() # Store ids for cleaning.
# Chack how many networks are created.
networks = self.neutronclient().list_networks()
for nw in networks.get('networks'):
if nw['name'] not in ['net0', 'external_net']:
continue
if self.vim['tenant_id'] != nw['tenant_id']:
continue
self.ext_networks.append(nw['id'])
self.ext_link_ports.append(self._create_port(nw['id']))
self.ext_subnets.append(self._create_subnet(nw))
@classmethod
def _list_glance_image(cls, filter_name='cirros-0.4.0-x86_64-disk'):
try:
images = cls.glance_client.images.list()
except Exception:
print("glance-image does not exists.")
return []
if filter_name is None:
return images
return list(filter(lambda image: image.name == filter_name, images))
@classmethod
def _get_glance_image(cls, image_id):
try:
image = cls.glance_client.images.get(image_id)
except Exception:
print("glance-image does not exists.")
return None
return image
@classmethod
def _create_glance_image(cls, image_data, file_url):
image = cls.glance_client.images.create(**image_data)
cls.glance_client.images.upload(image.id, file_url)
return image.id
def _get_glance_image_list_from_stack_resource(
self, stack_id, stack_resource_name):
image_id_list = []
for resource_name in stack_resource_name:
resource_details = self._get_heat_resource(stack_id, resource_name)
image = self._get_image_id_from_resource_attributes(
resource_details)
if image:
image_id_list.append(image.id)
return image_id_list
def _register_subscription(self, request_body):
resp, response_body = self.http_client.do_request(
self.base_subscriptions_url,
"POST",
body=jsonutils.dumps(request_body))
return resp, response_body
def _delete_subscription(self, subscription_id):
delete_url = os.path.join(self.base_subscriptions_url, subscription_id)
resp, body = self.tacker_client.do_request(delete_url, "DELETE")
return resp, body
def _show_subscription(self, subscription_id):
show_url = os.path.join(self.base_subscriptions_url, subscription_id)
resp, body = self.tacker_client.do_request(show_url, "GET")
return resp, body
def _list_subscription(self):
resp, body = self.tacker_client.do_request(
self.base_subscriptions_url, "GET")
return resp, body
def _create_vnf_instance(self, vnfd_id, vnf_instance_name=None,
vnf_instance_description=None):
request_body = {'vnfdId': vnfd_id}
if vnf_instance_name:
request_body['vnfInstanceName'] = vnf_instance_name
if vnf_instance_description:
request_body['vnfInstanceDescription'] = vnf_instance_description
return self._create_vnf_instance_from_body(request_body)
def _create_vnf_instance_from_body(self, request_body):
resp, response_body = self.http_client.do_request(
self.base_vnf_instances_url,
"POST",
body=jsonutils.dumps(request_body))
return resp, response_body
def _delete_vnf_instance(self, id):
url = os.path.join(self.base_vnf_instances_url, id)
resp, body = self.http_client.do_request(url, "DELETE")
return resp, body
def _show_vnf_instance(self, id):
show_url = os.path.join(self.base_vnf_instances_url, id)
resp, vnf_instance = self.http_client.do_request(show_url, "GET")
return resp, vnf_instance
def _list_vnf_instance(self, **kwargs):
resp, vnf_instances = self.http_client.do_request(
self.base_vnf_instances_url, "GET")
return resp, vnf_instances
def _wait_vnf_instance(self, id,
instantiation_state=fields.VnfInstanceState.INSTANTIATED,
timeout=VNF_INSTANTIATE_TIMEOUT):
start_time = int(time.time())
while True:
resp, body = self._show_vnf_instance(id)
if body['instantiationState'] == instantiation_state:
break
if ((int(time.time()) - start_time) > timeout):
error = ("Vnf instance %(id)s status is %(current)s, "
"expected status should be %(expected)s")
self.fail(error % {"id": id,
"current": body['instantiationState'],
"expected": instantiation_state})
time.sleep(5)
def _instantiate_vnf_instance(self, id, request_body):
url = os.path.join(self.base_vnf_instances_url, id, "instantiate")
resp, body = self.http_client.do_request(url, "POST",
body=jsonutils.dumps(request_body))
return resp, body
def _heal_vnf_instance(self, vnf_instance_id, request_body):
url = \
os.path.join(self.base_vnf_instances_url, vnf_instance_id, "heal")
resp, body = self.http_client.do_request(url, "POST",
body=jsonutils.dumps(request_body))
return resp, body
def _terminate_vnf_instance(self, id, request_body):
url = os.path.join(self.base_vnf_instances_url, id, "terminate")
resp, body = self.http_client.do_request(url, "POST",
body=jsonutils.dumps(request_body))
return resp, body
def _wait_terminate_vnf_instance(self, id, timeout=None):
start_time = int(time.time())
self._wait_vnf_instance(id,
instantiation_state=fields.VnfInstanceState.NOT_INSTANTIATED,
timeout=timeout)
# If gracefulTerminationTimeout is set, check whether vnf
# instantiation_state is set to NOT_INSTANTIATED after
# gracefulTerminationTimeout seconds.
if timeout and int(time.time()) - start_time < timeout:
self.fail("Vnf is terminated before graceful termination"
"timeout period")
else:
return
# wait for status completion
time.sleep(VNF_DELETE_COMPLETION_WAIT)
def _get_heat_stack(self, vnf_instance_id, prefix_id='vnflcm_'):
try:
stacks = self.h_client.stacks.list()
except Exception:
print("heat-stacks does not exists.")
return None
target_stack_name = prefix_id + vnf_instance_id
target_stakcs = list(
filter(
lambda x: x.stack_name == target_stack_name,
stacks))
if len(target_stakcs) == 0:
return None
return target_stakcs[0]
def _get_heat_resource_list(self, stack_id, nested_depth=0):
try:
resources = self.h_client.resources.list(
stack_id, nested_depth=nested_depth)
except Exception:
print("heat-stacks-resources does not exists.")
return None
return resources
def _get_heat_resource(self, stack_id, resource_name):
try:
resource = self.h_client.resources.get(
stack_id, resource_name)
except Exception:
print("heat-stacks-resource does not exists.")
return None
return resource
def _get_image_id_from_resource_attributes(self, stack_resource_details):
if stack_resource_details is None:
return None
if not hasattr(stack_resource_details, 'attributes'):
return None
return stack_resource_details.attributes.get('image', {}).get('id')
def _get_vnfc_instance_id_list(
self,
stack_id,
resource_type='OS::Nova::Server',
nested_depth=2,
limit=2):
resources = self._get_heat_resource_list(
stack_id, nested_depth=nested_depth)
if resources is None:
return None
return [r.physical_resource_id for r in resources[:limit]
if r.resource_type == resource_type]
def assert_http_header_location_for_create(self, response_header):
"""Validate URI in location header for CreateVNF
{apiRoot}/vnflcm/v1/vnf_instances/{vnfInstanceId}/instantiate
"""
location = response_header.get(
"Location") or response_header.get("location")
self.assertIsNotNone(location)
uri = urlparse(location)
self.assertIn(uri.scheme, ['http', 'https'])
self.assertRegex(
uri.path,
r'^/(?P<apiRoot>[^/]*?/)?vnflcm/v1/vnf_instances/' +
UUID_RE)
def assert_http_header_location_for_lcm_op_occs(self, response_header):
"""Validate URI in location header for various LCMs
{apiRoot}/vnflcm/v1/vnf_lcm_op_occs/{vnfLcmOpOccId}
"""
location = response_header.get(
"Location") or response_header.get("location")
self.assertIsNotNone(location)
uri = urlparse(location)
self.assertIn(uri.scheme, ['http', 'https'])
self.assertRegex(
uri.path,
r'^/(?P<apiRoot>[^/]*?/)?vnflcm/v1/vnf_lcm_op_occs/' +
UUID_RE)
def assert_http_header_location_for_subscription(self, response_header):
"""Validate URI in location header for Subscription
{apiRoot}/vnflcm/v1/subscriptions/{subscriptionId}
"""
location = response_header.get(
"Location") or response_header.get("location")
self.assertIsNotNone(location)
uri = urlparse(location)
self.assertIn(uri.scheme, ['http', 'https'])
self.assertRegex(
uri.path,
r'^/(?P<apiRoot>[^/]*?/)?vnflcm/v1/subscriptions/' +
UUID_RE)
def assert_instantiation_state(
self,
vnf_instance_body,
expected_instantiation_state=fields.VnfInstanceState.INSTANTIATED):
# FT-checkpoint: Instantiation state(VNF instance)
self.assertEqual(
expected_instantiation_state,
vnf_instance_body['instantiationState'])
def assert_vnf_state(
self,
vnf_instance_body,
expected_vnf_state=fields.VnfOperationalStateType.STARTED):
# FT-checkpoint: vnf_state
self.assertEqual(
expected_vnf_state,
vnf_instance_body['instantiatedVnfInfo']['vnfState'])
def assert_heat_stack_status(
self,
vnf_instance_id,
expected_stack_status=infra_cnst.STACK_CREATE_COMPLETE):
stack = self._get_heat_stack(vnf_instance_id)
self.assertEqual(
expected_stack_status,
stack.stack_status)
def assert_heat_resource_status(
self,
vnf_instance,
expected_glance_image=None,
expected_resource_status=None):
def assert_glance_image(stack_id, resource_name):
resource_details = self._get_heat_resource(stack_id, resource_name)
image = self._get_image_id_from_resource_attributes(
resource_details)
if image:
self.assertEqual(expected_glance_image, image.status)
stack = self._get_heat_stack(vnf_instance['id'])
resources = self._get_heat_resource_list(stack.id, 2)
self.assertIsNotNone(resources)
for resource in resources:
# FT-checkpoint: resource status
self.assertEqual(expected_resource_status,
resource.resource_status)
# FT-checkpoint: Glance-image
if expected_glance_image:
assert_glance_image(stack.id, resource.resource_name)
def assert_heat_resource_status_is_none(
self,
stack_id,
resources_name_list=None,
glance_image_id_list=None):
resources_name_list = resources_name_list or []
for resource_name in resources_name_list:
resource = self._get_heat_resource(stack_id, resource_name)
self.assertIsNone(resource)
glance_image_id_list = glance_image_id_list or []
for glance_image_id in glance_image_id_list:
image = self._get_glance_image(glance_image_id)
self.assertIsNone(image)
def _wait_lcm_done(self, expected_operation_status=None):
start_time = int(time.time())
while True:
actual_status = None
vnf_lcm_op_occ_id = None
notify_mock_responses = FAKE_SERVER_MANAGER.get_history(
MOCK_NOTIFY_CALLBACK_URL)
for res in notify_mock_responses:
if expected_operation_status is None:
return
actual_status = res.request_body.get('operationState', '')
vnf_lcm_op_occ_id = res.request_body.get('vnfLcmOpOccId', '')
if actual_status == expected_operation_status:
return
if ((int(time.time()) - start_time) > VNF_LCM_DONE_TIMEOUT):
if actual_status:
error = (
"LCM incomplete timeout, %s is %s," +
"expected status should be %s")
self.fail(
error % {
"vnf_lcm_op_occ_id": vnf_lcm_op_occ_id,
"expected": expected_operation_status,
"actual": actual_status})
else:
self.fail("LCM incomplete timeout")
time.sleep(RETRY_WAIT_TIME)
def _wait_stack_update(self, vnf_instance_id, expected_status):
timeout = VNF_HEAL_TIMEOUT
start_time = int(time.time())
while True:
stack = self._get_heat_stack(vnf_instance_id)
if stack.stack_status == expected_status:
break
if ((int(time.time()) - start_time) > timeout):
error = ("Stack %(id)s status is %(current)s, expected status "
"should be %(expected)s")
self.fail(error % {"vnf_instance_name": vnf_instance_id,
"current": stack.status,
"expected": expected_status})
time.sleep(RETRY_WAIT_TIME)
def assert_create_vnf(self, resp, vnf_instance):
self.assertEqual(201, resp.status_code)
self.assert_http_header_location_for_create(resp.headers)
self.assert_instantiation_state(
vnf_instance,
fields.VnfInstanceState.NOT_INSTANTIATED)
# FT-checkpoint: Notification
notify_mock_responses = FAKE_SERVER_MANAGER.get_history(
MOCK_NOTIFY_CALLBACK_URL)
FAKE_SERVER_MANAGER.clear_history(MOCK_NOTIFY_CALLBACK_URL)
self.assertEqual(1, len(notify_mock_responses))
self.assert_notification_mock_response(
notify_mock_responses[0],
'VnfIdentifierCreationNotification')
def assert_delete_vnf(self, resp, vnf_instance_id):
self.assertEqual(204, resp.status_code)
resp, _ = self._show_vnf_instance(vnf_instance_id)
self.assertEqual(404, resp.status_code)
# FT-checkpoint: Notification
notify_mock_responses = FAKE_SERVER_MANAGER.get_history(
MOCK_NOTIFY_CALLBACK_URL)
FAKE_SERVER_MANAGER.clear_history(MOCK_NOTIFY_CALLBACK_URL)
self.assertEqual(1, len(notify_mock_responses))
self.assert_notification_mock_response(
notify_mock_responses[0],
'VnfIdentifierDeletionNotification')
def assert_instantiate_vnf(
self,
resp,
vnf_instance_id):
self.assertEqual(202, resp.status_code)
resp, vnf_instance = self._show_vnf_instance(vnf_instance_id)
self.assert_vnf_state(vnf_instance)
self.assert_heat_stack_status(vnf_instance['id'])
self.assert_heat_resource_status(
vnf_instance,
expected_glance_image='active',
expected_resource_status='CREATE_COMPLETE')
# FT-checkpoint: Notification
notify_mock_responses = FAKE_SERVER_MANAGER.get_history(
MOCK_NOTIFY_CALLBACK_URL)
FAKE_SERVER_MANAGER.clear_history(MOCK_NOTIFY_CALLBACK_URL)
self.assertEqual(3, len(notify_mock_responses))
self.assert_notification_mock_response(
notify_mock_responses[0],
'VnfLcmOperationOccurrenceNotification',
'STARTING')
self.assert_notification_mock_response(
notify_mock_responses[1],
'VnfLcmOperationOccurrenceNotification',
'PROCESSING')
self.assert_notification_mock_response(
notify_mock_responses[2],
'VnfLcmOperationOccurrenceNotification',
'COMPLETED')
def assert_heal_vnf(
self,
resp,
vnf_instance_id,
expected_stack_status='UPDATE_COMPLETE'):
self.assertEqual(202, resp.status_code)
resp, vnf_instance = self._show_vnf_instance(vnf_instance_id)
self.assert_vnf_state(vnf_instance)
self.assert_instantiation_state(vnf_instance)
self.assert_heat_stack_status(
vnf_instance['id'],
expected_stack_status=expected_stack_status)
# FT-checkpoint: Notification
notify_mock_responses = FAKE_SERVER_MANAGER.get_history(
MOCK_NOTIFY_CALLBACK_URL)
FAKE_SERVER_MANAGER.clear_history(MOCK_NOTIFY_CALLBACK_URL)
self.assertEqual(3, len(notify_mock_responses))
self.assert_notification_mock_response(
notify_mock_responses[0],
'VnfLcmOperationOccurrenceNotification',
'STARTING')
self.assert_notification_mock_response(
notify_mock_responses[1],
'VnfLcmOperationOccurrenceNotification',
'PROCESSING')
self.assert_notification_mock_response(
notify_mock_responses[2],
'VnfLcmOperationOccurrenceNotification',
'COMPLETED')
def assert_terminate_vnf(
self,
resp,
vnf_instance_id,
stack_id,
resource_name_list,
glance_image_id_list):
self.assertEqual(202, resp.status_code)
resp, vnf_instance = self._show_vnf_instance(vnf_instance_id)
self.assert_instantiation_state(
vnf_instance,
fields.VnfInstanceState.NOT_INSTANTIATED)
# FT-checkpoint: Heat stack status.
stack = self._get_heat_stack(vnf_instance_id)
self.assertIsNone(stack)
self.assert_heat_resource_status_is_none(
stack_id,
resources_name_list=resource_name_list,
glance_image_id_list=glance_image_id_list)
# FT-checkpoint: Notification
notify_mock_responses = FAKE_SERVER_MANAGER.get_history(
MOCK_NOTIFY_CALLBACK_URL)
FAKE_SERVER_MANAGER.clear_history(MOCK_NOTIFY_CALLBACK_URL)
self.assertEqual(3, len(notify_mock_responses))
self.assert_notification_mock_response(
notify_mock_responses[0],
'VnfLcmOperationOccurrenceNotification',
'STARTING')
self.assert_notification_mock_response(
notify_mock_responses[1],
'VnfLcmOperationOccurrenceNotification',
'PROCESSING')
self.assert_notification_mock_response(
notify_mock_responses[2],
'VnfLcmOperationOccurrenceNotification',
'COMPLETED')
def assert_notification_mock_response(
self,
notify_mock_response,
expected_notify_types,
expected_operation_status=None):
self.assertEqual(204, notify_mock_response.status_code)
self.assertEqual(
expected_notify_types,
notify_mock_response.request_body['notificationType'])
if expected_operation_status:
self.assertEqual(
expected_operation_status,
notify_mock_response.request_body['operationState'])
def _create_network(self, name):
# First, we have to check network name passed by caller is
# already exists or not.
netlist = self.neutronclient().list_networks(name=name)
if netlist is not None:
print('%s is already exist' % name)
# OK, we can create this.
net = self.neutronclient().create_network({'network': {'name': name}})
net_id = net['network']['id']
self.addCleanup(self.neutronclient().delete_network, net_id)
return net_id
def _create_subnet(self, network):
body = {'subnet': {'network_id': network['id'],
'name': "subnet-%s" % uuidutils.generate_uuid(),
'cidr': "22.22.{}.0/24".format(str(len(self.ext_subnets) % 2)),
'ip_version': 4,
'gateway_ip': '22.22.0.1',
"enable_dhcp": True}}
subnet = self.neutronclient().create_subnet(body=body)["subnet"]
self.addCleanup(self.neutronclient().delete_subnet, subnet['id'])
return subnet['id']
def _create_port(self, network_id):
body = {'port': {'network_id': network_id}}
port = self.neutronclient().create_port(body=body)["port"]
self.addCleanup(self.neutronclient().delete_port, port['id'])
return port['id']
def assert_subscription_show(self, resp, response_body):
"""Assert that subscription informations has mandatory keys."""
self.assertEqual(200, resp.status_code)
self.assertIsNotNone(response_body.get('id'))
_filter = response_body.get('filter')
self.assertIsNotNone(_filter)
self.assertIsNotNone(_filter.get('notificationTypes'))
self.assertIsNotNone(_filter.get('operationTypes'))
self.assertIsNotNone(response_body.get('callbackUri'))
_links = response_body.get('_links')
self.assertIsNotNone(_links)
self.assertIsNotNone(_links.get('self'))
self.assertIsNotNone(_links.get('self').get('href'))

View File

@ -0,0 +1,216 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tacker.tests import uuidsentinel
class Subscription:
@staticmethod
def make_create_request_body(callback_uri):
"""Parameter selection policy.
Set all Notification types and all life cycle types for filter.
Specify OAuth2 for authentication do not set authentication.
Args:
callback_uri (str): Notification URI.
Returns:
dict: Request body
"""
return {
"filter": {
"notificationTypes": [
"VnfLcmOperationOccurrenceNotification",
"VnfIdentifierCreationNotification",
"VnfIdentifierDeletionNotification"
],
"operationTypes": [
"INSTANTIATE",
"SCALE",
"TERMINATE",
"HEAL",
"MODIFY_INFO"
]
},
"callbackUri": callback_uri
}
class VnfInstances:
@staticmethod
def make_create_request_body(vnfd_id):
return {
"vnfdId": vnfd_id,
"vnfInstanceName": "helloworld3",
"vnfInstanceDescription": "Sample VNF",
"metadata": {
"samplekey": "samplevalue"
}
}
@staticmethod
def make_inst_request_body(
tenant_id,
networks_id,
ext_mngd_networks_id,
external_ports_id,
external_subnets_id):
ext_vdu1_cp1 = {
"cpdId": "VDU1_CP1",
"cpConfig": [{
"linkPortId": uuidsentinel.elp1_id
}],
}
ext_vdu2_cp1 = {
"cpdId": "VDU2_CP1",
"cpConfig": [{
"linkPortId": uuidsentinel.elp2_id
}]
}
# set external port_id on vim.
ext_link_port1 = {
"id": uuidsentinel.elp1_id,
"resourceHandle": {
"vimConnectionId": uuidsentinel.vim_connection_id,
"resourceId": external_ports_id[0]
}
}
ext_link_port2 = {
"id": uuidsentinel.elp2_id,
"resourceHandle": {
"vimConnectionId": uuidsentinel.vim_connection_id,
"resourceId": external_ports_id[1]
}
}
ext_virtual_link_cp1 = {
"id": uuidsentinel.evl1_id,
"vimConnectionId": uuidsentinel.vim_connection_id,
# set external nw_id on vim.
"resourceId": networks_id[0],
"extCps": [ext_vdu1_cp1, ext_vdu2_cp1],
"extLinkPorts": [ext_link_port1, ext_link_port2]
}
# set external subet_id on vim.
ext_cps_vdu1_cp2 = {
"cpdId": "VDU1_CP2",
"cpConfig": [{
"cpProtocolData": [{
"layerProtocol": "IP_OVER_ETHERNET",
"ipOverEthernet": {
"ipAddresses": [{
"type": "IPV4",
"numDynamicAddresses": 1,
"subnetId": external_subnets_id[0]
}]
}
}]
}]
}
# set external subet_id on vim.
ext_cps_vdu2_cp2 = {
"cpdId": "VDU2_CP2",
"cpConfig": [{
"cpProtocolData": [{
"layerProtocol": "IP_OVER_ETHERNET",
"ipOverEthernet": {
"ipAddresses": [{
"type": "IPV4",
"numDynamicAddresses": "1",
"subnetId": external_subnets_id[1]
}]
}
}]
}]
}
ext_virtual_link_cp2 = {
"id": uuidsentinel.evl2_id,
"vimConnectionId": uuidsentinel.vim_connection_id,
"resourceId": networks_id[1],
"extCps": [
ext_cps_vdu1_cp2, ext_cps_vdu2_cp2
]
}
# set extManaged internal nw_id on vim.
ext_mng_vtl_lnks = [{
"id": uuidsentinel.emvl1_id,
"vnfVirtualLinkDescId": "internalVL1",
"vimConnectionId": uuidsentinel.vim_connection_id,
"resourceId": ext_mngd_networks_id[0]
}, {
"id": uuidsentinel.emvl2_id,
"vnfVirtualLinkDescId": "internalVL2",
"vimConnectionId": uuidsentinel.vim_connection_id,
"resourceId": ext_mngd_networks_id[1]
}]
data = {
"flavourId": "simple",
"instantiationLevelId": "instantiation_level_1",
"extVirtualLinks": [
ext_virtual_link_cp1, ext_virtual_link_cp2
],
"extManagedVirtualLinks": ext_mng_vtl_lnks,
"vimConnectionInfo": [{
"id": uuidsentinel.vim_connection_id,
"vimType": "ETSINFV.OPENSTACK_KEYSTONE.v_2",
"interfaceInfo": {
"endpoint": "http://127.0.0.1/identity"
},
"accessInfo": {
"username": "nfv_user",
"region": "RegionOne",
"password": "devstack",
"tenant": tenant_id
}
}],
"additionalParams": {
"lcm-operation-user-data": "./UserData/lcm_user_data.py",
"lcm-operation-user-data-class": "SampleUserData"
}
}
return data
@staticmethod
def make_heal_request_body(vnfc_instance_id=None):
data = {
"cause": "ManualHealing"
}
if vnfc_instance_id:
data["vnfcInstanceId"] = vnfc_instance_id
return data
@staticmethod
def make_term_request_body():
"""Parameter selection policy.
As all parameters are set, GRACEFUL is specified for terminationType.
(to specify gracefulTerminationTimeout)
Returns:
dict: Request body
"""
return {
"terminationType": "GRACEFUL",
"gracefulTerminationTimeout": 1,
"additionalParams": {
"samplekey": "samplevalue"
}
}

View File

@ -11,231 +11,18 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
import tempfile
import time
import yaml
import zipfile
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
from tacker.objects import fields
from tacker.tests.functional import base
from tacker.tests import utils
from tacker.tests.functional.vnflcm import base as vnflcm_base
import time
VNF_PACKAGE_UPLOAD_TIMEOUT = 60
VNF_INSTANTIATE_TIMEOUT = 60
VNF_TERMINATE_TIMEOUT = 60
VNF_INSTANTIATE_ERROR_WAIT = 80
VNF_DELETE_COMPLETION_WAIT = 60
def _get_external_virtual_links(net0_id):
return [
{
"id": "net0",
"resourceId": net0_id,
"extCps": [{
"cpdId": "CP1",
"cpConfig": [{
"cpProtocolData": [{
"layerProtocol": "IP_OVER_ETHERNET",
}]
}]
}]
}
]
def _create_csar_with_unique_vnfd_id(csar_dir):
unique_id = uuidutils.generate_uuid()
tempfd, tempname = tempfile.mkstemp(suffix=".zip",
dir=os.path.dirname(csar_dir))
os.close(tempfd)
common_dir = os.path.join(csar_dir, "../common/")
ud_common_dir = os.path.join(csar_dir, "../user_data_common/")
zcsar = zipfile.ZipFile(tempname, 'w')
target_dir_list = [csar_dir, common_dir, ud_common_dir]
_write_zipfile(zcsar, unique_id, target_dir_list)
zcsar.close()
return tempname, unique_id
def _write_zipfile(zcsar, unique_id, target_dir_list):
for target_dir in target_dir_list:
for (dpath, _, fnames) in os.walk(target_dir):
if not fnames:
continue
for fname in fnames:
src_file = os.path.join(dpath, fname)
dst_file = os.path.relpath(
os.path.join(dpath, fname), target_dir)
if fname.endswith('.yaml') or fname.endswith('.yml'):
with open(src_file, 'rb') as yfile:
data = yaml.safe_load(yfile)
utils._update_unique_id_in_yaml(data, unique_id)
zcsar.writestr(dst_file, yaml.dump(
data, default_flow_style=False,
allow_unicode=True))
else:
zcsar.write(src_file, dst_file)
def _create_and_upload_vnf_package(tacker_client, csar_package_name,
user_defined_data):
# create vnf package
body = jsonutils.dumps({"userDefinedData": user_defined_data})
resp, vnf_package = tacker_client.do_request(
'/vnfpkgm/v1/vnf_packages', "POST", body=body)
# upload vnf package
csar_package_path = \
"../../etc/samples/etsi/nfv/%s" % csar_package_name
file_path = os.path.abspath(os.path.join(os.path.dirname(__file__),
csar_package_path))
# Generating unique vnfd id. This is required when multiple workers
# are running concurrently. The call below creates a new temporary
# CSAR with unique vnfd id.
file_path, uniqueid = _create_csar_with_unique_vnfd_id(file_path)
with open(file_path, 'rb') as file_object:
resp, resp_body = tacker_client.do_request(
'/vnfpkgm/v1/vnf_packages/{id}/package_content'.format(
id=vnf_package['id']),
"PUT", body=file_object, content_type='application/zip')
# wait for onboard
timeout = VNF_PACKAGE_UPLOAD_TIMEOUT
start_time = int(time.time())
show_url = os.path.join('/vnfpkgm/v1/vnf_packages', vnf_package['id'])
vnfd_id = None
while True:
resp, body = tacker_client.do_request(show_url, "GET")
if body['onboardingState'] == "ONBOARDED":
vnfd_id = body['vnfdId']
break
if ((int(time.time()) - start_time) > timeout):
raise Exception("Failed to onboard vnf package")
time.sleep(1)
# remove temporarily created CSAR file
os.remove(file_path)
return vnf_package['id'], vnfd_id
class VnfLcmWithUserDataTest(base.BaseTackerTest):
def setUp(self):
super(VnfLcmWithUserDataTest, self).setUp()
self.tacker_client = base.BaseTackerTest.tacker_http_client()
self.base_url = "/vnflcm/v1/vnf_instances"
vim_list = self.client.list_vims()
self.vim = self.get_vim(vim_list, 'VIM0')
if not self.vim:
assert False, "vim_list is Empty: Default VIM is missing"
neutron_client = self.neutronclient()
net = neutron_client.list_networks()
networks = {}
for network in net['networks']:
networks[network['name']] = network['id']
net0_id = networks.get('net0')
if not net0_id:
self.fail("net0 network is not available")
self.ext_vl = _get_external_virtual_links(net0_id)
def _create_instantiate_vnf_request_body(self, flavour_id,
instantiation_level_id=None, vim_id=None, ext_vl=None,
add_params=None):
request_body = {"flavourId": flavour_id}
if instantiation_level_id:
request_body["instantiationLevelId"] = instantiation_level_id
if ext_vl:
request_body["extVirtualLinks"] = ext_vl
if vim_id:
request_body["vimConnectionInfo"] = [
{"id": uuidutils.generate_uuid(),
"vimId": vim_id,
"vimType": "ETSINFV.OPENSTACK_KEYSTONE.v_2"}]
if add_params:
request_body["additionalParams"] = add_params
return request_body
def _create_vnf_instance(self, vnfd_id, vnf_instance_name=None,
vnf_instance_description=None):
request_body = {'vnfdId': vnfd_id}
if vnf_instance_name:
request_body['vnfInstanceName'] = vnf_instance_name
if vnf_instance_description:
request_body['vnfInstanceDescription'] = vnf_instance_description
resp, response_body = self.http_client.do_request(
self.base_url, "POST", body=jsonutils.dumps(request_body))
return resp, response_body
def _delete_vnf_instance(self, id):
url = os.path.join(self.base_url, id)
resp, body = self.http_client.do_request(url, "DELETE")
self.assertEqual(204, resp.status_code)
# verify vnf instance is deleted
url = os.path.join(self.base_url, id)
resp, body = self.http_client.do_request(url, "GET")
self.assertEqual(404, resp.status_code)
def _show_vnf_instance(self, id, expected_result=None):
show_url = os.path.join(self.base_url, id)
resp, vnf_instance = self.http_client.do_request(show_url, "GET")
self.assertEqual(200, resp.status_code)
if expected_result:
self.assertDictSupersetOf(expected_result, vnf_instance)
return vnf_instance
def _vnf_instance_wait(self, id,
instantiation_state=fields.VnfInstanceState.INSTANTIATED,
timeout=VNF_INSTANTIATE_TIMEOUT):
show_url = os.path.join(self.base_url, id)
start_time = int(time.time())
while True:
resp, body = self.http_client.do_request(show_url, "GET")
if body['instantiationState'] == instantiation_state:
break
if ((int(time.time()) - start_time) > timeout):
error = ("Vnf instance %(id)s status is %(current)s, "
"expected status should be %(expected)s")
self.fail(error % {"id": id,
"current": body['instantiationState'],
"expected": instantiation_state})
time.sleep(5)
class VnfLcmWithUserDataTest(vnflcm_base.BaseVnfLcmTest):
def _vnf_instance_wait_until_fail_detected(self, id,
instantiation_state=fields.VnfInstanceState.NOT_INSTANTIATED,
timeout=VNF_INSTANTIATE_ERROR_WAIT):
show_url = os.path.join(self.base_url, id)
time.sleep(VNF_INSTANTIATE_ERROR_WAIT)
resp, body = self.http_client.do_request(show_url, "GET")
timeout=vnflcm_base.VNF_INSTANTIATE_ERROR_WAIT):
time.sleep(timeout)
_, body = self._show_vnf_instance(id)
if body['instantiationState'] != instantiation_state:
error = ("Vnf instance %(id)s status is %(current)s, "
"expected status should be %(expected)s")
@ -243,259 +30,92 @@ class VnfLcmWithUserDataTest(base.BaseTackerTest):
"current": body['instantiationState'],
"expected": instantiation_state})
def _instantiate_vnf_instance(self, id, request_body):
url = os.path.join(self.base_url, id, "instantiate")
resp, body = self.http_client.do_request(url, "POST",
body=jsonutils.dumps(request_body))
self.assertEqual(202, resp.status_code)
self._vnf_instance_wait(id)
def _instantiate_vnf_instance_fail(self, id, request_body):
url = os.path.join(self.base_url, id, "instantiate")
resp, body = self.http_client.do_request(url, "POST",
body=jsonutils.dumps(request_body))
resp, _ = self._instantiate_vnf_instance(id, request_body)
self.assertEqual(202, resp.status_code)
# Confirm that the state doesn't change from NOT_INSTANTIATED.
self._vnf_instance_wait_until_fail_detected(id)
def _terminate_vnf_instance(self, id, request_body):
url = os.path.join(self.base_url, id, "terminate")
resp, body = self.http_client.do_request(url, "POST",
body=jsonutils.dumps(request_body))
self.assertEqual(202, resp.status_code)
timeout = request_body.get('gracefulTerminationTimeout')
def _wait_show_subscription(self, subscription_id):
# wait for onboard
timeout = vnflcm_base.VNF_SUBSCRIPTION_TIMEOUT
start_time = int(time.time())
while True:
resp, body = self._show_subscription(subscription_id)
if resp.ok:
return resp, body
self._vnf_instance_wait(id,
instantiation_state=fields.VnfInstanceState.NOT_INSTANTIATED,
timeout=VNF_TERMINATE_TIMEOUT)
if ((int(time.time()) - start_time) > timeout):
if resp:
resp.raise_for_status()
raise Exception("Failed to show_subscription")
# If gracefulTerminationTimeout is set, check whether vnf
# instantiation_state is set to NOT_INSTANTIATED after
# gracefulTerminationTimeout seconds.
if timeout and int(time.time()) - start_time < timeout:
self.fail("Vnf is terminated before graceful termination "
"timeout period")
time.sleep(1)
# wait for status completion
time.sleep(VNF_DELETE_COMPLETION_WAIT)
def assert_create_vnf(self, resp, vnf_instance, vnf_pkg_id):
super().assert_create_vnf(resp, vnf_instance)
def _delete_vnf_package(self, vnf_package_id):
url = '/vnfpkgm/v1/vnf_packages/%s' % vnf_package_id
resp, vnf_pkg_info = vnflcm_base._show_vnf_package(
self.tacker_client, vnf_pkg_id)
self.assert_vnf_package_usage_state(vnf_pkg_info)
# Update vnf package before delete
req_body = jsonutils.dumps({"operationalState": "DISABLED"})
self.tacker_client.do_request(url, "PATCH", body=req_body)
def assert_delete_vnf(self, resp, vnf_instance_id, vnf_pkg_id):
super().assert_delete_vnf(resp, vnf_instance_id)
# Delete vnf package before delete
self.tacker_client.do_request(url, "DELETE")
resp, vnf_pkg_info = vnflcm_base._show_vnf_package(
self.tacker_client, vnf_pkg_id)
self.assert_vnf_package_usage_state(
vnf_pkg_info,
expected_usage_state=fields.PackageUsageStateType.NOT_IN_USE)
def test_instantiate_vnf_normal(self):
# Create vnf package
sample_name = "user_data_sample_normal"
vnf_package_id, vnfd_id = _create_and_upload_vnf_package(
self.tacker_client, sample_name, {"key": sample_name})
def assert_instantiate_vnf(
self,
resp,
vnf_instance_id,
vnf_pkg_id):
super().assert_instantiate_vnf(resp, vnf_instance_id)
# Reserve deleting vnf package
self.addCleanup(self._delete_vnf_package, vnf_package_id)
resp, vnf_pkg_info = vnflcm_base._show_vnf_package(
self.tacker_client, vnf_pkg_id)
self.assert_vnf_package_usage_state(vnf_pkg_info)
# Settings
vnf_instance_name = "vnf_with_user_data-%s" % \
uuidutils.generate_uuid()
vnf_instance_description = "vnf_with_user_data_normal"
def assert_heal_vnf(
self,
resp,
vnf_instance_id,
vnf_pkg_id,
expected_stack_status='UPDATE_COMPLETE'):
super().assert_heal_vnf(
resp, vnf_instance_id, expected_stack_status=expected_stack_status)
add_params = {
"lcm-operation-user-data": "./UserData/lcm_user_data.py",
"lcm-operation-user-data-class": "SampleUserData"}
resp, vnf_pkg_info = vnflcm_base._show_vnf_package(
self.tacker_client, vnf_pkg_id)
self.assert_vnf_package_usage_state(vnf_pkg_info)
# Create vnf instance
resp, vnf_instance = self._create_vnf_instance(vnfd_id,
vnf_instance_name=vnf_instance_name,
vnf_instance_description=vnf_instance_description)
def assert_terminate_vnf(
self,
resp,
vnf_instance_id,
stack_id,
resource_name_list,
glance_image_id_list,
vnf_pkg_id):
super().assert_terminate_vnf(
resp,
vnf_instance_id,
stack_id,
resource_name_list,
glance_image_id_list)
self.assertIsNotNone(vnf_instance['id'])
self.assertEqual(201, resp.status_code)
resp, vnf_pkg_info = vnflcm_base._show_vnf_package(
self.tacker_client, vnf_pkg_id)
self.assert_vnf_package_usage_state(vnf_pkg_info)
# Reserve deleting vnf instance
self.addCleanup(self._delete_vnf_instance, vnf_instance['id'])
request_body = self._create_instantiate_vnf_request_body("simple",
vim_id=self.vim['id'], ext_vl=self.ext_vl, add_params=add_params)
self._instantiate_vnf_instance(vnf_instance['id'], request_body)
vnf_instance = self._show_vnf_instance(vnf_instance['id'])
vdu_count = len(vnf_instance['instantiatedVnfInfo']
['vnfcResourceInfo'])
self.assertEqual(1, vdu_count)
# Terminate vnf forcefully
terminate_req_body = {
"terminationType": fields.VnfInstanceTerminationType.FORCEFUL
}
self._terminate_vnf_instance(vnf_instance['id'], terminate_req_body)
def test_instantiate_vnf_basehot_invalid(self):
# Create vnf package
sample_name = "user_data_sample_basehot_invalid"
vnf_package_id, vnfd_id = _create_and_upload_vnf_package(
self.tacker_client, sample_name, {"key": sample_name})
# Reserve deleting vnf package
self.addCleanup(self._delete_vnf_package, vnf_package_id)
# Settings
vnf_instance_name = "vnf_with_user_data-%s" % \
uuidutils.generate_uuid()
vnf_instance_description = "vnf_with_user_data_basehot_invalid"
add_params = {
"lcm-operation-user-data": "./UserData/lcm_user_data.py",
"lcm-operation-user-data-class": "SampleUserData"}
# Create vnf instance
resp, vnf_instance = self._create_vnf_instance(vnfd_id,
vnf_instance_name=vnf_instance_name,
vnf_instance_description=vnf_instance_description)
self.assertIsNotNone(vnf_instance['id'])
self.assertEqual(201, resp.status_code)
# Reserve deleting vnf instance
self.addCleanup(self._delete_vnf_instance, vnf_instance['id'])
request_body = self._create_instantiate_vnf_request_body("simple",
vim_id=self.vim['id'], ext_vl=self.ext_vl, add_params=add_params)
self._instantiate_vnf_instance_fail(vnf_instance['id'], request_body)
def test_instantiate_vnf_userdata_timeout(self):
# Create vnf package
sample_name = "user_data_sample_userdata_timeout"
vnf_package_id, vnfd_id = _create_and_upload_vnf_package(
self.tacker_client, sample_name, {"key": sample_name})
# Reserve deleting vnf package
self.addCleanup(self._delete_vnf_package, vnf_package_id)
# Settings
vnf_instance_name = "vnf_with_user_data-%s" % \
uuidutils.generate_uuid()
vnf_instance_description = "vnf_with_user_data_timeout"
add_params = {
"lcm-operation-user-data": "./UserData/lcm_user_data_sleeping.py",
"lcm-operation-user-data-class": "SampleUserData"}
# Create vnf instance
resp, vnf_instance = self._create_vnf_instance(vnfd_id,
vnf_instance_name=vnf_instance_name,
vnf_instance_description=vnf_instance_description)
self.assertIsNotNone(vnf_instance['id'])
self.assertEqual(201, resp.status_code)
# Reserve deleting vnf instance
self.addCleanup(self._delete_vnf_instance, vnf_instance['id'])
request_body = self._create_instantiate_vnf_request_body("simple",
vim_id=self.vim['id'], ext_vl=self.ext_vl, add_params=add_params)
self._instantiate_vnf_instance_fail(vnf_instance['id'], request_body)
def test_instantiate_vnf_userdata_invalid_hot_param(self):
# Create vnf package
sample_name = "user_data_sample_userdata_invalid_hot_param"
vnf_package_id, vnfd_id = _create_and_upload_vnf_package(
self.tacker_client, sample_name, {"key": sample_name})
# Reserve deleting vnf package
self.addCleanup(self._delete_vnf_package, vnf_package_id)
# Settings
vnf_instance_name = "vnf_with_user_data-%s" % \
uuidutils.generate_uuid()
vnf_instance_description = "vnf_with_user_data_timeout"
add_params = {
"lcm-operation-user-data": "./UserData/"
"lcm_user_data_invalid_hot_param.py",
"lcm-operation-user-data-class": "SampleUserData"}
# Create vnf instance
resp, vnf_instance = self._create_vnf_instance(vnfd_id,
vnf_instance_name=vnf_instance_name,
vnf_instance_description=vnf_instance_description)
self.assertIsNotNone(vnf_instance['id'])
self.assertEqual(201, resp.status_code)
# Reserve deleting vnf instance
self.addCleanup(self._delete_vnf_instance, vnf_instance['id'])
request_body = self._create_instantiate_vnf_request_body("simple",
vim_id=self.vim['id'], ext_vl=self.ext_vl, add_params=add_params)
self._instantiate_vnf_instance_fail(vnf_instance['id'], request_body)
def test_instantiate_vnf_userdata_none(self):
# Create vnf package
sample_name = "user_data_sample_userdata_none"
vnf_package_id, vnfd_id = _create_and_upload_vnf_package(
self.tacker_client, sample_name, {"key": sample_name})
# Reserve deleting vnf package
self.addCleanup(self._delete_vnf_package, vnf_package_id)
# Settings
vnf_instance_name = "vnf_with_user_data-%s" % \
uuidutils.generate_uuid()
vnf_instance_description = "vnf_with_user_data_timeout"
add_params = {
"lcm-operation-user-data": "./UserData/lcm_user_data.py",
"lcm-operation-user-data-class": "SampleUserData"}
# Create vnf instance
resp, vnf_instance = self._create_vnf_instance(vnfd_id,
vnf_instance_name=vnf_instance_name,
vnf_instance_description=vnf_instance_description)
self.assertIsNotNone(vnf_instance['id'])
self.assertEqual(201, resp.status_code)
# Reserve deleting vnf instance
self.addCleanup(self._delete_vnf_instance, vnf_instance['id'])
request_body = self._create_instantiate_vnf_request_body("simple",
vim_id=self.vim['id'], ext_vl=self.ext_vl, add_params=add_params)
self._instantiate_vnf_instance_fail(vnf_instance['id'], request_body)
def test_instantiate_vnf_userdata_invalid_script(self):
# Create vnf package
sample_name = "user_data_sample_userdata_invalid_script"
vnf_package_id, vnfd_id = _create_and_upload_vnf_package(
self.tacker_client, sample_name, {"key": sample_name})
# Reserve deleting vnf package
self.addCleanup(self._delete_vnf_package, vnf_package_id)
# Settings
vnf_instance_name = "vnf_with_user_data-%s" % \
uuidutils.generate_uuid()
vnf_instance_description = "vnf_with_user_data_timeout"
add_params = {
"lcm-operation-user-data": "./UserData/"
"lcm_user_data_invalid_script.py",
"lcm-operation-user-data-class": "SampleUserData"}
# Create vnf instance
resp, vnf_instance = self._create_vnf_instance(vnfd_id,
vnf_instance_name=vnf_instance_name,
vnf_instance_description=vnf_instance_description)
self.assertIsNotNone(vnf_instance['id'])
self.assertEqual(201, resp.status_code)
# Reserve deleting vnf instance
self.addCleanup(self._delete_vnf_instance, vnf_instance['id'])
request_body = self._create_instantiate_vnf_request_body("simple",
vim_id=self.vim['id'], ext_vl=self.ext_vl, add_params=add_params)
self._instantiate_vnf_instance_fail(vnf_instance['id'], request_body)
def assert_vnf_package_usage_state(
self,
vnf_package_info,
expected_usage_state=fields.PackageUsageStateType.IN_USE):
self.assertEqual(
expected_usage_state,
vnf_package_info['usageState'])