Revise package generator

* Fix a bug of using `shutil.move` terminating the command if output
  files already exist.

* Accept lowercases of VIM types because it a little bit annoying if
  run the command on usual terminal.

* Check required networks exist before generating package.

* Revise some duplicated lines.

Change-Id: I5d54371f3faf500b53e60303b404fc5d63fa6e47
This commit is contained in:
Yasufumi Ogawa
2024-10-14 18:51:23 +00:00
parent 752881e570
commit 7e16ed8432
2 changed files with 199 additions and 150 deletions

View File

@@ -14,17 +14,15 @@
# under the License.
import argparse
import glob
import json
import os
import pprint
import shutil
import tempfile
import subprocess
import sys
import textwrap
import yaml
import zipfile
from oslo_utils import uuidutils
from tacker.tests.functional.sol_kubernetes_v2 import paramgen as k8s_paramgen
from tacker.tests.functional.sol_kubernetes_v2 import zipgen as k8s_zipgen
from tacker.tests.functional.sol_v2_common import paramgen
@@ -32,195 +30,232 @@ from tacker.tests.functional.sol_v2_common import utils
from tacker.tests.functional.sol_v2_common import zipgen
STANDARD_OUTPUT="./output/userdata_standard/"
K8S_OUTPUT="./output/test_instantiate_cnf_resources/"
HELM_OUTPUT="./output/helm_instantiate/"
OUTPUT_ROOT=f"{os.path.curdir}/output"
# Output package dirs for each VIM types.
STANDARD_OUTPUT=f"{OUTPUT_ROOT}/userdata_standard"
K8S_OUTPUT=f"{OUTPUT_ROOT}/test_instantiate_cnf_resources"
HELM_OUTPUT=f"{OUTPUT_ROOT}/helm_instantiate"
# Set of names of short and actual VIM type.
VIM_TYPES = (VIM_OPENSTACK, VIM_KUBERNETES, VIM_HELM) = \
('ETSINFV.OPENSTACK_KEYSTONE.V_3',
'ETSINFV.KUBERNETES.V_1',
'ETSINFV.HELM.V_3')
# Networks and subnets required for the Tacker's sample VNF package.
REQ_NWS = [
{"net": "net0", "subnets": ["subnet0"]},
{"net": "net1", "subnets": ["subnet1"]},
{"net": "net_mgmt", "subnets": []}]
def get_args():
"""Return parsed args of argparse"""
parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter,
description='Create VNF Package zip and parameter files',
add_help=True
)
add_help=True)
parser.add_argument('-t', '--type',
help=textwrap.dedent('''\
specify the vim type
* ETSINFV.OPENSTACK_KEYSTONE.V_3
* ETSINFV.KUBERNETES.V_1
* ETSINFV.HELM.V_3
'''),
vim type (lowercase is also available)
* {}
* {}
* {}
'''.format(*VIM_TYPES)),
type=str, metavar='VIM_TYPE',
choices=['ETSINFV.OPENSTACK_KEYSTONE.V_3', 'ETSINFV.KUBERNETES.V_1', 'ETSINFV.HELM.V_3'],
choices=list(VIM_TYPES)+[s.lower() for s in VIM_TYPES],
required=True)
args = parser.parse_args()
return parser.parse_args()
if args.type == 'ETSINFV.OPENSTACK_KEYSTONE.V_3':
print(f"VIM type = {args.type}")
def ask_overwrite(dst_dir):
"""Ask user to overwrite contents in the dest dir if exists
Get the answer from stdin and return True if it's yes or no input.
"""
yes = ["y", "Y", "yes", "YES", "Yes"]
no = ["n", "N", "no", "NO", "No"]
if os.path.exists(dst_dir) is True:
while True:
print(f"Overwrite contents in {dst_dir!r}? [Y/n]")
line = sys.stdin.readline().rstrip("\n")
if line in yes or line == "":
return True
elif line in no:
return False
return True
def req_nw_names():
"""Get a set of lists of required nets and subnets"""
nets = [n["net"] for n in REQ_NWS]
subnets = [sn for n in REQ_NWS for sn in n["subnets"]]
return nets, subnets
def has_all_req_networks():
"""Check if any required network not exists
Return a combination of required networks not found on openstack. So,
it returns empty list if all the required networks exist."""
nets, subnets = req_nw_names()
nw_no_exist = {"nets": [], "subnets": []}
cmd_nwl = ["openstack", "network", "list", "-f", "json"]
os_nws = json.loads(
subprocess.run(cmd_nwl, capture_output=True).stdout)
net_names = [n["Name"] for n in os_nws]
cmd_snwl = ["openstack", "subnet", "list", "-f", "json"]
os_subnws = json.loads(
subprocess.run(cmd_snwl, capture_output=True).stdout)
subnet_names = [sn["Name"] for sn in os_subnws]
for n in nets:
if n not in net_names:
nw_no_exist["nets"].append(n)
for sn in subnets:
if sn not in subnet_names:
nw_no_exist["subnets"].append(sn)
if nw_no_exist["nets"] or nw_no_exist["subnets"]:
return False
return True
def main():
args = get_args()
if has_all_req_networks() is False:
print("Error: Create all the required nets and subnets below "
"before running this script.")
pprint.pprint(REQ_NWS, indent=2)
exit()
if args.type == VIM_OPENSTACK or args.type == VIM_OPENSTACK.lower():
if ask_overwrite(STANDARD_OUTPUT) is not True:
print("Skip to generate package.")
exit()
os.makedirs(STANDARD_OUTPUT, exist_ok=True)
vnfd_id, zip_path = zipgen.userdata_standard()
print(f"Zip file: {zip_path.rsplit('/', 1)[1]}")
vnfd_id, zip_src = zipgen.userdata_standard()
zip_fn = zip_src.rsplit('/', 1)[1]
zip_dst = f"{STANDARD_OUTPUT}/{zip_fn}"
shutil.move(zip_path, STANDARD_OUTPUT)
shutil.rmtree(zip_path.rsplit('/', 1)[0])
print(f"Generating zip file: {STANDARD_OUTPUT}/{zip_fn} ...")
create_req = paramgen.sample3_create(vnfd_id)
terminate_req = paramgen.sample3_terminate()
shutil.move(os.path.abspath(zip_src), os.path.abspath(zip_dst))
shutil.rmtree(zip_src.rsplit('/', 1)[0])
net_ids = utils.get_network_ids(['net0', 'net1', 'net_mgmt'])
subnet_ids = utils.get_subnet_ids(['subnet0', 'subnet1'])
nets, subnets = req_nw_names()
net_ids = utils.get_network_ids(nets)
subnet_ids = utils.get_subnet_ids(subnets)
instantiate_req = paramgen.sample3_instantiate(
net_ids, subnet_ids, "http://localhost/identity/v3")
req_files = {
"create_req": paramgen.sample3_create(vnfd_id),
"terminate_req": paramgen.sample3_terminate(),
"instantiate_req": paramgen.sample3_instantiate(
net_ids, subnet_ids, "http://localhost/identity/v3"),
"scale_out_req": paramgen.sample3_scale_out(),
"scale_in_req": paramgen.sample3_scale_in(),
"heal_req": paramgen.sample3_heal(),
"change_ext_conn_req": paramgen.sample3_change_ext_conn(net_ids),
"update_req": paramgen.sample3_update_vnf_vnfd_id("replace real vnfd id")
}
scale_out_req = paramgen.sample3_scale_out()
scale_in_req = paramgen.sample3_scale_in()
heal_req = paramgen.sample3_heal()
change_ext_conn_req = paramgen.sample3_change_ext_conn(net_ids)
update_req = paramgen.sample3_update_vnf_vnfd_id("replace real vnfd id")
for fn in req_files.keys():
with open(f"{STANDARD_OUTPUT}/{fn}", "w") as f:
f.write(json.dumps(req_files[fn], indent=2))
with open(f"{STANDARD_OUTPUT}create_req", "w") as f:
f.write(json.dumps(create_req, indent=2))
with open(f"{STANDARD_OUTPUT}terminate_req", "w") as f:
f.write(json.dumps(terminate_req, indent=2))
with open(f"{STANDARD_OUTPUT}instantiate_req", "w") as f:
f.write(json.dumps(instantiate_req, indent=2))
with open(f"{STANDARD_OUTPUT}scale_out_req", "w") as f:
f.write(json.dumps(scale_out_req, indent=2))
with open(f"{STANDARD_OUTPUT}scale_in_req", "w") as f:
f.write(json.dumps(scale_in_req, indent=2))
# NOTE: vnfcInstanceId should be filled by hand
with open(f"{STANDARD_OUTPUT}heal_req", "w") as f:
f.write(json.dumps(heal_req, indent=2))
with open(f"{STANDARD_OUTPUT}change_ext_conn_req", "w") as f:
f.write(json.dumps(change_ext_conn_req, indent=2))
with open(f"{STANDARD_OUTPUT}update_req", "w") as f:
f.write(json.dumps(update_req, indent=2))
print("--------------------------------------------------")
zip_file = zipfile.ZipFile(f"{STANDARD_OUTPUT}userdata_standard.zip")
files = zip_file.namelist()
for file in files:
print(file)
zip_file.close()
print("--------------------------------------------------")
elif args.type == 'ETSINFV.KUBERNETES.V_1':
print(f"VIM type: {args.type}")
with zipfile.ZipFile(zip_dst) as zf:
zf.printdir()
elif args.type == VIM_KUBERNETES or args.type == VIM_KUBERNETES.lower():
if ask_overwrite(K8S_OUTPUT) is not True:
print("Skip to generate package.")
exit()
os.makedirs(K8S_OUTPUT, exist_ok=True)
vnfd_id, zip_path = k8s_zipgen.test_instantiate_cnf_resources()
print(f"Zip file: {zip_path.rsplit('/', 1)[1]}")
vnfd_id, zip_src = k8s_zipgen.test_instantiate_cnf_resources()
zip_fn = zip_src.rsplit('/', 1)[1]
zip_dst = f"{K8S_OUTPUT}/{zip_fn}"
shutil.move(zip_path, K8S_OUTPUT)
shutil.rmtree(zip_path.rsplit('/', 1)[0])
print(f"Generating zip file: {K8S_OUTPUT}/{zip_fn} ...")
create_req = k8s_paramgen.test_instantiate_cnf_resources_create(vnfd_id)
shutil.move(os.path.abspath(zip_src), os.path.abspath(zip_dst))
shutil.rmtree(zip_src.rsplit('/', 1)[0])
# please change auth_url, bearer_token and ssl_ca_cert
# TODO(yasufum) Enable to change auth_url, bearer_token and ssl_ca_cert
# to your own k8s cluster's info
auth_url = "https://127.0.0.1:6443"
bearer_token = "your_k8s_cluster_bearer_token"
ssl_ca_cert = "k8s_ssl_ca_cert"
max_sample_instantiate = k8s_paramgen.max_sample_instantiate(
auth_url, bearer_token, ssl_ca_cert)
max_sample_terminate = k8s_paramgen.max_sample_terminate()
max_sample_scale_out = k8s_paramgen.max_sample_scale_out()
max_sample_scale_in = k8s_paramgen.max_sample_scale_in()
max_sample_heal = k8s_paramgen.max_sample_heal(["replace real vnfc ids"])
req_files = {
"create_req": k8s_paramgen.test_instantiate_cnf_resources_create(vnfd_id),
"max_sample_instantiate": k8s_paramgen.max_sample_instantiate(
auth_url, bearer_token, ssl_ca_cert),
"max_sample_terminate": k8s_paramgen.max_sample_terminate(),
"max_sample_scale_out": k8s_paramgen.max_sample_scale_out(),
"max_sample_scale_in": k8s_paramgen.max_sample_scale_in(),
"max_sample_heal": k8s_paramgen.max_sample_heal(["replace real vnfc ids"])
}
with open(f"{K8S_OUTPUT}create_req", "w", encoding='utf-8') as f:
f.write(json.dumps(create_req, indent=2))
for fn in req_files.keys():
with open(f"{K8S_OUTPUT}/{fn}", "w", encoding='utf-8') as f:
f.write(json.dumps(req_files[fn], indent=2))
with open(f"{K8S_OUTPUT}max_sample_instantiate", "w", encoding='utf-8') as f:
f.write(json.dumps(max_sample_instantiate, indent=2))
with open(f"{K8S_OUTPUT}max_sample_terminate", "w", encoding='utf-8') as f:
f.write(json.dumps(max_sample_terminate, indent=2))
with open(f"{K8S_OUTPUT}max_sample_scale_out", "w", encoding='utf-8') as f:
f.write(json.dumps(max_sample_scale_out, indent=2))
with open(f"{K8S_OUTPUT}max_sample_scale_in", "w", encoding='utf-8') as f:
f.write(json.dumps(max_sample_scale_in, indent=2))
with open(f"{K8S_OUTPUT}max_sample_heal", "w", encoding='utf-8') as f:
f.write(json.dumps(max_sample_heal, indent=2))
print("--------------------------------------------------")
zip_file = zipfile.ZipFile(f"{K8S_OUTPUT}test_instantiate_cnf_resources.zip")
files = zip_file.namelist()
for file in files:
print(file)
zip_file.close()
print("--------------------------------------------------")
elif args.type == 'ETSINFV.HELM.V_3':
print(f"VIM type = {args.type}")
with zipfile.ZipFile(zip_dst) as zf:
zf.printdir()
elif args.type == VIM_HELM or args.type == VIM_HELM.lower():
if ask_overwrite(HELM_OUTPUT) is not True:
print("Skip to generate package.")
exit()
os.makedirs(HELM_OUTPUT, exist_ok=True)
vnfd_id, zip_path = k8s_zipgen.test_helm_instantiate()
print(f"Zip file: {zip_path.rsplit('/', 1)[1]}")
vnfd_id, zip_src = k8s_zipgen.test_helm_instantiate()
zip_fn = zip_src.rsplit('/', 1)[1]
zip_dst = f"{HELM_OUTPUT}/{zip_fn}"
shutil.move(zip_path, HELM_OUTPUT)
shutil.rmtree(zip_path.rsplit('/', 1)[0])
print(f"Generating zip file: {HELM_OUTPUT}/{zip_fn} ...")
create_req = k8s_paramgen.test_helm_instantiate_create(vnfd_id)
shutil.move(os.path.abspath(zip_src), os.path.abspath(zip_dst))
shutil.rmtree(zip_src.rsplit('/', 1)[0])
# please change auth_url, bearer_token and ssl_ca_cert
# TODO(yasufum) Enable to change auth_url, bearer_token and ssl_ca_cert
# to your own k8s cluster's info
auth_url = "https://127.0.0.1:6443"
bearer_token = "your_k8s_cluster_bearer_token"
ssl_ca_cert = "k8s_ssl_ca_cert"
helm_instantiate_req = k8s_paramgen.helm_instantiate(
auth_url, bearer_token, ssl_ca_cert)
helm_terminate_req = k8s_paramgen.helm_terminate()
helm_scale_out = k8s_paramgen.helm_scale_out()
helm_scale_in = k8s_paramgen.helm_scale_in()
helm_heal = k8s_paramgen.helm_heal(["replace real vnfc ids"])
req_files = {
"create_req": k8s_paramgen.test_helm_instantiate_create(vnfd_id),
"helm_instantiate_req": k8s_paramgen.helm_instantiate(
auth_url, bearer_token, ssl_ca_cert),
"helm_terminate_req": k8s_paramgen.helm_terminate(),
"helm_scale_out": k8s_paramgen.helm_scale_out(),
"helm_scale_in": k8s_paramgen.helm_scale_in(),
"helm_heal": k8s_paramgen.helm_heal(["replace real vnfc ids"])
}
for fn in req_files.keys():
with open(f"{HELM_OUTPUT}/{fn}", "w", encoding='utf-8') as f:
f.write(json.dumps(req_files[fn], indent=2))
with open(f"{HELM_OUTPUT}create_req", "w", encoding='utf-8') as f:
f.write(json.dumps(create_req, indent=2))
with zipfile.ZipFile(zip_dst) as zf:
zf.printdir()
with open(f"{HELM_OUTPUT}helm_instantiate_req", "w", encoding='utf-8') as f:
f.write(json.dumps(helm_instantiate_req, indent=2))
with open(f"{HELM_OUTPUT}helm_terminate_req", "w", encoding='utf-8') as f:
f.write(json.dumps(helm_terminate_req, indent=2))
with open(f"{HELM_OUTPUT}helm_scale_out", "w", encoding='utf-8') as f:
f.write(json.dumps(helm_scale_out, indent=2))
with open(f"{HELM_OUTPUT}helm_scale_in", "w", encoding='utf-8') as f:
f.write(json.dumps(helm_scale_in, indent=2))
with open(f"{HELM_OUTPUT}helm_heal", "w", encoding='utf-8') as f:
f.write(json.dumps(helm_heal, indent=2))
print("--------------------------------------------------")
zip_file = zipfile.ZipFile(f"{HELM_OUTPUT}test_helm_instantiate.zip")
files = zip_file.namelist()
for file in files:
print(file)
zip_file.close()
print("--------------------------------------------------")
# NOTE(yasufum): Do not examine if __file__ is '__main__' for considering
# this script can be run from tox.
main()

14
tox.ini
View File

@@ -325,3 +325,17 @@ commands =
# further relies on "tox.skipsdist = True" above).
deps = bindep
commands = bindep test
[testenv:gen-pkg]
pass_env =
OS_REGION_NAME
OS_PROJECT_DOMAIN_ID
OS_AUTH_URL
OS_USER_DOMAIN_ID
OS_USERNAME
OS_VOLUME_API_VERSION
OS_AUTH_TYPE
OS_PROJECT_NAME
OS_PASSWORD
commands =
python3 tools/gen_vnf_pkg.py {posargs}