Add policy and policy rule belongs check

before updating and deletion of a qos rule under a policy,
we check if the qos is binding to the policy to avoid users
operating on policy rules binding to other policy.

Change-Id: I04723fa9dd37409cb211c35e701f352419b2d6fa
Closes-bug: #1485993
This commit is contained in:
gong yong sheng 2015-08-25 16:21:39 +08:00
parent 3703a1d4b0
commit c28b8a5ca6
3 changed files with 48 additions and 6 deletions

View File

@ -64,6 +64,18 @@ class QosPolicy(base.NeutronDbObject):
setattr(self, 'rules', rules)
self.obj_reset_changes(['rules'])
def get_rule_by_id(self, rule_id):
"""Return rule specified by rule_id.
@raise QosRuleNotFound: if there is no such rule in the policy.
"""
for rule in self.rules:
if rule_id == rule.id:
return rule
raise exceptions.QosRuleNotFound(policy_id=self.id,
rule_id=rule_id)
@staticmethod
def _is_policy_accessible(context, db_obj):
#TODO(QoS): Look at I3426b13eede8bfa29729cf3efea3419fb91175c4 for

View File

@ -109,6 +109,8 @@ class QoSPlugin(qos.QoSPluginBase):
with db_api.autonested_transaction(context.session):
# first, validate that we have access to the policy
policy = self._get_policy_obj(context, policy_id)
# check if the rule belong to the policy
policy.get_rule_by_id(rule_id)
rule = rule_object.QosBandwidthLimitRule(
context, **bandwidth_limit_rule['bandwidth_limit_rule'])
rule.id = rule_id
@ -122,8 +124,7 @@ class QoSPlugin(qos.QoSPluginBase):
with db_api.autonested_transaction(context.session):
# first, validate that we have access to the policy
policy = self._get_policy_obj(context, policy_id)
rule = rule_object.QosBandwidthLimitRule(context)
rule.id = rule_id
rule = policy.get_rule_by_id(rule_id)
rule.delete()
policy.reload_rules()
self.notification_driver_manager.update_policy(context, policy)

View File

@ -98,19 +98,48 @@ class TestQosPlugin(base.BaseQosTestCase):
self._validate_notif_driver_params('update_policy')
def test_update_policy_rule(self):
_policy = policy_object.QosPolicy(
self.ctxt, **self.policy_data['policy'])
with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
return_value=self.policy):
return_value=_policy):
setattr(_policy, "rules", [self.rule])
self.qos_plugin.update_policy_bandwidth_limit_rule(
self.ctxt, self.rule.id, self.policy.id, self.rule_data)
self._validate_notif_driver_params('update_policy')
def test_delete_policy_rule(self):
def test_update_policy_rule_bad_policy(self):
_policy = policy_object.QosPolicy(
self.ctxt, **self.policy_data['policy'])
with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
return_value=self.policy):
return_value=_policy):
setattr(_policy, "rules", [])
self.assertRaises(
n_exc.QosRuleNotFound,
self.qos_plugin.update_policy_bandwidth_limit_rule,
self.ctxt, self.rule.id, self.policy.id,
self.rule_data)
def test_delete_policy_rule(self):
_policy = policy_object.QosPolicy(
self.ctxt, **self.policy_data['policy'])
with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
return_value=_policy):
setattr(_policy, "rules", [self.rule])
self.qos_plugin.delete_policy_bandwidth_limit_rule(
self.ctxt, self.rule.id, self.policy.id)
self.ctxt, self.rule.id, _policy.id)
self._validate_notif_driver_params('update_policy')
def test_delete_policy_rule_bad_policy(self):
_policy = policy_object.QosPolicy(
self.ctxt, **self.policy_data['policy'])
with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
return_value=_policy):
setattr(_policy, "rules", [])
self.assertRaises(
n_exc.QosRuleNotFound,
self.qos_plugin.delete_policy_bandwidth_limit_rule,
self.ctxt, self.rule.id, _policy.id)
def test_get_policy_bandwidth_limit_rules_for_policy(self):
with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id',
return_value=self.policy):