ce07c3b7dc
Change-Id: I86c27b0d9be0af6ef58748d97938a2df54b297c3
196 lines
5.3 KiB
Python
196 lines
5.3 KiB
Python
# Copyright 2014 Huawei Technologies Co. Ltd
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Validator methods."""
|
|
import logging
|
|
import netaddr
|
|
import re
|
|
import socket
|
|
|
|
from compass.utils import setting_wrapper as setting
|
|
from compass.utils import util
|
|
|
|
|
|
def is_valid_ip(name, ip_addr, **kwargs):
|
|
"""Valid the format of an IP address."""
|
|
if isinstance(ip_addr, list):
|
|
return all([
|
|
is_valid_ip(name, item, **kwargs) for item in ip_addr
|
|
])
|
|
try:
|
|
netaddr.IPAddress(ip_addr)
|
|
except Exception:
|
|
logging.debug('%s invalid ip addr %s', name, ip_addr)
|
|
return False
|
|
return True
|
|
|
|
|
|
def is_valid_network(name, ip_network, **kwargs):
|
|
"""Valid the format of an Ip network."""
|
|
if isinstance(ip_network, list):
|
|
return all([
|
|
is_valid_network(name, item, **kwargs) for item in ip_network
|
|
])
|
|
try:
|
|
netaddr.IPNetwork(ip_network)
|
|
except Exception:
|
|
logging.debug('%s invalid network %s', name, ip_network)
|
|
return False
|
|
return True
|
|
|
|
|
|
def is_valid_netmask(name, ip_addr, **kwargs):
|
|
"""Valid the format of a netmask."""
|
|
if isinstance(ip_addr, list):
|
|
return all([
|
|
is_valid_netmask(name, item, **kwargs) for item in ip_addr
|
|
])
|
|
if not is_valid_ip(ip_addr):
|
|
return False
|
|
ip = netaddr.IPAddress(ip_addr)
|
|
if ip.is_netmask():
|
|
return True
|
|
logging.debug('%s invalid netmask %s', name, ip_addr)
|
|
return False
|
|
|
|
|
|
def is_valid_gateway(name, ip_addr, **kwargs):
|
|
"""Valid the format of gateway."""
|
|
if isinstance(ip_addr, list):
|
|
return all([
|
|
is_valid_gateway(name, item, **kwargs) for item in ip_addr
|
|
])
|
|
if not is_valid_ip(ip_addr):
|
|
return False
|
|
ip = netaddr.IPAddress(ip_addr)
|
|
if ip.is_private() or ip.is_public():
|
|
return True
|
|
logging.debug('%s invalid gateway %s', name, ip_addr)
|
|
return False
|
|
|
|
|
|
def is_valid_dns(name, dns, **kwargs):
|
|
"""Valid the format of DNS."""
|
|
if isinstance(dns, list):
|
|
return all([is_valid_dns(name, item, **kwargs) for item in dns])
|
|
if is_valid_ip(dns):
|
|
return True
|
|
try:
|
|
socket.gethostbyname_ex(dns)
|
|
except Exception:
|
|
logging.debug('%s invalid dns name %s', name, dns)
|
|
return False
|
|
return True
|
|
|
|
|
|
def is_valid_url(name, url, **kwargs):
|
|
"""Valid the format of url."""
|
|
if isinstance(url, list):
|
|
return all([
|
|
is_valid_url(name, item, **kwargs) for item in url
|
|
])
|
|
if re.match(
|
|
r'^(http|https|ftp)://([0-9A-Za-z_-]+)(\.[0-9a-zA-Z_-]+)*'
|
|
r'(:\d+)?(/[0-9a-zA-Z_-]+)*$',
|
|
url
|
|
):
|
|
return True
|
|
logging.debug(
|
|
'%s invalid url %s', name, url
|
|
)
|
|
return False
|
|
|
|
|
|
def is_valid_domain(name, domain, **kwargs):
|
|
"""Validate the format of domain."""
|
|
if isinstance(domain, list):
|
|
return all([
|
|
is_valid_domain(name, item, **kwargs) for item in domain
|
|
])
|
|
if re.match(
|
|
r'^([0-9a-zA-Z_-]+)(\.[0-9a-zA-Z_-]+)*$',
|
|
domain
|
|
):
|
|
return True
|
|
logging.debug(
|
|
'%s invalid domain %s', name, domain
|
|
)
|
|
return False
|
|
|
|
|
|
def is_valid_username(name, username, **kwargs):
|
|
"""Valid the format of username."""
|
|
if bool(username):
|
|
return True
|
|
logging.debug(
|
|
'%s username is empty', name
|
|
)
|
|
|
|
|
|
def is_valid_password(name, password, **kwargs):
|
|
"""Valid the format of password."""
|
|
if bool(password):
|
|
return True
|
|
logging.debug('%s password is empty', name)
|
|
return False
|
|
|
|
|
|
def is_valid_partition(name, partition, **kwargs):
|
|
"""Valid the format of partition name."""
|
|
if name != 'swap' and not name.startswith('/'):
|
|
logging.debug(
|
|
'%s is not started with / or swap', name
|
|
)
|
|
return False
|
|
if 'size' not in partition and 'percentage' not in partition:
|
|
logging.debug(
|
|
'%s partition does not contain sie or percentage',
|
|
name
|
|
)
|
|
return False
|
|
return True
|
|
|
|
|
|
def is_valid_percentage(name, percentage, **kwargs):
|
|
"""Valid the percentage."""
|
|
if 0 <= percentage <= 100:
|
|
return True
|
|
logging.debug('%s invalid percentage %s', name, percentage)
|
|
|
|
|
|
def is_valid_port(name, port, **kwargs):
|
|
"""Valid the format of port."""
|
|
if 0 < port < 65536:
|
|
return True
|
|
logging.debug('%s invalid port %s', name, port)
|
|
|
|
|
|
def is_valid_size(name, size, **kwargs):
|
|
if re.match(r'^(\d+)(K|M|G|T)$', size):
|
|
return True
|
|
logging.debug('%s invalid size %s', name, size)
|
|
return False
|
|
|
|
|
|
VALIDATOR_GLOBALS = globals()
|
|
VALIDATOR_LOCALS = locals()
|
|
VALIDATOR_CONFIGS = util.load_configs(
|
|
setting.VALIDATOR_DIR,
|
|
config_name_suffix='.py',
|
|
env_globals=VALIDATOR_GLOBALS,
|
|
env_locals=VALIDATOR_LOCALS
|
|
)
|
|
for validator_config in VALIDATOR_CONFIGS:
|
|
VALIDATOR_LOCALS.update(validator_config)
|