Merge "Refactors quantum.api.v2.attributes.py"

This commit is contained in:
Jenkins
2012-11-27 01:58:14 +00:00
committed by Gerrit Code Review
2 changed files with 96 additions and 100 deletions

View File

@@ -1,21 +1,19 @@
# Copyright (c) 2012 OpenStack, LLC.
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# vim: tabstop=4 shiftwidth=4 softtabstop=4
ATTR_NOT_SPECIFIED = object()
# Defining a constant to avoid repeating string literal in several modules
SHARED = 'shared'
# Copyright (c) 2012 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the 'License'); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
import re
@@ -27,15 +25,29 @@ from quantum.openstack.common import uuidutils
LOG = logging.getLogger(__name__)
ATTR_NOT_SPECIFIED = object()
# Defining a constant to avoid repeating string literal in several modules
SHARED = 'shared'
def _verify_dict_keys(expected_keys, target_dict):
if not isinstance(target_dict, dict):
msg = _("Invalid input. %s must be a dictionary.") % target_dict
return msg
provided_keys = target_dict.keys()
if set(expected_keys) != set(provided_keys):
msg = (_("Expected keys not found. Expected: %(expected_keys)s "
"Provided: %(provided_keys)s") % locals())
return msg
def is_attr_set(attribute):
return not (attribute is None or attribute is ATTR_NOT_SPECIFIED)
def _validate_boolean(data, valid_values=None):
if isinstance(data, bool):
return
else:
if not isinstance(data, bool):
msg = _("'%s' is not boolean") % data
LOG.debug("validate_boolean: %s", msg)
return msg
@@ -73,7 +85,6 @@ def _validate_range(data, valid_values=None):
def _validate_mac_address(data, valid_values=None):
try:
netaddr.EUI(data)
return
except Exception:
msg = _("'%s' is not a valid MAC address") % data
LOG.debug("validate_mac_address: %s", msg)
@@ -83,7 +94,6 @@ def _validate_mac_address(data, valid_values=None):
def _validate_ip_address(data, valid_values=None):
try:
netaddr.IPAddress(data)
return
except Exception:
msg = _("'%s' is not a valid IP address") % data
LOG.debug("validate_ip_address: %s", msg)
@@ -93,121 +103,103 @@ def _validate_ip_address(data, valid_values=None):
def _validate_ip_pools(data, valid_values=None):
"""Validate that start and end IP addresses are present
In addition to this the IP addresses will also be validated"""
In addition to this the IP addresses will also be validated
"""
if not isinstance(data, list):
msg = _("'%s' in not a valid IP pool") % data
msg = _("'%s' is not a valid IP pool") % data
LOG.debug("validate_ip_pools: %s", msg)
return msg
expected_keys = set(['start', 'end'])
try:
for ip_pool in data:
if set(ip_pool.keys()) != expected_keys:
msg = _("Expected keys not found. Expected: %s "
"Provided: %s") % (expected_keys, ip_pool.keys())
expected_keys = ['start', 'end']
for ip_pool in data:
msg = _verify_dict_keys(expected_keys, ip_pool)
if msg:
LOG.debug("validate_ip_pools: %s", msg)
return msg
for k in expected_keys:
msg = _validate_ip_address(ip_pool[k])
if msg:
LOG.debug("validate_ip_pools: %s", msg)
return msg
for k in expected_keys:
msg = _validate_ip_address(ip_pool[k])
if msg:
LOG.debug("validate_ip_pools: %s", msg)
return msg
except KeyError, e:
args = {'key_name': e.message, 'ip_pool': ip_pool}
msg = _("Invalid input. Required key: '%(key_name)s' "
"missing from %(ip_pool)s.") % args
LOG.debug("validate_ip_pools: %s", msg)
return msg
except TypeError, e:
msg = _("Invalid input. Pool %s must be a dictionary.") % ip_pool
LOG.debug("validate_ip_pools: %s", msg)
return msg
except Exception:
msg = _("'%s' in not a valid IP pool") % data
LOG.debug("validate_ip_pools: %s", msg)
return msg
def _validate_fixed_ips(data, valid_values=None):
if not isinstance(data, list):
msg = _("'%s' in not a valid fixed IP") % data
msg = _("'%s' is not a valid fixed IP") % data
LOG.debug("validate_fixed_ips: %s", msg)
return msg
ips = []
try:
for fixed_ip in data:
if 'ip_address' in fixed_ip:
msg = _validate_ip_address(fixed_ip['ip_address'])
if msg:
LOG.debug("validate_fixed_ips: %s", msg)
return msg
if 'subnet_id' in fixed_ip:
msg = _validate_regex(fixed_ip['subnet_id'], UUID_PATTERN)
if msg:
LOG.debug("validate_fixed_ips: %s", msg)
return msg
for fixed_ip in data:
if 'ip_address' in fixed_ip:
# Ensure that duplicate entries are not set - just checking IP
# suffices. Duplicate subnet_id's are legitimate.
if 'ip_address' in fixed_ip:
if fixed_ip['ip_address'] in ips:
msg = _("Duplicate entry %s") % fixed_ip
LOG.debug("validate_fixed_ips: %s", msg)
return msg
ips.append(fixed_ip['ip_address'])
except Exception:
msg = _("'%s' in not a valid fixed IP") % data
LOG.debug("validate_fixed_ips: %s", msg)
return msg
fixed_ip_address = fixed_ip['ip_address']
if fixed_ip_address in ips:
msg = _("Duplicate entry %s") % fixed_ip
else:
msg = _validate_ip_address(fixed_ip_address)
if msg:
LOG.debug("validate_fixed_ips: %s", msg)
return msg
ips.append(fixed_ip_address)
if 'subnet_id' in fixed_ip:
msg = _validate_uuid(fixed_ip['subnet_id'])
if msg:
LOG.debug("validate_fixed_ips: %s", msg)
return msg
def _validate_nameservers(data, valid_values=None):
if not hasattr(data, '__iter__'):
msg = _("'%s' in not a valid nameserver") % data
msg = _("'%s' is not a valid nameserver") % data
LOG.debug("validate_nameservers: %s", msg)
return msg
ips = set()
ips = []
for ip in data:
msg = _validate_ip_address(ip)
if msg:
# This may be a hostname
msg = _validate_regex(ip, HOSTNAME_PATTERN)
if msg:
msg = _("'%s' in not a valid nameserver") % ip
msg = _("'%s' is not a valid nameserver") % ip
LOG.debug("validate_nameservers: %s", msg)
return msg
if ip in ips:
msg = _("Duplicate nameserver %s") % ip
LOG.debug("validate_nameservers: %s", msg)
return msg
ips.add(ip)
ips.append(ip)
def _validate_hostroutes(data, valid_values=None):
if not isinstance(data, list):
msg = _("'%s' in not a valid hostroute") % data
msg = _("'%s' is not a valid hostroute") % data
LOG.debug("validate_hostroutes: %s", msg)
return msg
expected_keys = ['destination', 'nexthop']
hostroutes = []
try:
for hostroute in data:
msg = _validate_subnet(hostroute['destination'])
if msg:
LOG.debug("validate_hostroutes: %s", msg)
return msg
msg = _validate_ip_address(hostroute['nexthop'])
if msg:
LOG.debug("validate_hostroutes: %s", msg)
return msg
if hostroute in hostroutes:
msg = _("Duplicate hostroute %s") % hostroute
LOG.debug("validate_hostroutes: %s", msg)
if msg:
return msg
hostroutes.append(hostroute)
except:
msg = _("'%s' in not a valid hostroute") % data
LOG.debug("validate_hostroutes: %s", msg)
return msg
for hostroute in data:
msg = _verify_dict_keys(expected_keys, hostroute)
if msg:
LOG.debug("validate_hostroutes: %s", msg)
return msg
msg = _validate_subnet(hostroute['destination'])
if msg:
LOG.debug("validate_hostroutes: %s", msg)
return msg
msg = _validate_ip_address(hostroute['nexthop'])
if msg:
LOG.debug("validate_hostroutes: %s", msg)
return msg
if hostroute in hostroutes:
msg = _("Duplicate hostroute %s") % hostroute
LOG.debug("validate_hostroutes: %s", msg)
return msg
hostroutes.append(hostroute)
def _validate_ip_address_or_none(data, valid_values=None):
@@ -236,7 +228,7 @@ def _validate_regex(data, valid_values=None):
except TypeError:
pass
msg = _("'%s' is not valid input") % data
msg = _("'%s' is not a valid input") % data
LOG.debug("validate_regex: %s", msg)
return msg

View File

@@ -207,6 +207,9 @@ class TestAttributes(unittest2.TestCase):
def test_validate_hostroutes(self):
hostroute_pools = [[{'destination': '100.0.0.0/24'}],
[{'nexthop': '10.0.2.20'}],
[{'nexthop': '10.0.2.20',
'forza': 'juve',
'destination': '100.0.0.0/8'}],
[{'nexthop': '1110.0.2.20',
'destination': '100.0.0.0/8'}],
[{'nexthop': '10.0.2.20',
@@ -215,6 +218,7 @@ class TestAttributes(unittest2.TestCase):
'destination': '100.0.0.0/8'},
{'nexthop': '10.0.2.20',
'destination': '100.0.0.0/8'}],
[None],
None]
for host_routes in hostroute_pools:
msg = attributes._validate_hostroutes(host_routes, None)
@@ -368,11 +372,11 @@ class TestAttributes(unittest2.TestCase):
data = None
msg = attributes._validate_regex(data, pattern)
self.assertEquals(msg, "'%s' is not valid input" % data)
self.assertEquals(msg, "'%s' is not a valid input" % data)
data = 'bat'
msg = attributes._validate_regex(data, pattern)
self.assertEquals(msg, "'%s' is not valid input" % data)
self.assertEquals(msg, "'%s' is not a valid input" % data)
data = 'hat'
msg = attributes._validate_regex(data, pattern)