Browse Source

Merge "Add validation for hiera interpolation in services" into stable/queens

changes/89/703089/1
Zuul 1 month ago
parent
commit
b431be7510
2 changed files with 183 additions and 2 deletions
  1. +1
    -0
      requirements.txt
  2. +182
    -2
      tools/yaml-validate.py

+ 1
- 0
requirements.txt View File

@@ -3,6 +3,7 @@
# process, which may cause wedges in the gate later.
pbr!=2.1.0,>=2.0.0 # Apache-2.0
Jinja2!=2.9.0,!=2.9.1,!=2.9.2,!=2.9.3,!=2.9.4,>=2.8 # BSD License (3 clause)
PyYAML>=3.10 # MIT
six>=1.10.0 # MIT
tripleo-common>=7.1.0 # Apache-2.0
paunch>=1.0.0 # Apache-2.0

+ 182
- 2
tools/yaml-validate.py View File

@@ -13,10 +13,13 @@

import argparse
import os
import re
import sys
import traceback
import yaml

from copy import copy

# Only permit the template alias versions
# The current template version should be the last element
valid_heat_template_versions = [
@@ -691,6 +694,181 @@ def validate_service(filename, tpl):
return 0


def _rsearch_keys(d, pattern, search_keynames=False, enter_lists=False):
""" Deep regex search through a dict for k or v matching a pattern

Returns a list of the matched parent keys. Nested keypaths are
represented as lists. Looks for either values (default) or keys mathching
the search pattern. A key name may also be joined an integer index, when
the matched value belongs to a list and enter_lists is enabled.

Example:

>>> example_dict = { 'key1' : [ 'value1', { 'key1': 'value2' } ],
'key2' : 'value2',
'key3' : { 'key3a': 'value3a' },
'key4' : { 'key4a': { 'key4aa': 'value4aa',
'key4ab': 'value4ab',
'key4ac': 'value1'},
'key4b': 'value4b'} }
>>>_rsearch_keys(example_dict, 'value1', search_keynames=False,
enter_lists=True)
[['key1', 0], ['key4', 'key4a', 'key4ac']]
>>> _rsearch_keys(example_dict, 'key4aa', search_keynames=True)
[['key4', 'key4a', 'key4aa']]
>>> _rsearch_keys(example_dict, 'key1', True, True)
[['key1', 1, 'key1']]

"""
def _rsearch_keys_nested(d, pattern, search_keynames=False,
enter_lists=False, workset=None, path=None):
if path is None:
path = []
# recursively walk through the dict, optionally entering lists
if isinstance(d, dict):
for k, v in d.items():
path.append(k)
if (isinstance(v, dict) or enter_lists and
isinstance(v, list)):
# results are accumulated in the upper scope result var
_rsearch_keys_nested(v, pattern, search_keynames,
enter_lists, result, path)

if search_keynames:
target = str(k)
else:
target = str(v)

if re.search(pattern, target):
present = False
for entry in result:
if set(path).issubset(set(entry)):
present = True
break
if not present:
result.append(copy(path))

path.pop()

if enter_lists and isinstance(d, list):
for ind in range(len(d)):
path.append(ind)
if (isinstance(d[ind], dict) or
enter_lists and isinstance(d[ind], list)):
_rsearch_keys_nested(d[ind], pattern, search_keynames,
enter_lists, result, path)
if re.search(pattern, str(d[ind])):
present = False
for entry in result:
if set(path).issubset(set(entry)):
present = True
break
if not present:
result.append(copy(path))

path.pop()

return result

result = []
return _rsearch_keys_nested(d, pattern, search_keynames, enter_lists)

def _get(d, path):
""" Get a value (or None) from a dict by path given as a list

Integer values represent indexes in lists, string values are for dict keys
"""
if not isinstance(path, list):
raise LookupError("The path needs to be a list")
for step in path:
try:
d = d[step]
except KeyError:
return None
return d

def validate_service_hiera_interpol(f, tpl):
""" Validate service templates for hiera interpolation rules

Find all {get_param: [ServiceNetMap, ...]} missing hiera
interpolation of IP addresses or network ranges vs
the names of the networks, which needs no interpolation
"""
def _getindex(lst, element):
try:
pos = lst.index(element)
return pos
except ValueError:
return None

if 'ansible' in f or 'endpoint_map' in f:
return 0

failed = False
search_keynames = False
enter_lists = True
if 'outputs' in tpl and 'role_data' in tpl['outputs']:
values_found = _rsearch_keys(tpl['outputs']['role_data'],
'ServiceNetMap',
search_keynames, enter_lists)
for path in values_found:
# Omit if not a part of {get_param: [ServiceNetMap ...
if not enter_lists and path[-1] != 'get_param':
continue
if enter_lists and path[-1] != 0 and path[-2] != 'get_param':
continue

path_str = ';'.join(str(x) for x in path)
# NOTE(bogdando): Omit foo_network keys looking like a network
# name. The only exception is allow anything under
# str_replace['params'] ('str_replace;params' in the str notation).
# We need to escape because of '$' char may be in templated params.
query = re.compile(r'\\;str\\_replace\\;params\\;\S*?net',
re.IGNORECASE)
if not query.search(re.escape(path_str)):
# Keep parsing, if foo_vip_network, or anything
# else that looks like a keystore for an IP address value.
query = re.compile(r'(?!ip|cidr|addr|bind|host)([^;]\S)*?net',
re.IGNORECASE)
if query.search(re.escape(path_str)):
continue

# Omit mappings in tht, like:
# [NetXxxMap, <... ,> {get_param: [ServiceNetMap, ...
if re.search(r'Map.*get\\_param', re.escape(path_str)):
continue

# For the remaining cases, verify if there is a template
# (like str_replace) with the expected format, which is
# containing hiera(param_name) interpolation
str_replace_pos = _getindex(path, 'str_replace')
params_pos = _getindex(path, 'params')
if str_replace_pos is None or params_pos is None:
print("ERROR: Missed hiera interpolation via str_replace "
"in %s, role_data: %s"
% (f, path))
failed = True
continue

# Get the name of the templated param, like NETWORK or $NETWORK
param_name = path[params_pos + 1]
str_replace = _get(tpl['outputs']['role_data'],
path[:(str_replace_pos + 1)])
match_interp = re.search("%%\{hiera\(\S+%s\S+\)\}" %
re.escape(param_name),
str_replace['template'])
if str_replace['template'] is None or match_interp is None:
print("ERROR: Missed %%{hiera('... %s ...')} interpolation "
"in str_replace['template'] "
"in %s, role_data: %s" % (param_name, f, path))
failed = True
continue
# end processing this path and go for the next one

if failed:
return 1
else:
return 0

def validate_upgrade_tasks_duplicate_whens(filename):
"""Take a heat template and starting at the upgrade_tasks
@@ -706,8 +884,8 @@ def validate_upgrade_tasks_duplicate_whens(filename):
if ' when:' in line:
count += 1
if count > 1:
print ("ERROR: found duplicate when statements in %s "
"upgrade_task: %s %s" % (filename, line, duplicate))
print("ERROR: found duplicate when statements in %s "
"upgrade_task: %s %s" % (filename, line, duplicate))
return 1
duplicate = line
elif ' -' in line:
@@ -777,6 +955,8 @@ def validate(filename, param_map):
VALIDATE_PUPPET_OVERRIDE.get(filename, True)):
retval = validate_service(filename, tpl)

if re.search(r'(puppet|docker)\/services', filename):
retval = validate_service_hiera_interpol(filename, tpl)

if filename.startswith('./docker/services/logging/'):
retval = validate_docker_logging_template(filename, tpl)

Loading…
Cancel
Save