Don't drop 'protocol' from client supplied security_group_rule dict

If protocol was present in the dict, but was None, then it was never
re-instantiated after being popped out of the dict. This later resulted
in KeyError when trying to access the key on the dict.

Change-Id: I4985e7b54117bee3241d7365cb438197a09b9b86
Closes-Bug: #1566327
This commit is contained in:
Ihar Hrachyshka 2016-04-05 16:56:16 +02:00
parent b23e52666d
commit 5a41caa47a
2 changed files with 16 additions and 10 deletions

View File

@ -584,23 +584,16 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
# is changed which cannot be because other methods are already
# relying on this behavior. Therefore, we do the filtering
# below to check for these corner cases.
rule_dict = security_group_rule['security_group_rule'].copy()
sg_protocol = rule_dict.pop('protocol', None)
for db_rule in db_rules:
rule_id = db_rule.pop('id', None)
# remove protocol and match separately for number and type
db_protocol = db_rule.pop('protocol', None)
sg_protocol = (
security_group_rule['security_group_rule'].pop('protocol',
None))
is_protocol_matching = (
self._get_ip_proto_name_and_num(db_protocol) ==
self._get_ip_proto_name_and_num(sg_protocol))
are_rules_matching = (
security_group_rule['security_group_rule'] == db_rule)
# reinstate protocol field for further processing
if sg_protocol:
security_group_rule['security_group_rule']['protocol'] = (
sg_protocol)
if (is_protocol_matching and are_rules_matching):
if (is_protocol_matching and rule_dict == db_rule):
raise ext_sg.SecurityGroupRuleExists(rule_id=rule_id)
def _validate_ip_prefix(self, rule):

View File

@ -91,6 +91,19 @@ class SecurityGroupDbMixinTestCase(testlib_api.SqlTestCase):
self.mixin.create_security_group_rule(
self.ctx, mock.MagicMock())
def test__check_for_duplicate_rules_in_db_does_not_drop_protocol(self):
with mock.patch.object(self.mixin, 'get_security_group_rules',
return_value=[mock.Mock()]):
context = mock.Mock()
rule_dict = {
'security_group_rule': {'protocol': None,
'tenant_id': 'fake',
'security_group_id': 'fake',
'direction': 'fake'}
}
self.mixin._check_for_duplicate_rules_in_db(context, rule_dict)
self.assertIn('protocol', rule_dict['security_group_rule'])
def test_delete_security_group_rule_in_use(self):
with mock.patch.object(registry, "notify") as mock_notify:
mock_notify.side_effect = exceptions.CallbackFailure(Exception())