diff --git a/neutron/objects/qos/policy.py b/neutron/objects/qos/policy.py index 258512221fe..61ee7c96428 100644 --- a/neutron/objects/qos/policy.py +++ b/neutron/objects/qos/policy.py @@ -64,6 +64,18 @@ class QosPolicy(base.NeutronDbObject): setattr(self, 'rules', rules) self.obj_reset_changes(['rules']) + def get_rule_by_id(self, rule_id): + """Return rule specified by rule_id. + + @raise QosRuleNotFound: if there is no such rule in the policy. + """ + + for rule in self.rules: + if rule_id == rule.id: + return rule + raise exceptions.QosRuleNotFound(policy_id=self.id, + rule_id=rule_id) + @staticmethod def _is_policy_accessible(context, db_obj): #TODO(QoS): Look at I3426b13eede8bfa29729cf3efea3419fb91175c4 for diff --git a/neutron/services/qos/qos_plugin.py b/neutron/services/qos/qos_plugin.py index 154c1b87206..29ff7b58ff1 100644 --- a/neutron/services/qos/qos_plugin.py +++ b/neutron/services/qos/qos_plugin.py @@ -109,6 +109,8 @@ class QoSPlugin(qos.QoSPluginBase): with db_api.autonested_transaction(context.session): # first, validate that we have access to the policy policy = self._get_policy_obj(context, policy_id) + # check if the rule belong to the policy + policy.get_rule_by_id(rule_id) rule = rule_object.QosBandwidthLimitRule( context, **bandwidth_limit_rule['bandwidth_limit_rule']) rule.id = rule_id @@ -122,8 +124,7 @@ class QoSPlugin(qos.QoSPluginBase): with db_api.autonested_transaction(context.session): # first, validate that we have access to the policy policy = self._get_policy_obj(context, policy_id) - rule = rule_object.QosBandwidthLimitRule(context) - rule.id = rule_id + rule = policy.get_rule_by_id(rule_id) rule.delete() policy.reload_rules() self.notification_driver_manager.update_policy(context, policy) diff --git a/neutron/tests/unit/services/qos/test_qos_plugin.py b/neutron/tests/unit/services/qos/test_qos_plugin.py index 246f5fab17f..f6447cce77f 100644 --- a/neutron/tests/unit/services/qos/test_qos_plugin.py +++ b/neutron/tests/unit/services/qos/test_qos_plugin.py @@ -98,19 +98,48 @@ class TestQosPlugin(base.BaseQosTestCase): self._validate_notif_driver_params('update_policy') def test_update_policy_rule(self): + _policy = policy_object.QosPolicy( + self.ctxt, **self.policy_data['policy']) with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id', - return_value=self.policy): + return_value=_policy): + setattr(_policy, "rules", [self.rule]) self.qos_plugin.update_policy_bandwidth_limit_rule( self.ctxt, self.rule.id, self.policy.id, self.rule_data) self._validate_notif_driver_params('update_policy') - def test_delete_policy_rule(self): + def test_update_policy_rule_bad_policy(self): + _policy = policy_object.QosPolicy( + self.ctxt, **self.policy_data['policy']) with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id', - return_value=self.policy): + return_value=_policy): + setattr(_policy, "rules", []) + self.assertRaises( + n_exc.QosRuleNotFound, + self.qos_plugin.update_policy_bandwidth_limit_rule, + self.ctxt, self.rule.id, self.policy.id, + self.rule_data) + + def test_delete_policy_rule(self): + _policy = policy_object.QosPolicy( + self.ctxt, **self.policy_data['policy']) + with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id', + return_value=_policy): + setattr(_policy, "rules", [self.rule]) self.qos_plugin.delete_policy_bandwidth_limit_rule( - self.ctxt, self.rule.id, self.policy.id) + self.ctxt, self.rule.id, _policy.id) self._validate_notif_driver_params('update_policy') + def test_delete_policy_rule_bad_policy(self): + _policy = policy_object.QosPolicy( + self.ctxt, **self.policy_data['policy']) + with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id', + return_value=_policy): + setattr(_policy, "rules", []) + self.assertRaises( + n_exc.QosRuleNotFound, + self.qos_plugin.delete_policy_bandwidth_limit_rule, + self.ctxt, self.rule.id, _policy.id) + def test_get_policy_bandwidth_limit_rules_for_policy(self): with mock.patch('neutron.objects.qos.policy.QosPolicy.get_by_id', return_value=self.policy):