Merge "Prevent all primary keys in Neutron OVOs from being updated"

This commit is contained in:
Jenkins 2016-03-16 13:52:47 +00:00 committed by Gerrit Code Review
commit 8ac3f269df
3 changed files with 61 additions and 18 deletions

View File

@ -12,6 +12,7 @@
import abc
import copy
import itertools
from neutron_lib import exceptions
from oslo_db import exception as obj_exc
@ -107,6 +108,18 @@ class NeutronObject(obj_base.VersionedObject,
raise NotImplementedError()
class DeclarativeObject(abc.ABCMeta):
def __init__(cls, name, bases, dct):
super(DeclarativeObject, cls).__init__(name, bases, dct)
for base in itertools.chain([cls], bases):
if hasattr(base, 'primary_keys'):
cls.fields_no_update += base.primary_keys
# avoid duplicate entries
cls.fields_no_update = list(set(cls.fields_no_update))
@six.add_metaclass(DeclarativeObject)
class NeutronDbObject(NeutronObject):
# should be overridden for all persistent objects
@ -214,10 +227,6 @@ class NeutronDbObject(NeutronObject):
def _validate_changed_fields(self, fields):
fields = fields.copy()
# We won't allow id update anyway, so let's pop it out not to trigger
# update on id field touched by the consumer
fields.pop('id', None)
forbidden_updates = set(self.fields_no_update) & set(fields.keys())
if forbidden_updates:
raise NeutronObjectUpdateForbidden(fields=forbidden_updates)

View File

@ -48,11 +48,14 @@ class QoSPlugin(qos.QoSPluginBase):
@db_base_plugin_common.convert_result_to_dict
def update_policy(self, context, policy_id, policy):
policy = policy_object.QosPolicy(context, **policy['policy'])
policy.id = policy_id
policy.update()
self.notification_driver_manager.update_policy(context, policy)
return policy
obj = policy_object.QosPolicy(context, id=policy_id)
obj.obj_reset_changes()
for k, v in policy['policy'].items():
if k != 'id':
setattr(obj, k, v)
obj.update()
self.notification_driver_manager.update_policy(context, obj)
return obj
def delete_policy(self, context, policy_id):
policy = policy_object.QosPolicy(context)
@ -107,8 +110,11 @@ class QoSPlugin(qos.QoSPluginBase):
# check if the rule belong to the policy
policy.get_rule_by_id(rule_id)
rule = rule_object.QosBandwidthLimitRule(
context, **bandwidth_limit_rule['bandwidth_limit_rule'])
rule.id = rule_id
context, id=rule_id)
rule.obj_reset_changes()
for k, v in bandwidth_limit_rule['bandwidth_limit_rule'].items():
if k != 'id':
setattr(rule, k, v)
rule.update()
policy.reload_rules()
self.notification_driver_manager.update_policy(context, policy)

View File

@ -51,7 +51,9 @@ class FakeNeutronObject(base.NeutronDbObject):
'field2': obj_fields.StringField()
}
fields_no_update = ['id']
primary_keys = ['id']
fields_no_update = ['field1']
synthetic_fields = ['field2']
@ -115,8 +117,6 @@ class FakeNeutronObjectRenamedField(base.NeutronDbObject):
synthetic_fields = ['field2']
fields_no_update = ['id']
fields_need_translation = {'field_ovo': 'field_db'}
@ -137,8 +137,6 @@ class FakeNeutronObjectCompositePrimaryKeyWithId(base.NeutronDbObject):
synthetic_fields = ['field2']
fields_no_update = ['id']
FIELD_TYPE_VALUE_GENERATOR_MAP = {
obj_fields.BooleanField: tools.get_random_boolean,
@ -333,6 +331,10 @@ class BaseObjectIfaceTestCase(_BaseObjectTestCase, test_base.BaseTestCase):
@mock.patch.object(obj_db_api, 'update_object')
def test_update_changes(self, update_mock):
fields_to_update = self.get_updatable_fields(self.db_obj)
if not fields_to_update:
self.skipTest('No updatable fields found in test class %r' %
self._test_class)
with mock.patch.object(base.NeutronDbObject,
'_get_changed_persistent_fields',
return_value=fields_to_update):
@ -360,6 +362,9 @@ class BaseObjectIfaceTestCase(_BaseObjectTestCase, test_base.BaseTestCase):
return_value=self.db_obj):
obj = self._test_class(self.context, **self.obj_fields[1])
fields_to_update = self.get_updatable_fields(self.obj_fields[1])
if not fields_to_update:
self.skipTest('No updatable fields found in test class %r' %
self._test_class)
with mock.patch.object(base.NeutronDbObject,
'_get_changed_persistent_fields',
return_value=fields_to_update):
@ -383,6 +388,21 @@ class BaseObjectIfaceTestCase(_BaseObjectTestCase, test_base.BaseTestCase):
self.assertIs(expected_obj, observed_obj)
self.assertTrue(observed_obj.obj_reset_changes.called)
def test_update_primary_key_forbidden_fail(self):
obj = self._test_class(self.context, **self.db_obj)
obj.obj_reset_changes()
if not self._test_class.primary_keys:
self.skipTest(
'All non-updatable fields found in test class %r '
'are primary keys' % self._test_class)
for key, val in self.db_obj.items():
if key in self._test_class.primary_keys:
setattr(obj, key, val)
self.assertRaises(base.NeutronObjectUpdateForbidden, obj.update)
class BaseDbObjectNonStandardPrimaryKeyTestCase(BaseObjectIfaceTestCase):
@ -455,7 +475,11 @@ class BaseDbObjectTestCase(_BaseObjectTestCase):
obj = self._test_class(self.context, **self.obj_fields[0])
obj.obj_reset_changes()
for key, val in self.get_updatable_fields(self.obj_fields[0]).items():
fields_to_update = self.get_updatable_fields(self.obj_fields[0])
if not fields_to_update:
self.skipTest('No updatable fields found in test class %r' %
self._test_class)
for key, val in fields_to_update.items():
setattr(obj, key, val)
self.assertRaises(n_exc.ObjectNotFound, obj.update)
@ -474,7 +498,11 @@ class BaseDbObjectTestCase(_BaseObjectTestCase):
obj = self._test_class(self.context, **self.obj_fields[0])
obj.create()
for key, val in self.get_updatable_fields(self.obj_fields[1]).items():
fields_to_update = self.get_updatable_fields(self.obj_fields[1])
if not fields_to_update:
self.skipTest('No updatable fields found in test class %r' %
self._test_class)
for key, val in fields_to_update.items():
setattr(obj, key, val)
with mock.patch(SQLALCHEMY_COMMIT) as mock_commit: