
337 lines
9.9 KiB

# Copyright 2015, Rackspace US, Inc.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
# (c) 2015, Kevin Carter <>
import hashlib
import logging
import os
import re
import urlparse
from ansible import errors
from jinja2.runtime import Undefined
"""Filter usage:
Simple filters that may be useful from within the stack
def _deprecated(new_var, old_var=None, old_var_name=None,
new_var_name=None, removed_in=None, fatal=False):
"""Provide a deprecation warning on deprecated variables.
This filter will return the old_var value if defined along with a
deprecation warning that will inform the user that the old variable
should no longer be used.
In order to use this filter the old and new variable names must be provided
to the filter as a string which is used to render the warning message. The
removed_in option is used to give a date or release name where the old
option will be removed. Optionally, if fatal is set to True, the filter
will raise an exception if the old variable is used.
USAGE: {{ new_var | deprecated(old_var,
false) }}
:param new_var: ``object``
:param old_var: ``object``
:param old_var_name: ``str``
:param new_var_name: ``str``
:param removed_in: ``str``
:param fatal: ``bol``
_usage = (
'{{ new_var | deprecated(old_var=old_var, old_var_name="old_var_name",'
' new_var_name="new_var_name", removed_in="removed_in",'
' fatal=false) }}'
if not old_var_name:
raise errors.AnsibleUndefinedVariable(
'To use this filter you must provide the "old_var_name" option'
' with the string name of the old variable that will be'
' replaced. ' + _usage
if not new_var_name:
raise errors.AnsibleUndefinedVariable(
'To use this filter you must provide the "new_var_name" option'
' with the string name of the new variable that will replace the'
' deprecated one. ' + _usage
if not removed_in:
raise errors.AnsibleUndefinedVariable(
'To use this filter you must provide the "removed_in" option with'
' the string name of the release where the old_var will be'
' removed. ' + _usage
# If old_var is undefined or has a None value return the new_var value
if isinstance(old_var, Undefined) or not old_var:
return new_var
name = 'Ansible-Warning| '
log = logging.getLogger(name)
for handler in log.handlers:
if name ==
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.DEBUG) = name
stream_format = logging.Formatter(
'%(asctime)s - %(name)s%(levelname)s => %(message)s'
message = (
'Deprecated Option provided: Deprecated variable: "%(old)s", Removal'
' timeframe: "%(removed_in)s", Future usage: "%(new)s"'
% {'old': old_var_name, 'new': new_var_name, 'removed_in': removed_in}
if str(fatal).lower() in ['yes', 'true']:
message = 'Fatally %s' % message
raise RuntimeError(message)
return old_var
def _pip_requirement_split(requirement):
version_descriptors = "(>=|<=|>|<|==|~=|!=)"
requirement = requirement.split(';')
requirement_info = re.split(r'%s\s*' % version_descriptors, requirement[0])
name = requirement_info[0]
marker = None
if len(requirement) > 1:
marker = requirement[1]
versions = None
if len(requirement_info) > 1:
versions = requirement_info[1]
return name, versions, marker
def _lower_set_lists(list_one, list_two):
_list_one = set([i.lower() for i in list_one])
_list_two = set([i.lower() for i in list_two])
return _list_one, _list_two
def bit_length_power_of_2(value):
"""Return the smallest power of 2 greater than a numeric value.
:param value: Number to find the smallest power of 2
:type value: ``int``
:returns: ``int``
return 2**(int(value)-1).bit_length()
def get_netloc(url):
"""Return the netloc from a URL.
If the input value is not a value URL the method will raise an Ansible
filter exception.
:param url: the URL to parse
:type url: ``str``
:returns: ``str``
netloc = urlparse.urlparse(url).netloc
except Exception as exp:
raise errors.AnsibleFilterError(
'Failed to return the netloc of: "%s"' % str(exp)
return netloc
def get_netloc_no_port(url):
"""Return the netloc without a port from a URL.
If the input value is not a value URL the method will raise an Ansible
filter exception.
:param url: the URL to parse
:type url: ``str``
:returns: ``str``
return get_netloc(url=url).split(':')[0]
def get_netorigin(url):
"""Return the netloc from a URL.
If the input value is not a value URL the method will raise an Ansible
filter exception.
:param url: the URL to parse
:type url: ``str``
:returns: ``str``
parsed_url = urlparse.urlparse(url)
netloc = parsed_url.netloc
scheme = parsed_url.scheme
except Exception as exp:
raise errors.AnsibleFilterError(
'Failed to return the netorigin of: "%s"' % str(exp)
return '%s://%s' % (scheme, netloc)
def string_2_int(string):
"""Return the an integer from a string.
The string is hashed, converted to a base36 int, and the modulo of 10240
is returned.
:param string: string to retrieve an int from
:type string: ``str``
:returns: ``int``
# Try to encode utf-8 else pass
string = string.encode('utf-8')
except AttributeError:
hashed_name = hashlib.sha256(string).hexdigest()
return int(hashed_name, 36) % 10240
def pip_requirement_names(requirements):
"""Return a ``str`` of requirement name and list of versions.
:param requirement: Name of a requirement that may have versions within
it. This will use the constant,
:type requirement: ``str``
:return: ``str``
named_requirements = list()
for requirement in requirements:
name = _pip_requirement_split(requirement)[0]
if name and not name.startswith('#'):
return sorted(set(named_requirements))
def pip_constraint_update(list_one, list_two):
_list_one, _list_two = _lower_set_lists(list_one, list_two)
_list_one, _list_two = list(_list_one), list(_list_two)
for item2 in _list_two:
item2_name, item2_versions, _ = _pip_requirement_split(item2)
if item2_versions:
for item1 in _list_one:
if item2_name == _pip_requirement_split(item1)[0]:
item1_index = _list_one.index(item1)
_list_one[item1_index] = item2
return sorted(_list_one)
def splitlines(string_with_lines):
"""Return a ``list`` from a string with lines."""
return string_with_lines.splitlines()
def filtered_list(list_one, list_two):
_list_one, _list_two = _lower_set_lists(list_one, list_two)
return list(_list_one-_list_two)
def git_link_parse(repo):
"""Return a dict containing the parts of a git repository.
:param repo: git repo string to parse.
:type repo: ``str``
:returns: ``dict``
if 'git+' in repo:
_git_url = repo.split('git+', 1)[-1]
_git_url = repo
if '@' in _git_url:
url, branch = _git_url.split('@', 1)
url = _git_url
branch = 'master'
name = os.path.basename(url.rstrip('/'))
_branch = branch.split('#')
branch = _branch[0]
plugin_path = None
# Determine if the package is a plugin type
if len(_branch) > 1 and 'subdirectory=' in _branch[-1]:
plugin_path = _branch[-1].split('subdirectory=')[-1].split('&')[0]
return {
'name': name.split('.git')[0].lower(),
'version': branch,
'plugin_path': plugin_path,
'url': url,
'original': repo
def git_link_parse_name(repo):
"""Return the name of a git repo."""
return git_link_parse(repo)['name']
class FilterModule(object):
"""Ansible jinja2 filters."""
def filters():
return {
'bit_length_power_of_2': bit_length_power_of_2,
'netloc': get_netloc,
'netloc_no_port': get_netloc_no_port,
'netorigin': get_netorigin,
'string_2_int': string_2_int,
'pip_requirement_names': pip_requirement_names,
'pip_constraint_update': pip_constraint_update,
'splitlines': splitlines,
'filtered_list': filtered_list,
'git_link_parse': git_link_parse,
'git_link_parse_name': git_link_parse_name,
'deprecated': _deprecated