neutron/neutron/tests/unit/db/test_securitygroups_db.py
Slawek Kaplonski 013c183d7c Don't try to create default SG when security groups are disabled
If security group API is disabled, there is no point to create default
security group for tenant when e.g. network is created.

Closes-Bug: #1913297
Change-Id: Ib73babdd563e3e8c21ce6f63456cc87af414c5aa
2021-02-05 16:07:39 +01:00

620 lines
31 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from unittest import mock
from neutron_lib.callbacks import events
from neutron_lib.callbacks import exceptions
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants
from neutron_lib import context
from neutron_lib.objects import exceptions as obj_exc
import sqlalchemy
import testtools
from neutron.db import securitygroups_db
from neutron.extensions import securitygroup
from neutron import quota
from neutron.services.revisions import revision_plugin
from neutron.tests.unit import testlib_api
FAKE_SECGROUP = {
'security_group': {
"tenant_id": 'fake',
'description': 'fake',
'name': 'fake'
}
}
FAKE_SECGROUP_RULE = {
'security_group_rule': {
"tenant_id": 'fake',
'description': 'fake',
'name': 'fake',
'port_range_min': '21',
'protocol': 'tcp',
'port_range_max': '23',
'remote_ip_prefix': '10.0.0.1',
'ethertype': 'IPv4',
'remote_group_id': None,
'remote_address_group_id': None,
'security_group_id': 'None',
'direction': 'ingress'
}
}
DB_PLUGIN_KLASS = 'neutron.db.db_base_plugin_v2.NeutronDbPluginV2'
def fake_callback(resource, event, *args, **kwargs):
raise KeyError('bar')
class SecurityGroupDbMixinImpl(securitygroups_db.SecurityGroupDbMixin):
pass
class SecurityGroupDbMixinTestCase(testlib_api.SqlTestCase):
def setUp(self):
super(SecurityGroupDbMixinTestCase, self).setUp()
self.setup_coreplugin(core_plugin=DB_PLUGIN_KLASS)
self.ctx = context.get_admin_context()
self.mixin = SecurityGroupDbMixinImpl()
make_res = mock.patch.object(quota.QuotaEngine, 'make_reservation')
self.mock_quota_make_res = make_res.start()
commit_res = mock.patch.object(quota.QuotaEngine, 'commit_reservation')
self.mock_quota_commit_res = commit_res.start()
is_ext_supported = mock.patch(
'neutron_lib.api.extensions.is_extension_supported')
self.is_ext_supported = is_ext_supported.start()
self.is_ext_supported.return_value = True
def test_create_security_group_conflict(self):
with mock.patch.object(registry, "publish") as mock_publish:
mock_publish.side_effect = exceptions.CallbackFailure(Exception())
secgroup = {'security_group': mock.ANY}
with testtools.ExpectedException(
securitygroup.SecurityGroupConflict):
self.mixin.create_security_group(self.ctx, secgroup)
def test_delete_security_group_in_use(self):
with mock.patch.object(self.mixin,
'_get_port_security_group_bindings'),\
mock.patch.object(self.mixin, '_get_security_group'),\
mock.patch.object(registry, "notify") as mock_notify:
mock_notify.side_effect = exceptions.CallbackFailure(Exception())
with testtools.ExpectedException(securitygroup.SecurityGroupInUse):
self.mixin.delete_security_group(self.ctx, mock.ANY)
def test_update_security_group_statefulness_binded_conflict(self):
FAKE_SECGROUP['security_group']['stateful'] = mock.ANY
sg_dict = self.mixin.create_security_group(self.ctx, FAKE_SECGROUP)
FAKE_SECGROUP['security_group']['stateful'] = not sg_dict['stateful']
with mock.patch.object(self.mixin,
'_get_port_security_group_bindings'), \
mock.patch.object(registry, "notify") as mock_notify:
mock_notify.side_effect = exceptions.CallbackFailure(Exception())
with testtools.ExpectedException(securitygroup.SecurityGroupInUse):
self.mixin.update_security_group(self.ctx, sg_dict['id'],
FAKE_SECGROUP)
def test_update_security_group_conflict(self):
with mock.patch.object(registry, "notify") as mock_notify:
mock_notify.side_effect = exceptions.CallbackFailure(Exception())
secgroup = {'security_group': FAKE_SECGROUP}
with testtools.ExpectedException(
securitygroup.SecurityGroupConflict):
self.mixin.update_security_group(self.ctx, 'foo_id', secgroup)
def test_create_security_group_rule_conflict(self):
with mock.patch.object(self.mixin, '_validate_security_group_rule'),\
mock.patch.object(self.mixin,
'_check_for_duplicate_rules'),\
mock.patch.object(registry, "notify") as mock_notify:
mock_notify.side_effect = exceptions.CallbackFailure(Exception())
with testtools.ExpectedException(
securitygroup.SecurityGroupConflict):
self.mixin.create_security_group_rule(
self.ctx, FAKE_SECGROUP_RULE)
def test__check_for_duplicate_rules_does_not_drop_protocol(self):
with mock.patch.object(self.mixin, 'get_security_group',
return_value=None):
context = mock.Mock()
rule_dict = {
'security_group_rule': {'protocol': None,
'tenant_id': 'fake',
'security_group_id': 'fake',
'direction': 'fake'}
}
self.mixin._check_for_duplicate_rules(context, 'fake', [rule_dict])
self.assertIn('protocol', rule_dict['security_group_rule'])
def test__check_for_duplicate_rules_ignores_rule_id(self):
rules = [{'security_group_rule': {'protocol': 'tcp', 'id': 'fake1'}},
{'security_group_rule': {'protocol': 'tcp', 'id': 'fake2'}}]
# NOTE(arosen): the name of this exception is a little misleading
# in this case as this test, tests that the id fields are dropped
# while being compared. This is in the case if a plugin specifies
# the rule ids themselves.
with mock.patch.object(self.mixin, 'get_security_group',
return_value=None):
self.assertRaises(securitygroup.DuplicateSecurityGroupRuleInPost,
self.mixin._check_for_duplicate_rules,
context, 'fake', rules)
def test_check_for_duplicate_diff_rules_remote_ip_prefix_ipv4(self):
fake_secgroup = copy.deepcopy(FAKE_SECGROUP)
fake_secgroup['security_group_rules'] = \
[{'id': 'fake', 'tenant_id': 'fake', 'ethertype': 'IPv4',
'direction': 'ingress', 'security_group_id': 'fake',
'remote_ip_prefix': None}]
with mock.patch.object(self.mixin, 'get_security_group',
return_value=fake_secgroup):
context = mock.Mock()
rule_dict = {
'security_group_rule': {'id': 'fake2',
'tenant_id': 'fake',
'security_group_id': 'fake',
'ethertype': 'IPv4',
'direction': 'ingress',
'remote_ip_prefix': '0.0.0.0/0'}
}
self.assertRaises(securitygroup.SecurityGroupRuleExists,
self.mixin._check_for_duplicate_rules,
context, 'fake', [rule_dict])
def test_check_for_duplicate_diff_rules_remote_ip_prefix_ipv6(self):
fake_secgroup = copy.deepcopy(FAKE_SECGROUP)
fake_secgroup['security_group_rules'] = \
[{'id': 'fake', 'tenant_id': 'fake', 'ethertype': 'IPv6',
'direction': 'ingress', 'security_group_id': 'fake',
'remote_ip_prefix': None}]
with mock.patch.object(self.mixin, 'get_security_group',
return_value=fake_secgroup):
context = mock.Mock()
rule_dict = {
'security_group_rule': {'id': 'fake2',
'tenant_id': 'fake',
'security_group_id': 'fake',
'ethertype': 'IPv6',
'direction': 'ingress',
'remote_ip_prefix': '::/0'}
}
self.assertRaises(securitygroup.SecurityGroupRuleExists,
self.mixin._check_for_duplicate_rules,
context, 'fake', [rule_dict])
def test_delete_security_group_rule_in_use(self):
with mock.patch.object(registry, "notify") as mock_notify:
mock_notify.side_effect = exceptions.CallbackFailure(Exception())
with testtools.ExpectedException(
securitygroup.SecurityGroupRuleInUse):
self.mixin.delete_security_group_rule(self.ctx, mock.ANY)
def test_delete_security_group_rule_raise_error_on_not_found(self):
with testtools.ExpectedException(
securitygroup.SecurityGroupRuleNotFound):
self.mixin.delete_security_group_rule(self.ctx, 'foo_rule')
def test_validate_ethertype_and_protocol(self):
fake_ipv4_rules = [{'protocol': constants.PROTO_NAME_IPV6_ICMP,
'ethertype': constants.IPv4},
{'protocol': constants.PROTO_NAME_IPV6_ICMP_LEGACY,
'ethertype': constants.IPv4},
{'protocol': constants.PROTO_NAME_IPV6_ENCAP,
'ethertype': constants.IPv4},
{'protocol': constants.PROTO_NAME_IPV6_ROUTE,
'ethertype': constants.IPv4},
{'protocol': constants.PROTO_NAME_IPV6_FRAG,
'ethertype': constants.IPv4},
{'protocol': constants.PROTO_NAME_IPV6_NONXT,
'ethertype': constants.IPv4},
{'protocol': constants.PROTO_NAME_IPV6_OPTS,
'ethertype': constants.IPv4},
{'protocol': str(constants.PROTO_NUM_IPV6_ICMP),
'ethertype': constants.IPv4},
{'protocol': str(constants.PROTO_NUM_IPV6_ENCAP),
'ethertype': constants.IPv4},
{'protocol': str(constants.PROTO_NUM_IPV6_ROUTE),
'ethertype': constants.IPv4},
{'protocol': str(constants.PROTO_NUM_IPV6_FRAG),
'ethertype': constants.IPv4},
{'protocol': str(constants.PROTO_NUM_IPV6_NONXT),
'ethertype': constants.IPv4},
{'protocol': str(constants.PROTO_NUM_IPV6_OPTS),
'ethertype': constants.IPv4}]
# test wrong protocols
for rule in fake_ipv4_rules:
with testtools.ExpectedException(
securitygroup.SecurityGroupEthertypeConflictWithProtocol):
self.mixin._validate_ethertype_and_protocol(rule)
def test_security_group_precommit_create_event_fail(self):
registry.subscribe(fake_callback, resources.SECURITY_GROUP,
events.PRECOMMIT_CREATE)
with mock.patch.object(sqlalchemy.orm.session.SessionTransaction,
'rollback') as mock_rollback:
self.assertRaises(securitygroup.SecurityGroupConflict,
self.mixin.create_security_group,
self.ctx, FAKE_SECGROUP)
self.assertTrue(mock_rollback.called)
def test_security_group_precommit_update_event_fail(self):
registry.subscribe(fake_callback, resources.SECURITY_GROUP,
events.PRECOMMIT_UPDATE)
sg_dict = self.mixin.create_security_group(self.ctx, FAKE_SECGROUP)
with mock.patch.object(sqlalchemy.orm.session.SessionTransaction,
'rollback') as mock_rollback:
self.assertRaises(securitygroup.SecurityGroupConflict,
self.mixin.update_security_group,
self.ctx, sg_dict['id'], FAKE_SECGROUP)
self.assertTrue(mock_rollback.called)
def test_security_group_precommit_delete_event_fail(self):
registry.subscribe(fake_callback, resources.SECURITY_GROUP,
events.PRECOMMIT_DELETE)
sg_dict = self.mixin.create_security_group(self.ctx, FAKE_SECGROUP)
with mock.patch.object(sqlalchemy.orm.session.SessionTransaction,
'rollback') as mock_rollback:
self.assertRaises(securitygroup.SecurityGroupInUse,
self.mixin.delete_security_group,
self.ctx, sg_dict['id'])
self.assertTrue(mock_rollback.called)
def _test_security_group_precommit_create_event(self,
with_revisions=False):
DEFAULT_SECGROUP = {
'tenant_id': FAKE_SECGROUP['security_group']['tenant_id'],
'name': 'default',
'description': 'Default security group',
}
DEFAULT_SECGROUP_DICT = {
'id': mock.ANY,
'tenant_id': FAKE_SECGROUP['security_group']['tenant_id'],
'project_id': FAKE_SECGROUP['security_group']['tenant_id'],
'name': 'default',
'description': 'Default security group',
'stateful': mock.ANY,
'standard_attr_id': mock.ANY,
'security_group_rules': [
# Four rules for egress/ingress and ipv4/ipv6
mock.ANY, mock.ANY, mock.ANY, mock.ANY,
],
}
if with_revisions:
DEFAULT_SECGROUP_DICT.update({
'revision_number': mock.ANY,
})
with mock.patch.object(registry, 'publish') as publish, \
mock.patch.object(registry, "notify") as mock_notify:
sg_dict = self.mixin.create_security_group(self.ctx, FAKE_SECGROUP)
mock_notify.assert_has_calls([
mock.call('security_group', 'precommit_create', mock.ANY,
context=mock.ANY, is_default=True,
security_group=DEFAULT_SECGROUP_DICT),
mock.call('security_group', 'after_create', mock.ANY,
context=mock.ANY, is_default=True,
security_group=DEFAULT_SECGROUP_DICT),
mock.call('security_group', 'precommit_create', mock.ANY,
context=mock.ANY, is_default=False,
security_group=sg_dict),
mock.call('security_group', 'after_create', mock.ANY,
context=mock.ANY, is_default=False,
security_group=sg_dict)])
publish.assert_has_calls([
mock.call('security_group', 'before_create', mock.ANY,
payload=mock.ANY),
mock.call('security_group', 'before_create', mock.ANY,
payload=mock.ANY)])
payload = publish.mock_calls[0][2]['payload']
self.assertDictEqual(payload.desired_state,
FAKE_SECGROUP['security_group'])
payload = publish.mock_calls[1][2]['payload']
self.assertDictEqual(payload.desired_state, DEFAULT_SECGROUP)
# Ensure that the result of create is same as get.
# Especially we want to check the revision number here.
sg_dict_got = self.mixin.get_security_group(
self.ctx, sg_dict['id'])
self.assertEqual(sg_dict, sg_dict_got)
def test_security_group_precommit_create_event_with_revisions(self):
revision = revision_plugin.RevisionPlugin()
self._test_security_group_precommit_create_event(True)
del revision # appease pep8
def test_security_group_precommit_create_event(self):
self._test_security_group_precommit_create_event()
def test_security_group_precommit_update_event(self):
FAKE_SECGROUP['security_group']['stateful'] = mock.ANY
original_sg_dict = self.mixin.create_security_group(self.ctx,
FAKE_SECGROUP)
sg_id = original_sg_dict['id']
with mock.patch.object(self.mixin,
'_get_port_security_group_bindings'), \
mock.patch.object(registry, "publish") as mock_notify:
fake_secgroup = copy.deepcopy(FAKE_SECGROUP)
fake_secgroup['security_group']['name'] = 'updated_fake'
fake_secgroup['security_group']['stateful'] = mock.ANY
sg_dict = self.mixin.update_security_group(
self.ctx, sg_id, fake_secgroup)
mock_notify.assert_has_calls(
[mock.call('security_group', 'precommit_update', mock.ANY,
payload=mock.ANY)])
payload = mock_notify.call_args[1]['payload']
self.assertEqual(original_sg_dict, payload.states[0])
self.assertEqual(sg_id, payload.resource_id)
self.assertEqual(sg_dict, payload.desired_state)
def test_security_group_precommit_and_after_delete_event(self):
sg_dict = self.mixin.create_security_group(self.ctx, FAKE_SECGROUP)
with mock.patch.object(registry, "notify") as mock_notify:
self.mixin.delete_security_group(self.ctx, sg_dict['id'])
sg_dict['security_group_rules'] = mock.ANY
mock_notify.assert_has_calls(
[mock.call('security_group', 'precommit_delete',
mock.ANY, context=mock.ANY, security_group=sg_dict,
security_group_id=sg_dict['id'],
security_group_rule_ids=[mock.ANY, mock.ANY]),
mock.call('security_group', 'after_delete',
mock.ANY, context=mock.ANY,
security_group_id=sg_dict['id'],
security_group_rule_ids=[mock.ANY, mock.ANY],
name=sg_dict['name'])])
def test_security_group_rule_precommit_create_event_fail(self):
registry.subscribe(fake_callback, resources.SECURITY_GROUP_RULE,
events.PRECOMMIT_CREATE)
sg_dict = self.mixin.create_security_group(self.ctx, FAKE_SECGROUP)
fake_rule = FAKE_SECGROUP_RULE
fake_rule['security_group_rule']['security_group_id'] = sg_dict['id']
with mock.patch.object(sqlalchemy.orm.session.SessionTransaction,
'rollback') as mock_rollback,\
mock.patch.object(self.mixin, '_get_security_group'):
self.assertRaises(securitygroup.SecurityGroupConflict,
self.mixin.create_security_group_rule,
self.ctx, fake_rule)
self.assertTrue(mock_rollback.called)
def test_security_group_rule_precommit_delete_event_fail(self):
registry.subscribe(fake_callback, resources.SECURITY_GROUP_RULE,
events.PRECOMMIT_DELETE)
sg_dict = self.mixin.create_security_group(self.ctx, FAKE_SECGROUP)
fake_rule = FAKE_SECGROUP_RULE
fake_rule['security_group_rule']['security_group_id'] = sg_dict['id']
with mock.patch.object(sqlalchemy.orm.session.SessionTransaction,
'rollback') as mock_rollback,\
mock.patch.object(self.mixin, '_get_security_group'):
sg_rule_dict = self.mixin.create_security_group_rule(self.ctx,
fake_rule)
self.assertRaises(securitygroup.SecurityGroupRuleInUse,
self.mixin.delete_security_group_rule, self.ctx,
sg_rule_dict['id'])
self.assertTrue(mock_rollback.called)
def test_security_group_rule_precommit_create_event(self):
sg_dict = self.mixin.create_security_group(self.ctx, FAKE_SECGROUP)
fake_rule = FAKE_SECGROUP_RULE
fake_rule['security_group_rule']['security_group_id'] = sg_dict['id']
with mock.patch.object(registry, "notify") as mock_notify, \
mock.patch.object(self.mixin, '_get_security_group'):
mock_notify.assert_has_calls([mock.call('security_group_rule',
'precommit_create', mock.ANY, context=mock.ANY,
security_group_rule=self.mixin.create_security_group_rule(
self.ctx, fake_rule))])
def test_sg_rule_before_precommit_and_after_delete_event(self):
sg_dict = self.mixin.create_security_group(self.ctx, FAKE_SECGROUP)
fake_rule = FAKE_SECGROUP_RULE
fake_rule['security_group_rule']['security_group_id'] = sg_dict['id']
with mock.patch.object(registry, "notify") as mock_notify, \
mock.patch.object(self.mixin, '_get_security_group'):
sg_rule_dict = self.mixin.create_security_group_rule(self.ctx,
fake_rule)
self.mixin.delete_security_group_rule(self.ctx,
sg_rule_dict['id'])
mock_notify.assert_has_calls([mock.call('security_group_rule',
'before_delete', mock.ANY, context=mock.ANY,
security_group_rule_id=sg_rule_dict['id'])])
mock_notify.assert_has_calls([mock.call('security_group_rule',
'precommit_delete', mock.ANY, context=mock.ANY,
security_group_id=sg_dict['id'],
security_group_rule_id=sg_rule_dict['id'])])
mock_notify.assert_has_calls([mock.call('security_group_rule',
'after_delete', mock.ANY, context=mock.ANY,
security_group_rule_id=sg_rule_dict['id'],
security_group_id=sg_dict['id'])])
def test_get_ip_proto_name_and_num(self):
protocols = [constants.PROTO_NAME_UDP, str(constants.PROTO_NUM_TCP),
'blah', '111']
protocol_names_nums = (
[[constants.PROTO_NAME_UDP, str(constants.PROTO_NUM_UDP)],
[constants.PROTO_NAME_TCP, str(constants.PROTO_NUM_TCP)],
['blah', 'blah'], ['111', '111']])
for i, protocol in enumerate(protocols):
self.assertEqual(protocol_names_nums[i],
self.mixin._get_ip_proto_name_and_num(protocol))
def test__validate_port_range_for_icmp_exception(self):
states = [(1, 256, securitygroup.SecurityGroupInvalidIcmpValue),
(None, 6, securitygroup.SecurityGroupMissingIcmpType),
(300, 1, securitygroup.SecurityGroupInvalidIcmpValue)]
for protocol in (constants.PROTO_NAME_ICMP,
constants.PROTO_NAME_IPV6_ICMP,
constants.PROTO_NAME_IPV6_ICMP_LEGACY):
for pmin, pmax, exception in states:
self.assertRaises(exception,
self.mixin._validate_port_range,
{'port_range_min': pmin,
'port_range_max': pmax,
'protocol': protocol})
def test__validate_port_range_exception(self):
self.assertRaises(securitygroup.SecurityGroupInvalidPortValue,
self.mixin._validate_port_range,
{'port_range_min': 0,
'port_range_max': None,
'protocol': constants.PROTO_NAME_TCP})
self.assertRaises(securitygroup.SecurityGroupInvalidPortRange,
self.mixin._validate_port_range,
{'port_range_min': 1,
'port_range_max': None,
'protocol': constants.PROTO_NAME_SCTP})
self.assertRaises(securitygroup.SecurityGroupInvalidPortRange,
self.mixin._validate_port_range,
{'port_range_min': 1000,
'port_range_max': 1,
'protocol': constants.PROTO_NAME_UDPLITE})
self.assertRaises(
securitygroup.SecurityGroupInvalidProtocolForPort,
self.mixin._validate_port_range,
{'port_range_min': 100,
'port_range_max': 200,
'protocol': '111'})
self.assertRaises(
securitygroup.SecurityGroupInvalidProtocolForPort,
self.mixin._validate_port_range,
{'port_range_min': 100,
'port_range_max': None,
'protocol': constants.PROTO_NAME_VRRP})
self.assertRaises(
securitygroup.SecurityGroupInvalidProtocolForPort,
self.mixin._validate_port_range,
{'port_range_min': None,
'port_range_max': 200,
'protocol': constants.PROTO_NAME_VRRP})
def _create_environment(self):
self.sg = copy.deepcopy(FAKE_SECGROUP)
self.user_ctx = context.Context(user_id='user1', tenant_id='tenant_1',
is_admin=False, overwrite=False)
self.admin_ctx = context.Context(user_id='user2', tenant_id='tenant_2',
is_admin=True, overwrite=False)
self.sg_user = self.mixin.create_security_group(
self.user_ctx, {'security_group': {'name': 'name',
'tenant_id': 'tenant_1',
'description': 'fake'}})
def test_get_security_group_rules(self):
self._create_environment()
rules_before = self.mixin.get_security_group_rules(self.user_ctx)
rule = copy.deepcopy(FAKE_SECGROUP_RULE)
rule['security_group_rule']['security_group_id'] = self.sg_user['id']
rule['security_group_rule']['tenant_id'] = 'tenant_2'
self.mixin.create_security_group_rule(self.admin_ctx, rule)
rules_after = self.mixin.get_security_group_rules(self.user_ctx)
self.assertEqual(len(rules_before) + 1, len(rules_after))
for rule in (rule for rule in rules_after if rule not in rules_before):
self.assertEqual('tenant_2', rule['tenant_id'])
def test_get_security_group_rules_filters_passed(self):
self._create_environment()
filters = {'security_group_id': self.sg_user['id']}
rules_before = self.mixin.get_security_group_rules(self.user_ctx,
filters=filters)
default_sg = self.mixin.get_security_groups(
self.user_ctx, filters={'name': 'default'})[0]
rule = copy.deepcopy(FAKE_SECGROUP_RULE)
rule['security_group_rule']['security_group_id'] = default_sg['id']
rule['security_group_rule']['tenant_id'] = 'tenant_1'
self.mixin.create_security_group_rule(self.user_ctx, rule)
rules_after = self.mixin.get_security_group_rules(self.user_ctx,
filters=filters)
self.assertEqual(rules_before, rules_after)
def test_get_security_group_rules_admin_context(self):
self._create_environment()
rules_before = self.mixin.get_security_group_rules(self.ctx)
rule = copy.deepcopy(FAKE_SECGROUP_RULE)
rule['security_group_rule']['security_group_id'] = self.sg_user['id']
rule['security_group_rule']['tenant_id'] = 'tenant_1'
self.mixin.create_security_group_rule(self.user_ctx, rule)
rules_after = self.mixin.get_security_group_rules(self.ctx)
self.assertEqual(len(rules_before) + 1, len(rules_after))
for rule in (rule for rule in rules_after if rule not in rules_before):
self.assertEqual('tenant_1', rule['tenant_id'])
self.assertEqual(self.sg_user['id'], rule['security_group_id'])
def test__ensure_default_security_group(self):
with mock.patch.object(
self.mixin, '_get_default_sg_id') as get_default_sg_id,\
mock.patch.object(
self.mixin, 'create_security_group') as create_sg:
get_default_sg_id.return_value = None
self.mixin._ensure_default_security_group(self.ctx, 'tenant_1')
create_sg.assert_called_once_with(
self.ctx,
{'security_group': {
'name': 'default',
'tenant_id': 'tenant_1',
'description': securitygroups_db.DEFAULT_SG_DESCRIPTION}},
default_sg=True)
get_default_sg_id.assert_called_once_with(self.ctx, 'tenant_1')
def test__ensure_default_security_group_already_exists(self):
with mock.patch.object(
self.mixin, '_get_default_sg_id') as get_default_sg_id,\
mock.patch.object(
self.mixin, 'create_security_group') as create_sg:
get_default_sg_id.return_value = 'default_sg_id'
self.mixin._ensure_default_security_group(self.ctx, 'tenant_1')
create_sg.assert_not_called()
get_default_sg_id.assert_called_once_with(self.ctx, 'tenant_1')
def test__ensure_default_security_group_created_in_parallel(self):
with mock.patch.object(
self.mixin, '_get_default_sg_id') as get_default_sg_id,\
mock.patch.object(
self.mixin, 'create_security_group') as create_sg:
get_default_sg_id.side_effect = [None, 'default_sg_id']
create_sg.side_effect = obj_exc.NeutronDbObjectDuplicateEntry(
mock.Mock(), mock.Mock())
self.mixin._ensure_default_security_group(self.ctx, 'tenant_1')
create_sg.assert_called_once_with(
self.ctx,
{'security_group': {
'name': 'default',
'tenant_id': 'tenant_1',
'description': securitygroups_db.DEFAULT_SG_DESCRIPTION}},
default_sg=True)
get_default_sg_id.assert_has_calls([
mock.call(self.ctx, 'tenant_1'),
mock.call(self.ctx, 'tenant_1')])
def test__ensure_default_security_group_when_disabled(self):
with mock.patch.object(
self.mixin, '_get_default_sg_id') as get_default_sg_id,\
mock.patch.object(
self.mixin, 'create_security_group') as create_sg:
self.is_ext_supported.return_value = False
self.mixin._ensure_default_security_group(self.ctx, 'tenant_1')
create_sg.assert_not_called()
get_default_sg_id.assert_not_called()