#!/usr/bin/env python # vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2013 Red Hat, Inc. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. # # Copyright (c) 2013-2018 Wind River Systems, Inc. # import subprocess import socket import jsonpatch import os import pecan import re import wsme import netaddr import tsconfig.tsconfig as tsc from oslo_config import cfg from sysinv.common import constants from sysinv.common import exception from sysinv.common.utils import memoized from sysinv.openstack.common.gettextutils import _ from sysinv.openstack.common import log LOG = log.getLogger(__name__) CONF = cfg.CONF LOG = log.getLogger(__name__) JSONPATCH_EXCEPTIONS = (jsonpatch.JsonPatchException, jsonpatch.JsonPointerException, KeyError) def ip_version_to_string(ip_version): return str(constants.IP_FAMILIES[ip_version]) def validate_limit(limit): if limit and limit < 0: raise wsme.exc.ClientSideError(_("Limit must be positive")) return min(CONF.api_limit_max, limit) or CONF.api_limit_max def validate_sort_dir(sort_dir): if sort_dir not in ['asc', 'desc']: raise wsme.exc.ClientSideError(_("Invalid sort direction: %s. " "Acceptable values are " "'asc' or 'desc'") % sort_dir) return sort_dir def validate_patch(patch): """Performs a basic validation on patch.""" if not isinstance(patch, list): patch = [patch] for p in patch: path_pattern = re.compile("^/[a-zA-Z0-9-_]+(/[a-zA-Z0-9-_]+)*$") if not isinstance(p, dict) or \ any(key for key in ["path", "op"] if key not in p): raise wsme.exc.ClientSideError(_("Invalid patch format: %s") % str(p)) path = p["path"] op = p["op"] if op not in ["add", "replace", "remove"]: raise wsme.exc.ClientSideError(_("Operation not supported: %s") % op) if not path_pattern.match(path): raise wsme.exc.ClientSideError(_("Invalid path: %s") % path) if op == "add": if path.count('/') == 1: raise wsme.exc.ClientSideError(_("Adding an additional " "attribute (%s) to the " "resource is not allowed") % path) def validate_mtu(mtu): """Check if MTU is valid""" if mtu < 576 or mtu > 9216: raise wsme.exc.ClientSideError(_( "MTU must be between 576 and 9216 bytes.")) def validate_address_within_address_pool(ip, pool): """Determine whether an IP address is within the specified IP address pool. :param ip netaddr.IPAddress object :param pool objects.AddressPool object """ ipset = netaddr.IPSet() for start, end in pool.ranges: ipset.update(netaddr.IPRange(start, end)) if netaddr.IPAddress(ip) not in ipset: raise wsme.exc.ClientSideError(_( "IP address %s is not within address pool ranges" % str(ip))) def validate_address_within_nework(ip, network): """Determine whether an IP address is within the specified IP network. :param ip netaddr.IPAddress object :param network objects.Network object """ pool = pecan.request.dbapi.address_pool_get(network.pool_uuid) validate_address_within_address_pool(ip, pool) class ValidTypes(wsme.types.UserType): """User type for validate that value has one of a few types.""" def __init__(self, *types): self.types = types def validate(self, value): for t in self.types: if t is wsme.types.text and isinstance(value, wsme.types.bytes): value = value.decode() if isinstance(value, t): return value else: raise ValueError("Wrong type. Expected '%s', got '%s'" % ( self.types, type(value))) def is_valid_subnet(subnet, ip_version=None): """Determine whether an IP subnet is valid IPv4 subnet. Raise Client-Side Error on failure. """ if ip_version is not None and subnet.version != ip_version: raise wsme.exc.ClientSideError(_( "Invalid IP version %s %s. " "Please configure valid %s subnet") % (subnet.version, subnet, ip_version_to_string(ip_version))) elif subnet.size < 8: raise wsme.exc.ClientSideError(_( "Invalid subnet size %s with %s. " "Please configure at least size /24 subnet") % (subnet.size, subnet)) elif subnet.ip != subnet.network: raise wsme.exc.ClientSideError(_( "Invalid network address %s." "Network address of subnet is %s. " "Please configure valid %s subnet.") % (subnet.ip, subnet.network, ip_version_to_string(ip_version))) def is_valid_address_within_subnet(ip_address, subnet): """Determine whether an IP address is valid and within the specified subnet. Raise on Client-Side Error on failure. """ if ip_address.version != subnet.version: raise wsme.exc.ClientSideError(_( "Invalid IP version %s %s. " "Please configure valid %s address.") % (ip_address.version, subnet, ip_version_to_string(subnet.version))) elif ip_address == subnet.network: raise wsme.exc.ClientSideError(_( "Invalid IP address: %s. " "Cannot use network address: %s. " "Please configure valid %s address.") % (ip_address, subnet.network, ip_version_to_string(subnet.version))) elif ip_address == subnet.broadcast: raise wsme.exc.ClientSideError(_( "Cannot use broadcast address: %s. " "Please configure valid %s address.") % (subnet.broadcast, ip_version_to_string(subnet.version))) elif ip_address not in subnet: raise wsme.exc.ClientSideError(_( "IP Address %s is not in subnet: %s. " "Please configure valid %s address.") % (ip_address, subnet, ip_version_to_string(subnet.version))) return True def is_valid_hostname(hostname): """Determine whether an address is valid as per RFC 1123. """ # Maximum length of 255 rc = True length = len(hostname) if length > 255: raise wsme.exc.ClientSideError(_( "Hostname %s is too long. Length %s is greater than 255." "Please configure valid hostname.") % (hostname, length)) # Allow a single dot on the right hand side if hostname[-1] == ".": hostname = hostname[:-1] # Create a regex to ensure: # - hostname does not begin or end with a dash # - each segment is 1 to 63 characters long # - valid characters are A-Z (any case) and 0-9 valid_re = re.compile("(?!-)[A-Z\d-]{1,63}(?