Add translate_properties for Resource

Add specify translate_properties for Resource to
overload for every Resource with old and new properties.

bp deprecating-improvements

Change-Id: Ifd81eac3cd4496c1efdf8fdaa4c28cb02478ba46
This commit is contained in:
Peter Razumovsky
2015-06-03 17:19:26 +03:00
parent d42e4bdc42
commit 126d2a43a3
3 changed files with 583 additions and 0 deletions

View File

@ -602,3 +602,165 @@ class Properties(collections.Mapping):
if s.implemented]
param_items, prop_items = zip(*param_prop_defs)
return dict(param_items), dict(prop_items)
class TranslationRule(object):
"""Translating mechanism one properties to another.
Mechanism uses list of rules, each defines by this class, and can be
executed. Working principe: during resource creating after properties
defining resource take list of rules, specified by method
translation_rules, which should be overloaded for each resource, if it's
needed, and execute each rule using translate_properties method. Next
operations are allowed:
- ADD. This rule allows to add some value to list-type properties. Only
list-type values can be added to such properties. Using for other
cases is prohibited and will be returned with error.
- REPLACE. This rule allows to replace some property value to another. Used
for all types of properties. Note, that if property has list type, then
value will be replaced for all elements of list, where it needed. If
element in such property must be replaced by value of another element of
this property, value_name must be defined.
- DELETE. This rule allows to delete some property. If property has list
type, then deleting affects value in all list elements.
"""
RULE_KEYS = (ADD, REPLACE, DELETE) = ('Add', 'Replace', 'Delete')
def __init__(self, properties, rule, source_path, value=None,
value_name=None, value_path=None):
"""Add new rule for translating mechanism.
:param properties: properties of resource
:param rule: rule from RULE_KEYS
:param source_path: list with path to property, which value will be
affected in rule.
:param value: value which will be involved in rule
:param value_name: value_name which used for replacing properties
inside list-type properties.
:param value_path: path to value, which should be used for translation.
"""
self.properties = properties
self.rule = rule
self.source_path = source_path
self.value = value or None
self.value_name = value_name
self.value_path = value_path
self.validate()
def validate(self):
if self.rule not in self.RULE_KEYS:
raise ValueError(_('There is no rule %(rule)s. List of allowed '
'rules is: %(rules)s.') % {
'rule': self.rule,
'rules': ', '.join(self.RULE_KEYS)})
elif not isinstance(self.properties, Properties):
raise ValueError(_('Properties must be Properties type. '
'Found %s.') % type(self.properties))
elif not isinstance(self.source_path, list):
raise ValueError(_('source_path should be a list with path '
'instead of %s.') % type(self.source_path))
elif len(self.source_path) == 0:
raise ValueError(_('source_path must be non-empty list with '
'path.'))
elif self.value_name and self.rule != self.REPLACE:
raise ValueError(_('Use value_name only for replacing list '
'elements.'))
elif self.rule == self.ADD and not isinstance(self.value, list):
raise ValueError(_('value must be list type when rule is ADD.'))
def execute_rule(self):
(source_key, source_data) = self.get_data_from_source_path(
self.source_path)
if self.value_path:
(value_key, value_data) = self.get_data_from_source_path(
self.value_path)
value = (value_data[value_key]
if value_data and value_data.get(value_key)
else self.value)
else:
(value_key, value_data) = None, None
value = self.value
if (source_data is None or (self.rule != self.DELETE and
(value is None and
self.value_name is None and
(value_data is None or
value_data.get(value_key) is None)))):
return
if self.rule == TranslationRule.ADD:
if isinstance(source_data, list):
source_data.extend(value)
else:
raise ValueError(_('ADD rule must be used only for '
'lists.'))
elif self.rule == TranslationRule.REPLACE:
if isinstance(source_data, list):
for item in source_data:
if item.get(self.value_name) and item.get(source_key):
raise ValueError(_('Cannot use %(key)s and '
'%(name)s at the same time.')
% dict(key=source_key,
name=self.value_name))
elif item.get(self.value_name) is not None:
item[source_key] = item[self.value_name]
del item[self.value_name]
elif value is not None:
item[source_key] = value
else:
if (source_data and source_data.get(source_key) and
value_data and value_data.get(value_key)):
raise ValueError(_('Cannot use %(key)s and '
'%(name)s at the same time.')
% dict(key=source_key,
name=value_key))
source_data[source_key] = value
elif self.rule == TranslationRule.DELETE:
if isinstance(source_data, list):
for item in source_data:
if item.get(source_key) is not None:
del item[source_key]
else:
del source_data[source_key]
def get_data_from_source_path(self, path):
def get_props(props, key):
props = props.get(key)
if props.schema.schema is not None:
keys = list(props.schema.schema)
schemata = dict((k, props.schema.schema[k])
for k in keys)
props = dict((k, Property(s, k))
for k, s in schemata.items())
return props
source_key = path[0]
data = self.properties.data
props = self.properties.props
for key in path:
if isinstance(data, list):
source_key = key
elif data.get(key) is not None and isinstance(data.get(key),
(list, dict)):
data = data.get(key)
props = get_props(props, key)
elif data.get(key) is None:
if (self.rule == TranslationRule.DELETE or
(self.rule == TranslationRule.REPLACE and
self.value_name)):
return None, None
elif props.get(key).type() == Schema.LIST:
data[key] = []
elif props.get(key).type() == Schema.MAP:
data[key] = {}
else:
source_key = key
continue
data = data.get(key)
props = get_props(props, key)
else:
source_key = key
return source_key, data

View File

@ -294,6 +294,7 @@ class Resource(object):
def reparse(self):
self.properties = self.t.properties(self.properties_schema,
self.context)
self.translate_properties()
def __eq__(self, other):
'''Allow == comparison of two resources.'''
@ -794,6 +795,16 @@ class Resource(object):
# save the resource metadata
self.metadata_set(metadata)
def translation_rules(self):
"""Return specified rules for resource."""
return None
def translate_properties(self):
"""Translates old properties to new ones."""
rules = self.translation_rules() or []
for rule in rules:
rule.execute_rule()
def _get_resource_info(self, resource_data):
if not resource_data:
return None, None, None

View File

@ -11,6 +11,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_serialization import jsonutils
import six
@ -1800,3 +1801,412 @@ class PropertiesValidationTest(common.HeatTestCase):
immutable=True)}
props = properties.Properties(schema, {})
self.assertRaises(exception.InvalidSchemaError, props.validate)
class TestTranslationRule(common.HeatTestCase):
def test_translation_rule(self):
for r in properties.TranslationRule.RULE_KEYS:
props = properties.Properties({}, {})
rule = properties.TranslationRule(
props,
r,
['any'],
['value'] if r == 'Add' else 'value',
'value_name' if r == 'Replace' else None)
self.assertEqual(rule.properties, props)
self.assertEqual(rule.rule, r)
if r == 'Add':
self.assertEqual(['value'], rule.value)
else:
self.assertEqual('value', rule.value)
if r == 'Replace':
self.assertEqual('value_name', rule.value_name)
else:
self.assertIsNone(rule.value_name)
def test_invalid_translation_rule(self):
props = properties.Properties({}, {})
exc = self.assertRaises(ValueError,
properties.TranslationRule, 'proppy', mock.ANY,
mock.ANY)
self.assertEqual('Properties must be Properties type. '
'Found %s.' % str, six.text_type(exc))
exc = self.assertRaises(ValueError,
properties.TranslationRule,
props,
'EatTheCookie',
mock.ANY,
mock.ANY)
self.assertEqual('There is no rule EatTheCookie. List of allowed '
'rules is: Add, Replace, Delete.',
six.text_type(exc))
exc = self.assertRaises(ValueError,
properties.TranslationRule,
props,
properties.TranslationRule.ADD,
'networks.network',
'value')
self.assertEqual('source_path should be a list with path instead of '
'%s.' % str, six.text_type(exc))
exc = self.assertRaises(ValueError,
properties.TranslationRule,
props,
properties.TranslationRule.ADD,
[],
mock.ANY)
self.assertEqual('source_path must be non-empty list with path.',
six.text_type(exc))
exc = self.assertRaises(ValueError,
properties.TranslationRule,
props,
properties.TranslationRule.ADD,
['any'],
mock.ANY,
'value_name')
self.assertEqual('Use value_name only for replacing list elements.',
six.text_type(exc))
exc = self.assertRaises(ValueError,
properties.TranslationRule,
props,
properties.TranslationRule.ADD,
['any'],
'value')
self.assertEqual('value must be list type when rule is ADD.',
six.text_type(exc))
def test_add_rule_exist(self):
schema = {
'far': properties.Schema(
properties.Schema.LIST,
schema=properties.Schema(
properties.Schema.MAP,
schema={
'red': properties.Schema(
properties.Schema.STRING
)
}
)
),
'bar': properties.Schema(
properties.Schema.STRING
)}
data = {
'far': [
{'red': 'blue'}
],
'bar': 'dak'
}
props = properties.Properties(schema, data)
rule = properties.TranslationRule(props,
properties.TranslationRule.ADD,
['far'],
[{'red': props.get('bar')}])
rule.execute_rule()
self.assertIn({'red': 'dak'}, props.get('far'))
def test_add_rule_dont_exist(self):
schema = {
'far': properties.Schema(
properties.Schema.LIST,
schema=properties.Schema(
properties.Schema.MAP,
schema={
'red': properties.Schema(
properties.Schema.STRING
)
}
)
),
'bar': properties.Schema(
properties.Schema.STRING
)}
data = {
'bar': 'dak'
}
props = properties.Properties(schema, data)
rule = properties.TranslationRule(props,
properties.TranslationRule.ADD,
['far'],
[{'red': props.get('bar')}])
rule.execute_rule()
self.assertEqual([{'red': 'dak'}], props.get('far'))
def test_add_rule_invalid(self):
schema = {
'far': properties.Schema(
properties.Schema.MAP,
schema={
'red': properties.Schema(
properties.Schema.STRING
)
}
),
'bar': properties.Schema(
properties.Schema.STRING
)}
data = {
'far': 'tran',
'bar': 'dak'
}
props = properties.Properties(schema, data)
rule = properties.TranslationRule(props,
properties.TranslationRule.ADD,
['far'],
[props.get('bar')])
exc = self.assertRaises(ValueError, rule.execute_rule)
self.assertEqual('ADD rule must be used only for lists.',
six.text_type(exc))
def test_replace_rule_map_exist(self):
schema = {
'far': properties.Schema(
properties.Schema.MAP,
schema={
'red': properties.Schema(
properties.Schema.STRING
)
}
),
'bar': properties.Schema(
properties.Schema.STRING
)}
data = {
'far': {'red': 'tran'},
'bar': 'dak'
}
props = properties.Properties(schema, data)
rule = properties.TranslationRule(props,
properties.TranslationRule.REPLACE,
['far', 'red'],
props.get('bar'))
rule.execute_rule()
self.assertEqual({'red': 'dak'}, props.get('far'))
def test_replace_rule_map_dont_exist(self):
schema = {
'far': properties.Schema(
properties.Schema.MAP,
schema={
'red': properties.Schema(
properties.Schema.STRING
)
}
),
'bar': properties.Schema(
properties.Schema.STRING
)}
data = {
'bar': 'dak'
}
props = properties.Properties(schema, data)
rule = properties.TranslationRule(props,
properties.TranslationRule.REPLACE,
['far', 'red'],
props.get('bar'))
rule.execute_rule()
self.assertEqual({'red': 'dak'}, props.get('far'))
def test_replace_rule_list_different(self):
schema = {
'far': properties.Schema(
properties.Schema.LIST,
schema=properties.Schema(
properties.Schema.MAP,
schema={
'red': properties.Schema(
properties.Schema.STRING
)
}
)
),
'bar': properties.Schema(
properties.Schema.STRING
)}
data = {
'far': [{'red': 'blue'},
{'red': 'roses'}],
'bar': 'dak'
}
props = properties.Properties(schema, data)
rule = properties.TranslationRule(props,
properties.TranslationRule.REPLACE,
['far', 'red'],
props.get('bar'))
rule.execute_rule()
self.assertEqual([{'red': 'dak'}, {'red': 'dak'}], props.get('far'))
def test_replace_rule_list_same(self):
schema = {
'far': properties.Schema(
properties.Schema.LIST,
schema=properties.Schema(
properties.Schema.MAP,
schema={
'red': properties.Schema(
properties.Schema.STRING
),
'blue': properties.Schema(
properties.Schema.STRING
)
}
)
)}
data = {
'far': [{'blue': 'white'},
{'red': 'roses'}]
}
props = properties.Properties(schema, data)
rule = properties.TranslationRule(props,
properties.TranslationRule.REPLACE,
['far', 'red'],
None,
'blue')
rule.execute_rule()
self.assertEqual([{'red': 'white', 'blue': None},
{'blue': None, 'red': 'roses'}],
props.get('far'))
def test_replace_rule_str(self):
schema = {
'far': properties.Schema(properties.Schema.STRING),
'bar': properties.Schema(properties.Schema.STRING)
}
data = {'far': 'one', 'bar': 'two'}
props = properties.Properties(schema, data)
rule = properties.TranslationRule(props,
properties.TranslationRule.REPLACE,
['bar'],
props.get('far'))
rule.execute_rule()
self.assertEqual('one', props.get('bar'))
def test_replace_rule_str_value_path_error(self):
schema = {
'far': properties.Schema(properties.Schema.STRING),
'bar': properties.Schema(properties.Schema.STRING)
}
data = {'far': 'one', 'bar': 'two'}
props = properties.Properties(schema, data)
rule = properties.TranslationRule(props,
properties.TranslationRule.REPLACE,
['bar'],
value_path=['far'])
ex = self.assertRaises(ValueError, rule.execute_rule)
self.assertEqual('Cannot use bar and far at the same time.',
six.text_type(ex))
def test_replace_rule_str_value_path(self):
schema = {
'far': properties.Schema(properties.Schema.STRING),
'bar': properties.Schema(properties.Schema.STRING)
}
data = {'far': 'one'}
props = properties.Properties(schema, data)
rule = properties.TranslationRule(props,
properties.TranslationRule.REPLACE,
['bar'],
value_path=['far'])
rule.execute_rule()
self.assertEqual('one', props.get('bar'))
def test_replace_rule_str_invalid(self):
schema = {
'far': properties.Schema(properties.Schema.STRING),
'bar': properties.Schema(properties.Schema.INTEGER)
}
data = {'far': 'one', 'bar': 2}
props = properties.Properties(schema, data)
rule = properties.TranslationRule(props,
properties.TranslationRule.REPLACE,
['bar'],
props.get('far'))
rule.execute_rule()
exc = self.assertRaises(exception.StackValidationFailed,
props.validate)
self.assertEqual("Property error: bar: Value 'one' is not an integer",
six.text_type(exc))
def test_delete_rule_list(self):
schema = {
'far': properties.Schema(
properties.Schema.LIST,
schema=properties.Schema(
properties.Schema.MAP,
schema={
'red': properties.Schema(
properties.Schema.STRING
)
}
)
)}
data = {
'far': [{'red': 'blue'},
{'red': 'roses'}],
}
props = properties.Properties(schema, data)
rule = properties.TranslationRule(props,
properties.TranslationRule.DELETE,
['far', 'red'])
rule.execute_rule()
self.assertEqual([{'red': None}, {'red': None}], props.get('far'))
def test_delete_rule_other(self):
schema = {
'far': properties.Schema(properties.Schema.STRING)
}
data = {'far': 'one'}
props = properties.Properties(schema, data)
rule = properties.TranslationRule(props,
properties.TranslationRule.DELETE,
['far'])
rule.execute_rule()
self.assertIsNone(props.get('far'))