Factor Schema out of properties.py for re-use

There is currently some function duplication in Heat for schema
validation. There is properties.Schema and a ParamSchema class which
do mostly the same, but each with their own code.
It would be good to have a common implementation for property and
parameter validation with only property or parameter specific code
implemented in sub-classes.
The current Schema class in properties.py cannot be used in
parameters.py due to cyclic import dependencies that would arise.

This patch factors the Schema class and Constraint classes into their
own file which can then be imported by both properties.py and
parameters.py. For property schema specific code, a class
PropertySchema is introduced as derived from the common Schema class.

The code of the Schema and Constraint classes is unchanged except for
absolutely  refactoring work.

In a subsequent patch, I plan to provide the code to base ParamSchema
on the common Schema class.

Change-Id: I833edd8fad316220f56d6727fe1e3409f8fda6ee
Partial-Bug: #1230229
This commit is contained in:
Thomas Spatzier 2013-11-22 20:24:41 +01:00
parent e726c533d7
commit eae9a2ad3f
11 changed files with 801 additions and 642 deletions

View File

@ -69,12 +69,12 @@ class ResourcePages(Directive):
def _prop_syntax_example(self, prop):
if not prop:
return 'Value'
if prop.type == properties.LIST:
if prop.type == properties.Schema.LIST:
schema = lambda i: prop.schema[i] if prop.schema else None
sub_type = [self._prop_syntax_example(schema(i))
for i in range(2)]
return '[%s, %s, ...]' % tuple(sub_type)
elif prop.type == properties.MAP:
elif prop.type == properties.Schema.MAP:
def sub_props():
for sub_key, sub_value in prop.schema.items():
if sub_value.implemented:
@ -193,12 +193,12 @@ Resources:
definition.append(para)
sub_schema = None
if prop.schema and prop.type == properties.MAP:
if prop.schema and prop.type == properties.Schema.MAP:
para = nodes.emphasis('', _('Map properties:'))
definition.append(para)
sub_schema = prop.schema
elif prop.schema and prop.type == properties.LIST:
elif prop.schema and prop.type == properties.Schema.LIST:
para = nodes.emphasis(
'', _('List contents:'))
definition.append(para)

417
heat/engine/constraints.py Normal file
View File

@ -0,0 +1,417 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import numbers
import re
class InvalidSchemaError(Exception):
pass
class Schema(collections.Mapping):
"""
Schema base class for validating properties or parameters.
Schema objects are serialisable to dictionaries following a superset of
the HOT input Parameter schema using dict().
Serialises to JSON in the form::
{
'type': 'list',
'required': False
'constraints': [
{
'length': {'min': 1},
'description': 'List must not be empty'
}
],
'schema': {
'*': {
'type': 'string'
}
},
'description': 'An example list property.'
}
"""
KEYS = (
TYPE, DESCRIPTION, DEFAULT, SCHEMA, REQUIRED, CONSTRAINTS,
) = (
'type', 'description', 'default', 'schema', 'required', 'constraints',
)
TYPES = (
INTEGER,
STRING, NUMBER, BOOLEAN,
MAP, LIST
) = (
'Integer',
'String', 'Number', 'Boolean',
'Map', 'List'
)
def __init__(self, data_type, description=None,
default=None, schema=None,
required=False, constraints=[]):
self._len = None
self.type = data_type
if self.type not in Schema.TYPES:
raise InvalidSchemaError(_('Invalid type (%s)') % self.type)
self.description = description
self.required = required
if isinstance(schema, type(self)):
if self.type != Schema.LIST:
msg = _('Single schema valid only for '
'%(ltype)s, not %(utype)s') % dict(ltype=Schema.LIST,
utype=self.type)
raise InvalidSchemaError(msg)
self.schema = AnyIndexDict(schema)
else:
self.schema = schema
if self.schema is not None and self.type not in (Schema.LIST,
Schema.MAP):
msg = _('Schema valid only for %(ltype)s or '
'%(mtype)s, not %(utype)s') % dict(ltype=Schema.LIST,
mtype=Schema.MAP,
utype=self.type)
raise InvalidSchemaError(msg)
self.constraints = constraints
for c in constraints:
if self.type not in c.valid_types:
err_msg = _('%(name)s constraint '
'invalid for %(utype)s') % dict(
name=type(c).__name__,
utype=self.type)
raise InvalidSchemaError(err_msg)
self.default = default
if self.default is not None:
try:
self.validate_constraints(self.default)
except (ValueError, TypeError) as exc:
raise InvalidSchemaError(_('Invalid default '
'%(default)s (%(exc)s)') %
dict(default=self.default, exc=exc))
@staticmethod
def str_to_num(value):
"""Convert a string representation of a number into a numeric type."""
if isinstance(value, numbers.Number):
return value
try:
return int(value)
except ValueError:
return float(value)
def validate_constraints(self, value):
for constraint in self.constraints:
constraint.validate(value)
def __getitem__(self, key):
if key == self.TYPE:
return self.type.lower()
elif key == self.DESCRIPTION:
if self.description is not None:
return self.description
elif key == self.DEFAULT:
if self.default is not None:
return self.default
elif key == self.SCHEMA:
if self.schema is not None:
return dict((n, dict(s)) for n, s in self.schema.items())
elif key == self.REQUIRED:
return self.required
elif key == self.CONSTRAINTS:
if self.constraints:
return [dict(c) for c in self.constraints]
raise KeyError(key)
def __iter__(self):
for k in self.KEYS:
try:
self[k]
except KeyError:
pass
else:
yield k
def __len__(self):
if self._len is None:
self._len = len(list(iter(self)))
return self._len
class AnyIndexDict(collections.Mapping):
"""
A Mapping that returns the same value for any integer index.
Used for storing the schema for a list. When converted to a dictionary,
it contains a single item with the key '*'.
"""
ANYTHING = '*'
def __init__(self, value):
self.value = value
def __getitem__(self, key):
if key != self.ANYTHING and not isinstance(key, (int, long)):
raise KeyError(_('Invalid key %s') % str(key))
return self.value
def __iter__(self):
yield self.ANYTHING
def __len__(self):
return 1
class Constraint(collections.Mapping):
"""
Parent class for constraints on allowable values for a Property.
Constraints are serialisable to dictionaries following the HOT input
Parameter constraints schema using dict().
"""
(DESCRIPTION,) = ('description',)
def __init__(self, description=None):
self.description = description
def __str__(self):
def desc():
if self.description:
yield self.description
yield self._str()
return '\n'.join(desc())
def validate(self, value):
if not self._is_valid(value):
if self.description:
err_msg = self.description
else:
err_msg = self._err_msg(value)
raise ValueError(err_msg)
@classmethod
def _name(cls):
return '_'.join(w.lower() for w in re.findall('[A-Z]?[a-z]+',
cls.__name__))
def __getitem__(self, key):
if key == self.DESCRIPTION:
if self.description is None:
raise KeyError(key)
return self.description
if key == self._name():
return self._constraint()
raise KeyError(key)
def __iter__(self):
if self.description is not None:
yield self.DESCRIPTION
yield self._name()
def __len__(self):
return 2 if self.description is not None else 1
class Range(Constraint):
"""
Constrain values within a range.
Serialises to JSON as::
{
'range': {'min': <min>, 'max': <max>},
'description': <description>
}
"""
(MIN, MAX) = ('min', 'max')
valid_types = (Schema.INTEGER, Schema.NUMBER)
def __init__(self, min=None, max=None, description=None):
super(Range, self).__init__(description)
self.min = min
self.max = max
for param in (min, max):
if not isinstance(param, (float, int, long, type(None))):
raise InvalidSchemaError(_('min/max must be numeric'))
if min is max is None:
raise InvalidSchemaError(_('range must have min and/or max'))
def _str(self):
if self.max is None:
fmt = _('The value must be at least %(min)s.')
elif self.min is None:
fmt = _('The value must be no greater than %(max)s.')
else:
fmt = _('The value must be in the range %(min)s to %(max)s.')
return fmt % self._constraint()
def _err_msg(self, value):
return '%s is out of range (min: %s, max: %s)' % (value,
self.min,
self.max)
def _is_valid(self, value):
value = Schema.str_to_num(value)
if self.min is not None:
if value < self.min:
return False
if self.max is not None:
if value > self.max:
return False
return True
def _constraint(self):
def constraints():
if self.min is not None:
yield self.MIN, self.min
if self.max is not None:
yield self.MAX, self.max
return dict(constraints())
class Length(Range):
"""
Constrain the length of values within a range.
Serialises to JSON as::
{
'length': {'min': <min>, 'max': <max>},
'description': <description>
}
"""
valid_types = (Schema.STRING, Schema.LIST)
def __init__(self, min=None, max=None, description=None):
super(Length, self).__init__(min, max, description)
for param in (min, max):
if not isinstance(param, (int, long, type(None))):
msg = _('min/max length must be integral')
raise InvalidSchemaError(msg)
def _str(self):
if self.max is None:
fmt = _('The length must be at least %(min)s.')
elif self.min is None:
fmt = _('The length must be no greater than %(max)s.')
else:
fmt = _('The length must be in the range %(min)s to %(max)s.')
return fmt % self._constraint()
def _err_msg(self, value):
return 'length (%d) is out of range (min: %s, max: %s)' % (len(value),
self.min,
self.max)
def _is_valid(self, value):
return super(Length, self)._is_valid(len(value))
class AllowedValues(Constraint):
"""
Constrain values to a predefined set.
Serialises to JSON as::
{
'allowed_values': [<allowed1>, <allowed2>, ...],
'description': <description>
}
"""
valid_types = (Schema.STRING, Schema.INTEGER, Schema.NUMBER,
Schema.BOOLEAN)
def __init__(self, allowed, description=None):
super(AllowedValues, self).__init__(description)
if (not isinstance(allowed, collections.Sequence) or
isinstance(allowed, basestring)):
raise InvalidSchemaError(_('AllowedValues must be a list'))
self.allowed = tuple(allowed)
def _str(self):
allowed = ', '.join(str(a) for a in self.allowed)
return _('Allowed values: %s') % allowed
def _err_msg(self, value):
allowed = '[%s]' % ', '.join(str(a) for a in self.allowed)
return '"%s" is not an allowed value %s' % (value, allowed)
def _is_valid(self, value):
return value in self.allowed
def _constraint(self):
return list(self.allowed)
class AllowedPattern(Constraint):
"""
Constrain values to a predefined regular expression pattern.
Serialises to JSON as::
{
'allowed_pattern': <pattern>,
'description': <description>
}
"""
valid_types = (Schema.STRING,)
def __init__(self, pattern, description=None):
super(AllowedPattern, self).__init__(description)
self.pattern = pattern
self.match = re.compile(pattern).match
def _str(self):
return _('Value must match pattern: %s') % self.pattern
def _err_msg(self, value):
return '"%s" does not match pattern "%s"' % (value, self.pattern)
def _is_valid(self, value):
match = self.match(value)
return match is not None and match.end() == len(value)
def _constraint(self):
return self.pattern

View File

@ -14,11 +14,10 @@
# under the License.
import collections
import numbers
import re
from heat.common import exception
from heat.engine import parameters
from heat.engine import constraints as constr
from heat.engine import hot
SCHEMA_KEYS = (
@ -31,111 +30,38 @@ SCHEMA_KEYS = (
'MinLength', 'MaxLength', 'Description', 'UpdateAllowed',
)
SCHEMA_TYPES = (
INTEGER,
STRING, NUMBER, BOOLEAN,
MAP, LIST
) = (
'Integer',
'String', 'Number', 'Boolean',
'Map', 'List'
)
class InvalidPropertySchemaError(Exception):
pass
class Schema(collections.Mapping):
class Schema(constr.Schema):
"""
A Schema for a resource Property.
Schema class for validating resource properties.
Schema objects are serialisable to dictionaries following a superset of
the HOT input Parameter schema using dict().
Serialises to JSON in the form::
{
'type': 'list',
'required': False
'constraints': [
{
'length': {'min': 1},
'description': 'List must not be empty'
}
],
'schema': {
'*': {
'type': 'string'
}
},
'description': 'An example list property.'
}
This class is used for defining schema constraints for resource properties.
It inherits generic validation features from the base Schema class and add
processing that is specific to resource properties.
"""
KEYS = (
TYPE, DESCRIPTION, DEFAULT, SCHEMA, REQUIRED, CONSTRAINTS,
UPDATE_ALLOWED,
UPDATE_ALLOWED
) = (
'type', 'description', 'default', 'schema', 'required', 'constraints',
'update_allowed',
'update_allowed'
)
def __init__(self, data_type, description=None,
default=None, schema=None,
required=False, constraints=[],
implemented=True, update_allowed=False):
self._len = None
self.type = data_type
if self.type not in SCHEMA_TYPES:
raise InvalidPropertySchemaError(_('Invalid type (%s)') %
self.type)
self.description = description
self.required = required
implemented=True,
update_allowed=False):
super(Schema, self).__init__(data_type, description, default,
schema, required, constraints)
self.implemented = implemented
self.update_allowed = update_allowed
if isinstance(schema, type(self)):
if self.type != LIST:
msg = _('Single schema valid only for '
'%(ltype)s, not %(utype)s') % dict(ltype=LIST,
utype=self.type)
raise InvalidPropertySchemaError(msg)
self.schema = AnyIndexDict(schema)
else:
self.schema = schema
if self.schema is not None and self.type not in (LIST, MAP):
msg = _('Schema valid only for %(ltype)s or '
'%(mtype)s, not %(utype)s') % dict(ltype=LIST,
mtype=MAP,
utype=self.type)
raise InvalidPropertySchemaError(msg)
self.constraints = constraints
for c in constraints:
if self.type not in c.valid_types:
err_msg = _('%(name)s constraint '
'invalid for %(utype)s') % dict(
name=type(c).__name__,
utype=self.type)
raise InvalidPropertySchemaError(err_msg)
self.default = default
if self.default is not None:
try:
self.validate_constraints(self.default)
except (ValueError, TypeError) as exc:
raise InvalidPropertySchemaError(_('Invalid default '
'%(default)s (%(exc)s)') %
dict(default=self.default,
exc=exc))
@classmethod
def from_legacy(cls, schema_dict):
"""
Return a new Schema object from a legacy schema dictionary.
Return a Property Schema object from a legacy schema dictionary.
"""
# Check for fully-fledged Schema objects
@ -144,43 +70,41 @@ class Schema(collections.Mapping):
unknown = [k for k in schema_dict if k not in SCHEMA_KEYS]
if unknown:
raise InvalidPropertySchemaError(_('Unknown key(s) %s') % unknown)
raise constr.InvalidSchemaError(_('Unknown key(s) %s') % unknown)
def constraints():
def get_num(key):
val = schema_dict.get(key)
if val is not None:
val = Property.str_to_num(val)
val = Schema.str_to_num(val)
return val
if MIN_VALUE in schema_dict or MAX_VALUE in schema_dict:
yield Range(get_num(MIN_VALUE),
get_num(MAX_VALUE))
yield constr.Range(get_num(MIN_VALUE), get_num(MAX_VALUE))
if MIN_LENGTH in schema_dict or MAX_LENGTH in schema_dict:
yield Length(get_num(MIN_LENGTH),
get_num(MAX_LENGTH))
yield constr.Length(get_num(MIN_LENGTH), get_num(MAX_LENGTH))
if ALLOWED_VALUES in schema_dict:
yield AllowedValues(schema_dict[ALLOWED_VALUES])
yield constr.AllowedValues(schema_dict[ALLOWED_VALUES])
if ALLOWED_PATTERN in schema_dict:
yield AllowedPattern(schema_dict[ALLOWED_PATTERN])
yield constr.AllowedPattern(schema_dict[ALLOWED_PATTERN])
try:
data_type = schema_dict[TYPE]
except KeyError:
raise InvalidPropertySchemaError(_('No %s specified') % TYPE)
raise constr.InvalidSchemaError(_('No %s specified') % TYPE)
if SCHEMA in schema_dict:
if data_type == LIST:
if data_type == Schema.LIST:
ss = cls.from_legacy(schema_dict[SCHEMA])
elif data_type == MAP:
elif data_type == Schema.MAP:
schema_dicts = schema_dict[SCHEMA].items()
ss = dict((n, cls.from_legacy(sd)) for n, sd in schema_dicts)
else:
raise InvalidPropertySchemaError(_('%(schema)s supplied for'
' %(type)s %(data)s') %
dict(schema=SCHEMA,
type=TYPE,
data=data_type))
raise constr.InvalidSchemaError(_('%(schema)s supplied for '
' for %(type)s %(data)s') %
dict(schema=SCHEMA,
type=TYPE,
data=data_type))
else:
ss = None
@ -196,38 +120,40 @@ class Schema(collections.Mapping):
@classmethod
def from_parameter(cls, param):
"""
Return a property Schema corresponding to a parameter.
Return a Property Schema corresponding to a parameter.
Convert a parameter schema from a provider template to a property
Schema for the corresponding resource facade.
"""
param_type_map = {
parameters.STRING: STRING,
parameters.NUMBER: NUMBER,
parameters.COMMA_DELIMITED_LIST: LIST,
parameters.JSON: MAP
parameters.STRING: Schema.STRING,
parameters.NUMBER: Schema.NUMBER,
parameters.COMMA_DELIMITED_LIST: Schema.LIST,
parameters.JSON: Schema.MAP
}
def get_num(key, context=param):
val = context.get(key)
if val is not None:
val = Property.str_to_num(val)
val = Schema.str_to_num(val)
return val
def constraints():
desc = param.get(parameters.CONSTRAINT_DESCRIPTION)
if parameters.MIN_VALUE in param or parameters.MAX_VALUE in param:
yield Range(get_num(parameters.MIN_VALUE),
get_num(parameters.MAX_VALUE))
yield constr.Range(get_num(parameters.MIN_VALUE),
get_num(parameters.MAX_VALUE))
if (parameters.MIN_LENGTH in param or
parameters.MAX_LENGTH in param):
yield Length(get_num(parameters.MIN_LENGTH),
get_num(parameters.MAX_LENGTH))
yield constr.Length(get_num(parameters.MIN_LENGTH),
get_num(parameters.MAX_LENGTH))
if parameters.ALLOWED_VALUES in param:
yield AllowedValues(param[parameters.ALLOWED_VALUES], desc)
yield constr.AllowedValues(param[parameters.ALLOWED_VALUES],
desc)
if parameters.ALLOWED_PATTERN in param:
yield AllowedPattern(param[parameters.ALLOWED_PATTERN], desc)
yield constr.AllowedPattern(param[parameters.ALLOWED_PATTERN],
desc)
def constraints_hot():
constraints = param.get(hot.CONSTRAINTS)
@ -238,18 +164,18 @@ class Schema(collections.Mapping):
desc = constraint.get(hot.DESCRIPTION)
if hot.RANGE in constraint:
const_def = constraint.get(hot.RANGE)
yield Range(get_num(hot.MIN, const_def),
get_num(hot.MAX, const_def), desc)
yield constr.Range(get_num(hot.MIN, const_def),
get_num(hot.MAX, const_def), desc)
if hot.LENGTH in constraint:
const_def = constraint.get(hot.LENGTH)
yield Length(get_num(hot.MIN, const_def),
get_num(hot.MAX, const_def), desc)
yield constr.Length(get_num(hot.MIN, const_def),
get_num(hot.MAX, const_def), desc)
if hot.ALLOWED_VALUES in constraint:
const_def = constraint.get(hot.ALLOWED_VALUES)
yield AllowedValues(const_def, desc)
yield constr.AllowedValues(const_def, desc)
if hot.ALLOWED_PATTERN in constraint:
const_def = constraint.get(hot.ALLOWED_PATTERN)
yield AllowedPattern(const_def, desc)
yield constr.AllowedPattern(const_def, desc)
if isinstance(param, hot.HOTParamSchema):
constraint_list = list(constraints_hot())
@ -258,307 +184,29 @@ class Schema(collections.Mapping):
# make update_allowed true by default on TemplateResources
# as the template should deal with this.
return cls(param_type_map.get(param[parameters.TYPE], MAP),
return cls(param_type_map.get(param[parameters.TYPE], Schema.MAP),
description=param.get(parameters.DESCRIPTION),
required=parameters.DEFAULT not in param,
constraints=constraint_list,
update_allowed=True)
def validate_constraints(self, value):
for constraint in self.constraints:
constraint.validate(value)
def __getitem__(self, key):
if key == self.TYPE:
return self.type.lower()
elif key == self.DESCRIPTION:
if self.description is not None:
return self.description
elif key == self.DEFAULT:
if self.default is not None:
return self.default
elif key == self.SCHEMA:
if self.schema is not None:
return dict((n, dict(s)) for n, s in self.schema.items())
elif key == self.REQUIRED:
return self.required
elif key == self.CONSTRAINTS:
if self.constraints:
return [dict(c) for c in self.constraints]
elif key == self.UPDATE_ALLOWED:
if key == self.UPDATE_ALLOWED:
return self.update_allowed
else:
return super(Schema, self).__getitem__(key)
raise KeyError(key)
def __iter__(self):
for k in self.KEYS:
try:
self[k]
except KeyError:
pass
else:
yield k
def __len__(self):
if self._len is None:
self._len = len(list(iter(self)))
return self._len
class AnyIndexDict(collections.Mapping):
def schemata(schema_dicts):
"""
A Mapping that returns the same value for any integer index.
Return dictionary of Schema objects for given dictionary of schemata.
Used for storing the schema for a list. When converted to a dictionary,
it contains a single item with the key '*'.
The input schemata are converted from the legacy (dictionary-based)
format to Schema objects where necessary.
"""
ANYTHING = '*'
def __init__(self, value):
self.value = value
def __getitem__(self, key):
if key != self.ANYTHING and not isinstance(key, (int, long)):
raise KeyError(_('Invalid key %s') % str(key))
return self.value
def __iter__(self):
yield self.ANYTHING
def __len__(self):
return 1
class Constraint(collections.Mapping):
"""
Parent class for constraints on allowable values for a Property.
Constraints are serialisable to dictionaries following the HOT input
Parameter constraints schema using dict().
"""
(DESCRIPTION,) = ('description',)
def __init__(self, description=None):
self.description = description
def __str__(self):
def desc():
if self.description:
yield self.description
yield self._str()
return '\n'.join(desc())
def validate(self, value):
if not self._is_valid(value):
if self.description:
err_msg = self.description
else:
err_msg = self._err_msg(value)
raise ValueError(err_msg)
@classmethod
def _name(cls):
return '_'.join(w.lower() for w in re.findall('[A-Z]?[a-z]+',
cls.__name__))
def __getitem__(self, key):
if key == self.DESCRIPTION:
if self.description is None:
raise KeyError(key)
return self.description
if key == self._name():
return self._constraint()
raise KeyError(key)
def __iter__(self):
if self.description is not None:
yield self.DESCRIPTION
yield self._name()
def __len__(self):
return 2 if self.description is not None else 1
class Range(Constraint):
"""
Constrain values within a range.
Serialises to JSON as::
{
'range': {'min': <min>, 'max': <max>},
'description': <description>
}
"""
(MIN, MAX) = ('min', 'max')
valid_types = (INTEGER, NUMBER)
def __init__(self, min=None, max=None, description=None):
super(Range, self).__init__(description)
self.min = min
self.max = max
for param in (min, max):
if not isinstance(param, (float, int, long, type(None))):
raise InvalidPropertySchemaError(_('min/max must be numeric'))
if min is max is None:
raise InvalidPropertySchemaError(_('range must have '
'min and/or max'))
def _str(self):
if self.max is None:
fmt = _('The value must be at least %(min)s.')
elif self.min is None:
fmt = _('The value must be no greater than %(max)s.')
else:
fmt = _('The value must be in the range %(min)s to %(max)s.')
return fmt % self._constraint()
def _err_msg(self, value):
return '%s is out of range (min: %s, max: %s)' % (value,
self.min,
self.max)
def _is_valid(self, value):
value = Property.str_to_num(value)
if self.min is not None:
if value < self.min:
return False
if self.max is not None:
if value > self.max:
return False
return True
def _constraint(self):
def constraints():
if self.min is not None:
yield self.MIN, self.min
if self.max is not None:
yield self.MAX, self.max
return dict(constraints())
class Length(Range):
"""
Constrain the length of values within a range.
Serialises to JSON as::
{
'length': {'min': <min>, 'max': <max>},
'description': <description>
}
"""
valid_types = (STRING, LIST)
def __init__(self, min=None, max=None, description=None):
super(Length, self).__init__(min, max, description)
for param in (min, max):
if not isinstance(param, (int, long, type(None))):
msg = _('min/max length must be integral')
raise InvalidPropertySchemaError(msg)
def _str(self):
if self.max is None:
fmt = _('The length must be at least %(min)s.')
elif self.min is None:
fmt = _('The length must be no greater than %(max)s.')
else:
fmt = _('The length must be in the range %(min)s to %(max)s.')
return fmt % self._constraint()
def _err_msg(self, value):
return 'length (%d) is out of range (min: %s, max: %s)' % (len(value),
self.min,
self.max)
def _is_valid(self, value):
return super(Length, self)._is_valid(len(value))
class AllowedValues(Constraint):
"""
Constrain values to a predefined set.
Serialises to JSON as::
{
'allowed_values': [<allowed1>, <allowed2>, ...],
'description': <description>
}
"""
valid_types = (STRING, INTEGER, NUMBER, BOOLEAN)
def __init__(self, allowed, description=None):
super(AllowedValues, self).__init__(description)
if (not isinstance(allowed, collections.Sequence) or
isinstance(allowed, basestring)):
raise InvalidPropertySchemaError(_('AllowedValues must be a list'))
self.allowed = tuple(allowed)
def _str(self):
allowed = ', '.join(str(a) for a in self.allowed)
return _('Allowed values: %s') % allowed
def _err_msg(self, value):
allowed = '[%s]' % ', '.join(str(a) for a in self.allowed)
return '"%s" is not an allowed value %s' % (value, allowed)
def _is_valid(self, value):
return value in self.allowed
def _constraint(self):
return list(self.allowed)
class AllowedPattern(Constraint):
"""
Constrain values to a predefined regular expression pattern.
Serialises to JSON as::
{
'allowed_pattern': <pattern>,
'description': <description>
}
"""
valid_types = (STRING,)
def __init__(self, pattern, description=None):
super(AllowedPattern, self).__init__(description)
self.pattern = pattern
self.match = re.compile(pattern).match
def _str(self):
return _('Value must match pattern: %s') % self.pattern
def _err_msg(self, value):
return '"%s" does not match pattern "%s"' % (value, self.pattern)
def _is_valid(self, value):
match = self.match(value)
return match is not None and match.end() == len(value)
def _constraint(self):
return self.pattern
return dict((n, Schema.from_legacy(s)) for n, s in schema_dicts.items())
class Property(object):
@ -585,15 +233,6 @@ class Property(object):
def type(self):
return self.schema.type
@staticmethod
def str_to_num(value):
if isinstance(value, numbers.Number):
return value
try:
return int(value)
except ValueError:
return float(value)
def _validate_integer(self, value):
if value is None:
value = self.has_default() and self.default() or 0
@ -604,7 +243,7 @@ class Property(object):
def _validate_number(self, value):
if value is None:
value = self.has_default() and self.default() or 0
return self.str_to_num(value)
return Schema.str_to_num(value)
def _validate_string(self, value):
if value is None:
@ -655,17 +294,17 @@ class Property(object):
def _validate_data_type(self, value):
t = self.type()
if t == STRING:
if t == Schema.STRING:
return self._validate_string(value)
elif t == INTEGER:
elif t == Schema.INTEGER:
return self._validate_integer(value)
elif t == NUMBER:
elif t == Schema.NUMBER:
return self._validate_number(value)
elif t == MAP:
elif t == Schema.MAP:
return self._validate_map(value)
elif t == LIST:
elif t == Schema.LIST:
return self._validate_list(value)
elif t == BOOLEAN:
elif t == Schema.BOOLEAN:
return self._validate_bool(value)
def validate_data(self, value):
@ -674,16 +313,6 @@ class Property(object):
return value
def schemata(schema_dicts):
"""
Return a dictionary of Schema objects for the given dictionary of schemata.
The input schemata are converted from the legacy (dictionary-based) format
to Schema objects where necessary.
"""
return dict((n, Schema.from_legacy(s)) for n, s in schema_dicts.items())
class Properties(collections.Mapping):
def __init__(self, schema, data, resolver=lambda d: d, parent_name=None):
@ -772,11 +401,11 @@ class Properties(collections.Mapping):
if schema.get('Implemented') is False:
return
if schema[TYPE] == LIST:
if schema[TYPE] == Schema.LIST:
params[path] = {parameters.TYPE: parameters.COMMA_DELIMITED_LIST}
return {'Fn::Split': {'Ref': path}}
elif schema[TYPE] == MAP:
elif schema[TYPE] == Schema.MAP:
params[path] = {parameters.TYPE: parameters.JSON}
return {'Ref': path}

View File

@ -96,15 +96,14 @@ class InstanceGroup(stack_resource.StackResource):
"(Heat extension).")
}
rolling_update_schema = {
'MinInstancesInService': properties.Schema(properties.NUMBER,
'MinInstancesInService': properties.Schema(properties.Schema.NUMBER,
default=0),
'MaxBatchSize': properties.Schema(properties.NUMBER,
default=1),
'PauseTime': properties.Schema(properties.STRING,
'MaxBatchSize': properties.Schema(properties.Schema.NUMBER, default=1),
'PauseTime': properties.Schema(properties.Schema.STRING,
default='PT0S')
}
update_policy_schema = {
'RollingUpdate': properties.Schema(properties.MAP,
'RollingUpdate': properties.Schema(properties.Schema.MAP,
schema=rolling_update_schema)
}
@ -418,16 +417,16 @@ class AutoScalingGroup(InstanceGroup, CooldownMixin):
'Description': _('Tags to attach to this group.')}
}
rolling_update_schema = {
'MinInstancesInService': properties.Schema(properties.NUMBER,
'MinInstancesInService': properties.Schema(properties.Schema.NUMBER,
default=0),
'MaxBatchSize': properties.Schema(properties.NUMBER,
default=1),
'PauseTime': properties.Schema(properties.STRING,
'MaxBatchSize': properties.Schema(properties.Schema.NUMBER, default=1),
'PauseTime': properties.Schema(properties.Schema.STRING,
default='PT0S')
}
update_policy_schema = {
'AutoScalingRollingUpdate': properties.Schema(
properties.MAP, schema=rolling_update_schema)
'AutoScalingRollingUpdate': properties.Schema(properties.Schema.MAP,
schema=
rolling_update_schema)
}
update_allowed_keys = ('Properties', 'UpdatePolicy')

View File

@ -16,6 +16,7 @@
from heat.common import exception
from heat.engine import clients
from heat.engine import properties
from heat.engine import constraints
from heat.engine.resources.neutron import neutron
from heat.openstack.common import log as logging
@ -29,23 +30,23 @@ class SecurityGroup(neutron.NeutronResource):
rule_schema = {
'direction': properties.Schema(
properties.STRING,
properties.Schema.STRING,
_('The direction in which the security group rule is applied. '
'For a compute instance, an ingress security group rule '
'matches traffic that is incoming (ingress) for that '
'instance. An egress rule is applied to traffic leaving '
'the instance.'),
default='ingress',
constraints=[properties.AllowedValues(('ingress', 'egress'))]
constraints=[constraints.AllowedValues(('ingress', 'egress'))]
),
'ethertype': properties.Schema(
properties.STRING,
properties.Schema.STRING,
_('Ethertype of the traffic.'),
default='IPv4',
constraints=[properties.AllowedValues(('IPv4', 'IPv6'))]
constraints=[constraints.AllowedValues(('IPv4', 'IPv6'))]
),
'port_range_min': properties.Schema(
properties.INTEGER,
properties.Schema.INTEGER,
_('The minimum port number in the range that is matched by the '
'security group rule. If the protocol is TCP or UDP, this '
'value must be less than or equal to the value of the '
@ -53,32 +54,32 @@ class SecurityGroup(neutron.NeutronResource):
'value must be an ICMP type.')
),
'port_range_max': properties.Schema(
properties.INTEGER,
properties.Schema.INTEGER,
_('The maximum port number in the range that is matched by the '
'security group rule. The port_range_min attribute constrains '
'the port_range_max attribute. If the protocol is ICMP, this '
'value must be an ICMP type.')
),
'protocol': properties.Schema(
properties.STRING,
properties.Schema.STRING,
_('The protocol that is matched by the security group rule. '
'Valid values include tcp, udp, and icmp.')
),
'remote_mode': properties.Schema(
properties.STRING,
properties.Schema.STRING,
_('Whether to specify a remote group or a remote IP prefix.'),
default='remote_ip_prefix',
constraints=[properties.AllowedValues((
constraints=[constraints.AllowedValues((
'remote_ip_prefix', 'remote_group_id'))]
),
'remote_group_id': properties.Schema(
properties.STRING,
properties.Schema.STRING,
_('The remote group ID to be associated with this security group '
'rule. If no value is specified then this rule will use this '
'security group for the remote_group_id.')
),
'remote_ip_prefix': properties.Schema(
properties.STRING,
properties.Schema.STRING,
_('The remote IP prefix (CIDR) to be associated with this '
'security group rule.')
),
@ -86,25 +87,23 @@ class SecurityGroup(neutron.NeutronResource):
properties_schema = {
'name': properties.Schema(
properties.STRING,
properties.Schema.STRING,
_('A string specifying a symbolic name for '
'the security group, which is not required to be '
'unique.'),
update_allowed=True
),
'description': properties.Schema(
properties.STRING,
properties.Schema.STRING,
_('Description of the security group.'),
update_allowed=True
),
'rules': properties.Schema(
properties.LIST,
properties.Schema.LIST,
_('List of security group rules.'),
default=[],
schema=properties.Schema(
properties.MAP,
schema=rule_schema
),
schema=properties.Schema(properties.Schema.MAP,
schema=rule_schema),
update_allowed=True
)
}

View File

@ -16,6 +16,7 @@
from heat.db import api as db_api
from heat.engine import resource
from heat.engine import properties
from heat.engine import constraints
import random
import string
@ -29,19 +30,19 @@ class RandomString(resource.Resource):
'''
properties_schema = {
'length': properties.Schema(
properties.INTEGER,
properties.Schema.INTEGER,
_('Length of the string to generate.'),
default=32,
constraints=[properties.Range(1, 512)]),
constraints=[constraints.Range(1, 512)]),
'sequence': properties.Schema(
properties.STRING,
properties.Schema.STRING,
_('Sequence of characters to build the random string from.'),
default='lettersdigits',
constraints=[properties.AllowedValues((
constraints=[constraints.AllowedValues((
'lettersdigits', 'letters', 'lowercase', 'uppercase', 'digits',
'hexdigits', 'octdigits'))]),
'salt': properties.Schema(
properties.STRING,
properties.Schema.STRING,
_('Value which can be set or changed on stack update to trigger '
'the resource for replacement with a new random string . '
'The salt value itself is ignored by the random generator.'))

View File

@ -17,6 +17,7 @@ import copy
from heat.engine import parser
from heat.engine import properties
from heat.engine import constraints
from heat.engine import stack_resource
from heat.common import exception
@ -43,29 +44,29 @@ class ResourceGroup(stack_resource.StackResource):
min_resource_schema = {
"type": properties.Schema(
properties.STRING,
properties.Schema.STRING,
_("The type of the resources in the group"),
required=True
),
"properties": properties.Schema(
properties.MAP,
properties.Schema.MAP,
_("Property values for the resources in the group")
)
}
properties_schema = {
"count": properties.Schema(
properties.INTEGER,
properties.Schema.INTEGER,
_("The number of instances to create."),
default=1,
required=True,
update_allowed=True,
constraints=[
properties.Range(1)
constraints.Range(1)
]
),
"resource_def": properties.Schema(
properties.MAP,
properties.Schema.MAP,
_("Resource definition for the resources in the group. The value "
"of this property is the definition of a resource just as if it"
" had been declared in the template itself."),

View File

@ -16,6 +16,7 @@
from requests import exceptions
from heat.common import exception
from heat.common import template_format
from heat.common import urlfetch
from heat.engine import attributes
@ -81,7 +82,7 @@ class TemplateResource(stack_resource.StackResource):
val = self.properties[pname]
if val is not None:
# take a list and create a CommaDelimitedList
if pval.type() == properties.LIST:
if pval.type() == properties.Schema.LIST:
if len(val) == 0:
params[pname] = ''
elif isinstance(val[0], dict):

View File

@ -0,0 +1,220 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from heat.engine import constraints
class SchemaTest(testtools.TestCase):
def test_range_schema(self):
d = {'range': {'min': 5, 'max': 10}, 'description': 'a range'}
r = constraints.Range(5, 10, description='a range')
self.assertEqual(d, dict(r))
def test_range_min_schema(self):
d = {'range': {'min': 5}, 'description': 'a range'}
r = constraints.Range(min=5, description='a range')
self.assertEqual(d, dict(r))
def test_range_max_schema(self):
d = {'range': {'max': 10}, 'description': 'a range'}
r = constraints.Range(max=10, description='a range')
self.assertEqual(d, dict(r))
def test_length_schema(self):
d = {'length': {'min': 5, 'max': 10}, 'description': 'a length range'}
r = constraints.Length(5, 10, description='a length range')
self.assertEqual(d, dict(r))
def test_length_min_schema(self):
d = {'length': {'min': 5}, 'description': 'a length range'}
r = constraints.Length(min=5, description='a length range')
self.assertEqual(d, dict(r))
def test_length_max_schema(self):
d = {'length': {'max': 10}, 'description': 'a length range'}
r = constraints.Length(max=10, description='a length range')
self.assertEqual(d, dict(r))
def test_allowed_values_schema(self):
d = {'allowed_values': ['foo', 'bar'], 'description': 'allowed values'}
r = constraints.AllowedValues(['foo', 'bar'],
description='allowed values')
self.assertEqual(d, dict(r))
def test_allowed_pattern_schema(self):
d = {'allowed_pattern': '[A-Za-z0-9]', 'description': 'alphanumeric'}
r = constraints.AllowedPattern('[A-Za-z0-9]',
description='alphanumeric')
self.assertEqual(d, dict(r))
def test_range_validate(self):
r = constraints.Range(min=5, max=5, description='a range')
r.validate(5)
def test_range_min_fail(self):
r = constraints.Range(min=5, description='a range')
self.assertRaises(ValueError, r.validate, 4)
def test_range_max_fail(self):
r = constraints.Range(max=5, description='a range')
self.assertRaises(ValueError, r.validate, 6)
def test_length_validate(self):
l = constraints.Length(min=5, max=5, description='a range')
l.validate('abcde')
def test_length_min_fail(self):
l = constraints.Length(min=5, description='a range')
self.assertRaises(ValueError, l.validate, 'abcd')
def test_length_max_fail(self):
l = constraints.Length(max=5, description='a range')
self.assertRaises(ValueError, l.validate, 'abcdef')
def test_schema_all(self):
d = {
'type': 'string',
'description': 'A string',
'default': 'wibble',
'required': True,
'constraints': [
{'length': {'min': 4, 'max': 8}},
]
}
s = constraints.Schema(constraints.Schema.STRING, 'A string',
default='wibble', required=True,
constraints=[constraints.Length(4, 8)])
self.assertEqual(d, dict(s))
def test_schema_list_schema(self):
d = {
'type': 'list',
'description': 'A list',
'schema': {
'*': {
'type': 'string',
'description': 'A string',
'default': 'wibble',
'required': True,
'constraints': [
{'length': {'min': 4, 'max': 8}},
]
}
},
'required': False,
}
s = constraints.Schema(constraints.Schema.STRING, 'A string',
default='wibble', required=True,
constraints=[constraints.Length(4, 8)])
l = constraints.Schema(constraints.Schema.LIST, 'A list', schema=s)
self.assertEqual(d, dict(l))
def test_schema_map_schema(self):
d = {
'type': 'map',
'description': 'A map',
'schema': {
'Foo': {
'type': 'string',
'description': 'A string',
'default': 'wibble',
'required': True,
'constraints': [
{'length': {'min': 4, 'max': 8}},
]
}
},
'required': False,
}
s = constraints.Schema(constraints.Schema.STRING, 'A string',
default='wibble', required=True,
constraints=[constraints.Length(4, 8)])
m = constraints.Schema(constraints.Schema.MAP, 'A map',
schema={'Foo': s})
self.assertEqual(d, dict(m))
def test_schema_nested_schema(self):
d = {
'type': 'list',
'description': 'A list',
'schema': {
'*': {
'type': 'map',
'description': 'A map',
'schema': {
'Foo': {
'type': 'string',
'description': 'A string',
'default': 'wibble',
'required': True,
'constraints': [
{'length': {'min': 4, 'max': 8}},
]
}
},
'required': False,
}
},
'required': False,
}
s = constraints.Schema(constraints.Schema.STRING, 'A string',
default='wibble', required=True,
constraints=[constraints.Length(4, 8)])
m = constraints.Schema(constraints.Schema.MAP, 'A map',
schema={'Foo': s})
l = constraints.Schema(constraints.Schema.LIST, 'A list', schema=m)
self.assertEqual(d, dict(l))
def test_invalid_type(self):
self.assertRaises(constraints.InvalidSchemaError, constraints.Schema,
'Fish')
def test_schema_invalid_type(self):
self.assertRaises(constraints.InvalidSchemaError,
constraints.Schema,
'String',
schema=constraints.Schema('String'))
def test_range_invalid_type(self):
self.assertRaises(constraints.InvalidSchemaError,
constraints.Schema,
'String',
constraints=[constraints.Range(1, 10)])
def test_length_invalid_type(self):
self.assertRaises(constraints.InvalidSchemaError,
constraints.Schema,
'Integer',
constraints=[constraints.Length(1, 10)])
def test_allowed_pattern_invalid_type(self):
self.assertRaises(constraints.InvalidSchemaError,
constraints.Schema,
'Integer',
constraints=[constraints.AllowedPattern('[0-9]*')])
def test_range_vals_invalid_type(self):
self.assertRaises(constraints.InvalidSchemaError,
constraints.Range, '1', 10)
self.assertRaises(constraints.InvalidSchemaError,
constraints.Range, 1, '10')
def test_length_vals_invalid_type(self):
self.assertRaises(constraints.InvalidSchemaError,
constraints.Length, '1', 10)
self.assertRaises(constraints.InvalidSchemaError,
constraints.Length, 1, '10')

View File

@ -13,82 +13,16 @@
# under the License.
import testtools
from heat.engine import constraints
from heat.common import exception
from heat.engine import hot
from heat.engine import parameters
from heat.engine import properties
from heat.engine import resources
from heat.engine import hot
from heat.common import exception
import testtools
class SchemaTest(testtools.TestCase):
def test_range_schema(self):
d = {'range': {'min': 5, 'max': 10}, 'description': 'a range'}
r = properties.Range(5, 10, description='a range')
self.assertEqual(d, dict(r))
def test_range_min_schema(self):
d = {'range': {'min': 5}, 'description': 'a range'}
r = properties.Range(min=5, description='a range')
self.assertEqual(d, dict(r))
def test_range_max_schema(self):
d = {'range': {'max': 10}, 'description': 'a range'}
r = properties.Range(max=10, description='a range')
self.assertEqual(d, dict(r))
def test_length_schema(self):
d = {'length': {'min': 5, 'max': 10}, 'description': 'a length range'}
r = properties.Length(5, 10, description='a length range')
self.assertEqual(d, dict(r))
def test_length_min_schema(self):
d = {'length': {'min': 5}, 'description': 'a length range'}
r = properties.Length(min=5, description='a length range')
self.assertEqual(d, dict(r))
def test_length_max_schema(self):
d = {'length': {'max': 10}, 'description': 'a length range'}
r = properties.Length(max=10, description='a length range')
self.assertEqual(d, dict(r))
def test_allowed_values_schema(self):
d = {'allowed_values': ['foo', 'bar'], 'description': 'allowed values'}
r = properties.AllowedValues(['foo', 'bar'],
description='allowed values')
self.assertEqual(d, dict(r))
def test_allowed_pattern_schema(self):
d = {'allowed_pattern': '[A-Za-z0-9]', 'description': 'alphanumeric'}
r = properties.AllowedPattern('[A-Za-z0-9]',
description='alphanumeric')
self.assertEqual(d, dict(r))
def test_range_validate(self):
r = properties.Range(min=5, max=5, description='a range')
r.validate(5)
def test_range_min_fail(self):
r = properties.Range(min=5, description='a range')
self.assertRaises(ValueError, r.validate, 4)
def test_range_max_fail(self):
r = properties.Range(max=5, description='a range')
self.assertRaises(ValueError, r.validate, 6)
def test_length_validate(self):
l = properties.Length(min=5, max=5, description='a range')
l.validate('abcde')
def test_length_min_fail(self):
l = properties.Length(min=5, description='a range')
self.assertRaises(ValueError, l.validate, 'abcd')
def test_length_max_fail(self):
l = properties.Length(max=5, description='a range')
self.assertRaises(ValueError, l.validate, 'abcdef')
class PropertySchemaTest(testtools.TestCase):
def test_schema_all(self):
d = {
'type': 'string',
@ -100,9 +34,9 @@ class SchemaTest(testtools.TestCase):
{'length': {'min': 4, 'max': 8}},
]
}
s = properties.Schema(properties.STRING, 'A string',
s = properties.Schema(properties.Schema.STRING, 'A string',
default='wibble', required=True,
constraints=[properties.Length(4, 8)])
constraints=[constraints.Length(4, 8)])
self.assertEqual(d, dict(s))
def test_schema_list_schema(self):
@ -124,11 +58,10 @@ class SchemaTest(testtools.TestCase):
'required': False,
'update_allowed': False
}
s = properties.Schema(properties.STRING, 'A string',
s = properties.Schema(properties.Schema.STRING, 'A string',
default='wibble', required=True,
constraints=[properties.Length(4, 8)])
l = properties.Schema(properties.LIST, 'A list',
schema=s)
constraints=[constraints.Length(4, 8)])
l = properties.Schema(properties.Schema.LIST, 'A list', schema=s)
self.assertEqual(d, dict(l))
def test_schema_map_schema(self):
@ -150,10 +83,10 @@ class SchemaTest(testtools.TestCase):
'required': False,
'update_allowed': False,
}
s = properties.Schema(properties.STRING, 'A string',
s = properties.Schema(properties.Schema.STRING, 'A string',
default='wibble', required=True,
constraints=[properties.Length(4, 8)])
m = properties.Schema(properties.MAP, 'A map',
constraints=[constraints.Length(4, 8)])
m = properties.Schema(properties.Schema.MAP, 'A map',
schema={'Foo': s})
self.assertEqual(d, dict(m))
@ -184,13 +117,12 @@ class SchemaTest(testtools.TestCase):
'required': False,
'update_allowed': False,
}
s = properties.Schema(properties.STRING, 'A string',
s = properties.Schema(properties.Schema.STRING, 'A string',
default='wibble', required=True,
constraints=[properties.Length(4, 8)])
m = properties.Schema(properties.MAP, 'A map',
constraints=[constraints.Length(4, 8)])
m = properties.Schema(properties.Schema.MAP, 'A map',
schema={'Foo': s})
l = properties.Schema(properties.LIST, 'A list',
schema=m)
l = properties.Schema(properties.Schema.LIST, 'A list', schema=m)
self.assertEqual(d, dict(l))
def test_all_resource_schemata(self):
@ -200,61 +132,19 @@ class SchemaTest(testtools.TestCase):
{}).itervalues():
properties.Schema.from_legacy(schema)
def test_invalid_type(self):
self.assertRaises(properties.InvalidPropertySchemaError,
properties.Schema,
'Fish')
def test_schema_invalid_type(self):
self.assertRaises(properties.InvalidPropertySchemaError,
properties.Schema,
'String',
schema=properties.Schema('String'))
def test_range_invalid_type(self):
self.assertRaises(properties.InvalidPropertySchemaError,
properties.Schema,
'String',
constraints=[properties.Range(1, 10)])
def test_length_invalid_type(self):
self.assertRaises(properties.InvalidPropertySchemaError,
properties.Schema,
'Integer',
constraints=[properties.Length(1, 10)])
def test_allowed_pattern_invalid_type(self):
self.assertRaises(properties.InvalidPropertySchemaError,
properties.Schema,
'Integer',
constraints=[properties.AllowedPattern('[0-9]*')])
def test_range_vals_invalid_type(self):
self.assertRaises(properties.InvalidPropertySchemaError,
properties.Range, '1', 10)
self.assertRaises(properties.InvalidPropertySchemaError,
properties.Range, 1, '10')
def test_length_vals_invalid_type(self):
self.assertRaises(properties.InvalidPropertySchemaError,
properties.Length, '1', 10)
self.assertRaises(properties.InvalidPropertySchemaError,
properties.Length, 1, '10')
def test_from_legacy_idempotency(self):
s = properties.Schema(properties.STRING)
s = properties.Schema(properties.Schema.STRING)
self.assertTrue(properties.Schema.from_legacy(s) is s)
def test_from_legacy_minimal_string(self):
s = properties.Schema.from_legacy({
'Type': 'String',
})
self.assertEqual(properties.STRING, s.type)
self.assertEqual(properties.Schema.STRING, s.type)
self.assertEqual(None, s.description)
self.assertEqual(None, s.default)
self.assertFalse(s.required)
self.assertEqual(0, len(s.constraints))
self.assertTrue(s.implemented)
def test_from_legacy_string(self):
s = properties.Schema.from_legacy({
@ -268,12 +158,11 @@ class SchemaTest(testtools.TestCase):
'AllowedValues': ['blarg', 'wibble'],
'AllowedPattern': '[a-z]*',
})
self.assertEqual(properties.STRING, s.type)
self.assertEqual(properties.Schema.STRING, s.type)
self.assertEqual('a string', s.description)
self.assertEqual('wibble', s.default)
self.assertTrue(s.required)
self.assertEqual(3, len(s.constraints))
self.assertFalse(s.implemented)
def test_from_legacy_min_length(self):
s = properties.Schema.from_legacy({
@ -282,7 +171,7 @@ class SchemaTest(testtools.TestCase):
})
self.assertEqual(1, len(s.constraints))
c = s.constraints[0]
self.assertEqual(properties.Length, type(c))
self.assertEqual(constraints.Length, type(c))
self.assertEqual(4, c.min)
self.assertEqual(None, c.max)
@ -293,7 +182,7 @@ class SchemaTest(testtools.TestCase):
})
self.assertEqual(1, len(s.constraints))
c = s.constraints[0]
self.assertEqual(properties.Length, type(c))
self.assertEqual(constraints.Length, type(c))
self.assertEqual(None, c.min)
self.assertEqual(8, c.max)
@ -305,7 +194,7 @@ class SchemaTest(testtools.TestCase):
})
self.assertEqual(1, len(s.constraints))
c = s.constraints[0]
self.assertEqual(properties.Length, type(c))
self.assertEqual(constraints.Length, type(c))
self.assertEqual(4, c.min)
self.assertEqual(8, c.max)
@ -317,7 +206,7 @@ class SchemaTest(testtools.TestCase):
})
self.assertEqual(1, len(s.constraints))
c = s.constraints[0]
self.assertEqual(properties.Length, type(c))
self.assertEqual(constraints.Length, type(c))
self.assertEqual(4, c.min)
self.assertEqual(8, c.max)
@ -328,7 +217,7 @@ class SchemaTest(testtools.TestCase):
})
self.assertEqual(1, len(s.constraints))
c = s.constraints[0]
self.assertEqual(properties.Range, type(c))
self.assertEqual(constraints.Range, type(c))
self.assertEqual(4, c.min)
self.assertEqual(None, c.max)
@ -339,7 +228,7 @@ class SchemaTest(testtools.TestCase):
})
self.assertEqual(1, len(s.constraints))
c = s.constraints[0]
self.assertEqual(properties.Range, type(c))
self.assertEqual(constraints.Range, type(c))
self.assertEqual(None, c.min)
self.assertEqual(8, c.max)
@ -351,7 +240,7 @@ class SchemaTest(testtools.TestCase):
})
self.assertEqual(1, len(s.constraints))
c = s.constraints[0]
self.assertEqual(properties.Range, type(c))
self.assertEqual(constraints.Range, type(c))
self.assertEqual(4, c.min)
self.assertEqual(8, c.max)
@ -363,7 +252,7 @@ class SchemaTest(testtools.TestCase):
})
self.assertEqual(1, len(s.constraints))
c = s.constraints[0]
self.assertEqual(properties.Range, type(c))
self.assertEqual(constraints.Range, type(c))
self.assertEqual(4, c.min)
self.assertEqual(8, c.max)
@ -374,7 +263,7 @@ class SchemaTest(testtools.TestCase):
})
self.assertEqual(1, len(s.constraints))
c = s.constraints[0]
self.assertEqual(properties.AllowedValues, type(c))
self.assertEqual(constraints.AllowedValues, type(c))
self.assertEqual(('blarg', 'wibble'), c.allowed)
def test_from_legacy_allowed_pattern(self):
@ -384,7 +273,7 @@ class SchemaTest(testtools.TestCase):
})
self.assertEqual(1, len(s.constraints))
c = s.constraints[0]
self.assertEqual(properties.AllowedPattern, type(c))
self.assertEqual(constraints.AllowedPattern, type(c))
self.assertEqual('[a-z]*', c.pattern)
def test_from_legacy_list(self):
@ -397,11 +286,11 @@ class SchemaTest(testtools.TestCase):
'MaxLength': 8,
}
})
self.assertEqual(properties.LIST, l.type)
self.assertEqual(properties.Schema.LIST, l.type)
self.assertEqual(['wibble'], l.default)
ss = l.schema[0]
self.assertEqual(properties.STRING, ss.type)
self.assertEqual(properties.Schema.STRING, ss.type)
self.assertEqual('wibble', ss.default)
def test_from_legacy_map(self):
@ -414,14 +303,14 @@ class SchemaTest(testtools.TestCase):
}
}
})
self.assertEqual(properties.MAP, l.type)
self.assertEqual(properties.Schema.MAP, l.type)
ss = l.schema['foo']
self.assertEqual(properties.STRING, ss.type)
self.assertEqual(properties.Schema.STRING, ss.type)
self.assertEqual('wibble', ss.default)
def test_from_legacy_invalid_key(self):
self.assertRaises(properties.InvalidPropertySchemaError,
self.assertRaises(constraints.InvalidSchemaError,
properties.Schema.from_legacy,
{'Type': 'String', 'Foo': 'Bar'})
@ -441,7 +330,7 @@ class SchemaTest(testtools.TestCase):
schema = properties.Schema.from_parameter(param)
self.assertEqual(properties.STRING, schema.type)
self.assertEqual(properties.Schema.STRING, schema.type)
self.assertEqual(description, schema.description)
self.assertEqual(None, schema.default)
self.assertFalse(schema.required)
@ -466,7 +355,7 @@ class SchemaTest(testtools.TestCase):
schema = properties.Schema.from_parameter(param)
self.assertEqual(properties.STRING, schema.type)
self.assertEqual(properties.Schema.STRING, schema.type)
self.assertEqual(description, schema.description)
self.assertEqual(None, schema.default)
self.assertFalse(schema.required)
@ -492,7 +381,7 @@ class SchemaTest(testtools.TestCase):
schema = properties.Schema.from_parameter(param)
self.assertEqual(properties.STRING, schema.type)
self.assertEqual(properties.Schema.STRING, schema.type)
self.assertEqual(description, schema.description)
self.assertEqual(None, schema.default)
self.assertFalse(schema.required)
@ -579,7 +468,7 @@ class SchemaTest(testtools.TestCase):
schema = properties.Schema.from_parameter(param)
self.assertEqual(properties.NUMBER, schema.type)
self.assertEqual(properties.Schema.NUMBER, schema.type)
self.assertEqual(None, schema.default)
self.assertFalse(schema.required)
self.assertEqual(1, len(schema.constraints))
@ -599,7 +488,7 @@ class SchemaTest(testtools.TestCase):
schema = properties.Schema.from_parameter(param)
self.assertEqual(properties.NUMBER, schema.type)
self.assertEqual(properties.Schema.NUMBER, schema.type)
self.assertEqual(None, schema.default)
self.assertFalse(schema.required)
self.assertEqual(1, len(schema.constraints))
@ -620,7 +509,7 @@ class SchemaTest(testtools.TestCase):
schema = properties.Schema.from_parameter(param)
self.assertEqual(properties.NUMBER, schema.type)
self.assertEqual(properties.Schema.NUMBER, schema.type)
self.assertEqual(None, schema.default)
self.assertFalse(schema.required)
self.assertEqual(1, len(schema.constraints))
@ -642,7 +531,7 @@ class SchemaTest(testtools.TestCase):
schema = properties.Schema.from_parameter(param)
self.assertEqual(properties.NUMBER, schema.type)
self.assertEqual(properties.Schema.NUMBER, schema.type)
self.assertEqual(None, schema.default)
self.assertFalse(schema.required)
self.assertEqual(1, len(schema.constraints))
@ -660,7 +549,7 @@ class SchemaTest(testtools.TestCase):
schema = properties.Schema.from_parameter(param)
self.assertEqual(properties.LIST, schema.type)
self.assertEqual(properties.Schema.LIST, schema.type)
self.assertEqual(None, schema.default)
self.assertFalse(schema.required)
@ -672,7 +561,7 @@ class SchemaTest(testtools.TestCase):
schema = properties.Schema.from_parameter(param)
self.assertEqual(properties.MAP, schema.type)
self.assertEqual(properties.Schema.MAP, schema.type)
self.assertEqual(None, schema.default)
self.assertFalse(schema.required)
@ -715,11 +604,11 @@ class PropertyTest(testtools.TestCase):
self.assertEqual(p.type(), 'String')
def test_bad_type(self):
self.assertRaises(properties.InvalidPropertySchemaError,
self.assertRaises(constraints.InvalidSchemaError,
properties.Property, {'Type': 'Fish'})
def test_bad_key(self):
self.assertRaises(properties.InvalidPropertySchemaError,
self.assertRaises(constraints.InvalidSchemaError,
properties.Property,
{'Type': 'String', 'Foo': 'Bar'})

View File

@ -226,8 +226,9 @@ class ProviderTemplateTest(HeatTestCase):
files = {'test_resource.template': json.dumps(provider)}
class DummyResource(object):
properties_schema = {"Foo": properties.Schema(properties.STRING,
required=True)}
properties_schema = {"Foo":
properties.Schema(properties.Schema.STRING,
required=True)}
attributes_schema = {}
json_snippet = {
@ -258,8 +259,9 @@ class ProviderTemplateTest(HeatTestCase):
files = {'test_resource.template': json.dumps(provider)}
class DummyResource(object):
properties_schema = {"Foo": properties.Schema(properties.STRING,
required=True)}
properties_schema = {"Foo":
properties.Schema(properties.Schema.STRING,
required=True)}
attributes_schema = {}
json_snippet = {
@ -320,7 +322,8 @@ class ProviderTemplateTest(HeatTestCase):
files = {'test_resource.template': json.dumps(provider)}
class DummyResource(object):
properties_schema = {"Foo": properties.Schema(properties.MAP)}
properties_schema = {"Foo":
properties.Schema(properties.Schema.MAP)}
attributes_schema = {}
json_snippet = {