Add Seperated NFVO FT for V2 API

Only test cases using local NFVO are included in the FT of the
current v2 API.
This patch provides a new FT job for the v2 API to test seperated
NFVO scenarios.

1. The split NFVO in this patch is to be simulated using code.
2. This patch will merge the fake_grant_server.py of v1 as a common
   fake_grant_server for users to test manually.

Change-Id: Ic67295d425b19fc6de6f5c49dc02478068c17224
This commit is contained in:
Yi Feng 2022-02-25 14:36:58 +09:00
parent acf84a168a
commit fb72043c8e
109 changed files with 3665 additions and 2021 deletions

View File

@ -306,6 +306,25 @@
stack_retries: 120
tox_envlist: dsvm-functional-sol-separated-nfvo
- job:
name: tacker-functional-devstack-multinode-sol-separated-nfvo-v2
parent: tacker-functional-devstack-multinode-sol
description: |
Multinodes job for SOL devstack-based functional tests
with separated V2 NFVO
host-vars:
controller-tacker:
devstack_local_conf:
post-config:
$TACKER_CONF:
v2_nfvo:
use_external_nfvo: True
endpoint: http://127.0.0.1:9990
token_endpoint: http://127.0.0.1:9990
client_id: 229ec984de7547b2b662e968961af5a4
client_password: devstack
tox_envlist: dsvm-functional-sol-separated-nfvo-v2
- job:
name: tacker-functional-devstack-multinode-sol-kubernetes
parent: devstack
@ -568,6 +587,7 @@
- tacker-functional-devstack-multinode-sol-kubernetes
- tacker-functional-devstack-multinode-libs-master
- tacker-functional-devstack-multinode-sol-v2
- tacker-functional-devstack-multinode-sol-separated-nfvo-v2
- tacker-functional-devstack-multinode-sol-kubernetes-v2
- tacker-functional-devstack-multinode-sol-multi-tenant
- tacker-functional-devstack-multinode-sol-kubernetes-multi-tenant

View File

@ -270,12 +270,12 @@ The following is sample Base HOT corresponding to above sample userdata script.
**top Base HOT**
.. literalinclude:: ../../../tacker/tests/functional/sol_v2/samples/basic_lcms_max/contents/BaseHOT/simple/sample1.yaml
.. literalinclude:: ../../../tacker/tests/functional/sol_v2_common/samples/basic_lcms_max/contents/BaseHOT/simple/sample1.yaml
:language: yaml
**nested Base HOT**
.. literalinclude:: ../../../tacker/tests/functional/sol_v2/samples/basic_lcms_max/contents/BaseHOT/simple/nested/VDU1.yaml
.. literalinclude:: ../../../tacker/tests/functional/sol_v2_common/samples/basic_lcms_max/contents/BaseHOT/simple/nested/VDU1.yaml
:language: yaml

View File

@ -234,4 +234,6 @@ class OAuth2AuthHandle(AuthHandle):
self.client_id, self.client_password)
def get_session(self, auth, service_type):
return session.Session(auth=auth, verify=False)
_session = session.Session(auth=auth, verify=False)
return adapter.Adapter(session=_session,
service_type=service_type)

View File

@ -0,0 +1,501 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ast
import hashlib
import io
import json
import os
import random
import re
import shutil
import tempfile
import zipfile
from flask import Flask
from flask import request
from flask import Response
from flask import send_file
from keystoneauth1.identity import v3
from keystoneauth1 import session
from novaclient import client as nova_client
from oslo_utils import uuidutils
from tackerclient.v1_0 import client as tacker_client
import yaml
from tacker.tests import constants
from tacker.tests.functional.sol_separated_nfvo.vnflcm.fake_grant import Grant
from tacker.tests.functional.sol_separated_nfvo_v2 import fake_grant_v2
from tacker.tests.functional.sol_separated_nfvo_v2 import fake_vnfpkgm_v2
from tacker.tests import utils
from tacker.tests.utils import read_file
# sample_vnf_package's abs path v1
V1_VNF_PACKAGE_PATH = 'sample_v1.zip'
V1_VNFD_FILE_NAME = 'Definitions/helloworld3_df_simple.yaml'
# sample_vnf_package's abs path v2
V2_VNF_PACKAGE_PATH = 'sample_v2.zip'
V2_VNFD_FILE_NAME = 'Definitions/v2_sample2_df_simple.yaml'
class GrantServer:
app = Flask(__name__)
def __init__(self):
self.client = self.tackerclient()
self.nova_client = self.novaclient()
@staticmethod
def log_http_request():
message = "Method:{0}, Url:{1}, Headers:{2}, Body:{3}"
body = ""
ct = "{0}".format(request.headers.get("Content-type"))
print(ct)
if len(request.get_data()) > 0 and not re.match(
".*?application/zip.*?", ct):
body = request.get_data().decode("utf-8")
hs = ""
ff = "{0}:{1};"
for k, v in request.headers.items():
hs += ff.format(k, v)
message = message.format(request.method, request.url, hs, body)
print(message)
@staticmethod
def log_http_response(resp):
message = "Status:{0}, Body:{1}"
if resp.content_type != 'application/zip' and len(resp.get_data()) > 0:
try:
body = resp.get_data().decode("utf-8")
except AttributeError:
body = "binary file."
else:
body = 'binary file.'
message = message.format(resp.status_code, body)
print(message)
return resp
def get_auth_session(self):
vim_params = self.get_credentials()
auth = v3.Password(
auth_url=vim_params['auth_url'],
username=vim_params['username'],
password=vim_params['password'],
project_name=vim_params['project_name'],
user_domain_name=vim_params['user_domain_name'],
project_domain_name=vim_params['project_domain_name'])
verify = 'True' == vim_params.pop('cert_verify', 'False')
auth_ses = session.Session(auth=auth, verify=verify)
return auth_ses
def get_credentials(self):
vim_params = yaml.safe_load(read_file('local-vim.yaml'))
vim_params['auth_url'] += '/v3'
return vim_params
def tackerclient(self):
auth_session = self.get_auth_session()
return tacker_client.Client(session=auth_session, retries=5)
def novaclient(self):
vim_params = self.get_credentials()
auth = v3.Password(auth_url=vim_params['auth_url'],
username=vim_params['username'],
password=vim_params['password'],
project_name=vim_params['project_name'],
user_domain_name=vim_params['user_domain_name'],
project_domain_name=vim_params['project_domain_name'])
verify = 'True' == vim_params.pop('cert_verify', 'False')
auth_ses = session.Session(auth=auth, verify=verify)
return nova_client.Client(constants.NOVA_CLIENT_VERSION,
session=auth_ses)
def list_zone(self):
try:
zone = self.nova_client.services.list()
except nova_client.exceptions.ClientException:
print("availability zone does not exists.", flush=True)
return []
return zone
def get_vim(self):
vim_list = self.client.list_vims()
vim = self.get_vim_specified(vim_list, 'openstack-admin-vim')
if not vim:
assert False, "vim_list is Empty: Default VIM is missing"
return vim
def get_vim_specified(self, vim_list, vim_name):
if len(vim_list.values()) == 0:
assert False, "vim_list is Empty: Default VIM is missing"
for vim_list in vim_list.values():
for vim in vim_list:
if vim['name'] == vim_name:
return vim
return None
@staticmethod
def make_get_package_content_response_body(vnfd_id):
csar_package_path, _ = get_package_path(vnfd_id)
tempfd, tempname = tempfile.mkstemp(
suffix=".zip", dir='/tmp')
os.close(tempfd)
with zipfile.ZipFile(tempname, 'w') as zcsar:
_write_zipfile(zcsar, vnfd_id, [csar_package_path])
shutil.rmtree(csar_package_path)
_update_hash(tempname)
return tempname
@staticmethod
def make_get_package_vnfd(vnfd_id, path):
csar_package_path, _ = get_package_path(vnfd_id, path)
tempfd, tempname = tempfile.mkstemp(
suffix=".zip", dir='/tmp')
os.close(tempfd)
with zipfile.ZipFile(tempname, 'w') as zcsar:
_write_zipfile(zcsar, vnfd_id, [csar_package_path],
operation='vnfd')
shutil.rmtree(csar_package_path)
return tempname
def _write_zipfile(zcsar, unique_id, target_dir_list, operation='package'):
common_def = ['etsi_nfv_sol001_common_types.yaml',
'etsi_nfv_sol001_vnfd_types.yaml']
new_names = {}
for target_dir in target_dir_list:
for (dpath, _, fnames) in os.walk(target_dir):
if not fnames:
continue
for fname in fnames:
src_file = os.path.join(dpath, fname)
dst_file = os.path.relpath(
os.path.join(dpath, fname), target_dir)
if operation == 'package':
if 'kubernetes' in dst_file.split('/'):
with open(src_file, 'rb') as yfile:
data = yaml.safe_load(yfile)
old_name, new_name = _update_res_name(data)
zcsar.writestr(dst_file, yaml.dump(
data, default_flow_style=False,
allow_unicode=True, sort_keys=False))
new_names[old_name] = new_name
elif not dst_file.startswith('Definitions') and (
not dst_file.startswith('TOSCA')):
zcsar.write(src_file, dst_file)
for target_dir in target_dir_list:
for (dpath, _, fnames) in os.walk(target_dir):
if not fnames:
continue
for fname in fnames:
src_file = os.path.join(dpath, fname)
dst_file = os.path.relpath(
os.path.join(dpath, fname), target_dir)
if dst_file.startswith('Definitions') and (
fname not in common_def):
with open(src_file, 'rb') as yfile:
data = yaml.safe_load(yfile)
utils._update_unique_id_in_yaml(data, unique_id)
if new_names:
data = _update_df_name(data, new_names)
zcsar.writestr(dst_file, yaml.dump(
data, default_flow_style=False,
allow_unicode=True))
if fname == 'TOSCA.meta':
zcsar.write(src_file, dst_file)
if dst_file.startswith('Definitions') and (
fname in common_def):
zcsar.write(src_file, dst_file)
def _update_hash(tempname):
old_hash = {}
new_hash = {}
file_content = {}
with zipfile.ZipFile(tempname, 'r') as z:
paths = [file for file in z.namelist() if 'kubernetes' in file]
tosca_content = z.read('TOSCA-Metadata/TOSCA.meta')
contents = re.split(b'\n\n+', z.read('TOSCA-Metadata/TOSCA.meta'))
if paths:
for path in paths:
hash_obj = hashlib.sha256()
hash_obj.update(z.read(path))
new_hash[path] = hash_obj.hexdigest()
old_hash[path] = [
yaml.safe_load(content)['Hash'] for content
in contents if yaml.safe_load(content).get(
'Name') == path][0]
name_list = z.namelist()
for name in z.namelist():
if not name.startswith('TOSCA'):
file_content[name] = z.read(name)
if new_hash:
with zipfile.ZipFile(tempname, 'w') as z:
for name in name_list:
if name.startswith('TOSCA'):
for file, hash in new_hash.items():
old_value = [value for key, value in old_hash.items()
if key == file][0]
new_tosca = tosca_content.replace(
bytes(old_value, 'utf-8'), bytes(hash, 'utf-8'))
z.writestr('TOSCA-Metadata/TOSCA.meta', new_tosca)
else:
z.writestr(name, file_content[name])
def _update_df_name(data, new_names):
data_str = str(data)
for old_name, new_name in new_names.items():
data_str = data_str.replace(old_name, new_name)
data = ast.literal_eval(data_str)
return data
def _get_random_string(slen=5):
random_str = ''
base_str = 'abcdefghigklmnopqrstuvwxyz0123456789'
length = len(base_str) - 1
for i in range(slen):
random_str += base_str[random.randint(0, length)]
return random_str
def _update_res_name(data):
old_name = data['metadata']['name']
data['metadata']['name'] = data['metadata'][
'name'] + '-' + _get_random_string()
return old_name, data['metadata']['name']
def get_package_path(vnfd_id, version='v1'):
if version == 'v1':
csar_package_path = V1_VNF_PACKAGE_PATH
vnfd_path = V1_VNFD_FILE_NAME
else:
csar_package_path = V2_VNF_PACKAGE_PATH
vnfd_path = V2_VNFD_FILE_NAME
(tmp_path, _) = os.path.split(csar_package_path)
tmp_abs_path = os.path.join(tmp_path, vnfd_id)
with zipfile.ZipFile(csar_package_path) as zf_obj:
zf_obj.extractall(path=tmp_abs_path)
return tmp_abs_path, vnfd_path
def get_common_resp_info(request_body):
csar_path, vnfd_path = get_package_path(
request_body['vnfdId'])
glance_image = fake_grant_v2.GrantV2.get_sw_image(csar_path, vnfd_path)
flavour_vdu_dict = fake_grant_v2.GrantV2.get_compute_flavor(
csar_path, vnfd_path)
availability_zone_info = GrantServer().list_zone()
zone_name_list = list(set(
[zone.zone for zone in availability_zone_info
if zone.binary == 'nova-compute']))
return glance_image, flavour_vdu_dict, zone_name_list
@GrantServer.app.route('/grant/v1/grants', methods=['POST'])
def grant():
body = request.json
request_body = Grant.convert_body_to_dict(body)
glance_image, flavour_vdu_dict, zone_name_list = get_common_resp_info(
request_body)
vim = GrantServer().get_vim()
if request_body['operation'] == 'INSTANTIATE':
return Grant.make_inst_response_body(
body, vim['tenant_id'], glance_image, flavour_vdu_dict,
zone_name_list)
if request_body['operation'] == 'SCALE':
return Grant.make_scale_response_body(
body, vim['tenant_id'], glance_image, flavour_vdu_dict,
zone_name_list)
if request_body['operation'] == 'HEAL':
return Grant.make_heal_response_body(
body, vim['tenant_id'], glance_image, flavour_vdu_dict,
zone_name_list)
if request_body['operation'] == 'CHANGE_EXT_CONN':
return Grant.make_change_ext_conn_response_body(
body, vim['tenant_id'], zone_name_list)
if request_body['operation'] == 'TERMINATE':
return Grant.make_term_response_body(body)
@GrantServer.app.route('/grant/v2/grants', methods=['POST'])
def grant_v2():
body = request.json
request_body = fake_grant_v2.GrantV2.convert_body_to_dict(body)
glance_image, flavour_vdu_dict, zone_name_list = get_common_resp_info(
request_body)
if request_body['operation'] == 'INSTANTIATE':
resp_body = fake_grant_v2.GrantV2.make_inst_response_body(
body, glance_image, flavour_vdu_dict,
zone_name_list)
if request_body['operation'] == 'SCALE':
resp_body = fake_grant_v2.GrantV2.make_scale_response_body(
body, glance_image, flavour_vdu_dict,
zone_name_list)
if request_body['operation'] == 'HEAL':
resp_body = fake_grant_v2.GrantV2.make_heal_response_body(
body, glance_image, flavour_vdu_dict,
zone_name_list)
if request_body['operation'] == 'CHANGE_EXT_CONN':
resp_body = fake_grant_v2.GrantV2.make_change_ext_conn_response_body(
body, zone_name_list)
if request_body['operation'] == 'CHANGE_VNFPKG':
resp_body = fake_grant_v2.GrantV2.make_change_vnfpkg_response_body(
body)
if request_body['operation'] == 'TERMINATE':
resp_body = fake_grant_v2.GrantV2.make_term_response_body(body)
resp = (resp_body, '201', {"content-type": "application/json"})
return resp
@GrantServer.app.route('/token', methods=['POST'])
def get_token():
resp_body = {"access_token": 'fake_token'}
resp = (resp_body, '200', {"content-type": "application/json"})
return resp
@GrantServer.app.route('/notification/callbackuri/<vnfdid>',
methods=['GET', 'POST'])
def callback(vnfdid):
resp = Response(status=204)
return resp
@GrantServer.app.route('/vnfpkgm/v1/vnf_packages',
methods=['GET'])
def get_vnf_package_v1():
vnfd_id = request.url
vnfd_id = vnfd_id.split("vnfdId%2C")[1].split("%29")[0]
resp_body = [
{
"id": uuidutils.generate_uuid(),
"vnfdId": vnfd_id,
"vnfProvider": "Company",
"vnfProductName": "Sample VNF",
"vnfSoftwareVersion": "1.0",
"vnfdVersion": "1.0",
"onboardingState": "ONBOARDED",
"operationalState": "ENABLED",
"usageState": "NOT_IN_USE"
}
]
vnf_package_path = "/tmp/vnf_package_data"
data_file = vnf_package_path + "/" + resp_body[0]["id"]
tempname = GrantServer().make_get_package_content_response_body(vnfd_id)
if not os.path.exists(vnf_package_path):
os.makedirs(vnf_package_path)
else:
with open(data_file, "w") as f:
f.write(tempname)
resp = (json.dumps(resp_body), '200', {'Content-Type': 'application/json'})
return resp
@GrantServer.app.route(
'/vnfpkgm/v1/vnf_packages/<vnf_package_id>/vnfd',
methods=['GET'])
def get_vnf_package_vnfd(vnf_package_id):
zip_path = "/tmp/vnf_package_data/" + vnf_package_id
with open(zip_path, "rb") as f:
tempname = f.read()
with open(tempname, "rb") as f:
bytes_file = io.BytesIO(f.read())
return (
send_file(bytes_file, mimetype='application/zip'), '200',
{'Content-Type': 'application/zip'})
@GrantServer.app.route(
'/vnfpkgm/v1/vnf_packages/<vnf_package_id>/artifacts/<artifact_path>',
methods=['GET'])
def get_vnf_package_artifact_path():
resp = Response(status=200)
return resp
@GrantServer.app.route(
'/vnfpkgm/v1/vnf_packages/<vnf_package_id>/package_content',
methods=['GET'])
def get_vnf_package_content_v1(vnf_package_id):
zip_path = "/tmp/vnf_package_data/" + vnf_package_id
with open(zip_path, "rb") as f:
tempname = f.read()
with open(tempname, "rb") as f:
bytes_file = io.BytesIO(f.read())
return (send_file(
bytes_file, mimetype='application/zip'), '200',
{'Content-Type': 'application/zip'})
@GrantServer.app.route(
'/vnfpkgm/v2/onboarded_vnf_packages/<vnfdid>/package_content',
methods=['GET'])
def get_vnf_package_content(vnfdid):
resp_body = GrantServer().make_get_package_content_response_body(vnfdid)
with open(resp_body, "rb") as f:
bytes_file = io.BytesIO(f.read())
return (send_file(
bytes_file, mimetype='application/zip'), '200',
{'Content-Type': 'application/zip'})
@GrantServer.app.route('/vnfpkgm/v2/onboarded_vnf_packages/<vnfdid>',
methods=['GET'])
def get_vnf_package(vnfdid):
resp_body = (
fake_vnfpkgm_v2.VnfPackage.make_get_vnf_pkg_info_resp(
vnfdid))
resp = (resp_body, '200', {'Content-Type': 'application/json'})
return resp
@GrantServer.app.route('/vnfpkgm/v2/onboarded_vnf_packages/<vnfdid>/vnfd',
methods=['GET'])
def get_vnfd(vnfdid):
resp_body = GrantServer().make_get_package_content_response_body(vnfdid)
with open(resp_body, "rb") as f:
bytes_file = io.BytesIO(f.read())
return (send_file(bytes_file, mimetype='application/zip'),
'200', {'Content-Type': 'application/zip'})
# Start Fake_Grant_Server for manual test
GrantServer.app.before_request(GrantServer.log_http_request)
GrantServer.app.after_request(GrantServer.log_http_response)
GrantServer.app.run(host="127.0.0.1", port=9990, debug=False)

View File

@ -70,7 +70,11 @@ def PrepareRequestHandler(manager):
response_body_str = open(mock_info.get('content'), 'rb').read()
elif len(mock_body) > 0:
response_body_str = json.dumps(mock_body).encode('utf-8')
mock_headers['Content-Length'] = str(len(response_body_str))
if '/token' in self.path or 'v2' in self.path:
pass
else:
mock_headers['Content-Length'] = str(len(
response_body_str))
# Send custom header if exist
for key, val in mock_headers.items():
@ -87,10 +91,13 @@ def PrepareRequestHandler(manager):
response_headers=copy.deepcopy(mock_headers),
response_body=copy.deepcopy(mock_body)))
if mock_info.get('content') is None:
if mock_info.get('content') is None and 'v2/grants' not in path:
self.end_headers()
def _parse_request_body(self):
if '/token' in self.path:
return {}
if 'content-length' not in self.headers:
return {}

View File

@ -1,159 +0,0 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
from flask import Flask
from flask import request
from keystoneauth1.identity import v3
from keystoneauth1 import session
from novaclient import client as nova_client
from tackerclient.v1_0 import client as tacker_client
import yaml
from tacker.tests import constants
from tacker.tests.functional.sol_separated_nfvo.vnflcm.fake_grant import Grant
from tacker.tests.utils import read_file
class GrantServer:
app = Flask(__name__)
def __init__(self):
self.client = self.tackerclient()
self.nova_client = self.novaclient()
@staticmethod
def log_http_request():
message = "Method:{0}, Url:{1}, Headers:{2}, Body:{3}"
body = ""
ct = "{0}".format(request.headers.get("Content-type"))
print(ct)
if len(request.get_data()) > 0 and not re.match(
".*?application/zip.*?", ct):
body = request.get_data().decode("utf-8")
hs = ""
ff = "{0}:{1};"
for k, v in request.headers.items():
hs += ff.format(k, v)
message = message.format(request.method, request.url, hs, body)
print(message)
@staticmethod
def log_http_response(resp):
message = "Status:{0}, Body:{1}"
body = ""
if len(resp.get_data()) > 0:
try:
body = resp.get_data().decode("utf-8")
except AttributeError:
body = "binary file."
message = message.format(resp.status_code, body)
print(message)
return resp
def get_auth_session(self):
vim_params = self.get_credentials()
auth = v3.Password(
auth_url=vim_params['auth_url'],
username=vim_params['username'],
password=vim_params['password'],
project_name=vim_params['project_name'],
user_domain_name=vim_params['user_domain_name'],
project_domain_name=vim_params['project_domain_name'])
verify = 'True' == vim_params.pop('cert_verify', 'False')
auth_ses = session.Session(auth=auth, verify=verify)
return auth_ses
def get_credentials(self):
vim_params = yaml.safe_load(read_file('local-vim.yaml'))
vim_params['auth_url'] += '/v3'
return vim_params
def tackerclient(self):
auth_session = self.get_auth_session()
return tacker_client.Client(session=auth_session, retries=5)
def novaclient(self):
vim_params = self.get_credentials()
auth = v3.Password(auth_url=vim_params['auth_url'],
username=vim_params['username'],
password=vim_params['password'],
project_name=vim_params['project_name'],
user_domain_name=vim_params['user_domain_name'],
project_domain_name=vim_params['project_domain_name'])
verify = 'True' == vim_params.pop('cert_verify', 'False')
auth_ses = session.Session(auth=auth, verify=verify)
return nova_client.Client(constants.NOVA_CLIENT_VERSION,
session=auth_ses)
def list_zone(self):
try:
zone = self.nova_client.services.list()
except nova_client.exceptions.ClientException:
print("availability zone does not exists.", flush=True)
return []
return zone
def get_vim(self):
vim_list = self.client.list_vims()
vim = self.get_vim_specified(vim_list, 'openstack-admin-vim')
if not vim:
assert False, "vim_list is Empty: Default VIM is missing"
return vim
def get_vim_specified(self, vim_list, vim_name):
if len(vim_list.values()) == 0:
assert False, "vim_list is Empty: Default VIM is missing"
for vim_list in vim_list.values():
for vim in vim_list:
if vim['name'] == vim_name:
return vim
return None
@GrantServer.app.route('/grant/v1/grants', methods=['POST'])
def grant():
body = request.json
request_body = Grant.convert_body_to_dict(body)
glance_image = Grant.get_sw_image("functional5")
flavour_vdu_dict = Grant.get_compute_flavor("functional5")
availability_zone_info = GrantServer().list_zone()
zone_name_list = list(set(
[zone.zone for zone in availability_zone_info
if zone.binary == 'nova-compute']))
vim = GrantServer().get_vim()
if request_body['operation'] == 'INSTANTIATE':
return Grant.make_inst_response_body(
body, vim['tenant_id'], glance_image, flavour_vdu_dict,
zone_name_list)
if request_body['operation'] == 'SCALE':
return Grant.make_scale_response_body(
body, vim['tenant_id'], glance_image, flavour_vdu_dict,
zone_name_list)
if request_body['operation'] == 'HEAL':
return Grant.make_heal_response_body(
body, vim['tenant_id'], glance_image, flavour_vdu_dict,
zone_name_list)
if request_body['operation'] == 'CHANGE_EXT_CONN':
return Grant.make_change_ext_conn_response_body(
body, vim['tenant_id'], zone_name_list)
if request_body['operation'] == 'TERMINATE':
return Grant.make_term_response_body(body)
# Start Fake_Grant_Server for manual test
GrantServer.app.before_request(GrantServer.log_http_request)
GrantServer.app.after_request(GrantServer.log_http_response)
GrantServer.app.run(host="127.0.0.1", port=9990, debug=False)

View File

@ -0,0 +1,428 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import yaml
from oslo_serialization import jsonutils
from oslo_utils import uuidutils
from tacker.tests import uuidsentinel
class GrantV2:
GRANT_REQ_PATH = '/grant/v2/grants'
TOKEN = '/token'
ZONES = [
{
"id": uuidsentinel.zone_id,
"zoneId": "nova",
"vimConnectionId": uuidsentinel.vim_connection_id
}
]
ADDITIONAL_PARAMS = {
"key": "value"
}
@staticmethod
def _make_vim_connection_info():
access_info = {
"username": "nfv_user",
"region": "RegionOne",
"password": "devstack",
"project": "nfv",
"projectDomain": "Default",
"userDomain": "Default"
}
return {
"vim1": {
"vimId": uuidsentinel.vim_connection_id,
"vimType": "ETSINFV.OPENSTACK_KEYSTONE.V_3",
"interfaceInfo": {"endpoint": 'http://127.0.0.1/identity/v3'},
"accessInfo": access_info,
"extra": {"dummy-key": "dummy-val"}
}
}
@staticmethod
def make_grant_v2_links(request_body):
link = {
"self": {
# set fake server port.
"href": os.path.join(
'http://localhost:9990',
'grant/v2/grants')},
"vnfLcmOpOcc": {
"href": os.path.join(
'http://localhost:9890/vnflcm/v1/vnf_lcm_op_occs',
request_body['vnfLcmOpOccId'])},
"vnfInstance": {
"href": os.path.join(
'http://localhost:9890/vnflcm/v1/vnf_instances',
request_body['vnfInstanceId'])}}
return link
@staticmethod
def _make_add_resources(req_add_resources, zones=None,
placement_constraints=None):
add_resources = []
for req_add_resource in req_add_resources:
res_add_resource = {
"resourceDefinitionId": req_add_resource['id'],
"vimConnectionId": uuidsentinel.vim_connection_id
}
if req_add_resource['type'] == 'COMPUTE':
if placement_constraints:
zone_id_dict = (GrantV2.
_get_zone_id_from_placement_constraint(
placement_constraints, zones))
if zone_id_dict[req_add_resource['id']]:
res_add_resource["zoneId"] = zone_id_dict[
req_add_resource['id']]
add_resources.append(res_add_resource)
return add_resources
@staticmethod
def _get_zone_id_from_placement_constraint(placement_constraints, zones):
zone_id_dict = {}
for placement_constraint in placement_constraints:
for index in range(len(placement_constraint['resource'])):
if placement_constraint[
'affinityOrAntiAffinity'] == 'AFFINITY':
# In this fake_grant, if the user set 'AFFINITY'
# rule in VNF Package, it will always use the first
# `Availability Zone` to create VM.
zone_id_dict[placement_constraint[
'resource'][index]['resourceId']] = zones[0]['id']
else:
try:
zone_id_dict[placement_constraint[
'resource'][index]['resourceId']] = zones[
index]['id']
except IndexError:
print(
"The number of 'Availability Zone'"
"cannot support current case.")
raise IndexError
return zone_id_dict
@staticmethod
def _make_zones(zone_name_list):
zone = []
for name in zone_name_list:
zone_dict = {
"id": uuidutils.generate_uuid(),
"zoneId": name,
"vimConnectionId": uuidsentinel.vim_connection_id
}
zone.append(zone_dict)
return zone
@staticmethod
def _make_remove_resources(req_remove_resources):
res_remove_resources = []
for req_remove_resource in req_remove_resources:
res_remove_resource = {
"resourceDefinitionId": req_remove_resource['id']
}
res_remove_resources.append(res_remove_resource)
return res_remove_resources
@staticmethod
def _make_update_resources(req_update_resources,
zones=None, placement_constraints=None):
res_update_resources = []
for req_update_resource in req_update_resources:
res_update_resource = {
"resourceDefinitionId": req_update_resource['id'],
"vimConnectionId": uuidsentinel.vim_connection_id
}
if req_update_resource['type'] == 'COMPUTE':
if placement_constraints:
zone_id_dict = (GrantV2.
_get_zone_id_from_placement_constraint(
placement_constraints, zones))
if zone_id_dict[req_update_resource['id']]:
res_update_resource["zoneId"] = zone_id_dict[
req_update_resource['id']]
res_update_resources.append(res_update_resource)
return res_update_resources
@staticmethod
def get_vdu_list(add_resources):
vdu_list = []
for add_resource in add_resources:
if add_resource['type'] == 'COMPUTE' or (
add_resource['type'] == 'STORAGE'):
if add_resource.get('vduId'):
vdu_list.append(add_resource['vduId'])
else:
vdu_list.append(add_resource['resourceTemplateId'])
return vdu_list
@staticmethod
def _make_vim_assets(add_resources, image_id_dict, flavour_id_dict):
# set m1.tiny="1" for flavour_id
vdu_list = GrantV2.get_vdu_list(add_resources)
flavors = [GrantV2._generate_flavour(vdu, flavour_id_dict)
for vdu in vdu_list if flavour_id_dict.get(vdu)]
images = [GrantV2._generate_image(vdu, image_id_dict)
for vdu in vdu_list if image_id_dict.get(vdu)]
vim_assets = {
'computeResourceFlavours': flavors,
'softwareImages': images,
}
return vim_assets
@staticmethod
def _generate_flavour(vdu, flavour_id_dict):
if flavour_id_dict.get(vdu):
return {
"vimConnectionId": uuidsentinel.vim_connection_id,
"vnfdVirtualComputeDescId": vdu,
"vimFlavourId": flavour_id_dict[vdu]
}
return None
@staticmethod
def _generate_image(vdu, image_id_dict):
if image_id_dict.get(vdu):
return {
"vimConnectionId": uuidsentinel.vim_connection_id,
"vnfdSoftwareImageId": vdu,
"vimSoftwareImageId": image_id_dict[vdu]
}
return None
@staticmethod
def _make_response_template(request_body):
res = {
"id": uuidsentinel.__getattr__(request_body['vnfLcmOpOccId']),
"vnfInstanceId": request_body['vnfInstanceId'],
"vnfLcmOpOccId": request_body['vnfLcmOpOccId'],
}
res["_links"] = {
"self": {
# set fake server port.
"href": os.path.join(
'http://localhost:9990',
GrantV2.GRANT_REQ_PATH)},
"vnfLcmOpOcc": {
"href": os.path.join(
'http://localhost:9890/vnflcm/v2/vnf_lcm_op_occs',
request_body['vnfLcmOpOccId'])},
"vnfInstance": {
"href": os.path.join(
'http://localhost:9890/vnflcm/v2/vnf_instances',
request_body['vnfInstanceId'])}}
return res
@staticmethod
def convert_body_to_dict(body):
if isinstance(body, str):
return jsonutils.loads(body)
return body
@staticmethod
def make_inst_response_body(
request_body, image_id_dict,
flavour_id_dict, zone_name_list):
request_body = GrantV2.convert_body_to_dict(request_body)
res = GrantV2._make_response_template(request_body)
res["vimConnections"] = GrantV2._make_vim_connection_info()
res["zones"] = GrantV2._make_zones(zone_name_list)
if 'addResources' in request_body.keys():
res["addResources"] = GrantV2._make_add_resources(
request_body['addResources'], res["zones"],
request_body.get('placementConstraints'))
res["vimAssets"] = GrantV2._make_vim_assets(
request_body['addResources'],
image_id_dict,
flavour_id_dict)
res["additionalParams"] = GrantV2.ADDITIONAL_PARAMS
return res
@staticmethod
def make_heal_response_body(request_body, image_id_dict,
flavour_id_dict, zone_name_list):
request_body = GrantV2.convert_body_to_dict(request_body)
res = GrantV2._make_response_template(request_body)
res["vimConnections"] = GrantV2._make_vim_connection_info()
res["zones"] = GrantV2._make_zones(zone_name_list)
if 'addResources' in request_body.keys():
res["addResources"] = GrantV2._make_add_resources(
request_body['addResources'], res["zones"],
request_body.get('placementConstraints'))
res["vimAssets"] = GrantV2._make_vim_assets(
request_body['addResources'],
image_id_dict,
flavour_id_dict)
if 'removeResources' in request_body.keys():
res["removeResources"] = GrantV2._make_remove_resources(
request_body['removeResources'])
if 'updateResources' in request_body.keys():
res["updateResources"] = GrantV2._make_update_resources(
request_body['updateResources'], res["zones"],
request_body.get('placementConstraints'))
return res
@staticmethod
def make_scale_response_body(
request_body, image_id_dict, flavour_id_dict,
zone_name_list):
request_body = GrantV2.convert_body_to_dict(request_body)
res = GrantV2._make_response_template(request_body)
if 'addResources' in request_body.keys():
res["zones"] = GrantV2._make_zones(zone_name_list)
res["addResources"] = GrantV2._make_add_resources(
request_body['addResources'], res["zones"],
request_body.get('placementConstraints'))
res["vimAssets"] = GrantV2._make_vim_assets(
request_body['addResources'],
image_id_dict,
flavour_id_dict)
res["vimConnections"] = GrantV2._make_vim_connection_info()
if 'removeResources' in request_body.keys():
res["removeResources"] = GrantV2._make_remove_resources(
request_body['removeResources'])
return res
@staticmethod
def make_term_response_body(request_body):
request_body = GrantV2.convert_body_to_dict(request_body)
res = GrantV2._make_response_template(request_body)
if 'removeResources' in request_body.keys():
res["removeResources"] = GrantV2._make_remove_resources(
request_body['removeResources'])
return res
@staticmethod
def make_change_ext_conn_response_body(
request_body, zone_name_list):
request_body = GrantV2.convert_body_to_dict(request_body)
res = GrantV2._make_response_template(request_body)
res["vimConnections"] = GrantV2._make_vim_connection_info()
res["zones"] = GrantV2._make_zones(zone_name_list)
if 'updateResources' in request_body.keys():
res["updateResources"] = GrantV2._make_update_resources(
request_body['updateResources'], res["zones"],
request_body.get('placementConstraints'))
res["additionalParams"] = GrantV2.ADDITIONAL_PARAMS
return res
@staticmethod
def make_change_vnfpkg_response_body(
request_body):
request_body = GrantV2.convert_body_to_dict(request_body)
res = GrantV2._make_response_template(request_body)
if 'addResources' in request_body.keys():
res["addResources"] = GrantV2._make_add_resources(
request_body['addResources'])
if 'removeResources' in request_body.keys():
res["removeResources"] = GrantV2._make_remove_resources(
request_body['removeResources'])
if 'updateResources' in request_body.keys():
res["updateResources"] = GrantV2._make_update_resources(
request_body['updateResources'])
return res
@staticmethod
def get_sw_image(package_dir, vnfd_path):
csar_package_path = package_dir
yaml_file = os.path.join(csar_package_path,
vnfd_path)
with open(yaml_file, 'r', encoding='utf-8') as f:
config_content = yaml.safe_load(f.read())
nodes = (config_content
.get('topology_template', {})
.get('node_templates', {}))
types = ['tosca.nodes.nfv.Vdu.Compute',
'tosca.nodes.nfv.Vdu.VirtualBlockStorage']
sw_image = {}
for name, data in nodes.items():
if (data['type'] in types and
data.get('properties', {}).get('sw_image_data')):
image = data['properties']['sw_image_data']['name']
sw_image[name] = image
return sw_image
@staticmethod
def get_compute_flavor(package_dir, vnfd_path):
csar_package_path = package_dir
yaml_file = os.path.join(csar_package_path,
vnfd_path)
with open(yaml_file, 'r', encoding='utf-8') as f:
config_content = yaml.safe_load(f.read())
nodes = (config_content
.get('topology_template', {})
.get('node_templates', {}))
types = ['tosca.nodes.nfv.Vdu.Compute',
'tosca.nodes.nfv.Vdu.VirtualBlockStorage']
flavor = {}
for name, data in nodes.items():
if (data['type'] in types and
data.get('capabilities', {})
.get('virtual_compute', {})
.get('properties', {})
.get('requested_additional_capabilities', {})
.get('properties')):
flavour = data['capabilities']['virtual_compute'][
'properties']['requested_additional_capabilities'][
'properties']['requested_additional_capability_name']
flavor[name] = flavour
return flavor
@staticmethod
def get_sw_data(package_dir, vnfd_path):
csar_package_path = package_dir
yaml_file = os.path.join(csar_package_path,
vnfd_path)
with open(yaml_file, 'r', encoding='utf-8') as f:
config_content = yaml.safe_load(f.read())
nodes = (config_content
.get('topology_template', {})
.get('node_templates', {}))
types = ['tosca.nodes.nfv.Vdu.Compute',
'tosca.nodes.nfv.Vdu.VirtualBlockStorage']
sw_data = {}
for name, data in nodes.items():
if (data['type'] in types and
data.get('properties', {}).get('sw_image_data')):
image_data = data['properties']['sw_image_data']
sw_data[name] = image_data
return sw_data

View File

@ -0,0 +1,34 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import uuidutils
class VnfPackage:
@staticmethod
def make_get_vnf_pkg_info_resp(vnfdid):
data = {
"id": uuidutils.generate_uuid(),
"vnfdId": vnfdid,
"vnfProvider": "Company",
"vnfProductName": "Sample VNF",
"vnfSoftwareVersion": "1.0",
"vnfdVersion": "1.0",
"onboardingState": "ONBOARDED",
"operationalState": "ENABLED",
"usageState": "NOT_IN_USE"
}
return data

View File

@ -0,0 +1,167 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tacker.tests.functional.sol_v2_common import test_vnflcm_basic_common
class VnfLcmWithNfvoSeparator(test_vnflcm_basic_common.CommonVnfLcmTest):
def test_basic_lcms_max(self):
"""Test LCM operations with all attributes set
* About attributes:
All of the following cardinality attributes are set.
In addition, 0..N or 1..N attributes are set to 2 or more.
- 0..1 (1)
- 0..N (2 or more)
- 1
- 1..N (2 or more)
* About LCM operations:
This test includes the following operations.
- 0. Pre-setting
- 1. Create subscription
- 2. Test notification
- 3. Show subscription
- 4. Create VNF instance
- 5. Instantiate VNF
- 6. Show VNF instance
- 7. Show VNF LCM operation occurrence
- 8. Heal VNF(all with omit all parameter)
- 9. Heal VNF(all with all=True parameter)
- 10. Scale out operation
- 11. Show VNF instance
- 12. Scale in operation
- 13. Show VNF instance
- 14. Heal VNF(vnfc)
- 15. Change external connectivity
- 16. Show VNF LCM operation occurrence
- 17. Heal VNF(vnfc with omit all parameter)
- 18. Heal VNF(vnfc with all=false parameter)
- 19. Heal VNF(vnfc with all=true parameter)
- 20. Update VNF
- 21. Show VNF instance
- 22. Terminate VNF
- 23. Delete VNF instance
- 24. Delete subscription
"""
self.basic_lcms_max_common_test(True)
def test_basic_lcms_min(self):
"""Test LCM operations with omitting except for required attributes
The change_ext_conn can't be tested here because VNF package 2 don't
have external connectivity. So moved it to the test_scale_other_lcm().
* About attributes:
Omit except for required attributes.
Only the following cardinality attributes are set.
- 1
- 1..N (1)
* About LCM operations:
This test includes the following operations.
- 1. Create subscription
- 2. Test notification
- 3. Create VNF instance
- 4. Instantiate VNF
- 5. Show VNF instance
- 6. Heal VNF(all with omit all parameter)
- 7. Show VNF instance
- 8. Update VNF
- 9. Heal VNF(vnfc)
- 10. Show VNF instance
- 11. Scale out operation
- 12. Show VNF instance
- 13. Scale in operation
- 14. Terminate VNF
- 15. Delete VNF instance
- 16. Delete subscription
- 17. Show subscription
"""
self.basic_lcms_min_common_test(True)
def test_change_vnfpkg(self):
"""Test basic life cycle operations with sample VNFD with UserData.
In this test case, we do following steps.
- Create subscription.
- Create VNF instance.
- Instantiate VNF.
- Show VNF instance.
- Change VNF Package.
- Show VNF instance.
- Terminate VNF.
- Delete VNF.
- Delete subscription.
- Show subscription.
"""
self.change_vnfpkg_from_image_to_volume_common_test(True)
def test_retry_rollback_scale_out(self):
"""Test LCM operations with omitting except for required attributes
* About attributes:
All of the following cardinality attributes are set.
In addition, 0..N or 1..N attributes are set to 2 or more.
- 0..1 (1)
- 0..N (2 or more)
- 1
- 1..N (2 or more)
* About LCM operations:
This test includes the following operations.
- 0. Pre-setting
- 1. Create subscription
- 2. Test notification
- 3. Create VNF instance
- 4. Instantiate VNF
- 5. Show VNF instance
- 6. Scale out operation(will fail)
- 7. Show VNF instance
- 8. Retry operation
- 9. Rollback scale out operation
- 10. Show VNF LCM operation occurrence
- 11. List VNF LCM operation occurrence
- 12. Terminate VNF
- 13. Delete VNF instance
- 14. Delete subscription
- 15. Show subscription
"""
self.retry_rollback_scale_out_common_test(True)
def test_fail_instantiate(self):
"""Test LCM operations with omitting except for required attributes
* About attributes:
Omit except for required attributes.
Only the following cardinality attributes are set.
- 1
- 1..N (1)
* About LCM operations:
This test includes the following operations.
- 1. Create subscription
- 2. Test notification
- 3. Create VNF instance
- 4. Instantiate VNF(will fail)
- 5. Show VNF instance
- 6. Fail instantiation operation
- 7. Show VNF LCM operation occurrence
- 8. List VNF LCM operation occurrence
- 9. Delete VNF instance
- 10. Delete subscription
"""
self.fail_instantiate_common_test(True)

View File

@ -17,12 +17,12 @@ import ddt
import os
import time
from tacker.tests.functional.sol_v2 import base_v2
from tacker.tests.functional.sol_v2 import paramgen
from tacker.tests.functional.sol_v2_common import paramgen
from tacker.tests.functional.sol_v2_common import test_vnflcm_basic_common
@ddt.ddt
class ChangeVnfPkgVnfLcmTest(base_v2.BaseSolV2Test):
class ChangeVnfPkgVnfLcmTest(test_vnflcm_basic_common.CommonVnfLcmTest):
@classmethod
def setUpClass(cls):
@ -37,22 +37,26 @@ class ChangeVnfPkgVnfLcmTest(base_v2.BaseSolV2Test):
image_path = os.path.abspath(os.path.join(image_dir, image_file))
change_vnfpkg_from_image_to_image_path = os.path.join(
cur_dir, "samples/test_instantiate_vnf_with_old_image_or_volume")
cur_dir, "../sol_v2_common/samples/"
"test_instantiate_vnf_with_old_image_or_volume")
cls.vnf_pkg_1, cls.vnfd_id_1 = cls.create_vnf_package(
change_vnfpkg_from_image_to_image_path)
change_vnfpkg_from_image_to_image_path_2 = os.path.join(
cur_dir, "samples/test_change_vnf_pkg_with_new_image")
cur_dir, "../sol_v2_common/samples/"
"test_change_vnf_pkg_with_new_image")
cls.vnf_pkg_2, cls.vnfd_id_2 = cls.create_vnf_package(
change_vnfpkg_from_image_to_image_path_2, image_path=image_path)
change_vnfpkg_from_image_to_volume_path = os.path.join(
cur_dir, "samples/test_change_vnf_pkg_with_new_volume")
cur_dir, "../sol_v2_common/samples/"
"test_change_vnf_pkg_with_new_volume")
cls.vnf_pkg_3, cls.vnfd_id_3 = cls.create_vnf_package(
change_vnfpkg_from_image_to_volume_path, image_path=image_path)
change_vnfpkg_failed_in_update_path = os.path.join(
cur_dir, "samples/test_change_vnf_pkg_with_update_failed")
cur_dir, "../sol_v2_common/samples/"
"test_change_vnf_pkg_with_update_failed")
cls.vnf_pkg_4, cls.vnfd_id_4 = cls.create_vnf_package(
change_vnfpkg_failed_in_update_path, image_path=image_path)
@ -157,99 +161,7 @@ class ChangeVnfPkgVnfLcmTest(base_v2.BaseSolV2Test):
self.check_resp_headers_in_delete(resp)
def test_change_vnfpkg_from_image_to_volume(self):
create_req = paramgen.change_vnfpkg_create(self.vnfd_id_1)
resp, body = self.create_vnf_instance(create_req)
expected_inst_attrs = [
'id',
'vnfInstanceName',
'vnfInstanceDescription',
'vnfdId',
'vnfProvider',
'vnfProductName',
'vnfSoftwareVersion',
'vnfdVersion',
# 'vnfConfigurableProperties', # omitted
# 'vimConnectionInfo', # omitted
'instantiationState',
# 'instantiatedVnfInfo', # omitted
'metadata',
# 'extensions', # omitted
'_links'
]
self.assertEqual(201, resp.status_code)
self.check_resp_headers_in_create(resp)
self.check_resp_body(body, expected_inst_attrs)
inst_id = body['id']
net_ids = self.get_network_ids(['net0', 'net1', 'net_mgmt'])
subnet_ids = self.get_subnet_ids(['subnet0', 'subnet1'])
instantiate_req = paramgen.change_vnfpkg_instantiate(
net_ids, subnet_ids, self.auth_url)
resp, body = self.instantiate_vnf_instance(inst_id, instantiate_req)
self.assertEqual(202, resp.status_code)
self.check_resp_headers_in_operation_task(resp)
lcmocc_id = os.path.basename(resp.headers['Location'])
self.wait_lcmocc_complete(lcmocc_id)
additional_inst_attrs = [
'vimConnectionInfo',
'instantiatedVnfInfo'
]
expected_inst_attrs.extend(additional_inst_attrs)
resp_1, body_1 = self.show_vnf_instance(inst_id)
stack_name = "vnf-{}".format(inst_id)
stack_id = self.heat_client.get_stack_resource(stack_name)['stack'][
'id']
image_id_1 = self.get_current_vdu_image(stack_id, stack_name, 'VDU2')
self.assertEqual(200, resp_1.status_code)
self.check_resp_headers_in_get(resp_1)
self.check_resp_body(body_1, expected_inst_attrs)
resource_ids_1 = [obj['id'] for obj in body_1[
'instantiatedVnfInfo']['vnfcResourceInfo'] if obj[
'vduId'] == 'VDU2'][0]
change_vnfpkg_req = paramgen.change_vnfpkg(self.vnfd_id_3)
del change_vnfpkg_req['additionalParams']['vdu_params'][0]
resp, body = self.change_vnfpkg(inst_id, change_vnfpkg_req)
self.assertEqual(202, resp.status_code)
self.check_resp_headers_in_operation_task(resp)
lcmocc_id = os.path.basename(resp.headers['Location'])
self.wait_lcmocc_complete(lcmocc_id)
resp_2, body_2 = self.show_vnf_instance(inst_id)
image_id_2 = self.get_current_vdu_image(stack_id, stack_name, 'VDU2')
storageResourceIds = [obj.get('storageResourceIds') for obj in body_2[
'instantiatedVnfInfo']['vnfcResourceInfo'] if obj[
'vduId'] == 'VDU2']
self.assertIsNotNone(storageResourceIds)
resource_ids_2 = [obj['id'] for obj in body_2[
'instantiatedVnfInfo']['vnfcResourceInfo'] if obj[
'vduId'] == 'VDU2'][0]
self.assertNotEqual(resource_ids_1, resource_ids_2)
self.assertNotEqual(image_id_1, image_id_2)
self.assertEqual(200, resp_2.status_code)
self.check_resp_headers_in_get(resp_2)
self.check_resp_body(body_2, expected_inst_attrs)
terminate_req = paramgen.terminate_vnf_min()
resp, body = self.terminate_vnf_instance(inst_id, terminate_req)
self.assertEqual(202, resp.status_code)
self.check_resp_headers_in_operation_task(resp)
lcmocc_id = os.path.basename(resp.headers['Location'])
self.wait_lcmocc_complete(lcmocc_id)
# wait a bit because there is a bit time lag between lcmocc DB
# update and terminate completion.
time.sleep(10)
resp, body = self.delete_vnf_instance(inst_id)
self.assertEqual(204, resp.status_code)
self.check_resp_headers_in_delete(resp)
self.change_vnfpkg_from_image_to_volume_common_test(False)
def test_change_vnfpkg_from_volume_to_image(self):
create_req = paramgen.change_vnfpkg_create(self.vnfd_id_1)

File diff suppressed because it is too large Load Diff

View File

@ -18,12 +18,13 @@ import os
import time
from tacker.objects import fields
from tacker.tests.functional.sol_v2 import base_v2
from tacker.tests.functional.sol_v2 import paramgen
from tacker.tests.functional.sol_v2_common import base_v2
from tacker.tests.functional.sol_v2_common import paramgen
from tacker.tests.functional.sol_v2_common import test_vnflcm_basic_common
@ddt.ddt
class VnfLcmErrorHandlingTest(base_v2.BaseSolV2Test):
class VnfLcmErrorHandlingTest(test_vnflcm_basic_common.CommonVnfLcmTest):
@classmethod
def setUpClass(cls):
@ -43,13 +44,15 @@ class VnfLcmErrorHandlingTest(base_v2.BaseSolV2Test):
scale_ng_path, image_path=image_path)
# Instantiate VNF will fail
error_network_path = os.path.join(cur_dir, "samples/error_network")
error_network_path = os.path.join(cur_dir, "../sol_v2_common/"
"samples/error_network")
# no image contained
cls.vnf_pkg_2, cls.vnfd_id_2 = cls.create_vnf_package(
error_network_path)
# update VNF or change external VNF connectivity will fail
update_change_ng_path = os.path.join(cur_dir,
"../sol_v2_common/"
"samples/basic_lcms_min")
# no image contained
cls.vnf_pkg_3, cls.vnfd_id_3 = cls.create_vnf_package(
@ -96,254 +99,7 @@ class VnfLcmErrorHandlingTest(base_v2.BaseSolV2Test):
- 14. Delete subscription
- 15. Show subscription
"""
# 0. Pre-setting
# Create a new network and subnet to check the IP allocation of
# IPv4 and IPv6
ft_net0_name = 'ft-net0'
ft_net0_subs = {
'ft-ipv4-subnet0': {
'range': '100.100.100.0/24',
'ip_version': 4
},
'ft-ipv6-subnet0': {
'range': '1111:2222:3333::/64',
'ip_version': 6
}
}
ft_net0_id = self.create_network(ft_net0_name)
self.addCleanup(self.delete_network, ft_net0_id)
for sub_name, val in ft_net0_subs.items():
# subnet is automatically deleted with network deletion
self.create_subnet(
ft_net0_id, sub_name, val['range'], val['ip_version'])
net_ids = self.get_network_ids(
['net0', 'net1', 'net_mgmt', 'ft-net0'])
subnet_ids = self.get_subnet_ids(
['subnet0', 'subnet1', 'ft-ipv4-subnet0', 'ft-ipv6-subnet0'])
port_names = ['VDU2_CP1-1', 'VDU2_CP1-2']
port_ids = {}
for port_name in port_names:
port_id = self.create_port(net_ids['net0'], port_name)
port_ids[port_name] = port_id
self.addCleanup(self.delete_port, port_id)
# 1. Create subscription
callback_url = os.path.join(base_v2.MOCK_NOTIFY_CALLBACK_URL,
self._testMethodName)
callback_uri = ('http://localhost:'
f'{base_v2.FAKE_SERVER_MANAGER.SERVER_PORT}'
f'{callback_url}')
sub_req = paramgen.sub_create_max(callback_uri)
resp, body = self.create_subscription(sub_req)
self.assertEqual(201, resp.status_code)
self.check_resp_headers_in_create(resp)
sub_id = body['id']
# 2. Test notification
self.assert_notification_get(callback_url)
# check usageState of VNF Package
usage_state = self.get_vnf_package(self.vnf_pkg_1)['usageState']
self.assertEqual('NOT_IN_USE', usage_state)
# 3. Create VNF instance
# ETSI NFV SOL003 v3.3.1 5.5.2.2 VnfInstance
expected_inst_attrs = [
'id',
'vnfInstanceName',
'vnfInstanceDescription',
'vnfdId',
'vnfProvider',
'vnfProductName',
'vnfSoftwareVersion',
'vnfdVersion',
# 'vnfConfigurableProperties', # omitted
# 'vimConnectionInfo', # omitted
'instantiationState',
# 'instantiatedVnfInfo', # omitted
'metadata',
# 'extensions', # omitted
'_links'
]
create_req = paramgen.create_vnf_max(self.vnfd_id_1)
resp, body = self.create_vnf_instance(create_req)
self.assertEqual(201, resp.status_code)
self.check_resp_headers_in_create(resp)
self.check_resp_body(body, expected_inst_attrs)
inst_id = body['id']
# check usageState of VNF Package
usage_state = self.get_vnf_package(self.vnf_pkg_1)['usageState']
self.assertEqual('IN_USE', usage_state)
# check instantiationState of VNF
self.assertEqual(fields.VnfInstanceState.NOT_INSTANTIATED,
body['instantiationState'])
# 4. Instantiate VNF
instantiate_req = paramgen.instantiate_vnf_max(
net_ids, subnet_ids, port_ids, self.auth_url)
resp, body = self.instantiate_vnf_instance(inst_id, instantiate_req)
self.assertEqual(202, resp.status_code)
self.check_resp_headers_in_operation_task(resp)
lcmocc_id = os.path.basename(resp.headers['Location'])
self.wait_lcmocc_complete(lcmocc_id)
# check usageState of VNF Package
usage_state = self.get_vnf_package(self.vnf_pkg_1)['usageState']
self.assertEqual('IN_USE', usage_state)
# 5. Show VNF instance
additional_inst_attrs = [
'vimConnectionInfo',
'instantiatedVnfInfo',
'extensions',
'vnfConfigurableProperties'
]
expected_inst_attrs.extend(additional_inst_attrs)
resp, body = self.show_vnf_instance(inst_id)
self.assertEqual(200, resp.status_code)
self.check_resp_headers_in_get(resp)
self.check_resp_body(body, expected_inst_attrs)
# check instantiationState of VNF
self.assertEqual(fields.VnfInstanceState.INSTANTIATED,
body.get('instantiationState'))
# check vnfState of VNF
self.assertEqual(fields.VnfOperationalStateType.STARTED,
body['instantiatedVnfInfo']['vnfState'])
# 6. Scale out operation(will fail)
scaleout_req = paramgen.scaleout_vnf_max()
resp, body = self.scale_vnf_instance(inst_id, scaleout_req)
self.assertEqual(202, resp.status_code)
self.check_resp_headers_in_operation_task(resp)
lcmocc_id = os.path.basename(resp.headers['Location'])
self.wait_lcmocc_failed_temp(lcmocc_id)
# check usageState of VNF Package
usage_state = self.get_vnf_package(self.vnf_pkg_1)['usageState']
self.assertEqual('IN_USE', usage_state)
# 7. Show VNF instance
resp, body = self.show_vnf_instance(inst_id)
self.assertEqual(200, resp.status_code)
self.check_resp_headers_in_get(resp)
self.check_resp_body(body, expected_inst_attrs)
# 8. Retry scale out operation
resp, body = self.retry_lcmocc(lcmocc_id)
self.assertEqual(202, resp.status_code)
self.check_resp_headers_in_delete(resp)
self.wait_lcmocc_failed_temp(lcmocc_id)
# 9. Rollback scale out operation
resp, body = self.rollback_lcmocc(lcmocc_id)
self.assertEqual(202, resp.status_code)
self.check_resp_headers_in_delete(resp)
self.wait_lcmocc_rolled_back(lcmocc_id)
# 10. Show VNF LCM operation occurrence
# ETSI NFV SOL003 v3.3.1 5.5.2.13 VnfLcmOpOcc
# NOTE: omitted values are not supported at that time
expected_attrs = [
'id',
'operationState',
'stateEnteredTime',
'startTime',
'vnfInstanceId',
'grantId',
'operation',
'isAutomaticInvocation',
'operationParams',
'isCancelPending',
# 'cancelMode', # omitted
'error',
# 'resourceChanges', # omitted
# 'changedInfo', # omitted
# 'changedExtConnectivity', # omitted
# 'modificationsTriggeredByVnfPkgChange', # omitted
# 'vnfSnapshotInfoId', # omitted
'_links'
]
resp, body = self.show_lcmocc(lcmocc_id)
self.assertEqual(200, resp.status_code)
self.check_resp_headers_in_get(resp)
self.check_resp_body(body, expected_attrs)
# 11. List VNF LCM operation occurrence
# NOTE: omitted values are not supported at that time
expected_attrs = [
'id',
'operationState',
'stateEnteredTime',
'startTime',
'vnfInstanceId',
# 'grantId', # omitted
'operation',
'isAutomaticInvocation',
# 'operationParams', # omitted
'isCancelPending',
# 'cancelMode', # omitted
# 'error', # omitted
# 'resourceChanges', # omitted
# 'changedInfo', # omitted
# 'changedExtConnectivity', # omitted
# 'modificationsTriggeredByVnfPkgChange', # omitted
# 'vnfSnapshotInfoId', # omitted
'_links'
]
resp, body = self.list_lcmocc()
self.assertEqual(200, resp.status_code)
self.check_resp_headers_in_get(resp)
for lcmocc in body:
self.check_resp_body(lcmocc, expected_attrs)
# 12. Terminate VNF instance
terminate_req = paramgen.terminate_vnf_max()
resp, body = self.terminate_vnf_instance(inst_id, terminate_req)
self.assertEqual(202, resp.status_code)
self.check_resp_headers_in_operation_task(resp)
lcmocc_id = os.path.basename(resp.headers['Location'])
self.wait_lcmocc_complete(lcmocc_id)
# check usageState of VNF Package
usage_state = self.get_vnf_package(self.vnf_pkg_1)['usageState']
self.assertEqual('IN_USE', usage_state)
# check instantiationState of VNF
resp, body = self.show_vnf_instance(inst_id)
self.assertEqual(200, resp.status_code)
self.assertEqual(fields.VnfInstanceState.NOT_INSTANTIATED,
body.get('instantiationState'))
# wait a bit because there is a bit time lag between vnf instance DB
# terminate and delete completion.
time.sleep(5)
# 13. Delete VNF instance
resp, body = self.delete_vnf_instance(inst_id)
self.assertEqual(204, resp.status_code)
self.check_resp_headers_in_delete(resp)
# check usageState of VNF Package
usage_state = self.get_vnf_package(self.vnf_pkg_1)['usageState']
self.assertEqual('NOT_IN_USE', usage_state)
# 14. Delete subscription
resp, body = self.delete_subscription(sub_id)
self.assertEqual(204, resp.status_code)
self.check_resp_headers_in_delete(resp)
# 15. Show subscription
resp, body = self.show_subscription(sub_id)
self.assertEqual(404, resp.status_code)
self.check_resp_headers_in_get(resp)
self.retry_rollback_scale_out_common_test(False)
def test_rollback_instantiate(self):
"""Test rollback instantiate operation
@ -542,161 +298,7 @@ class VnfLcmErrorHandlingTest(base_v2.BaseSolV2Test):
- 9. Delete VNF instance
- 10. Delete subscription
"""
# 1. Create subscription
callback_url = os.path.join(base_v2.MOCK_NOTIFY_CALLBACK_URL,
self._testMethodName)
callback_uri = ('http://localhost:'
f'{base_v2.FAKE_SERVER_MANAGER.SERVER_PORT}'
f'{callback_url}')
sub_req = paramgen.sub_create_min(callback_uri)
resp, body = self.create_subscription(sub_req)
self.assertEqual(201, resp.status_code)
self.check_resp_headers_in_create(resp)
sub_id = body['id']
# 2. Test notification
self.assert_notification_get(callback_url)
# check usageState of VNF Package
usage_state = self.get_vnf_package(self.vnf_pkg_2)['usageState']
self.assertEqual('NOT_IN_USE', usage_state)
# 3. Create VNF instance
# ETSI NFV SOL003 v3.3.1 5.5.2.2 VnfInstance
expected_inst_attrs = [
'id',
# 'vnfInstanceName', # omitted
# 'vnfInstanceDescription', # omitted
'vnfdId',
'vnfProvider',
'vnfProductName',
'vnfSoftwareVersion',
'vnfdVersion',
# 'vnfConfigurableProperties', # omitted
# 'vimConnectionInfo', # omitted
'instantiationState',
# 'instantiatedVnfInfo', # omitted
# 'metadata', # omitted
# 'extensions', # omitted
'_links'
]
create_req = paramgen.create_vnf_min(self.vnfd_id_2)
resp, body = self.create_vnf_instance(create_req)
self.assertEqual(201, resp.status_code)
self.check_resp_headers_in_create(resp)
self.check_resp_body(body, expected_inst_attrs)
inst_id = body['id']
# check usageState of VNF Package
usage_state = self.get_vnf_package(self.vnf_pkg_2)['usageState']
self.assertEqual('IN_USE', usage_state)
# check instantiationState of VNF
self.assertEqual(fields.VnfInstanceState.NOT_INSTANTIATED,
body.get('instantiationState'))
# 4. Instantiate VNF(will fail)
instantiate_req = paramgen.instantiate_vnf_min()
resp, body = self.instantiate_vnf_instance(inst_id, instantiate_req)
self.assertEqual(202, resp.status_code)
self.check_resp_headers_in_operation_task(resp)
lcmocc_id = os.path.basename(resp.headers['Location'])
self.wait_lcmocc_failed_temp(lcmocc_id)
# check usageState of VNF Package
usage_state = self.get_vnf_package(self.vnf_pkg_2)['usageState']
self.assertEqual('IN_USE', usage_state)
# 5. Show VNF instance
resp, body = self.show_vnf_instance(inst_id)
self.assertEqual(200, resp.status_code)
self.check_resp_headers_in_get(resp)
self.check_resp_body(body, expected_inst_attrs)
# check instantiationState of VNF
self.assertEqual(fields.VnfInstanceState.NOT_INSTANTIATED,
body.get('instantiationState'))
# 6. Fail instantiation operation
expected_attrs = [
'id',
'operationState',
'stateEnteredTime',
'startTime',
'vnfInstanceId',
'grantId',
'operation',
'isAutomaticInvocation',
'operationParams',
'isCancelPending',
# 'cancelMode', # omitted
'error',
# 'resourceChanges', # omitted
# 'changedInfo', # omitted
# 'changedExtConnectivity', # omitted
# 'modificationTriggeredByVnfPkgChange', # omitted
# 'vnfSnapshotInfoId', # omitted
'_links'
]
resp, body = self.fail_lcmocc(lcmocc_id)
self.assertEqual(200, resp.status_code)
self.check_resp_headers_in_get(resp)
self.check_resp_body(body, expected_attrs)
self.assertEqual('FAILED', body['operationState'])
# 7. Show VNF LCM operation occurrence
# ETSI NFV SOL003 v3.3.1 5.5.2.13 VnfLcmOpOcc
resp, body = self.show_lcmocc(lcmocc_id)
self.assertEqual(200, resp.status_code)
self.check_resp_headers_in_get(resp)
self.check_resp_body(body, expected_attrs)
# 8. List VNF LCM operation occurrence
# NOTE: omitted values are not supported at that time
expected_attrs = [
'id',
'operationState',
'stateEnteredTime',
'startTime',
'vnfInstanceId',
# 'grantId', # omitted
'operation',
'isAutomaticInvocation',
# 'operationParams', # omitted
'isCancelPending',
# 'cancelMode', # omitted
# 'error', # omitted
# 'resourceChanges', # omitted
# 'changedInfo', # omitted
# 'changedExtConnectivity', # omitted
# 'modificationsTriggeredByVnfPkgChange', # omitted
# 'vnfSnapshotInfoId', # omitted
'_links'
]
resp, body = self.list_lcmocc()
self.assertEqual(200, resp.status_code)
self.check_resp_headers_in_get(resp)
for lcmocc in body:
self.check_resp_body(lcmocc, expected_attrs)
# 9. Delete VNF instance
# Delete Stack
self.heat_client.delete_stack(f'vnf-{inst_id}')
resp, body = self.delete_vnf_instance(inst_id)
self.assertEqual(204, resp.status_code)
self.check_resp_headers_in_delete(resp)
# check usageState of VNF Package
usage_state = self.get_vnf_package(self.vnf_pkg_2)['usageState']
self.assertEqual('NOT_IN_USE', usage_state)
# 10. Delete subscription
resp, body = self.delete_subscription(sub_id)
self.assertEqual(204, resp.status_code)
self.check_resp_headers_in_delete(resp)
self.fail_instantiate_common_test(False)
def test_rollback_update(self):
"""Test rollback update VNF operation

View File

@ -25,11 +25,13 @@ from oslo_log import log as logging
from oslo_utils import uuidutils
from tempest.lib import base
from tacker.common import utils as common_utils
from tacker.sol_refactored.common import http_client
from tacker.sol_refactored.infra_drivers.openstack import heat_utils
from tacker.sol_refactored.nfvo import glance_utils
from tacker.sol_refactored import objects
from tacker.tests.functional.common.fake_server import FakeServerManager
from tacker.tests.functional.sol_v2 import utils
from tacker.tests.functional.sol_v2_common import utils
from tacker.tests import utils as base_utils
from tacker import version
@ -104,7 +106,8 @@ class BaseSolV2Test(base.BaseTestCase):
return vim_info
@classmethod
def create_vnf_package(cls, sample_path, user_data={}, image_path=None):
def create_vnf_package(cls, sample_path, user_data={},
image_path=None, nfvo=False):
vnfd_id = uuidutils.generate_uuid()
tmp_dir = tempfile.mkdtemp()
@ -113,37 +116,42 @@ class BaseSolV2Test(base.BaseTestCase):
zip_file_name = os.path.basename(os.path.abspath(sample_path)) + ".zip"
zip_file_path = os.path.join(tmp_dir, zip_file_name)
path = "/vnfpkgm/v1/vnf_packages"
req_body = {'userDefinedData': user_data}
resp, body = cls.tacker_client.do_request(
path, "POST", expected_status=[201], body=req_body)
if nfvo:
return zip_file_path, vnfd_id
pkg_id = body['id']
with open(zip_file_path, 'rb') as fp:
path = "/vnfpkgm/v1/vnf_packages/{}/package_content".format(pkg_id)
else:
path = "/vnfpkgm/v1/vnf_packages"
req_body = {'userDefinedData': user_data}
resp, body = cls.tacker_client.do_request(
path, "PUT", body=fp, content_type='application/zip',
expected_status=[202])
path, "POST", expected_status=[201], body=req_body)
# wait for onboard
timeout = VNF_PACKAGE_UPLOAD_TIMEOUT
start_time = int(time.time())
path = "/vnfpkgm/v1/vnf_packages/{}".format(pkg_id)
while True:
resp, body = cls.tacker_client.do_request(
path, "GET", expected_status=[200])
if body['onboardingState'] == "ONBOARDED":
break
pkg_id = body['id']
if ((int(time.time()) - start_time) > timeout):
raise Exception("Failed to onboard vnf package")
with open(zip_file_path, 'rb') as fp:
path = "/vnfpkgm/v1/vnf_packages/{}/package_content".format(
pkg_id)
resp, body = cls.tacker_client.do_request(
path, "PUT", body=fp, content_type='application/zip',
expected_status=[202])
time.sleep(5)
# wait for onboard
timeout = VNF_PACKAGE_UPLOAD_TIMEOUT
start_time = int(time.time())
path = "/vnfpkgm/v1/vnf_packages/{}".format(pkg_id)
while True:
resp, body = cls.tacker_client.do_request(
path, "GET", expected_status=[200])
if body['onboardingState'] == "ONBOARDED":
break
shutil.rmtree(tmp_dir)
if ((int(time.time()) - start_time) > timeout):
raise Exception("Failed to onboard vnf package")
return pkg_id, vnfd_id
time.sleep(5)
shutil.rmtree(tmp_dir)
return pkg_id, vnfd_id
@classmethod
def delete_vnf_package(cls, pkg_id):
@ -284,6 +292,66 @@ class BaseSolV2Test(base.BaseTestCase):
server_details = server
return server_details
def get_zone_list(self):
path = "/os-services"
resp, resp_body = self.nova_client.do_request(path, "GET")
zone_name_list = [zone.get("zone") for zone in
resp_body.get('services')
if zone.get("binary") == 'nova-compute']
return zone_name_list
def glance_create_image(
self, vim_info, filename, sw_data, inst_id, num_vdu=1):
min_disk = 0
if 'min_disk' in sw_data:
min_disk = common_utils.MemoryUnit.convert_unit_size_to_num(
sw_data['min_disk'], 'GB')
min_ram = 0
if 'min_ram' in sw_data:
min_ram = common_utils.MemoryUnit.convert_unit_size_to_num(
sw_data['min_ram'], 'MB')
# NOTE: use tag to find to delete images when terminate vnf instance.
create_args = {
'min_disk': min_disk,
'min_ram': min_ram,
'disk_format': sw_data.get('disk_format'),
'container_format': sw_data.get('container_format'),
'filename': filename,
'visibility': 'private',
'tags': [inst_id]
}
vim = objects.VimConnectionInfo(
vimId=vim_info.get("vimId"),
vimType=vim_info.get("vimType"),
interfaceInfo=vim_info.get("interfaceInfo"),
accessInfo=vim_info.get("accessInfo")
)
glance_client = glance_utils.GlanceClient(vim)
if num_vdu == 1:
image = glance_client.create_image(
sw_data['VDU2-VirtualStorage']['name'], **create_args)
return image.id
image_1 = glance_client.create_image(
sw_data['VDU1-VirtualStorage']['name'], **create_args)
image_2 = glance_client.create_image(
sw_data['VDU2-VirtualStorage']['name'], **create_args)
return image_1.id, image_2.id
def glance_delete_image(self, vim_info, image_ids):
vim = objects.VimConnectionInfo(
vimId=vim_info.get("vimId"),
vimType=vim_info.get("vimType"),
interfaceInfo=vim_info.get("interfaceInfo"),
accessInfo=vim_info.get("accessInfo")
)
glance_client = glance_utils.GlanceClient(vim)
for image_id in image_ids:
glance_client.delete_image(image_id)
def create_vnf_instance(self, req_body):
path = "/vnflcm/v2/vnf_instances"
return self.tacker_client.do_request(

View File

@ -20,8 +20,8 @@ import tempfile
from oslo_utils import uuidutils
from tacker.tests.functional.sol_v2 import paramgen
from tacker.tests.functional.sol_v2 import utils
from tacker.tests.functional.sol_v2_common import paramgen
from tacker.tests.functional.sol_v2_common import utils
zip_file_name = os.path.basename(os.path.abspath(".")) + '.zip'

View File

@ -12,7 +12,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tacker.tests.functional.sol_v2 import utils
from tacker.tests.functional.sol_v2_common import utils
utils.delete_network('ft-net0')
utils.delete_network('ft-net1')

View File

@ -12,7 +12,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tacker.tests.functional.sol_v2 import utils
from tacker.tests.functional.sol_v2_common import utils
utils.create_network('ft-net0')
utils.create_subnet('ft-ipv4-subnet0', 'ft-net0', '100.100.100.0/24', '4')

View File

@ -20,8 +20,8 @@ import tempfile
from oslo_utils import uuidutils
from tacker.tests.functional.sol_v2 import paramgen
from tacker.tests.functional.sol_v2 import utils
from tacker.tests.functional.sol_v2_common import paramgen
from tacker.tests.functional.sol_v2_common import utils
zip_file_name = os.path.basename(os.path.abspath(".")) + '.zip'

View File

@ -20,8 +20,8 @@ import tempfile
from oslo_utils import uuidutils
from tacker.tests.functional.sol_v2 import paramgen
from tacker.tests.functional.sol_v2 import utils
from tacker.tests.functional.sol_v2_common import paramgen
from tacker.tests.functional.sol_v2_common import utils
zip_file_name = os.path.basename(os.path.abspath(".")) + '.zip'

View File

@ -20,8 +20,8 @@ import tempfile
from oslo_utils import uuidutils
from tacker.tests.functional.sol_v2 import paramgen
from tacker.tests.functional.sol_v2 import utils
from tacker.tests.functional.sol_v2_common import paramgen
from tacker.tests.functional.sol_v2_common import utils
zip_file_name = os.path.basename(os.path.abspath(".")) + '.zip'

View File

@ -12,7 +12,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tacker.tests.functional.sol_v2 import utils
from tacker.tests.functional.sol_v2_common import utils
utils.delete_network('ft-net0')
# NOTE: subnet is automatically deleted by network deletion

View File

@ -12,7 +12,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tacker.tests.functional.sol_v2 import utils
from tacker.tests.functional.sol_v2_common import utils
utils.create_network('ft-net0')
utils.create_subnet('ft-ipv4-subnet0', 'ft-net0', '100.100.100.0/24', '4')

View File

@ -20,8 +20,8 @@ import tempfile
from oslo_utils import uuidutils
from tacker.tests.functional.sol_v2 import paramgen
from tacker.tests.functional.sol_v2 import utils
from tacker.tests.functional.sol_v2_common import paramgen
from tacker.tests.functional.sol_v2_common import utils
zip_file_name = os.path.basename(os.path.abspath(".")) + '.zip'

View File

@ -20,8 +20,8 @@ import tempfile
from oslo_utils import uuidutils
from tacker.tests.functional.sol_v2 import paramgen
from tacker.tests.functional.sol_v2 import utils
from tacker.tests.functional.sol_v2_common import paramgen
from tacker.tests.functional.sol_v2_common import utils
zip_file_name = os.path.basename(os.path.abspath(".")) + '.zip'

View File

@ -20,8 +20,8 @@ import tempfile
from oslo_utils import uuidutils
from tacker.tests.functional.sol_v2 import paramgen
from tacker.tests.functional.sol_v2 import utils
from tacker.tests.functional.sol_v2_common import paramgen
from tacker.tests.functional.sol_v2_common import utils
zip_file_name = os.path.basename(os.path.abspath(".")) + '.zip'

View File

@ -20,8 +20,8 @@ import tempfile
from oslo_utils import uuidutils
from tacker.tests.functional.sol_v2 import paramgen
from tacker.tests.functional.sol_v2 import utils
from tacker.tests.functional.sol_v2_common import paramgen
from tacker.tests.functional.sol_v2_common import utils
zip_file_name = os.path.basename(os.path.abspath(".")) + '.zip'

Some files were not shown because too many files have changed in this diff Show More