Add validation for hiera interpolation in services
Walk through services' templates role_data to identify missing hiera interplolation of networks. Use additionally provided interfaces for validations: * search in dicts by keys or values matching some regex, entering into lists as an option; * safe get values by the discovered paths casted as lists, like get_param/get_attr works for heat templates. Add PyYAML missing to the requirements.txt. Closes-bug: #1764315 Change-Id: Idef66ee96cbd67d23760a1cce9537ecc157c3429 Signed-off-by: Bogdan Dobrelya <bdobreli@redhat.com>
This commit is contained in:
parent
2e224ddaaa
commit
0b44170e73
@ -2,6 +2,7 @@
|
||||
# of appearance. Changing the order has an impact on the overall integration
|
||||
# process, which may cause wedges in the gate later.
|
||||
pbr!=2.1.0,>=2.0.0 # Apache-2.0
|
||||
PyYAML>=3.12 # MIT
|
||||
Jinja2>=2.10 # BSD License (3 clause)
|
||||
six>=1.10.0 # MIT
|
||||
tripleo-common>=7.1.0 # Apache-2.0
|
||||
|
@ -13,10 +13,14 @@
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import traceback
|
||||
import yaml
|
||||
|
||||
from copy import copy
|
||||
from sets import Set
|
||||
|
||||
# Only permit the template alias versions
|
||||
# The current template version should be the last element
|
||||
valid_heat_template_versions = [
|
||||
@ -604,6 +608,181 @@ def validate_service(filename, tpl):
|
||||
return 0
|
||||
|
||||
|
||||
def _rsearch_keys(d, pattern, search_keynames=False, enter_lists=False):
|
||||
""" Deep regex search through a dict for k or v matching a pattern
|
||||
|
||||
Returns a list of the matched parent keys. Nested keypaths are
|
||||
represented as lists. Looks for either values (default) or keys mathching
|
||||
the search pattern. A key name may also be joined an integer index, when
|
||||
the matched value belongs to a list and enter_lists is enabled.
|
||||
|
||||
Example:
|
||||
|
||||
>>> example_dict = { 'key1' : [ 'value1', { 'key1': 'value2' } ],
|
||||
'key2' : 'value2',
|
||||
'key3' : { 'key3a': 'value3a' },
|
||||
'key4' : { 'key4a': { 'key4aa': 'value4aa',
|
||||
'key4ab': 'value4ab',
|
||||
'key4ac': 'value1'},
|
||||
'key4b': 'value4b'} }
|
||||
>>>_rsearch_keys(example_dict, 'value1', search_keynames=False,
|
||||
enter_lists=True)
|
||||
[['key1', 0], ['key4', 'key4a', 'key4ac']]
|
||||
>>> _rsearch_keys(example_dict, 'key4aa', search_keynames=True)
|
||||
[['key4', 'key4a', 'key4aa']]
|
||||
>>> _rsearch_keys(example_dict, 'key1', True, True)
|
||||
[['key1', 1, 'key1']]
|
||||
|
||||
"""
|
||||
def _rsearch_keys_nested(d, pattern, search_keynames=False,
|
||||
enter_lists=False, workset=None, path=None):
|
||||
if path is None:
|
||||
path = []
|
||||
# recursively walk through the dict, optionally entering lists
|
||||
if isinstance(d, dict):
|
||||
for k, v in d.iteritems():
|
||||
path.append(k)
|
||||
if (isinstance(v, dict) or enter_lists and
|
||||
isinstance(v, list)):
|
||||
# results are accumulated in the upper scope result var
|
||||
_rsearch_keys_nested(v, pattern, search_keynames,
|
||||
enter_lists, result, path)
|
||||
|
||||
if search_keynames:
|
||||
target = str(k)
|
||||
else:
|
||||
target = str(v)
|
||||
|
||||
if re.search(pattern, target):
|
||||
present = False
|
||||
for entry in result:
|
||||
if Set(path).issubset(Set(entry)):
|
||||
present = True
|
||||
break
|
||||
if not present:
|
||||
result.append(copy(path))
|
||||
|
||||
path.pop()
|
||||
|
||||
if enter_lists and isinstance(d, list):
|
||||
for ind in range(len(d)):
|
||||
path.append(ind)
|
||||
if (isinstance(d[ind], dict) or
|
||||
enter_lists and isinstance(d[ind], list)):
|
||||
_rsearch_keys_nested(d[ind], pattern, search_keynames,
|
||||
enter_lists, result, path)
|
||||
if re.search(pattern, str(d[ind])):
|
||||
present = False
|
||||
for entry in result:
|
||||
if Set(path).issubset(Set(entry)):
|
||||
present = True
|
||||
break
|
||||
if not present:
|
||||
result.append(copy(path))
|
||||
|
||||
path.pop()
|
||||
|
||||
return result
|
||||
|
||||
result = []
|
||||
return _rsearch_keys_nested(d, pattern, search_keynames, enter_lists)
|
||||
|
||||
def _get(d, path):
|
||||
""" Get a value (or None) from a dict by path given as a list
|
||||
|
||||
Integer values represent indexes in lists, string values are for dict keys
|
||||
"""
|
||||
if not isinstance(path, list):
|
||||
raise LookupError("The path needs to be a list")
|
||||
for step in path:
|
||||
try:
|
||||
d = d[step]
|
||||
except KeyError:
|
||||
return None
|
||||
return d
|
||||
|
||||
def validate_service_hiera_interpol(f, tpl):
|
||||
""" Validate service templates for hiera interpolation rules
|
||||
|
||||
Find all {get_param: [ServiceNetMap, ...]} missing hiera
|
||||
interpolation of IP addresses or network ranges vs
|
||||
the names of the networks, which needs no interpolation
|
||||
"""
|
||||
def _getindex(lst, element):
|
||||
try:
|
||||
pos = lst.index(element)
|
||||
return pos
|
||||
except ValueError:
|
||||
return None
|
||||
|
||||
if 'ansible' in f or 'endpoint_map' in f:
|
||||
return 0
|
||||
|
||||
failed = False
|
||||
search_keynames = False
|
||||
enter_lists = True
|
||||
if 'outputs' in tpl and 'role_data' in tpl['outputs']:
|
||||
values_found = _rsearch_keys(tpl['outputs']['role_data'],
|
||||
'ServiceNetMap',
|
||||
search_keynames, enter_lists)
|
||||
for path in values_found:
|
||||
# Omit if not a part of {get_param: [ServiceNetMap ...
|
||||
if not enter_lists and path[-1] != 'get_param':
|
||||
continue
|
||||
if enter_lists and path[-1] != 0 and path[-2] != 'get_param':
|
||||
continue
|
||||
|
||||
path_str = ';'.join(str(x) for x in path)
|
||||
# NOTE(bogdando): Omit foo_network keys looking like a network
|
||||
# name. The only exception is allow anything under
|
||||
# str_replace['params'] ('str_replace;params' in the str notation).
|
||||
# We need to escape because of '$' char may be in templated params.
|
||||
query = re.compile(r'\\;str\\_replace\\;params\\;\S*?net',
|
||||
re.IGNORECASE)
|
||||
if not query.search(re.escape(path_str)):
|
||||
# Keep parsing, if foo_vip_network, or anything
|
||||
# else that looks like a keystore for an IP address value.
|
||||
query = re.compile(r'(?!ip|cidr|addr|bind|host)([^;]\S)*?net',
|
||||
re.IGNORECASE)
|
||||
if query.search(re.escape(path_str)):
|
||||
continue
|
||||
|
||||
# Omit mappings in tht, like:
|
||||
# [NetXxxMap, <... ,> {get_param: [ServiceNetMap, ...
|
||||
if re.search(r'Map.*get\\_param', re.escape(path_str)):
|
||||
continue
|
||||
|
||||
# For the remaining cases, verify if there is a template
|
||||
# (like str_replace) with the expected format, which is
|
||||
# containing hiera(param_name) interpolation
|
||||
str_replace_pos = _getindex(path, 'str_replace')
|
||||
params_pos = _getindex(path, 'params')
|
||||
if str_replace_pos is None or params_pos is None:
|
||||
print("ERROR: Missed hiera interpolation via str_replace "
|
||||
"in %s, role_data: %s"
|
||||
% (f, path))
|
||||
failed = True
|
||||
continue
|
||||
|
||||
# Get the name of the templated param, like NETWORK or $NETWORK
|
||||
param_name = path[params_pos + 1]
|
||||
str_replace = _get(tpl['outputs']['role_data'],
|
||||
path[:(str_replace_pos + 1)])
|
||||
match_interp = re.search("%%\{hiera\(\S+%s\S+\)\}" %
|
||||
re.escape(param_name),
|
||||
str_replace['template'])
|
||||
if str_replace['template'] is None or match_interp is None:
|
||||
print("ERROR: Missed %%{hiera('... %s ...')} interpolation "
|
||||
"in str_replace['template'] "
|
||||
"in %s, role_data: %s" % (param_name, f, path))
|
||||
failed = True
|
||||
continue
|
||||
# end processing this path and go for the next one
|
||||
|
||||
if failed:
|
||||
return 1
|
||||
else:
|
||||
return 0
|
||||
|
||||
def validate_upgrade_tasks_duplicate_whens(filename):
|
||||
"""Take a heat template and starting at the upgrade_tasks
|
||||
@ -619,8 +798,8 @@ def validate_upgrade_tasks_duplicate_whens(filename):
|
||||
if ' when:' in line:
|
||||
count += 1
|
||||
if count > 1:
|
||||
print ("ERROR: found duplicate when statements in %s "
|
||||
"upgrade_task: %s %s" % (filename, line, duplicate))
|
||||
print("ERROR: found duplicate when statements in %s "
|
||||
"upgrade_task: %s %s" % (filename, line, duplicate))
|
||||
return 1
|
||||
duplicate = line
|
||||
elif ' -' in line:
|
||||
@ -690,6 +869,8 @@ def validate(filename, param_map):
|
||||
VALIDATE_PUPPET_OVERRIDE.get(filename, True)):
|
||||
retval = validate_service(filename, tpl)
|
||||
|
||||
if re.search(r'(puppet|docker)\/services', filename):
|
||||
retval = validate_service_hiera_interpol(filename, tpl)
|
||||
|
||||
if filename.startswith('./docker/services/logging/'):
|
||||
retval = validate_docker_logging_template(filename, tpl)
|
||||
|
Loading…
Reference in New Issue
Block a user