Make security_group_rules use check_deltas() for quota

This converts security_group_rules to use check_deltas() for quota
checks instead of the legacy limit_check().

Part of blueprint cells-count-resources-to-check-quota-in-api

Change-Id: Ief6d0a6284390c23c5080c6d052179c39c0f535a
This commit is contained in:
melanie witt 2017-06-23 02:35:19 +00:00
parent d247e36182
commit 39fed19b12
3 changed files with 72 additions and 5 deletions

View File

@ -5144,12 +5144,10 @@ class SecurityGroupAPI(base.Base, security_group_base.SecurityGroupBase):
this function is written to support both.
"""
count = objects.Quotas.count_as_dict(context,
'security_group_rules', id)
count_value = count['user']['security_group_rules']
try:
projected = count_value + len(vals)
objects.Quotas.limit_check(context, security_group_rules=projected)
objects.Quotas.check_deltas(context,
{'security_group_rules': len(vals)},
id)
except exception.OverQuota:
msg = _("Quota exceeded, too many security group rules.")
self.raise_over_quota(msg)
@ -5159,6 +5157,22 @@ class SecurityGroupAPI(base.Base, security_group_base.SecurityGroupBase):
rules = []
for v in vals:
rule = self.db.security_group_rule_create(context, v)
# NOTE(melwitt): We recheck the quota after creating the object to
# prevent users from allocating more resources than their allowed
# quota in the event of a race. This is configurable because it can
# be expensive if strict quota limits are not required in a
# deployment.
if CONF.quota.recheck_quota:
try:
objects.Quotas.check_deltas(context,
{'security_group_rules': 0},
id)
except exception.OverQuota:
self.db.security_group_rule_destroy(context, rule['id'])
msg = _("Quota exceeded, too many security group rules.")
self.raise_over_quota(msg)
rules.append(rule)
LOG.info(msg, {'name': name,
'protocol': rule.protocol,

View File

@ -491,6 +491,14 @@ class _TestNeutronSecurityGroupRulesBase(object):
# Enforced by neutron
pass
def test_create_rule_over_quota_during_recheck(self):
# Enforced by neutron
pass
def test_create_rule_no_quota_recheck(self):
# Enforced by neutron
pass
class TestNeutronSecurityGroupRulesV21(
_TestNeutronSecurityGroupRulesBase,

View File

@ -1222,6 +1222,51 @@ class TestSecurityGroupRulesV21(test.TestCase):
self.assertRaises(webob.exc.HTTPForbidden, self.controller.create,
self.req, {'security_group_rule': rule})
@mock.patch('nova.objects.Quotas.check_deltas')
def test_create_rule_over_quota_during_recheck(self, mock_check):
# Simulate a race where the first check passes and the recheck fails.
# First check occurs in compute/api.
exc = exception.OverQuota(overs='security_group_rules',
usages={'security_group_rules': 100})
mock_check.side_effect = [None, exc]
rule = {
'ip_protocol': 'tcp', 'from_port': '121', 'to_port': '121',
'parent_group_id': self.sg2['id'], 'group_id': self.sg1['id']
}
self.assertRaises(webob.exc.HTTPForbidden, self.controller.create,
self.req, {'security_group_rule': rule})
ctxt = self.req.environ['nova.context']
self.assertEqual(2, mock_check.call_count)
# parent_group_id is used for adding the rules.
call1 = mock.call(ctxt, {'security_group_rules': 1}, self.sg2['id'])
call2 = mock.call(ctxt, {'security_group_rules': 0}, self.sg2['id'])
mock_check.assert_has_calls([call1, call2])
# Verify we removed the rule that was added after the first quota check
# passed.
rules = objects.SecurityGroupRuleList.get_by_security_group_id(
ctxt, self.sg1['id'])
self.assertEqual(0, len(rules))
@mock.patch('nova.objects.Quotas.check_deltas')
def test_create_rule_no_quota_recheck(self, mock_check):
# Disable recheck_quota.
self.flags(recheck_quota=False, group='quota')
rule = {
'ip_protocol': 'tcp', 'from_port': '121', 'to_port': '121',
'parent_group_id': self.sg2['id'], 'group_id': self.sg1['id']
}
self.controller.create(self.req, {'security_group_rule': rule})
ctxt = self.req.environ['nova.context']
# check_deltas should have been called only once.
# parent_group_id is used for adding the rules.
mock_check.assert_called_once_with(ctxt, {'security_group_rules': 1},
self.sg2['id'])
def test_create_rule_cidr_allow_all(self):
rule = security_group_rule_template(cidr='0.0.0.0/0',
parent_group_id=self.sg2['id'])