Remove the CheckedDict class
Its functionality has been completely superceded by the new Properties and Parameters implementations. Change-Id: I33a1990ee543a189dd3ec8f22eec27e2b0514d09 Signed-off-by: Zane Bitter <zbitter@redhat.com>
This commit is contained in:
parent
275eb17e97
commit
8e16b5168c
heat
@ -1,159 +0,0 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import re
|
||||
from copy import deepcopy
|
||||
|
||||
from heat.openstack.common import log as logging
|
||||
|
||||
logger = logging.getLogger('heat.engine.checkeddict')
|
||||
|
||||
|
||||
class CheckedDict(collections.MutableMapping):
|
||||
|
||||
def __init__(self, name):
|
||||
self.data = {}
|
||||
self.name = name
|
||||
|
||||
def addschema(self, key, schema):
|
||||
self.data[key] = deepcopy(schema)
|
||||
|
||||
def get_attr(self, key, attr):
|
||||
return self.data[key].get(attr, '')
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
'''Since this function gets called whenever we modify the
|
||||
dictionary (object), we can (and do) validate those keys that we
|
||||
know how to validate.
|
||||
'''
|
||||
def str_to_num(s):
|
||||
try:
|
||||
return int(s)
|
||||
except ValueError:
|
||||
return float(s)
|
||||
|
||||
num_converter = {'Integer': int,
|
||||
'Number': str_to_num,
|
||||
'Float': float}
|
||||
|
||||
if not key in self.data:
|
||||
raise KeyError('%s: key %s not found' % (self.name, key))
|
||||
|
||||
if 'Type' in self.data[key]:
|
||||
t = self.data[key]['Type']
|
||||
if t == 'String':
|
||||
if not isinstance(value, basestring):
|
||||
raise ValueError('%s: %s Value must be a string' %
|
||||
(self.name, key))
|
||||
if 'MaxLength' in self.data[key]:
|
||||
if len(value) > int(self.data[key]['MaxLength']):
|
||||
raise ValueError('%s: %s is too long; MaxLength %s' %
|
||||
(self.name, key,
|
||||
self.data[key]['MaxLength']))
|
||||
if 'MinLength' in self.data[key]:
|
||||
if len(value) < int(self.data[key]['MinLength']):
|
||||
raise ValueError('%s: %s is too short; MinLength %s' %
|
||||
(self.name, key,
|
||||
self.data[key]['MinLength']))
|
||||
if 'AllowedPattern' in self.data[key]:
|
||||
rc = re.match('^%s$' % self.data[key]['AllowedPattern'],
|
||||
value)
|
||||
if rc is None:
|
||||
raise ValueError('%s: Pattern %s does not match %s' %
|
||||
(self.name,
|
||||
self.data[key]['AllowedPattern'],
|
||||
key))
|
||||
|
||||
elif t in ['Integer', 'Number', 'Float']:
|
||||
# just try convert and see if it will throw a ValueError
|
||||
num = num_converter[t](value)
|
||||
minn = num
|
||||
maxn = num
|
||||
if 'MaxValue' in self.data[key]:
|
||||
maxn = num_converter[t](self.data[key]['MaxValue'])
|
||||
if 'MinValue' in self.data[key]:
|
||||
minn = num_converter[t](self.data[key]['MinValue'])
|
||||
if num > maxn or num < minn:
|
||||
raise ValueError('%s: %s is out of range' % (self.name,
|
||||
key))
|
||||
|
||||
elif t == 'Map':
|
||||
if not isinstance(value, dict):
|
||||
raise ValueError('%s: %s Value must be a map' %
|
||||
(self.name, key))
|
||||
if 'Schema' in self.data[key]:
|
||||
cdict = CheckedDict(key)
|
||||
schema = self.data[key]['Schema']
|
||||
for n, s in schema.items():
|
||||
cdict.addschema(n, s)
|
||||
for k, v in value.items():
|
||||
cdict[k] = v
|
||||
|
||||
elif t == 'List':
|
||||
if not isinstance(value, (list, tuple)):
|
||||
raise ValueError('%s: %s Value must be a list, not %s' %
|
||||
(self.name, key, value))
|
||||
if 'Schema' in self.data[key]:
|
||||
for item in value:
|
||||
cdict = CheckedDict(key)
|
||||
schema = self.data[key]['Schema']
|
||||
for n, s in schema.items():
|
||||
cdict.addschema(n, s)
|
||||
for k, v in item.items():
|
||||
cdict[k] = v
|
||||
|
||||
elif t == 'CommaDelimitedList':
|
||||
sp = value.split(',')
|
||||
|
||||
else:
|
||||
logger.warn('Unknown value type "%s"' % t)
|
||||
|
||||
if 'AllowedValues' in self.data[key]:
|
||||
if not value in self.data[key]['AllowedValues']:
|
||||
raise ValueError('%s: %s Value must be one of %s' %
|
||||
(self.name, key,
|
||||
str(self.data[key]['AllowedValues'])))
|
||||
|
||||
self.data[key]['Value'] = value
|
||||
|
||||
def __getitem__(self, key):
|
||||
if not key in self.data:
|
||||
raise KeyError('%s: key %s not found' % (self.name, key))
|
||||
|
||||
if 'Value' in self.data[key]:
|
||||
return self.data[key]['Value']
|
||||
elif 'Default' in self.data[key]:
|
||||
return self.data[key]['Default']
|
||||
elif 'Required' in self.data[key]:
|
||||
if not self.data[key]['Required']:
|
||||
return None
|
||||
else:
|
||||
raise ValueError('%s: Property %s not assigned' % (self.name,
|
||||
key))
|
||||
else:
|
||||
raise ValueError('%s: Property %s not assigned' % (self.name, key))
|
||||
|
||||
def __len__(self):
|
||||
return len(self.data)
|
||||
|
||||
def __contains__(self, key):
|
||||
return key in self.data
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self.data)
|
||||
|
||||
def __delitem__(self, k):
|
||||
del self.data[k]
|
@ -1,134 +0,0 @@
|
||||
import sys
|
||||
import os
|
||||
|
||||
import nose
|
||||
import unittest
|
||||
import mox
|
||||
import json
|
||||
|
||||
from nose.plugins.attrib import attr
|
||||
from nose import with_setup
|
||||
|
||||
from heat.engine import checkeddict
|
||||
|
||||
|
||||
@attr(tag=['unit', 'checkeddict'])
|
||||
@attr(speed='fast')
|
||||
class CheckedDictTest(unittest.TestCase):
|
||||
|
||||
def test_paramerters(self):
|
||||
parms = '''
|
||||
{
|
||||
"Parameters" : {
|
||||
"TODOList" : {
|
||||
"Description" : "stuff",
|
||||
"Type" : "CommaDelimitedList"
|
||||
},
|
||||
"SomeNumber" : {
|
||||
"Type" : "Number",
|
||||
"Default" : "56",
|
||||
"MaxValue": "6778",
|
||||
"MinValue": "15.78"
|
||||
},
|
||||
"DBUsername": {
|
||||
"Default": "admin",
|
||||
"NoEcho": "true",
|
||||
"Description" : "The WordPress database admin account username",
|
||||
"Type": "String",
|
||||
"MinLength": "1",
|
||||
"MaxLength": "16",
|
||||
"AllowedPattern" : "[a-zA-Z][a-zA-Z0-9]*",
|
||||
"ConstraintDescription" : "begin with a letter & \
|
||||
contain only alphanumeric characters."
|
||||
},
|
||||
"LinuxDistribution": {
|
||||
"Default": "F16",
|
||||
"Description" : "Distribution of choice",
|
||||
"Type": "String",
|
||||
"AllowedValues" : [ "F16", "F17", "U10", "RHEL-6.1", "RHEL-6.3" ]
|
||||
}
|
||||
}
|
||||
}
|
||||
'''
|
||||
ps = json.loads(parms)
|
||||
cd = checkeddict.CheckedDict('test_paramerters')
|
||||
for p in ps['Parameters']:
|
||||
cd.addschema(p, ps['Parameters'][p])
|
||||
|
||||
# AllowedValues
|
||||
self.assertRaises(ValueError, cd.__setitem__, 'LinuxDistribution',
|
||||
'f16')
|
||||
# MaxLength
|
||||
self.assertRaises(ValueError, cd.__setitem__, 'DBUsername',
|
||||
'Farstarststrststrstrstrst144')
|
||||
# MinLength
|
||||
self.assertRaises(ValueError, cd.__setitem__, 'DBUsername', '')
|
||||
# AllowedPattern
|
||||
self.assertRaises(ValueError, cd.__setitem__, 'DBUsername', '4me')
|
||||
|
||||
cd['DBUsername'] = 'wtf'
|
||||
self.assertTrue(cd['DBUsername'] == 'wtf')
|
||||
cd['LinuxDistribution'] = 'U10'
|
||||
self.assertTrue(cd['LinuxDistribution'] == 'U10')
|
||||
|
||||
# int
|
||||
cd['SomeNumber'] = '98'
|
||||
self.assertTrue(cd['SomeNumber'] == '98')
|
||||
|
||||
# float
|
||||
cd['SomeNumber'] = '54.345'
|
||||
self.assertTrue(cd['SomeNumber'] == '54.345')
|
||||
|
||||
# not a num
|
||||
self.assertRaises(ValueError, cd.__setitem__, 'SomeNumber', 'S8')
|
||||
# range errors
|
||||
self.assertRaises(ValueError, cd.__setitem__, 'SomeNumber', '8')
|
||||
self.assertRaises(ValueError, cd.__setitem__, 'SomeNumber', '9048.56')
|
||||
# lists
|
||||
cd['TODOList'] = "'one', 'two', 'three'"
|
||||
|
||||
def test_nested_paramerters(self):
|
||||
listeners_schema = {
|
||||
'InstancePort': {'Type': 'Integer',
|
||||
'Required': True},
|
||||
'LoadBalancerPort': {'Type': 'Integer',
|
||||
'Required': True}
|
||||
}
|
||||
|
||||
healthcheck_schema = {
|
||||
'HealthyThreshold': {'Type': 'Number',
|
||||
'Required': True},
|
||||
'Interval': {'Type': 'Number',
|
||||
'Required': True}
|
||||
}
|
||||
|
||||
properties_schema = {
|
||||
'HealthCheck': {'Type': 'Map',
|
||||
'Implemented': False,
|
||||
'Schema': healthcheck_schema},
|
||||
'Listeners': {'Type': 'List',
|
||||
'Schema': listeners_schema}
|
||||
}
|
||||
|
||||
cd = checkeddict.CheckedDict('nested')
|
||||
for p, s in properties_schema.items():
|
||||
cd.addschema(p, s)
|
||||
|
||||
hc = {'HealthyThreshold': 'bla', 'Interval': '45'}
|
||||
self.assertRaises(ValueError, cd.__setitem__, 'HealthCheck', hc)
|
||||
|
||||
hc = {'HealthyThreshold': '246', 'Interval': '45'}
|
||||
cd['HealthCheck'] = hc
|
||||
self.assertTrue(cd['HealthCheck']['HealthyThreshold'] == '246')
|
||||
self.assertTrue(cd['HealthCheck']['Interval'] == '45')
|
||||
|
||||
li = [{'InstancePort': 'bla', 'LoadBalancerPort': '45'},
|
||||
{'InstancePort': '66', 'LoadBalancerPort': '775'}]
|
||||
self.assertRaises(ValueError, cd.__setitem__, 'Listeners', li)
|
||||
|
||||
li2 = [{'InstancePort': '674', 'LoadBalancerPort': '45'},
|
||||
{'InstancePort': '66', 'LoadBalancerPort': '775'}]
|
||||
cd['Listeners'] = li2
|
||||
|
||||
self.assertTrue(cd['Listeners'][0]['LoadBalancerPort'] == '45')
|
||||
self.assertTrue(cd['Listeners'][1]['LoadBalancerPort'] == '775')
|
Loading…
x
Reference in New Issue
Block a user