Browse Source

Add validation for hiera interpolation in services

Walk through services' templates role_data to identify
missing hiera interplolation of networks.

Use additionally provided interfaces for validations:
 * search in dicts by keys or values matching some regex,
   entering into lists as an option;
 * safe get values by the discovered paths casted as lists,
   like get_param/get_attr works for heat templates.

Add PyYAML missing to the requirements.txt.

Closes-bug: #1764315

NOTE: Includes

Change-Id: Idef66ee96cbd67d23760a1cce9537ecc157c3429
Signed-off-by: Bogdan Dobrelya <>
(cherry picked from commit 0b44170e73)
Bogdan Dobrelya 3 years ago
2 changed files with 183 additions and 2 deletions
  1. +1
  2. +182

+ 1
- 0
requirements.txt View File

@ -3,6 +3,7 @@
# process, which may cause wedges in the gate later.
pbr!=2.1.0,>=2.0.0 # Apache-2.0
Jinja2!=2.9.0,!=2.9.1,!=2.9.2,!=2.9.3,!=2.9.4,>=2.8 # BSD License (3 clause)
PyYAML>=3.10 # MIT
six>=1.10.0 # MIT
tripleo-common>=7.1.0 # Apache-2.0
paunch>=1.0.0 # Apache-2.0

+ 182
- 2
tools/ View File

@ -13,10 +13,13 @@
import argparse
import os
import re
import sys
import traceback
import yaml
from copy import copy
# Only permit the template alias versions
# The current template version should be the last element
valid_heat_template_versions = [
@ -691,6 +694,181 @@ def validate_service(filename, tpl):
return 0
def _rsearch_keys(d, pattern, search_keynames=False, enter_lists=False):
""" Deep regex search through a dict for k or v matching a pattern
Returns a list of the matched parent keys. Nested keypaths are
represented as lists. Looks for either values (default) or keys mathching
the search pattern. A key name may also be joined an integer index, when
the matched value belongs to a list and enter_lists is enabled.
>>> example_dict = { 'key1' : [ 'value1', { 'key1': 'value2' } ],
'key2' : 'value2',
'key3' : { 'key3a': 'value3a' },
'key4' : { 'key4a': { 'key4aa': 'value4aa',
'key4ab': 'value4ab',
'key4ac': 'value1'},
'key4b': 'value4b'} }
>>>_rsearch_keys(example_dict, 'value1', search_keynames=False,
[['key1', 0], ['key4', 'key4a', 'key4ac']]
>>> _rsearch_keys(example_dict, 'key4aa', search_keynames=True)
[['key4', 'key4a', 'key4aa']]
>>> _rsearch_keys(example_dict, 'key1', True, True)
[['key1', 1, 'key1']]
def _rsearch_keys_nested(d, pattern, search_keynames=False,
enter_lists=False, workset=None, path=None):
if path is None:
path = []
# recursively walk through the dict, optionally entering lists
if isinstance(d, dict):
for k, v in d.items():
if (isinstance(v, dict) or enter_lists and
isinstance(v, list)):
# results are accumulated in the upper scope result var
_rsearch_keys_nested(v, pattern, search_keynames,
enter_lists, result, path)
if search_keynames:
target = str(k)
target = str(v)
if, target):
present = False
for entry in result:
if set(path).issubset(set(entry)):
present = True
if not present:
if enter_lists and isinstance(d, list):
for ind in range(len(d)):
if (isinstance(d[ind], dict) or
enter_lists and isinstance(d[ind], list)):
_rsearch_keys_nested(d[ind], pattern, search_keynames,
enter_lists, result, path)
if, str(d[ind])):
present = False
for entry in result:
if set(path).issubset(set(entry)):
present = True
if not present:
return result
result = []
return _rsearch_keys_nested(d, pattern, search_keynames, enter_lists)
def _get(d, path):
""" Get a value (or None) from a dict by path given as a list
Integer values represent indexes in lists, string values are for dict keys
if not isinstance(path, list):
raise LookupError("The path needs to be a list")
for step in path:
d = d[step]
except KeyError:
return None
return d
def validate_service_hiera_interpol(f, tpl):
""" Validate service templates for hiera interpolation rules
Find all {get_param: [ServiceNetMap, ...]} missing hiera
interpolation of IP addresses or network ranges vs
the names of the networks, which needs no interpolation
def _getindex(lst, element):
pos = lst.index(element)
return pos
except ValueError:
return None
if 'ansible' in f or 'endpoint_map' in f:
return 0
failed = False
search_keynames = False
enter_lists = True
if 'outputs' in tpl and 'role_data' in tpl['outputs']:
values_found = _rsearch_keys(tpl['outputs']['role_data'],
search_keynames, enter_lists)
for path in values_found:
# Omit if not a part of {get_param: [ServiceNetMap ...
if not enter_lists and path[-1] != 'get_param':
if enter_lists and path[-1] != 0 and path[-2] != 'get_param':
path_str = ';'.join(str(x) for x in path)
# NOTE(bogdando): Omit foo_network keys looking like a network
# name. The only exception is allow anything under
# str_replace['params'] ('str_replace;params' in the str notation).
# We need to escape because of '$' char may be in templated params.
query = re.compile(r'\\;str\\_replace\\;params\\;\S*?net',
if not
# Keep parsing, if foo_vip_network, or anything
# else that looks like a keystore for an IP address value.
query = re.compile(r'(?!ip|cidr|addr|bind|host)([^;]\S)*?net',
# Omit mappings in tht, like:
# [NetXxxMap, <... ,> {get_param: [ServiceNetMap, ...
if'Map.*get\\_param', re.escape(path_str)):
# For the remaining cases, verify if there is a template
# (like str_replace) with the expected format, which is
# containing hiera(param_name) interpolation
str_replace_pos = _getindex(path, 'str_replace')
params_pos = _getindex(path, 'params')
if str_replace_pos is None or params_pos is None:
print("ERROR: Missed hiera interpolation via str_replace "
"in %s, role_data: %s"
% (f, path))
failed = True
# Get the name of the templated param, like NETWORK or $NETWORK
param_name = path[params_pos + 1]
str_replace = _get(tpl['outputs']['role_data'],
path[:(str_replace_pos + 1)])
match_interp ="%%\{hiera\(\S+%s\S+\)\}" %
if str_replace['template'] is None or match_interp is None:
print("ERROR: Missed %%{hiera('... %s ...')} interpolation "
"in str_replace['template'] "
"in %s, role_data: %s" % (param_name, f, path))
failed = True
# end processing this path and go for the next one
if failed:
return 1
return 0
def validate_upgrade_tasks_duplicate_whens(filename):
"""Take a heat template and starting at the upgrade_tasks
@ -706,8 +884,8 @@ def validate_upgrade_tasks_duplicate_whens(filename):
if ' when:' in line:
count += 1
if count > 1:
print ("ERROR: found duplicate when statements in %s "
"upgrade_task: %s %s" % (filename, line, duplicate))
print("ERROR: found duplicate when statements in %s "
"upgrade_task: %s %s" % (filename, line, duplicate))
return 1
duplicate = line
elif ' -' in line:
@ -777,6 +955,8 @@ def validate(filename, param_map):
VALIDATE_PUPPET_OVERRIDE.get(filename, True)):
retval = validate_service(filename, tpl)
if'(puppet|docker)\/services', filename):
retval = validate_service_hiera_interpol(filename, tpl)
if filename.startswith('./docker/services/logging/'):
retval = validate_docker_logging_template(filename, tpl)