# Copyright 2012 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. # # Copyright 2012 Nebula, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. from unittest import mock from django.conf import settings from django.urls import reverse from django.utils import html from horizon import exceptions from horizon import forms from openstack_dashboard import api from openstack_dashboard.dashboards.project.security_groups import tables from openstack_dashboard.test import helpers as test from openstack_dashboard.usage import quotas INDEX_URL = reverse('horizon:project:security_groups:index') SG_CREATE_URL = reverse('horizon:project:security_groups:create') SG_VIEW_PATH = 'horizon:project:security_groups:%s' SG_DETAIL_VIEW = SG_VIEW_PATH % 'detail' SG_UPDATE_VIEW = SG_VIEW_PATH % 'update' SG_ADD_RULE_VIEW = SG_VIEW_PATH % 'add_rule' SG_TEMPLATE_PATH = 'project/security_groups/%s' SG_DETAIL_TEMPLATE = SG_TEMPLATE_PATH % 'detail.html' SG_CREATE_TEMPLATE = SG_TEMPLATE_PATH % 'create.html' SG_UPDATE_TEMPLATE = SG_TEMPLATE_PATH % '_update.html' def strip_absolute_base(uri): return uri.split(settings.TESTSERVER, 1)[-1] class SecurityGroupsViewTests(test.TestCase): secgroup_backend = 'neutron' def setUp(self): super().setUp() sec_group = self.security_groups.first() self.detail_url = reverse(SG_DETAIL_VIEW, args=[sec_group.id]) self.edit_url = reverse(SG_ADD_RULE_VIEW, args=[sec_group.id]) self.update_url = reverse(SG_UPDATE_VIEW, args=[sec_group.id]) @test.create_mocks({api.neutron: ('security_group_list', 'is_extension_supported'), quotas: ('tenant_quota_usages',)}) def test_index(self): sec_groups = self.security_groups.list() quota_data = self.neutron_quota_usages.first() quota_data['security_group']['available'] = 10 self.mock_security_group_list.return_value = sec_groups self.mock_is_extension_supported.return_value = True self.mock_tenant_quota_usages.return_value = quota_data res = self.client.get(INDEX_URL) self.assertTemplateUsed(res, 'horizon/common/_data_table_view.html') # Security groups sec_groups_from_ctx = res.context['security_groups_table'].data # Context data needs to contains all items from the test data. self.assertCountEqual(sec_groups_from_ctx, sec_groups) # Sec groups in context need to be sorted by their ``name`` attribute. # This assertion is somewhat weak since it's only meaningful as long as # the sec groups in the test data are *not* sorted by name (which is # the case as of the time of this addition). self.assertTrue( all([sec_groups_from_ctx[i].name <= sec_groups_from_ctx[i + 1].name for i in range(len(sec_groups_from_ctx) - 1)])) self.mock_security_group_list.assert_called_once_with( test.IsHttpRequest()) self.assert_mock_multiple_calls_with_same_arguments( self.mock_tenant_quota_usages, 2, mock.call(test.IsHttpRequest(), targets=('security_group', ))) @test.create_mocks({api.neutron: ('security_group_list', 'is_extension_supported'), quotas: ('tenant_quota_usages',)}) def test_create_button_attributes(self): sec_groups = self.security_groups.list() quota_data = self.neutron_quota_usages.first() quota_data['security_group']['available'] = 10 self.mock_security_group_list.return_value = sec_groups self.mock_is_extension_supported.return_value = True self.mock_tenant_quota_usages.return_value = quota_data res = self.client.get(INDEX_URL) security_groups = res.context['security_groups_table'].data self.assertCountEqual(security_groups, self.security_groups.list()) create_action = self.getAndAssertTableAction(res, 'security_groups', 'create') self.assertEqual('Create Security Group', create_action.verbose_name) self.assertEqual((('network', 'create_security_group'),), create_action.policy_rules) self.assertEqual(set(['ajax-modal']), set(create_action.classes)) url = 'horizon:project:security_groups:create' self.assertEqual(url, create_action.url) self.mock_security_group_list.assert_called_once_with( test.IsHttpRequest()) self.assert_mock_multiple_calls_with_same_arguments( self.mock_tenant_quota_usages, 3, mock.call(test.IsHttpRequest(), targets=('security_group', ))) @test.create_mocks({api.neutron: ('security_group_list',), quotas: ('tenant_quota_usages',)}) def _test_create_button_disabled_when_quota_exceeded(self, network_enabled): sec_groups = self.security_groups.list() quota_data = self.neutron_quota_usages.first() quota_data['security_group']['available'] = 0 self.mock_security_group_list.return_value = sec_groups self.mock_tenant_quota_usages.return_value = quota_data res = self.client.get(INDEX_URL) security_groups = res.context['security_groups_table'].data self.assertCountEqual(security_groups, self.security_groups.list()) create_action = self.getAndAssertTableAction(res, 'security_groups', 'create') self.assertIn('disabled', create_action.classes, 'The create button should be disabled') self.mock_security_group_list.assert_called_once_with( test.IsHttpRequest()) self.assert_mock_multiple_calls_with_same_arguments( self.mock_tenant_quota_usages, 3, mock.call(test.IsHttpRequest(), targets=('security_group', ))) @test.create_mocks({api.neutron: ('is_extension_supported',)}) def test_create_button_disabled_when_quota_exceeded_neutron_disabled(self): self.mock_is_extension_supported.return_value = True self._test_create_button_disabled_when_quota_exceeded(False) @test.create_mocks({api.neutron: ('is_extension_supported',)}) def test_create_button_disabled_when_quota_exceeded_neutron_enabled(self): self.mock_is_extension_supported.return_value = True self._test_create_button_disabled_when_quota_exceeded(True) def _add_security_group_rule_fixture(self, is_desc_support=True, **kwargs): sec_group = self.security_groups.first() sec_group_list = self.security_groups.list() rule = self.security_group_rules.first() self.mock_is_extension_supported.return_value = is_desc_support self.mock_security_group_rule_create.return_value = rule self.mock_security_group_list.return_value = sec_group_list return sec_group, rule def _check_add_security_group_rule(self, **kwargs): sec_group = self.security_groups.first() rule = self.security_group_rules.first() extra_params = {} if 'description' in kwargs: if kwargs['description'] is not None: extra_params['description'] = kwargs['description'] else: extra_params['description'] = rule.description self.mock_security_group_rule_create.assert_called_once_with( test.IsHttpRequest(), kwargs.get('sec_group', sec_group.id), kwargs.get('ingress', 'ingress'), kwargs.get('ethertype', 'IPv4'), kwargs.get('ip_protocol', rule.ip_protocol), kwargs.get('from_port', int(rule.from_port)), kwargs.get('to_port', int(rule.to_port)), kwargs.get('cidr', rule.ip_range['cidr']), kwargs.get('security_group', '%s' % sec_group.id), **extra_params) self.mock_security_group_list.assert_called_once_with( test.IsHttpRequest()) self.mock_is_extension_supported.assert_called_once_with( test.IsHttpRequest(), 'standard-attr-description') @test.create_mocks({api.neutron: ('security_group_get',)}) def test_update_security_groups_get(self): sec_group = self.security_groups.first() self.mock_security_group_get.return_value = sec_group res = self.client.get(self.update_url) self.assertTemplateUsed(res, SG_UPDATE_TEMPLATE) self.assertEqual(res.context['security_group'].name, sec_group.name) self.mock_security_group_get.assert_called_once_with( test.IsHttpRequest(), sec_group.id) @test.create_mocks({api.neutron: ('security_group_update', 'security_group_get')}) def test_update_security_groups_post(self): """Ensure that we can change a group name. The name must not be restricted to alphanumeric characters. bug #1233501 Security group names cannot contain at characters bug #1224576 Security group names cannot contain spaces """ sec_group = self.security_groups.first() sec_group.name = "@new name" self.mock_security_group_update.return_value = sec_group self.mock_security_group_get.return_value = sec_group form_data = {'method': 'UpdateGroup', 'id': sec_group.id, 'name': sec_group.name, 'description': sec_group.description} res = self.client.post(self.update_url, form_data) self.assertRedirectsNoFollow(res, INDEX_URL) self.mock_security_group_update.assert_called_once_with( test.IsHttpRequest(), str(sec_group.id), sec_group.name, sec_group.description) self.mock_security_group_get.assert_called_once_with( test.IsHttpRequest(), sec_group.id) def test_create_security_groups_get(self): res = self.client.get(SG_CREATE_URL) self.assertTemplateUsed(res, SG_CREATE_TEMPLATE) def test_create_security_groups_post(self): self._create_security_group() def test_create_security_groups_special_chars(self): """Ensure non-alphanumeric characters can be used as a group name. bug #1233501 Security group names cannot contain at characters bug #1224576 Security group names cannot contain spaces """ sg_name = b'@group name-\xe3\x82\xb3'.decode('utf8') self._create_security_group(sg_name=sg_name) @test.create_mocks({api.neutron: ('security_group_create',)}) def _create_security_group(self, sg_name=None): sec_group = self.security_groups.first() if sg_name is not None: sec_group.name = sg_name self.mock_security_group_create.return_value = sec_group form_data = {'method': 'CreateGroup', 'name': sec_group.name, 'description': sec_group.description} res = self.client.post(SG_CREATE_URL, form_data) self.assertRedirectsNoFollow(res, self.detail_url) self.mock_security_group_create.assert_called_once_with( test.IsHttpRequest(), sec_group.name, sec_group.description) @test.create_mocks({api.neutron: ('security_group_create',)}) def test_create_security_groups_post_exception(self): sec_group = self.security_groups.first() self.mock_security_group_create.side_effect = self.exceptions.nova formData = {'method': 'CreateGroup', 'name': sec_group.name, 'description': sec_group.description} res = self.client.post(SG_CREATE_URL, formData) self.assertMessageCount(error=1) self.assertRedirectsNoFollow(res, INDEX_URL) self.mock_security_group_create.assert_called_once_with( test.IsHttpRequest(), sec_group.name, sec_group.description) @test.create_mocks({api.neutron: ('security_group_get', 'is_extension_supported'), quotas: ('tenant_quota_usages',)}) def test_detail_get(self): sec_group = self.security_groups.first() quota_data = self.neutron_quota_usages.first() self.mock_tenant_quota_usages.return_value = quota_data self.mock_security_group_get.return_value = sec_group self.mock_is_extension_supported.return_value = True res = self.client.get(self.detail_url) self.assertTemplateUsed(res, SG_DETAIL_TEMPLATE) self.mock_security_group_get.assert_called_once_with( test.IsHttpRequest(), sec_group.id) self.assert_mock_multiple_calls_with_same_arguments( self.mock_tenant_quota_usages, 2, mock.call(test.IsHttpRequest(), targets=('security_group_rule', ))) self.mock_is_extension_supported.assert_called_once_with( test.IsHttpRequest(), 'standard-attr-description') @test.create_mocks({api.neutron: ('security_group_get', 'is_extension_supported')}) def test_detail_get_exception(self): sec_group = self.security_groups.first() self.mock_security_group_get.side_effect = self.exceptions.nova self.mock_is_extension_supported.return_value = True res = self.client.get(self.detail_url) self.assertRedirectsNoFollow(res, INDEX_URL) self.mock_security_group_get.assert_called_once_with( test.IsHttpRequest(), sec_group.id) self.mock_is_extension_supported.assert_called_once_with( test.IsHttpRequest(), 'standard-attr-description') @test.create_mocks({api.neutron: ('security_group_rule_create', 'is_extension_supported', 'security_group_list')}) def test_detail_add_rule_cidr(self): sec_group, rule = self._add_security_group_rule_fixture( security_group=None) formData = {'method': 'AddRule', 'id': sec_group.id, 'description': rule.description, 'port_or_range': 'port', 'port': rule.from_port, 'rule_menu': rule.ip_protocol, 'cidr': rule.ip_range['cidr'], 'remote': 'cidr'} res = self.client.post(self.edit_url, formData) self.assertRedirectsNoFollow(res, self.detail_url) self._check_add_security_group_rule( security_group=None) @test.create_mocks({api.neutron: ('security_group_rule_create', 'is_extension_supported', 'security_group_list')}) def test_detail_add_rule_cidr_with_invalid_unused_fields(self): sec_group, rule = self._add_security_group_rule_fixture( security_group=None) formData = {'method': 'AddRule', 'id': sec_group.id, 'description': rule.description, 'port_or_range': 'port', 'port': rule.from_port, 'to_port': 'INVALID', 'from_port': 'INVALID', 'icmp_code': 'INVALID', 'icmp_type': 'INVALID', 'security_group': 'INVALID', 'ip_protocol': 'INVALID', 'rule_menu': rule.ip_protocol, 'cidr': rule.ip_range['cidr'], 'remote': 'cidr'} res = self.client.post(self.edit_url, formData) self.assertNoFormErrors(res) self.assertRedirectsNoFollow(res, self.detail_url) self._check_add_security_group_rule( security_group=None) @test.create_mocks({api.neutron: ('security_group_rule_create', 'is_extension_supported', 'security_group_list')}) def test_detail_add_rule_securitygroup_with_invalid_unused_fields(self): sec_group, rule = self._add_security_group_rule_fixture( cidr=None, ethertype='') formData = {'method': 'AddRule', 'id': sec_group.id, 'description': rule.description, 'port_or_range': 'port', 'port': rule.from_port, 'to_port': 'INVALID', 'from_port': 'INVALID', 'icmp_code': 'INVALID', 'icmp_type': 'INVALID', 'security_group': sec_group.id, 'ip_protocol': 'INVALID', 'rule_menu': rule.ip_protocol, 'cidr': 'INVALID', 'remote': 'sg'} res = self.client.post(self.edit_url, formData) self.assertNoFormErrors(res) self.assertRedirectsNoFollow(res, self.detail_url) self._check_add_security_group_rule( cidr=None, ethertype='') @test.create_mocks({api.neutron: ('security_group_rule_create', 'is_extension_supported', 'security_group_list')}) def test_detail_add_rule_icmp_with_invalid_unused_fields(self): sec_group, rule = self._add_security_group_rule_fixture( ip_protocol='icmp', security_group=None) formData = {'method': 'AddRule', 'id': sec_group.id, 'description': rule.description, 'port_or_range': 'port', 'port': 'INVALID', 'to_port': 'INVALID', 'from_port': 'INVALID', 'icmp_code': rule.to_port, 'icmp_type': rule.from_port, 'security_group': sec_group.id, 'ip_protocol': 'INVALID', 'rule_menu': 'icmp', 'cidr': rule.ip_range['cidr'], 'remote': 'cidr'} res = self.client.post(self.edit_url, formData) self.assertNoFormErrors(res) self.assertRedirectsNoFollow(res, self.detail_url) self._check_add_security_group_rule( ip_protocol='icmp', security_group=None) @test.create_mocks({api.neutron: ('security_group_rule_create', 'is_extension_supported', 'security_group_list')}) def test_detail_add_rule_without_description_support(self): sec_group, rule = self._add_security_group_rule_fixture( security_group=None, is_desc_support=False) formData = {'method': 'AddRule', 'id': sec_group.id, 'port_or_range': 'port', 'port': rule.from_port, 'rule_menu': rule.ip_protocol, 'cidr': rule.ip_range['cidr'], 'remote': 'cidr'} res = self.client.post(self.edit_url, formData) self.assertRedirectsNoFollow(res, self.detail_url) self._check_add_security_group_rule( security_group=None, description=None) @test.create_mocks({api.neutron: ('security_group_rule_create', 'is_extension_supported', 'security_group_list')}) def test_detail_add_rule_cidr_with_template(self): sec_group = self.security_groups.first() sec_group_list = self.security_groups.list() rule = self.security_group_rules.first() self.mock_is_extension_supported.return_value = True self.mock_security_group_rule_create.return_value = rule self.mock_security_group_list.return_value = sec_group_list formData = {'method': 'AddRule', 'id': sec_group.id, 'rule_menu': 'http', 'port_or_range': 'port', 'cidr': rule.ip_range['cidr'], 'remote': 'cidr'} res = self.client.post(self.edit_url, formData) self.assertRedirectsNoFollow(res, self.detail_url) self.mock_security_group_rule_create.assert_called_once_with( test.IsHttpRequest(), sec_group.id, 'ingress', 'IPv4', rule.ip_protocol, int(rule.from_port), int(rule.to_port), rule.ip_range['cidr'], None, description='') self.mock_security_group_list.assert_called_once_with( test.IsHttpRequest()) self.mock_is_extension_supported.assert_called_once_with( test.IsHttpRequest(), 'standard-attr-description') def _get_source_group_rule(self): for rule in self.security_group_rules.list(): if rule.group: return rule raise Exception("No matches found.") @test.create_mocks({api.neutron: ('security_group_rule_create', 'is_extension_supported', 'security_group_list')}) def test_detail_add_rule_self_as_source_group(self): sec_group = self.security_groups.first() sec_group_list = self.security_groups.list() rule = self._get_source_group_rule() self.mock_is_extension_supported.return_value = True self.mock_security_group_rule_create.return_value = rule self.mock_security_group_list.return_value = sec_group_list formData = {'method': 'AddRule', 'id': sec_group.id, 'port_or_range': 'port', 'port': rule.from_port, 'rule_menu': rule.ip_protocol, 'cidr': '0.0.0.0/0', 'security_group': sec_group.id, 'remote': 'sg'} res = self.client.post(self.edit_url, formData) self.assertRedirectsNoFollow(res, self.detail_url) self.mock_security_group_rule_create.assert_called_once_with( test.IsHttpRequest(), sec_group.id, 'ingress', # ethertype is empty for source_group of Nova Security Group '', rule.ip_protocol, int(rule.from_port), int(rule.to_port), None, '%s' % sec_group.id, description='') self.mock_security_group_list.assert_called_once_with( test.IsHttpRequest()) self.mock_is_extension_supported.assert_called_once_with( test.IsHttpRequest(), 'standard-attr-description') @test.create_mocks({api.neutron: ('security_group_rule_create', 'is_extension_supported', 'security_group_list')}) def test_detail_add_rule_self_as_source_group_with_template(self): sec_group = self.security_groups.first() sec_group_list = self.security_groups.list() rule = self._get_source_group_rule() self.mock_is_extension_supported.return_value = True self.mock_security_group_rule_create.return_value = rule self.mock_security_group_list.return_value = sec_group_list formData = {'method': 'AddRule', 'id': sec_group.id, 'rule_menu': 'http', 'port_or_range': 'port', 'cidr': '0.0.0.0/0', 'security_group': sec_group.id, 'remote': 'sg'} res = self.client.post(self.edit_url, formData) self.assertRedirectsNoFollow(res, self.detail_url) self.mock_security_group_rule_create.assert_called_once_with( test.IsHttpRequest(), sec_group.id, 'ingress', # ethertype is empty for source_group of Nova Security Group '', rule.ip_protocol, int(rule.from_port), int(rule.to_port), None, '%s' % sec_group.id, description='') self.mock_security_group_list.assert_called_once_with( test.IsHttpRequest()) self.mock_is_extension_supported.assert_called_once_with( test.IsHttpRequest(), 'standard-attr-description') @test.create_mocks({api.neutron: ('security_group_list', 'is_extension_supported')}) def test_detail_invalid_port(self): sec_group = self.security_groups.first() sec_group_list = self.security_groups.list() rule = self.security_group_rules.first() self.mock_security_group_list.return_value = sec_group_list self.mock_is_extension_supported.return_value = True # note that 'port' is not passed formData = {'method': 'AddRule', 'id': sec_group.id, 'port_or_range': 'port', 'rule_menu': rule.ip_protocol, 'cidr': rule.ip_range['cidr'], 'remote': 'cidr'} res = self.client.post(self.edit_url, formData) self.assertNoMessages() self.assertContains(res, "The specified port is invalid") self.assert_mock_multiple_calls_with_same_arguments( self.mock_security_group_list, 2, mock.call(test.IsHttpRequest())) self.assert_mock_multiple_calls_with_same_arguments( self.mock_is_extension_supported, 2, mock.call(test.IsHttpRequest(), 'standard-attr-description')) @test.create_mocks({api.neutron: ('security_group_list', 'is_extension_supported')}) def test_detail_invalid_port_range(self): sec_group = self.security_groups.first() sec_group_list = self.security_groups.list() rule = self.security_group_rules.first() self.mock_is_extension_supported.return_value = True self.mock_security_group_list.return_value = sec_group_list formData = {'method': 'AddRule', 'id': sec_group.id, 'port_or_range': 'range', 'from_port': rule.from_port, 'to_port': int(rule.from_port) - 1, 'rule_menu': rule.ip_protocol, 'cidr': rule.ip_range['cidr'], 'remote': 'cidr'} res = self.client.post(self.edit_url, formData) self.assertNoMessages() self.assertContains(res, "greater than or equal to") # note that 'from_port' is not passed formData = {'method': 'AddRule', 'id': sec_group.id, 'port_or_range': 'range', 'to_port': rule.to_port, 'rule_menu': rule.ip_protocol, 'cidr': rule.ip_range['cidr'], 'remote': 'cidr'} res = self.client.post(self.edit_url, formData) self.assertNoMessages() self.assertContains(res, html.escape('"from" port number is invalid')) # note that 'to_port' is not passed formData = {'method': 'AddRule', 'id': sec_group.id, 'port_or_range': 'range', 'from_port': rule.from_port, 'rule_menu': rule.ip_protocol, 'cidr': rule.ip_range['cidr'], 'remote': 'cidr'} res = self.client.post(self.edit_url, formData) self.assertNoMessages() self.assertContains(res, html.escape('"to" port number is invalid')) self.assert_mock_multiple_calls_with_same_arguments( self.mock_security_group_list, 6, mock.call(test.IsHttpRequest())) self.assert_mock_multiple_calls_with_same_arguments( self.mock_is_extension_supported, 6, mock.call(test.IsHttpRequest(), 'standard-attr-description')) @test.create_mocks({api.neutron: ('security_group_list', 'is_extension_supported')}) def test_detail_invalid_icmp_rule(self): sec_group = self.security_groups.first() sec_group_list = self.security_groups.list() icmp_rule = self.security_group_rules.list()[1] self.mock_security_group_list.return_value = sec_group_list self.mock_is_extension_supported.return_value = True formData = {'method': 'AddRule', 'id': sec_group.id, 'port_or_range': 'port', 'icmp_type': 256, 'icmp_code': icmp_rule.to_port, 'rule_menu': icmp_rule.ip_protocol, 'cidr': icmp_rule.ip_range['cidr'], 'remote': 'cidr'} res = self.client.post(self.edit_url, formData) self.assertNoMessages() self.assertContains(res, "The ICMP type not in range (-1, 255)") formData = {'method': 'AddRule', 'id': sec_group.id, 'port_or_range': 'port', 'icmp_type': icmp_rule.from_port, 'icmp_code': 256, 'rule_menu': icmp_rule.ip_protocol, 'cidr': icmp_rule.ip_range['cidr'], 'remote': 'cidr'} res = self.client.post(self.edit_url, formData) self.assertNoMessages() self.assertContains(res, "The ICMP code not in range (-1, 255)") formData = {'method': 'AddRule', 'id': sec_group.id, 'port_or_range': 'port', 'icmp_type': -1, 'icmp_code': icmp_rule.to_port, 'rule_menu': icmp_rule.ip_protocol, 'cidr': icmp_rule.ip_range['cidr'], 'remote': 'cidr'} res = self.client.post(self.edit_url, formData) self.assertNoMessages() self.assertContains( res, "ICMP code is provided but ICMP type is missing.") self.assert_mock_multiple_calls_with_same_arguments( self.mock_security_group_list, 6, mock.call(test.IsHttpRequest())) self.assert_mock_multiple_calls_with_same_arguments( self.mock_is_extension_supported, 6, mock.call(test.IsHttpRequest(), 'standard-attr-description')) @test.create_mocks({api.neutron: ('security_group_rule_create', 'is_extension_supported', 'security_group_list')}) def test_detail_add_rule_exception(self): sec_group = self.security_groups.first() sec_group_list = self.security_groups.list() rule = self.security_group_rules.first() self.mock_is_extension_supported.return_value = True self.mock_security_group_rule_create.side_effect = self.exceptions.nova self.mock_security_group_list.return_value = sec_group_list formData = {'method': 'AddRule', 'id': sec_group.id, 'port_or_range': 'port', 'port': rule.from_port, 'rule_menu': rule.ip_protocol, 'cidr': rule.ip_range['cidr'], 'remote': 'cidr'} res = self.client.post(self.edit_url, formData) self.assertRedirectsNoFollow(res, self.detail_url) self.mock_security_group_rule_create.assert_called_once_with( test.IsHttpRequest(), sec_group.id, 'ingress', 'IPv4', rule.ip_protocol, int(rule.from_port), int(rule.to_port), rule.ip_range['cidr'], None, description='') self.mock_security_group_list.assert_called_once_with( test.IsHttpRequest()) self.mock_is_extension_supported.assert_called_once_with( test.IsHttpRequest(), 'standard-attr-description') @test.create_mocks({api.neutron: ('security_group_rule_create', 'is_extension_supported', 'security_group_list')}) def test_detail_add_rule_duplicated(self): sec_group = self.security_groups.first() sec_group_list = self.security_groups.list() rule = self.security_group_rules.first() self.mock_is_extension_supported.return_value = True self.mock_security_group_rule_create.side_effect = exceptions.Conflict self.mock_security_group_list.return_value = sec_group_list formData = {'method': 'AddRule', 'id': sec_group.id, 'port_or_range': 'port', 'port': rule.from_port, 'rule_menu': rule.ip_protocol, 'cidr': rule.ip_range['cidr'], 'remote': 'cidr'} res = self.client.post(self.edit_url, formData) self.assertNoFormErrors(res) self.assertRedirectsNoFollow(res, self.detail_url) self.mock_security_group_rule_create.assert_called_once_with( test.IsHttpRequest(), sec_group.id, 'ingress', 'IPv4', rule.ip_protocol, int(rule.from_port), int(rule.to_port), rule.ip_range['cidr'], None, description='') self.mock_security_group_list.assert_called_once_with( test.IsHttpRequest()) self.mock_is_extension_supported.assert_called_once_with( test.IsHttpRequest(), 'standard-attr-description') @test.create_mocks({api.neutron: ('security_group_rule_delete', 'is_extension_supported')}) def test_detail_delete_rule(self): sec_group = self.security_groups.first() rule = self.security_group_rules.first() self.mock_security_group_rule_delete.return_value = None self.mock_is_extension_supported.return_value = True form_data = {"action": "rules__delete__%s" % rule.id} req = self.factory.post(self.edit_url, form_data) kwargs = {'security_group_id': sec_group.id} table = tables.RulesTable(req, sec_group.rules, **kwargs) handled = table.maybe_handle() self.assertEqual(strip_absolute_base(handled['location']), self.detail_url) self.mock_security_group_rule_delete.assert_called_once_with( test.IsHttpRequest(), rule.id) self.mock_is_extension_supported.assert_called_once_with( test.IsHttpRequest(), 'standard-attr-description') @test.create_mocks({api.neutron: ('security_group_rule_delete', 'is_extension_supported')}) def test_detail_delete_rule_exception(self): sec_group = self.security_groups.first() rule = self.security_group_rules.first() self.mock_security_group_rule_delete.side_effect = self.exceptions.nova self.mock_is_extension_supported.return_value = True form_data = {"action": "rules__delete__%s" % rule.id} req = self.factory.post(self.edit_url, form_data) kwargs = {'security_group_id': sec_group.id} table = tables.RulesTable( req, self.security_group_rules.list(), **kwargs) handled = table.maybe_handle() self.assertEqual(strip_absolute_base(handled['location']), self.detail_url) self.mock_security_group_rule_delete.assert_called_once_with( test.IsHttpRequest(), rule.id) self.mock_is_extension_supported.assert_called_once_with( test.IsHttpRequest(), 'standard-attr-description') @test.create_mocks({api.neutron: ('security_group_delete', 'is_extension_supported')}) def test_delete_group(self): sec_group = self.security_groups.get(name="other_group") self.mock_security_group_delete.return_value = None self.mock_is_extension_supported.return_value = True form_data = {"action": "security_groups__delete__%s" % sec_group.id} req = self.factory.post(INDEX_URL, form_data) table = tables.SecurityGroupsTable(req, self.security_groups.list()) handled = table.maybe_handle() self.assertEqual(strip_absolute_base(handled['location']), INDEX_URL) self.mock_security_group_delete.assert_called_once_with( test.IsHttpRequest(), sec_group.id) @test.create_mocks({api.neutron: ('security_group_delete', 'is_extension_supported')}) def test_delete_group_exception(self): sec_group = self.security_groups.get(name="other_group") self.mock_security_group_delete.side_effect = self.exceptions.nova self.mock_is_extension_supported.return_value = True form_data = {"action": "security_groups__delete__%s" % sec_group.id} req = self.factory.post(INDEX_URL, form_data) table = tables.SecurityGroupsTable(req, self.security_groups.list()) handled = table.maybe_handle() self.assertEqual(strip_absolute_base(handled['location']), INDEX_URL) self.mock_security_group_delete.assert_called_once_with( test.IsHttpRequest(), sec_group.id) @test.create_mocks({api.neutron: ('security_group_rule_create', 'is_extension_supported', 'security_group_list')}) def test_detail_add_rule_custom_protocol(self): sec_group = self.security_groups.first() sec_group_list = self.security_groups.list() rule = self.security_group_rules.first() self.mock_is_extension_supported.return_value = True self.mock_security_group_rule_create.return_value = rule self.mock_security_group_list.return_value = sec_group_list formData = {'method': 'AddRule', 'id': sec_group.id, 'rule_menu': 'custom', 'direction': 'ingress', 'port_or_range': 'port', 'ip_protocol': 37, 'cidr': 'fe80::/48', 'remote': 'cidr'} res = self.client.post(self.edit_url, formData) self.assertRedirectsNoFollow(res, self.detail_url) self.mock_security_group_rule_create.assert_called_once_with( test.IsHttpRequest(), sec_group.id, 'ingress', 'IPv6', 37, None, None, 'fe80::/48', None, description='') self.mock_security_group_list.assert_called_once_with( test.IsHttpRequest()) self.mock_is_extension_supported.assert_called_once_with( test.IsHttpRequest(), 'standard-attr-description') @test.create_mocks({api.neutron: ('security_group_rule_create', 'is_extension_supported', 'security_group_list')}) def test_detail_add_rule_egress(self): sec_group = self.security_groups.first() sec_group_list = self.security_groups.list() rule = self.security_group_rules.first() self.mock_is_extension_supported.return_value = True self.mock_security_group_rule_create.return_value = rule self.mock_security_group_list.return_value = sec_group_list formData = {'method': 'AddRule', 'id': sec_group.id, 'direction': 'egress', 'rule_menu': 'udp', 'port_or_range': 'port', 'port': 80, 'cidr': '10.1.1.0/24', 'remote': 'cidr'} res = self.client.post(self.edit_url, formData) self.assertRedirectsNoFollow(res, self.detail_url) self.mock_security_group_rule_create.assert_called_once_with( test.IsHttpRequest(), sec_group.id, 'egress', 'IPv4', 'udp', 80, 80, '10.1.1.0/24', None, description='') self.mock_security_group_list.assert_called_once_with( test.IsHttpRequest()) self.mock_is_extension_supported.assert_called_once_with( test.IsHttpRequest(), 'standard-attr-description') @test.create_mocks({api.neutron: ('security_group_rule_create', 'is_extension_supported', 'security_group_list')}) def test_detail_add_rule_egress_with_all_tcp(self): sec_group = self.security_groups.first() sec_group_list = self.security_groups.list() rule = self.security_group_rules.list()[3] self.mock_is_extension_supported.return_value = True self.mock_security_group_rule_create.return_value = rule self.mock_security_group_list.return_value = sec_group_list formData = {'method': 'AddRule', 'id': sec_group.id, 'direction': 'egress', 'port_or_range': 'range', 'rule_menu': 'all_tcp', 'cidr': rule.ip_range['cidr'], 'remote': 'cidr'} res = self.client.post(self.edit_url, formData) self.assertRedirectsNoFollow(res, self.detail_url) self.mock_security_group_rule_create.assert_called_once_with( test.IsHttpRequest(), sec_group.id, 'egress', 'IPv4', rule.ip_protocol, int(rule.from_port), int(rule.to_port), rule.ip_range['cidr'], None, description='') self.mock_security_group_list.assert_called_once_with( test.IsHttpRequest()) self.mock_is_extension_supported.assert_called_once_with( test.IsHttpRequest(), 'standard-attr-description') @test.create_mocks({api.neutron: ('security_group_rule_create', 'is_extension_supported', 'security_group_list')}) def test_detail_add_rule_source_group_with_direction_ethertype(self): sec_group = self.security_groups.first() sec_group_list = self.security_groups.list() rule = self._get_source_group_rule() self.mock_is_extension_supported.return_value = True self.mock_security_group_rule_create.return_value = rule self.mock_security_group_list.return_value = sec_group_list formData = {'method': 'AddRule', 'id': sec_group.id, 'direction': 'egress', 'port_or_range': 'port', 'port': rule.from_port, 'rule_menu': rule.ip_protocol, 'cidr': '0.0.0.0/0', 'security_group': sec_group.id, 'remote': 'sg', 'ethertype': 'IPv6'} res = self.client.post(self.edit_url, formData) self.assertRedirectsNoFollow(res, self.detail_url) self.mock_security_group_rule_create.assert_called_once_with( test.IsHttpRequest(), sec_group.id, 'egress', # ethertype is empty for source_group of Nova Security Group 'IPv6', rule.ip_protocol, int(rule.from_port), int(rule.to_port), None, '%s' % sec_group.id, description='') self.mock_security_group_list.assert_called_once_with( test.IsHttpRequest()) self.mock_is_extension_supported.assert_called_once_with( test.IsHttpRequest(), 'standard-attr-description') @test.update_settings( OPENSTACK_NEUTRON_NETWORK={'enable_ipv6': False}) @test.create_mocks({api.neutron: ('security_group_list', 'is_extension_supported')}) def test_add_rule_ethertype_with_ipv6_disabled(self): self.mock_security_group_list.return_value = \ self.security_groups.list() self.mock_is_extension_supported.return_value = True res = self.client.get(self.edit_url) self.assertIsInstance( res.context['form']['ethertype'].field.widget, forms.TextInput ) self.assertIn( 'readonly', res.context['form']['ethertype'].field.widget.attrs ) self.assertEqual( res.context['form']['ethertype'].field.initial, 'IPv4' ) self.mock_security_group_list.assert_called_once_with( test.IsHttpRequest()) self.mock_is_extension_supported.assert_called_once_with( test.IsHttpRequest(), 'standard-attr-description') @test.update_settings( OPENSTACK_NEUTRON_NETWORK={'enable_ipv6': False}) @test.create_mocks({api.neutron: ('security_group_list', 'is_extension_supported')}) def test_add_rule_cidr_with_ipv6_disabled(self): sec_group = self.security_groups.first() self.mock_security_group_list.return_value = \ self.security_groups.list() self.mock_is_extension_supported.return_value = True formData = {'method': 'AddRule', 'id': sec_group.id, 'rule_menu': 'custom', 'direction': 'ingress', 'port_or_range': 'port', 'ip_protocol': 37, 'cidr': 'fe80::/48', 'etherype': 'IPv4', 'remote': 'cidr'} res = self.client.post(self.edit_url, formData) self.assertFormError(res, 'form', 'cidr', 'Invalid version for IP address') self.assert_mock_multiple_calls_with_same_arguments( self.mock_security_group_list, 2, mock.call(test.IsHttpRequest())) self.assert_mock_multiple_calls_with_same_arguments( self.mock_is_extension_supported, 2, mock.call(test.IsHttpRequest(), 'standard-attr-description')) @test.create_mocks({api.neutron: ('security_group_list', 'is_extension_supported')}) def test_detail_add_rule_invalid_port(self): sec_group = self.security_groups.first() sec_group_list = self.security_groups.list() rule = self.security_group_rules.first() self.mock_is_extension_supported.return_value = True self.mock_security_group_list.return_value = sec_group_list formData = {'method': 'AddRule', 'id': sec_group.id, 'port_or_range': 'port', 'port': -1, 'rule_menu': rule.ip_protocol, 'cidr': rule.ip_range['cidr'], 'remote': 'cidr'} res = self.client.post(self.edit_url, formData) self.assertNoMessages() self.assertContains(res, "Not a valid port number") self.assert_mock_multiple_calls_with_same_arguments( self.mock_security_group_list, 2, mock.call(test.IsHttpRequest())) self.assert_mock_multiple_calls_with_same_arguments( self.mock_is_extension_supported, 2, mock.call(test.IsHttpRequest(), 'standard-attr-description')) @test.create_mocks({api.neutron: ('security_group_rule_create', 'is_extension_supported', 'security_group_list')}) def test_detail_add_rule_ingress_tcp_without_port(self): sec_group = self.security_groups.first() sec_group_list = self.security_groups.list() rule = self.security_group_rules.list()[3] self.mock_is_extension_supported.return_value = True self.mock_security_group_rule_create.return_value = rule self.mock_security_group_list.return_value = sec_group_list formData = {'id': sec_group.id, 'direction': 'ingress', 'port_or_range': 'all', 'rule_menu': 'tcp', 'cidr': rule.ip_range['cidr'], 'remote': 'cidr'} res = self.client.post(self.edit_url, formData) self.assertRedirectsNoFollow(res, self.detail_url) self.mock_security_group_rule_create.assert_called_once_with( test.IsHttpRequest(), sec_group.id, 'ingress', 'IPv4', 'tcp', None, None, rule.ip_range['cidr'], None, description='') self.mock_security_group_list.assert_called_once_with( test.IsHttpRequest()) self.mock_is_extension_supported.assert_called_once_with( test.IsHttpRequest(), 'standard-attr-description') @test.create_mocks({api.neutron: ('security_group_rule_create', 'is_extension_supported', 'security_group_list')}) def test_detail_add_rule_custom_without_protocol(self): sec_group = self.security_groups.first() sec_group_list = self.security_groups.list() rule = self.security_group_rules.list()[3] self.mock_is_extension_supported.return_value = True self.mock_security_group_rule_create.return_value = rule self.mock_security_group_list.return_value = sec_group_list formData = {'id': sec_group.id, 'direction': 'ingress', 'port_or_range': 'port', 'rule_menu': 'custom', 'ip_protocol': -1, 'cidr': rule.ip_range['cidr'], 'remote': 'cidr'} res = self.client.post(self.edit_url, formData) self.assertRedirectsNoFollow(res, self.detail_url) self.mock_security_group_rule_create.assert_called_once_with( test.IsHttpRequest(), sec_group.id, 'ingress', 'IPv4', -1, None, None, rule.ip_range['cidr'], None, description='') self.mock_security_group_list.assert_called_once_with( test.IsHttpRequest()) self.mock_is_extension_supported.assert_called_once_with( test.IsHttpRequest(), 'standard-attr-description')