Add a Properties implementation with lazy loading

This will allow property values to be calculated on demand, rather than
stored as state.

Change-Id: I93fefa911b96c931cbf48b82b5a7ccc13ad99168
Signed-off-by: Zane Bitter <zbitter@redhat.com>
This commit is contained in:
Zane Bitter
2012-11-05 14:28:14 +01:00
parent c44c27404f
commit 6a19ccb7b8
2 changed files with 524 additions and 0 deletions

View File

@@ -0,0 +1,193 @@
import collections
import re
SCHEMA_KEYS = (
REQUIRED, IMPLEMENTED, DEFAULT, TYPE, SCHEMA,
PATTERN, MIN_VALUE, MAX_VALUE, VALUES,
) = (
'Required', 'Implemented', 'Default', 'Type', 'Schema',
'AllowedPattern', 'MinValue', 'MaxValue', 'AllowedValues',
)
SCHEMA_TYPES = (
INTEGER,
STRING, NUMBER, BOOLEAN,
MAP, LIST
) = (
'Integer',
'String', 'Number', 'Boolean',
'Map', 'List'
)
class Property(object):
def __init__(self, schema, name=None):
self.schema = schema
self.name = name
for key in self.schema:
assert key in SCHEMA_KEYS, 'Unknown schema key "%s"' % key
assert self.type() in SCHEMA_TYPES, \
'Unknown property type "%s"' % self.type()
def required(self):
return self.schema.get(REQUIRED, False)
def implemented(self):
return self.schema.get(IMPLEMENTED, True)
def has_default(self):
return DEFAULT in self.schema
def default(self):
return self.schema[DEFAULT]
def type(self):
return self.schema[TYPE]
def _check_allowed(self, value):
if VALUES in self.schema:
allowed = self.schema[VALUES]
if value not in allowed:
raise ValueError('"%s" is not an allowed value %s' %
(value, str(allowed)))
@staticmethod
def str_to_num(value):
try:
return int(value)
except ValueError:
return float(value)
def _validate_number(self, value):
self._check_allowed(value)
num = self.str_to_num(value)
minn = self.str_to_num(self.schema.get(MIN_VALUE, value))
maxn = self.str_to_num(self.schema.get(MAX_VALUE, value))
if num > maxn or num < minn:
format = '%d' if isinstance(num, int) else '%f'
raise ValueError('%s is out of range' % (format % num))
return value
def _validate_string(self, value):
if not isinstance(value, basestring):
raise ValueError('Value must be a string')
self._check_allowed(value)
if PATTERN in self.schema:
pattern = self.schema[PATTERN]
match = re.match(pattern, value)
if match is None or match.end() != len(value):
raise ValueError('"%s" does not match pattern "%s"' %
(value, pattern))
return value
def _validate_map(self, value):
if not isinstance(value, collections.Mapping):
raise TypeError('"%s" is not a map' % value)
if SCHEMA in self.schema:
children = dict(Properties(self.schema[SCHEMA], value,
parent_name=self.name))
else:
children = value
return children
def _validate_list(self, value):
if (not isinstance(value, collections.Sequence) or
isinstance(value, basestring)):
raise TypeError('"%s" is not a list' % repr(value))
for v in value:
self._check_allowed(v)
if SCHEMA in self.schema:
prop = Property({TYPE: MAP, SCHEMA: self.schema[SCHEMA]})
children = [prop.validate_data(d) for d in value]
else:
children = value
return children
def _validate_bool(self, value):
normalised = value.lower()
if normalised not in ['true', 'false']:
raise ValueError('"%s" is not a valid boolean')
return normalised
def validate_data(self, value):
t = self.type()
if t == STRING:
return self._validate_string(value)
elif t == INTEGER:
if not isinstance(value, int):
raise TypeError('value is not an integer')
return self._validate_number(value)
elif t == NUMBER:
return self._validate_number(value)
elif t == MAP:
return self._validate_map(value)
elif t == LIST:
return self._validate_list(value)
elif t == BOOLEAN:
return self._validate_bool(value)
class Properties(collections.Mapping):
def __init__(self, schema, data, resolver=lambda d: d, parent_name=None):
self.props = dict((k, Property(s, k)) for k, s in schema.items())
self.resolve = resolver
self.data = data
if parent_name is None:
self.error_prefix = ''
else:
self.error_prefix = parent_name + ': '
def validate(self):
for (key, prop) in self.props.items():
try:
self[key]
except ValueError as e:
return str(e)
# are there unimplemented Properties
if not prop.implemented() and key in self.data:
return (self.error_prefix +
'%s Property not implemented yet' % key)
def __getitem__(self, key):
if key not in self:
raise KeyError(self.error_prefix + 'Invalid Property %s' % key)
prop = self.props[key]
if key in self.data:
value = self.resolve(self.data[key])
try:
return prop.validate_data(value)
except ValueError as e:
raise ValueError(self.error_prefix + '%s %s' % (key, str(e)))
elif prop.has_default():
return prop.default()
elif prop.required():
raise ValueError(self.error_prefix +
'Property %s not assigned' % key)
def __len__(self):
return len(self.props)
def __contains__(self, key):
return key in self.props
def __iter__(self):
return iter(self.props)

View File

@@ -0,0 +1,331 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
from nose.plugins.attrib import attr
import mox
from heat.engine.resources import properties
@attr(tag=['unit', 'properties'])
@attr(speed='fast')
class PropertyTest(unittest.TestCase):
def test_required_default(self):
p = properties.Property({'Type': 'String'})
self.assertFalse(p.required())
def test_required_false(self):
p = properties.Property({'Type': 'String', 'Required': False})
self.assertFalse(p.required())
def test_required_true(self):
p = properties.Property({'Type': 'String', 'Required': True})
self.assertTrue(p.required())
def test_implemented_default(self):
p = properties.Property({'Type': 'String'})
self.assertTrue(p.implemented())
def test_implemented_false(self):
p = properties.Property({'Type': 'String', 'Implemented': False})
self.assertFalse(p.implemented())
def test_implemented_true(self):
p = properties.Property({'Type': 'String', 'Implemented': True})
self.assertTrue(p.implemented())
def test_no_default(self):
p = properties.Property({'Type': 'String'})
self.assertFalse(p.has_default())
def test_default(self):
p = properties.Property({'Type': 'String', 'Default': 'wibble'})
self.assertEqual(p.default(), 'wibble')
def test_type(self):
p = properties.Property({'Type': 'String'})
self.assertEqual(p.type(), 'String')
def test_bad_type(self):
self.assertRaises(AssertionError,
properties.Property, {'Type': 'Fish'})
def test_bad_key(self):
self.assertRaises(AssertionError,
properties.Property,
{'Type': 'String', 'Foo': 'Bar'})
def test_string_pattern_good(self):
schema = {'Type': 'String',
'AllowedPattern': '[a-z]*'}
p = properties.Property(schema)
self.assertEqual(p.validate_data('foo'), 'foo')
def test_string_pattern_bad_prefix(self):
schema = {'Type': 'String',
'AllowedPattern': '[a-z]*'}
p = properties.Property(schema)
self.assertRaises(ValueError, p.validate_data, '1foo')
def test_string_pattern_bad_suffix(self):
schema = {'Type': 'String',
'AllowedPattern': '[a-z]*'}
p = properties.Property(schema)
self.assertRaises(ValueError, p.validate_data, 'foo1')
def test_string_value_list_good(self):
schema = {'Type': 'String',
'AllowedValues': ['foo', 'bar', 'baz']}
p = properties.Property(schema)
self.assertEqual(p.validate_data('bar'), 'bar')
def test_string_value_list_bad(self):
schema = {'Type': 'String',
'AllowedValues': ['foo', 'bar', 'baz']}
p = properties.Property(schema)
self.assertRaises(ValueError, p.validate_data, 'blarg')
def test_int_good(self):
schema = {'Type': 'Integer',
'MinValue': 3,
'MaxValue': 3}
p = properties.Property(schema)
self.assertEqual(p.validate_data(3), 3)
def test_int_bad(self):
schema = {'Type': 'Integer'}
p = properties.Property(schema)
self.assertRaises(TypeError, p.validate_data, '3')
def test_integer_low(self):
schema = {'Type': 'Integer',
'MinValue': 4}
p = properties.Property(schema)
self.assertRaises(ValueError, p.validate_data, 3)
def test_integer_high(self):
schema = {'Type': 'Integer',
'MaxValue': 2}
p = properties.Property(schema)
self.assertRaises(ValueError, p.validate_data, 3)
def test_integer_value_list_good(self):
schema = {'Type': 'Integer',
'AllowedValues': [1, 3, 5]}
p = properties.Property(schema)
self.assertEqual(p.validate_data(5), 5)
def test_integer_value_list_bad(self):
schema = {'Type': 'Integer',
'AllowedValues': [1, 3, 5]}
p = properties.Property(schema)
self.assertRaises(ValueError, p.validate_data, 2)
def test_number_good(self):
schema = {'Type': 'Number',
'MinValue': '3',
'MaxValue': '3'}
p = properties.Property(schema)
self.assertEqual(p.validate_data('3'), '3')
def test_number_value_list_good(self):
schema = {'Type': 'Number',
'AllowedValues': ['1', '3', '5']}
p = properties.Property(schema)
self.assertEqual(p.validate_data('5'), '5')
def test_number_value_list_bad(self):
schema = {'Type': 'Number',
'AllowedValues': ['1', '3', '5']}
p = properties.Property(schema)
self.assertRaises(ValueError, p.validate_data, '2')
def test_number_low(self):
schema = {'Type': 'Number',
'MinValue': '4'}
p = properties.Property(schema)
self.assertRaises(ValueError, p.validate_data, '3')
def test_number_high(self):
schema = {'Type': 'Number',
'MaxValue': '2'}
p = properties.Property(schema)
self.assertRaises(ValueError, p.validate_data, '3')
def test_boolean_true(self):
p = properties.Property({'Type': 'Boolean'})
self.assertEqual(p.validate_data('True'), 'true')
def test_boolean_false(self):
p = properties.Property({'Type': 'Boolean'})
self.assertEqual(p.validate_data('False'), 'false')
def test_boolean_invalid(self):
p = properties.Property({'Type': 'Boolean'})
self.assertRaises(ValueError, p.validate_data, 'fish')
def test_list_string(self):
p = properties.Property({'Type': 'List'})
self.assertRaises(TypeError, p.validate_data, 'foo')
def test_list_good(self):
p = properties.Property({'Type': 'List'})
self.assertEqual(p.validate_data(['foo', 'bar']), ['foo', 'bar'])
def test_list_dict(self):
p = properties.Property({'Type': 'List'})
self.assertRaises(TypeError, p.validate_data, {'foo': 'bar'})
def test_list_value_list_bad(self):
schema = {'Type': 'List',
'AllowedValues': ['foo', 'bar', 'baz']}
p = properties.Property(schema)
self.assertRaises(ValueError, p.validate_data, ['foo', 'wibble'])
def test_list_value_list_good(self):
schema = {'Type': 'List',
'AllowedValues': ['foo', 'bar', 'baz']}
p = properties.Property(schema)
self.assertEqual(p.validate_data(['bar', 'foo']), ['bar', 'foo'])
def test_map_string(self):
p = properties.Property({'Type': 'Map'})
self.assertRaises(TypeError, p.validate_data, 'foo')
def test_map_list(self):
p = properties.Property({'Type': 'Map'})
self.assertRaises(TypeError, p.validate_data, ['foo'])
def test_map_schema_good(self):
map_schema = {'valid': {'Type': 'Boolean'}}
p = properties.Property({'Type': 'Map', 'Schema': map_schema})
self.assertEqual(p.validate_data({'valid': 'TRUE'}), {'valid': 'true'})
def test_map_schema_bad_data(self):
map_schema = {'valid': {'Type': 'Boolean'}}
p = properties.Property({'Type': 'Map', 'Schema': map_schema})
self.assertRaises(ValueError, p.validate_data, {'valid': 'fish'})
def test_map_schema_missing_data(self):
map_schema = {'valid': {'Type': 'Boolean'}}
p = properties.Property({'Type': 'Map', 'Schema': map_schema})
self.assertEqual(p.validate_data({}), {'valid': None})
def test_map_schema_missing_required_data(self):
map_schema = {'valid': {'Type': 'Boolean', 'Required': True}}
p = properties.Property({'Type': 'Map', 'Schema': map_schema})
self.assertRaises(ValueError, p.validate_data, {})
def test_list_schema_good(self):
map_schema = {'valid': {'Type': 'Boolean'}}
p = properties.Property({'Type': 'List', 'Schema': map_schema})
self.assertEqual(p.validate_data([{'valid': 'TRUE'},
{'valid': 'False'}]),
[{'valid': 'true'},
{'valid': 'false'}])
def test_list_schema_bad_data(self):
map_schema = {'valid': {'Type': 'Boolean'}}
p = properties.Property({'Type': 'List', 'Schema': map_schema})
self.assertRaises(ValueError, p.validate_data, [{'valid': 'True'},
{'valid': 'fish'}])
@attr(tag=['unit', 'properties'])
@attr(speed='fast')
class PropertiesTest(unittest.TestCase):
def setUp(self):
schema = {
'int': {'Type': 'Integer'},
'string': {'Type': 'String'},
'required_int': {'Type': 'Integer', 'Required': True},
'bad_int': {'Type': 'Integer'},
'missing': {'Type': 'Integer'},
'defaulted': {'Type': 'Integer', 'Default': 1},
'default_override': {'Type': 'Integer', 'Default': 1},
}
data = {
'int': 21,
'string': 'foo',
'bad_int': 'foo',
'default_override': 21,
}
double = lambda d: d * 2
self.props = properties.Properties(schema, data, double, 'wibble')
def test_integer_good(self):
self.assertEqual(self.props['int'], 42)
def test_string_good(self):
self.assertEqual(self.props['string'], 'foofoo')
def test_missing_required(self):
self.assertRaises(ValueError, self.props.get, 'required_int')
def test_integer_bad(self):
self.assertRaises(TypeError, self.props.get, 'bad_int')
def test_missing(self):
self.assertEqual(self.props['missing'], None)
def test_default(self):
self.assertEqual(self.props['defaulted'], 1)
def test_default_override(self):
self.assertEqual(self.props['default_override'], 42)
def test_bad_key(self):
self.assertEqual(self.props.get('foo', 'wibble'), 'wibble')
@attr(tag=['unit', 'properties'])
@attr(speed='fast')
class PropertiesValidationTest(unittest.TestCase):
def test_required(self):
schema = {'foo': {'Type': 'String', 'Required': True}}
props = properties.Properties(schema, {'foo': 'bar'})
self.assertEqual(props.validate(), None)
def test_missing_required(self):
schema = {'foo': {'Type': 'String', 'Required': True}}
props = properties.Properties(schema, {})
self.assertNotEqual(props.validate(), None)
def test_missing_unimplemented(self):
schema = {'foo': {'Type': 'String', 'Implemented': False}}
props = properties.Properties(schema, {})
self.assertEqual(props.validate(), None)
def test_present_unimplemented(self):
schema = {'foo': {'Type': 'String', 'Implemented': False}}
props = properties.Properties(schema, {'foo': 'bar'})
self.assertNotEqual(props.validate(), None)
def test_missing(self):
schema = {'foo': {'Type': 'String'}}
props = properties.Properties(schema, {})
self.assertEqual(props.validate(), None)
def test_bad_data(self):
schema = {'foo': {'Type': 'String'}}
props = properties.Properties(schema, {'foo': 42})
self.assertNotEqual(props.validate(), None)
# allows testing of the test directly, shown below
if __name__ == '__main__':
sys.argv.append(__file__)
nose.main()