399 lines
12 KiB
Python
399 lines
12 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""
|
|
Contains all core validation functions.
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import socket
|
|
import logging
|
|
|
|
from . import utils
|
|
|
|
from .exceptions import ParamValidationError
|
|
|
|
|
|
__all__ = ('ParamValidationError', 'validate_integer', 'validate_float',
|
|
'validate_regexp', 'validate_port', 'validate_not_empty',
|
|
'validate_options', 'validate_multi_options', 'validate_ip',
|
|
'validate_multi_ip', 'validate_file', 'validate_ping',
|
|
'validate_multi_ping', 'validate_ssh', 'validate_multi_ssh',
|
|
'validate_sshkey', 'validate_ldap_url', 'validate_ldap_dn',
|
|
'validate_export', 'validate_multi_export',
|
|
'validate_writeable_directory')
|
|
|
|
|
|
def validate_integer(param, options=None):
|
|
"""
|
|
Raises ParamValidationError if given param is not integer.
|
|
"""
|
|
if not param:
|
|
return
|
|
options = options or []
|
|
try:
|
|
int(param)
|
|
except ValueError:
|
|
logging.debug('validate_integer(%s, options=%s) failed.' %
|
|
(param, options))
|
|
msg = 'Given value is not an integer: %s'
|
|
raise ParamValidationError(msg % param)
|
|
|
|
|
|
def validate_float(param, options=None):
|
|
"""
|
|
Raises ParamValidationError if given param is not a float.
|
|
"""
|
|
if not param:
|
|
return
|
|
options = options or []
|
|
try:
|
|
float(param)
|
|
except ValueError:
|
|
logging.debug('validate_float(%s, options=%s) failed.' %
|
|
(param, options))
|
|
msg = 'Given value is not a float: %s'
|
|
raise ParamValidationError(msg % param)
|
|
|
|
|
|
def validate_regexp(param, options=None):
|
|
"""
|
|
Raises ParamValidationError if given param doesn't match at least
|
|
one of regular expressions given in options.
|
|
"""
|
|
if not param:
|
|
return
|
|
options = options or []
|
|
for regex in options:
|
|
if re.search(regex, param):
|
|
break
|
|
else:
|
|
logging.debug('validate_regexp(%s, options=%s) failed.' %
|
|
(param, options))
|
|
msg = 'Given value does not match required regular expression: %s'
|
|
raise ParamValidationError(msg % param)
|
|
|
|
|
|
def validate_multi_regexp(param, options=None):
|
|
"""
|
|
Raises ParamValidationError if any of the comma separated values given
|
|
in param doesn't match one of the regular expressions given in options.
|
|
"""
|
|
options = options or []
|
|
for i in param.split(','):
|
|
validate_regexp(i.strip(), options=options)
|
|
|
|
|
|
def validate_port(param, options=None):
|
|
"""
|
|
Raises ParamValidationError if given param is not a decimal number
|
|
in range (0, 65535).
|
|
"""
|
|
if not param:
|
|
return
|
|
options = options or []
|
|
validate_integer(param, options)
|
|
port = int(param)
|
|
if not (port >= 0 and port < 65535):
|
|
logging.debug('validate_port(%s, options=%s) failed.' %
|
|
(param, options))
|
|
msg = 'Given value is outside the range of (0, 65535): %s'
|
|
raise ParamValidationError(msg % param)
|
|
|
|
|
|
def validate_not_empty(param, options=None):
|
|
"""
|
|
Raises ParamValidationError if given param is empty.
|
|
"""
|
|
options = options or []
|
|
if not param and param is not False:
|
|
logging.debug('validate_not_empty(%s, options=%s) failed.' %
|
|
(param, options))
|
|
msg = 'Given value is not allowed: %s'
|
|
raise ParamValidationError(msg % param)
|
|
|
|
|
|
def validate_options(param, options=None):
|
|
"""
|
|
Raises ParamValidationError if given param is not member of options.
|
|
"""
|
|
if not param:
|
|
return
|
|
|
|
options = options or []
|
|
validate_not_empty(param, options)
|
|
if param not in options:
|
|
logging.debug('validate_options(%s, options=%s) failed.' %
|
|
(param, options))
|
|
msg = 'Given value is not member of allowed values %s: %s'
|
|
raise ParamValidationError(msg % (options, param))
|
|
|
|
|
|
def validate_multi_options(param, options=None):
|
|
"""
|
|
Validates if comma separated values given in params are members
|
|
of options.
|
|
"""
|
|
if not param:
|
|
return
|
|
options = options or []
|
|
for i in param.split(','):
|
|
validate_options(i.strip(), options=options)
|
|
|
|
|
|
def validate_ip(param, options=None):
|
|
"""
|
|
Raises ParamValidationError if given parameter value is not in IPv4
|
|
or IPv6 address.
|
|
"""
|
|
if not param:
|
|
return
|
|
for family in (socket.AF_INET, socket.AF_INET6):
|
|
try:
|
|
socket.inet_pton(family, param)
|
|
return family
|
|
except socket.error:
|
|
continue
|
|
else:
|
|
logging.debug('validate_ip(%s, options=%s) failed.' %
|
|
(param, options))
|
|
msg = 'Given host is not in IP address format: %s'
|
|
raise ParamValidationError(msg % param)
|
|
|
|
|
|
def validate_multi_ip(param, options=None):
|
|
"""
|
|
Raises ParamValidationError if comma separated IP addresses given
|
|
parameter value are in IPv4 or IPv6 aformat.
|
|
"""
|
|
for host in param.split(','):
|
|
host = host.split('/', 1)[0]
|
|
validate_ip(host.strip(), options)
|
|
|
|
|
|
def validate_file(param, options=None):
|
|
"""
|
|
Raises ParamValidationError if provided file in param does not exist.
|
|
"""
|
|
if not param:
|
|
return
|
|
|
|
options = options or []
|
|
if not os.path.isfile(param):
|
|
logging.debug('validate_file(%s, options=%s) failed.' %
|
|
(param, options))
|
|
msg = 'Given file does not exist: %s'
|
|
raise ParamValidationError(msg % param)
|
|
|
|
|
|
def validate_writeable_directory(param, options=None):
|
|
"""
|
|
Raises ParamValidationError if provided directory does not exist or
|
|
is not writeable.
|
|
"""
|
|
if not param:
|
|
return
|
|
|
|
options = options or []
|
|
path = os.path.expanduser(param)
|
|
if not ((os.path.isdir(path) and os.access(path, os.W_OK)) or
|
|
os.access(
|
|
os.path.normpath(os.path.join(path, os.pardir)), os.W_OK)):
|
|
logging.debug('validate_writeable_directory(%s, options=%s) failed.' %
|
|
(param, options))
|
|
msg = 'Given directory does not exist or is not writeable: %s'
|
|
raise ParamValidationError(msg % param)
|
|
|
|
|
|
def validate_ping(param, options=None):
|
|
"""
|
|
Raises ParamValidationError if provided host does not answer to ICMP
|
|
echo request.
|
|
"""
|
|
if not param:
|
|
return
|
|
options = options or []
|
|
rc, out = utils.execute(['/bin/ping', '-c', '1', str(param)],
|
|
can_fail=False)
|
|
if rc != 0:
|
|
logging.debug('validate_ping(%s, options=%s) failed.' %
|
|
(param, options))
|
|
msg = 'Given host is unreachable: %s'
|
|
raise ParamValidationError(msg % param)
|
|
|
|
|
|
def validate_multi_ping(param, options=None):
|
|
"""
|
|
Raises ParamValidationError if comma separated host given in param
|
|
do not answer to ICMP echo request.
|
|
"""
|
|
options = options or []
|
|
for host in param.split(","):
|
|
validate_ping(host.strip())
|
|
|
|
|
|
_tested_ports = []
|
|
|
|
|
|
def touch_port(host, port):
|
|
"""
|
|
Check that provided host is listening on provided port.
|
|
"""
|
|
key = "%s:%d" % (host, port)
|
|
if key in _tested_ports:
|
|
return
|
|
sock = socket.create_connection((host, port))
|
|
sock.shutdown(socket.SHUT_RDWR)
|
|
sock.close()
|
|
_tested_ports.append(key)
|
|
|
|
|
|
def validate_ssh(param, options=None):
|
|
"""
|
|
Raises ParamValidationError if provided host does not listen
|
|
on port 22.
|
|
"""
|
|
if not param:
|
|
return
|
|
options = options or []
|
|
try:
|
|
touch_port(param.strip(), 22)
|
|
except socket.error:
|
|
logging.debug('validate_ssh(%s, options=%s) failed.' %
|
|
(param, options))
|
|
msg = 'Given host does not listen on port 22: %s'
|
|
raise ParamValidationError(msg % param)
|
|
|
|
|
|
def validate_multi_ssh(param, options=None):
|
|
"""
|
|
Raises ParamValidationError if comma separated host provided
|
|
in param do not listen on port 22.
|
|
"""
|
|
options = options or []
|
|
for host in param.split(","):
|
|
validate_ssh(host)
|
|
|
|
|
|
def validate_sshkey(param, options=None):
|
|
"""
|
|
Raises ParamValidationError if provided sshkey file is not public key.
|
|
"""
|
|
if not param:
|
|
return
|
|
with open(param) as sshkey:
|
|
line = sshkey.readline()
|
|
msg = None
|
|
if not re.search('ssh-|ecdsa', line):
|
|
msg = ('Invalid content header in %s, Public SSH key is required.'
|
|
% param)
|
|
if re.search('BEGIN [RD]SA PRIVATE KEY', line):
|
|
msg = 'Public SSH key is required. You passed private key.'
|
|
if msg:
|
|
raise ParamValidationError(msg)
|
|
|
|
|
|
def validate_ldap_url(param, options=None):
|
|
"""
|
|
Raises ParamValidationError if provided param is not a valid LDAP URL
|
|
"""
|
|
if not param:
|
|
return
|
|
try:
|
|
import ldapurl
|
|
except ImportError:
|
|
msg = (
|
|
'The python ldap package is required to use this functionality.'
|
|
)
|
|
raise ParamValidationError(msg)
|
|
|
|
try:
|
|
ldapurl.LDAPUrl(param)
|
|
except ValueError as ve:
|
|
msg = ('The given string [%s] is not a valid LDAP URL: %s' %
|
|
(param, ve))
|
|
raise ParamValidationError(msg)
|
|
|
|
|
|
def validate_ldap_dn(param, options=None):
|
|
"""
|
|
Raises ParamValidationError if provided param is not a valid LDAP DN
|
|
"""
|
|
if not param:
|
|
return
|
|
try:
|
|
import ldap
|
|
import ldap.dn
|
|
except ImportError:
|
|
msg = (
|
|
'The python ldap package is required to use this functionality.'
|
|
)
|
|
raise ParamValidationError(msg)
|
|
|
|
try:
|
|
ldap.dn.str2dn(param)
|
|
except ldap.DECODING_ERROR as de:
|
|
msg = ('The given string [%s] is not a valid LDAP DN: %s' %
|
|
(param, de))
|
|
raise ParamValidationError(msg)
|
|
|
|
|
|
def validate_export(param, options=None):
|
|
"""
|
|
Raises ParamValidationError if the nfs export is not valid.
|
|
"""
|
|
msg = ('The nfs export [%s] is not a valid export - use squares around ipv6 addresses -.' %
|
|
param)
|
|
try:
|
|
[ip, export] = param.split(':/')
|
|
except ValueError:
|
|
raise ParamValidationError(msg)
|
|
get_squares = re.search(r'\[([^]]+)\]', ip)
|
|
ip_to_test = ip
|
|
if get_squares:
|
|
# this should be a valid ipv6 address.
|
|
ip_to_test = get_squares.group(1)
|
|
if not utils.network.is_ipv6(ip_to_test):
|
|
raise ParamValidationError(msg)
|
|
else:
|
|
# this should be an ipv4. Cannot have ipv6 without square braquet
|
|
# notation here, as the mount will fail.
|
|
if not utils.network.is_ipv4(ip):
|
|
raise ParamValidationError(msg)
|
|
validate_ip(ip_to_test, options)
|
|
if not export:
|
|
raise ParamValidationError(msg)
|
|
|
|
|
|
def validate_multi_export(param, options=None):
|
|
"""
|
|
Raises ParamValidationError if comma separated nfs export given
|
|
in param is not valid
|
|
"""
|
|
for export in param.split(","):
|
|
validate_export(export)
|
|
|
|
|
|
def validate_neutron(param, options=None):
|
|
"""
|
|
Raises ParamValidationError if neutron is not enabled.
|
|
This is intended to make user aware nova-network has been removed
|
|
in ocata cycle.
|
|
"""
|
|
validate_options(param, options=options)
|
|
if param != 'y':
|
|
msg = ('Nova network support has been removed in Ocata. Neutron service must be enabled')
|
|
raise ParamValidationError(msg)
|