Pass Loader to yaml.load to remove YAMLLoadWarning

Pass explicitly Loader to yaml.load so that we can get rid of
YAMLLoadWarning [1].
[1] https://msg.pyyaml.org/load

Also make sure to use context manager when opening file,
so that we don't miss to close it.

Change-Id: Ic12499c3286c6bed5e0ee231ad0c25e01f82b58e
This commit is contained in:
Takashi Kajinami 2019-09-06 10:38:46 +09:00
parent b8b75f5285
commit 0bef980f59
3 changed files with 49 additions and 20 deletions

View File

@ -246,7 +246,9 @@ def backup_template():
def merge_from_processed(reference_params):
template = yaml.load(open(OPTS.template).read(), Loader=TemplateLoader)
with open(OPTS.template, 'r') as f:
template = yaml.load(f.read(), Loader=TemplateLoader)
for param in reference_params:
if param not in template['parameters']:
template['parameters'][param] = reference_params[param]

View File

@ -176,7 +176,8 @@ def write_template(template, filename=None):
def convert(filename, script_path):
print('Converting %s' % filename)
try:
tpl = yaml.load(open(filename).read(), Loader=TemplateLoader)
with open(filename, 'r') as f:
tpl = yaml.load(f.read(), Loader=TemplateLoader)
except Exception:
print(traceback.format_exc())
return 0
@ -219,7 +220,7 @@ def convert(filename, script_path):
def check_old_style(filename):
with open(filename, 'r') as f:
tpl = yaml.load(f.read())
tpl = yaml.load(f.read(), Loader=yaml.FullLoader)
if isinstance(tpl.get('resources', {}), dict):
for r in (tpl.get('resources', {})).items():

View File

@ -275,7 +275,8 @@ def to_camel_case(string):
def get_base_endpoint_map(filename):
try:
tpl = yaml.load(open(filename).read())
with open(filename, 'r') as f:
tpl = yaml.load(f.read(), Loader=yaml.FullLoader)
return tpl['parameters']['EndpointMap']['default']
except Exception:
print(traceback.format_exc())
@ -284,7 +285,8 @@ def get_base_endpoint_map(filename):
def get_endpoint_map_from_env(filename):
try:
tpl = yaml.load(open(filename).read())
with open(filename, 'r') as f:
tpl = yaml.load(f.read(), Loader=yaml.FullLoader)
return {
'file': filename,
'map': tpl['parameter_defaults']['EndpointMap']
@ -299,7 +301,10 @@ def validate_endpoint_map(base_map, env_map):
def validate_role_name(filename):
role_data = yaml.load(open(filename).read())[0]
with open(filename, 'r') as f:
tpl = yaml.load(f.read(), Loader=yaml.FullLoader)
role_data = tpl[0]
if role_data['name'] != os.path.basename(filename).split('.')[0]:
print('ERROR: role name should match file name for role : %s.'
% filename)
@ -312,7 +317,9 @@ def validate_hci_compute_services_default(env_filename, env_tpl):
env_services_list.remove('OS::TripleO::Services::CephOSD')
roles_filename = os.path.join(os.path.dirname(env_filename),
'../roles/Compute.yaml')
roles_tpl = yaml.load(open(roles_filename).read())
with open(roles_filename, 'r') as f:
roles_tpl = yaml.load(f.read(), Loader=yaml.FullLoader)
for role in roles_tpl:
if role['name'] == 'Compute':
roles_services_list = role['ServicesDefault']
@ -326,7 +333,9 @@ def validate_hci_compute_services_default(env_filename, env_tpl):
def validate_hci_computehci_role(hci_role_filename, hci_role_tpl):
compute_role_filename = os.path.join(os.path.dirname(hci_role_filename),
'./Compute.yaml')
compute_role_tpl = yaml.load(open(compute_role_filename).read())
with open(compute_role_filename, 'r') as f:
compute_role_tpl = yaml.load(f.read(), Loader=yaml.FullLoader)
compute_role_services = compute_role_tpl[0]['ServicesDefault']
for role in hci_role_tpl:
if role['name'] == 'ComputeHCI':
@ -342,7 +351,9 @@ def validate_hci_computehci_role(hci_role_filename, hci_role_tpl):
def validate_controller_dashboard(filename, tpl):
control_role_filename = os.path.join(os.path.dirname(filename),
'./Controller.yaml')
control_role_tpl = yaml.load(open(control_role_filename).read())
with open(control_role_filename, 'r') as f:
control_role_tpl = yaml.load(f.read(), Loader=yaml.FullLoader)
control_role_services = control_role_tpl[0]['ServicesDefault']
for role in tpl:
if role['name'] == 'ControllerStorageDashboard':
@ -359,7 +370,9 @@ def validate_hci_role(hci_role_filename, hci_role_tpl):
if hci_role_filename in ['./roles/' + x + '.yaml' for x in role_files]:
compute_role_filename = \
os.path.join(os.path.dirname(hci_role_filename), './Compute.yaml')
compute_role_tpl = yaml.load(open(compute_role_filename).read())
with open(compute_role_filename, 'r') as f:
compute_role_tpl = yaml.load(f.read(), Loader=yaml.FullLoader)
compute_role_services = compute_role_tpl[0]['ServicesDefault']
for role in hci_role_tpl:
if role['name'] == 'HciCephAll':
@ -396,7 +409,9 @@ def validate_ceph_role(ceph_role_filename, ceph_role_tpl):
if ceph_role_filename in ['./roles/' + x + '.yaml' for x in role_files]:
ceph_storage_role_filename = \
os.path.join(os.path.dirname(ceph_role_filename), './CephStorage.yaml')
ceph_storage_role_tpl = yaml.load(open(ceph_storage_role_filename).read())
with open(ceph_storage_role_filename, 'r') as f:
ceph_storage_role_tpl = yaml.load(f.read(), Loader=yaml.FullLoader)
ceph_storage_role_services = ceph_storage_role_tpl[0]['ServicesDefault']
for role in ceph_role_tpl:
if role['name'] == 'CephAll':
@ -425,7 +440,9 @@ def validate_ceph_role(ceph_role_filename, ceph_role_tpl):
def validate_controller_no_ceph_role(filename, tpl):
control_role_filename = os.path.join(os.path.dirname(filename),
'./Controller.yaml')
control_role_tpl = yaml.load(open(control_role_filename).read())
with open(control_role_filename, 'r') as f:
control_role_tpl = yaml.load(f.read(), Loader=yaml.FullLoader)
control_role_services = control_role_tpl[0]['ServicesDefault']
for role in tpl:
if role['name'] == 'ControllerNoCeph':
@ -447,7 +464,9 @@ def validate_controller_no_ceph_role(filename, tpl):
def validate_with_compute_role_services(role_filename, role_tpl, exclude_service=()):
cmpt_filename = os.path.join(os.path.dirname(role_filename),
'./Compute.yaml')
cmpt_tpl = yaml.load(open(cmpt_filename).read())
with open(cmpt_filename, 'r') as f:
cmpt_tpl = yaml.load(f.read(), Loader=yaml.FullLoader)
cmpt_services = cmpt_tpl[0]['ServicesDefault']
cmpt_services = [x for x in cmpt_services if (x not in exclude_service)]
@ -490,7 +509,7 @@ def validate_multiarch_compute_roles(role_filename, role_tpl):
for arch in ['ppc64le']:
arch_filename = os.path.join(roles_dir,
'Compute%s.yaml' % (arch.upper()))
with open(arch_filename) as f:
with open(arch_filename, 'r') as f:
arch_tpl = yaml.safe_load(f)
arch_services = set(arch_tpl[0].get('ServicesDefault', []))
@ -579,8 +598,10 @@ def validate_docker_service_mysql_usage(filename, tpl):
# disregard class names, only consider file names
if 'OS::' in f:
continue
newfile = os.path.normpath(os.path.dirname(incfile) + '/' + f)
newtmp = yaml.load(open(newfile).read())
newfilename = \
os.path.normpath(os.path.join(os.path.dirname(incfile), f))
with open(newfilename, 'r') as newfile:
newtmp = yaml.load(newfile.read(), Loader=yaml.FullLoader)
read_all(newfile, newtmp)
read_all(filename, tpl)
@ -985,7 +1006,7 @@ def validate_upgrade_tasks_duplicate_whens(filename):
"""Take a heat template and starting at the upgrade_tasks
try to detect duplicate 'when:' statements
"""
with open(filename) as template:
with open(filename, 'r') as template:
contents = template.read()
upgrade_task_position = contents.index('upgrade_tasks')
lines = contents[upgrade_task_position:].splitlines()
@ -1034,7 +1055,8 @@ def validate(filename, param_map):
print('Validating %s' % filename)
retval = 0
try:
tpl = yaml.load(open(filename).read())
with open(filename, 'r') as f:
tpl = yaml.load(f.read(), Loader=yaml.FullLoader)
is_heat_template = 'heat_template_version' in tpl
@ -1219,9 +1241,13 @@ def validate_upgrade_tasks(upgrade_tasks):
def validate_network_data_file(data_file_path):
try:
data_file = yaml.load(open(data_file_path).read())
with open(data_file_path, 'r') as data_file:
data_file = yaml.load(data_file.read(), Loader=yaml.FullLoader)
base_file_path = os.path.dirname(data_file_path) + "/network_data.yaml"
base_file = yaml.load(open(base_file_path).read())
with open(base_file_path, 'r') as base_file:
base_file = yaml.load(base_file.read(), Loader=yaml.FullLoader)
retval = 0
for n in base_file:
if n not in data_file: