openstack-ansible-plugins/filter/osa-filters.py
Nolan Brubaker 80a1262252 Add a get_nested filter
When accessing nested facts, such as in ansible_local sections, guard
statements in the when clause can get quite long, due to having to
repeate the 'in/not in' logic for every key at every level.

The get_nested filter attempts to instead to move the guard into a
single line, making the conditions easier to write and maintain.

As example,

   ('openstack_ansible' not in ansible_local or
    'swift' not in ansible_local['openstack_ansible'] or
    'venv_tag' not in ansible_local['openstack_ansible']['swift'] or
    ansible_local['openstack_ansible']['swift']['venv_tag'] == swift_venv_tag)

 could be rewritten as

     get_nested(ansible_local, 'openstack_ansible.swift.venv_tag') == swift_venv_tag

Change-Id: I3b43c25c8783c43cf5285f2b3e7267b2c5712ea0
2017-07-07 19:42:39 +00:00

369 lines
11 KiB
Python

# Copyright 2015, Rackspace US, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# (c) 2015, Kevin Carter <kevin.carter@rackspace.com>
import hashlib
import logging
import os
import re
from ansible import errors
from jinja2.runtime import Undefined
try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse
"""Filter usage:
Simple filters that may be useful from within the stack
"""
def _deprecated(new_var, old_var=None, old_var_name=None,
new_var_name=None, removed_in=None, fatal=False):
"""Provide a deprecation warning on deprecated variables.
This filter will return the old_var value if defined along with a
deprecation warning that will inform the user that the old variable
should no longer be used.
In order to use this filter the old and new variable names must be provided
to the filter as a string which is used to render the warning message. The
removed_in option is used to give a date or release name where the old
option will be removed. Optionally, if fatal is set to True, the filter
will raise an exception if the old variable is used.
USAGE: {{ new_var | deprecated(old_var,
"old_var_name",
"new_var_name",
"removed_in",
false) }}
:param new_var: ``object``
:param old_var: ``object``
:param old_var_name: ``str``
:param new_var_name: ``str``
:param removed_in: ``str``
:param fatal: ``bol``
"""
_usage = (
'USAGE: '
'{{ new_var | deprecated(old_var=old_var, old_var_name="old_var_name",'
' new_var_name="new_var_name", removed_in="removed_in",'
' fatal=false) }}'
)
if not old_var_name:
raise errors.AnsibleUndefinedVariable(
'To use this filter you must provide the "old_var_name" option'
' with the string name of the old variable that will be'
' replaced. ' + _usage
)
if not new_var_name:
raise errors.AnsibleUndefinedVariable(
'To use this filter you must provide the "new_var_name" option'
' with the string name of the new variable that will replace the'
' deprecated one. ' + _usage
)
if not removed_in:
raise errors.AnsibleUndefinedVariable(
'To use this filter you must provide the "removed_in" option with'
' the string name of the release where the old_var will be'
' removed. ' + _usage
)
# If old_var is undefined or has a None value return the new_var value
if isinstance(old_var, Undefined) or not old_var:
return new_var
name = 'Ansible-Warning| '
log = logging.getLogger(name)
for handler in log.handlers:
if name == handler.name:
break
else:
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.DEBUG)
stream_handler.name = name
stream_format = logging.Formatter(
'%(asctime)s - %(name)s%(levelname)s => %(message)s'
)
stream_handler.setFormatter(stream_format)
log.setLevel(logging.DEBUG)
log.addHandler(stream_handler)
message = (
'Deprecated Option provided: Deprecated variable: "%(old)s", Removal'
' timeframe: "%(removed_in)s", Future usage: "%(new)s"'
% {'old': old_var_name, 'new': new_var_name, 'removed_in': removed_in}
)
if str(fatal).lower() in ['yes', 'true']:
message = 'Fatally %s' % message
log.fatal(message)
raise RuntimeError(message)
else:
log.warn(message)
return old_var
def _pip_requirement_split(requirement):
version_descriptors = "(>=|<=|>|<|==|~=|!=)"
requirement = requirement.split(';')
requirement_info = re.split(r'%s\s*' % version_descriptors, requirement[0])
name = requirement_info[0]
marker = None
if len(requirement) > 1:
marker = requirement[1]
versions = None
if len(requirement_info) > 1:
versions = requirement_info[1]
return name, versions, marker
def _lower_set_lists(list_one, list_two):
_list_one = set([i.lower() for i in list_one])
_list_two = set([i.lower() for i in list_two])
return _list_one, _list_two
def bit_length_power_of_2(value):
"""Return the smallest power of 2 greater than a numeric value.
:param value: Number to find the smallest power of 2
:type value: ``int``
:returns: ``int``
"""
return 2**(int(value)-1).bit_length()
def get_netloc(url):
"""Return the netloc from a URL.
If the input value is not a value URL the method will raise an Ansible
filter exception.
:param url: the URL to parse
:type url: ``str``
:returns: ``str``
"""
try:
netloc = urlparse(url).netloc
except Exception as exp:
raise errors.AnsibleFilterError(
'Failed to return the netloc of: "%s"' % str(exp)
)
else:
return netloc
def get_netloc_no_port(url):
"""Return the netloc without a port from a URL.
If the input value is not a value URL the method will raise an Ansible
filter exception.
:param url: the URL to parse
:type url: ``str``
:returns: ``str``
"""
return get_netloc(url=url).split(':')[0]
def get_netorigin(url):
"""Return the netloc from a URL.
If the input value is not a value URL the method will raise an Ansible
filter exception.
:param url: the URL to parse
:type url: ``str``
:returns: ``str``
"""
try:
parsed_url = urlparse(url)
netloc = parsed_url.netloc
scheme = parsed_url.scheme
except Exception as exp:
raise errors.AnsibleFilterError(
'Failed to return the netorigin of: "%s"' % str(exp)
)
else:
return '%s://%s' % (scheme, netloc)
def string_2_int(string):
"""Return the an integer from a string.
The string is hashed, converted to a base36 int, and the modulo of 10240
is returned.
:param string: string to retrieve an int from
:type string: ``str``
:returns: ``int``
"""
# Try to encode utf-8 else pass
try:
string = string.encode('utf-8')
except AttributeError:
pass
hashed_name = hashlib.sha256(string).hexdigest()
return int(hashed_name, 36) % 10240
def pip_requirement_names(requirements):
"""Return a ``str`` of requirement name and list of versions.
:param requirement: Name of a requirement that may have versions within
it. This will use the constant,
VERSION_DESCRIPTORS.
:type requirement: ``str``
:return: ``str``
"""
named_requirements = list()
for requirement in requirements:
name = _pip_requirement_split(requirement)[0]
if name and not name.startswith('#'):
named_requirements.append(name.lower())
return sorted(set(named_requirements))
def pip_constraint_update(list_one, list_two):
_list_one, _list_two = _lower_set_lists(list_one, list_two)
_list_one, _list_two = list(_list_one), list(_list_two)
for item2 in _list_two:
item2_name, item2_versions, _ = _pip_requirement_split(item2)
if item2_versions:
for item1 in _list_one:
if item2_name == _pip_requirement_split(item1)[0]:
item1_index = _list_one.index(item1)
_list_one[item1_index] = item2
break
else:
_list_one.append(item2)
return sorted(_list_one)
def splitlines(string_with_lines):
"""Return a ``list`` from a string with lines."""
return string_with_lines.splitlines()
def filtered_list(list_one, list_two):
_list_one, _list_two = _lower_set_lists(list_one, list_two)
return list(_list_one-_list_two)
def git_link_parse(repo):
"""Return a dict containing the parts of a git repository.
:param repo: git repo string to parse.
:type repo: ``str``
:returns: ``dict``
"""
if 'git+' in repo:
_git_url = repo.split('git+', 1)[-1]
else:
_git_url = repo
if '@' in _git_url:
url, branch = _git_url.split('@', 1)
else:
url = _git_url
branch = 'master'
name = os.path.basename(url.rstrip('/'))
_branch = branch.split('#')
branch = _branch[0]
plugin_path = None
# Determine if the package is a plugin type
if len(_branch) > 1 and 'subdirectory=' in _branch[-1]:
plugin_path = _branch[-1].split('subdirectory=')[-1].split('&')[0]
return {
'name': name.split('.git')[0].lower(),
'version': branch,
'plugin_path': plugin_path,
'url': url,
'original': repo
}
def git_link_parse_name(repo):
"""Return the name of a git repo."""
return git_link_parse(repo)['name']
def get_nested(target_dict, keys):
"""Retrieves values through a nested dictionary.
If any key on the path is missing, return None
This helps solves convoluted guards in roles/plays such as the following:
('openstack_ansible' not in ansible_local or
'swift' not in ansible_local['openstack_ansible'] or
'venv_tag' not in ansible_local['openstack_ansible']['swift'] or
ansible_local['openstack_ansible']['swift']['venv_tag'] == swift_venv_tag)
With this filter, it could be instead written:
ansible_local|get_nested('openstack_ansible.swift.venv_tag') == swift_venv_tag
"""
try:
key, next_keys = keys.split('.', 1)
except ValueError:
return target_dict.get(keys, None)
try:
next_dict = target_dict[key]
except KeyError:
return None
return get_nested(next_dict, next_keys)
class FilterModule(object):
"""Ansible jinja2 filters."""
@staticmethod
def filters():
return {
'bit_length_power_of_2': bit_length_power_of_2,
'netloc': get_netloc,
'netloc_no_port': get_netloc_no_port,
'netorigin': get_netorigin,
'string_2_int': string_2_int,
'pip_requirement_names': pip_requirement_names,
'pip_constraint_update': pip_constraint_update,
'splitlines': splitlines,
'filtered_list': filtered_list,
'git_link_parse': git_link_parse,
'git_link_parse_name': git_link_parse_name,
'deprecated': _deprecated,
'get_nested': get_nested
}