Support RBAC security groups in dashboard

Get the RBAC shared security groups in the dashboard by making
an additional Neutron API call to filter by the shared field. Currently,
the dashboard only shows SGs owned by the tenant.

Depends-On: https://review.opendev.org/c/openstack/neutron/+/811242
Closes-Bug: #1907843
Change-Id: Ifa1acb3f0f6a33d0b4dc3761674e561a8d24c5c2
This commit is contained in:
Hang Yang 2021-09-28 20:23:38 -05:00
parent 66091d7b2f
commit c7ea66bc3e
6 changed files with 109 additions and 15 deletions

View File

@ -20,6 +20,7 @@
import collections
from collections.abc import Sequence
import copy
import itertools
import logging
import netaddr
@ -329,6 +330,7 @@ class SecurityGroupManager(object):
* name
* description
* tenant_id
* shared: A boolean indicates whether this security group is shared
* rules: A list of SecurityGroupRule objects
SecurityGroupRule object should have the following attributes
@ -354,6 +356,23 @@ class SecurityGroupManager(object):
self.client = neutronclient(request)
def _list(self, **filters):
if (filters.get("tenant_id") and
is_extension_supported(
self.request, 'security-groups-shared-filtering')):
# NOTE(hangyang): First, we get the SGs owned by but not shared
# to the requester(tenant_id)
filters["shared"] = False
secgroups_owned = self.client.list_security_groups(**filters)
# NOTE(hangyang): Second, we get the SGs shared to the
# requester. For a requester with an admin role, this second
# API call also only returns SGs shared to the requester's tenant
# instead of all the SGs shared to any tenant.
filters.pop("tenant_id")
filters["shared"] = True
secgroups_rbac = self.client.list_security_groups(**filters)
return [SecurityGroup(sg) for sg in
itertools.chain(secgroups_owned.get('security_groups'),
secgroups_rbac.get('security_groups'))]
secgroups = self.client.list_security_groups(**filters)
return [SecurityGroup(sg) for sg in secgroups.get('security_groups')]

View File

@ -115,6 +115,20 @@ class SecurityGroupsTable(tables.DataTable):
security_group_id = tables.Column("id",
verbose_name=_("Security Group ID"))
description = tables.Column("description", verbose_name=_("Description"))
shared = tables.Column("shared", verbose_name=_("Shared"))
def __init__(self, request, *args, **kwargs):
super().__init__(request, *args, **kwargs)
try:
is_shared_supported = api.neutron.is_extension_supported(
self.request, 'security-groups-shared-filtering')
except Exception:
exceptions.handle(
self.request,
_('Failed to check if shared field is supported.'))
is_shared_supported = False
if not is_shared_supported:
del self.columns['shared']
def sanitize_id(self, obj_id):
return filters.get_int_or_uuid(obj_id)

View File

@ -60,7 +60,8 @@ class SecurityGroupsViewTests(test.TestCase):
self.edit_url = reverse(SG_ADD_RULE_VIEW, args=[sec_group.id])
self.update_url = reverse(SG_UPDATE_VIEW, args=[sec_group.id])
@test.create_mocks({api.neutron: ('security_group_list',),
@test.create_mocks({api.neutron: ('security_group_list',
'is_extension_supported'),
quotas: ('tenant_quota_usages',)})
def test_index(self):
sec_groups = self.security_groups.list()
@ -68,6 +69,7 @@ class SecurityGroupsViewTests(test.TestCase):
quota_data['security_group']['available'] = 10
self.mock_security_group_list.return_value = sec_groups
self.mock_is_extension_supported.return_value = True
self.mock_tenant_quota_usages.return_value = quota_data
res = self.client.get(INDEX_URL)
@ -93,7 +95,8 @@ class SecurityGroupsViewTests(test.TestCase):
self.mock_tenant_quota_usages, 2,
mock.call(test.IsHttpRequest(), targets=('security_group', )))
@test.create_mocks({api.neutron: ('security_group_list',),
@test.create_mocks({api.neutron: ('security_group_list',
'is_extension_supported'),
quotas: ('tenant_quota_usages',)})
def test_create_button_attributes(self):
sec_groups = self.security_groups.list()
@ -101,6 +104,7 @@ class SecurityGroupsViewTests(test.TestCase):
quota_data['security_group']['available'] = 10
self.mock_security_group_list.return_value = sec_groups
self.mock_is_extension_supported.return_value = True
self.mock_tenant_quota_usages.return_value = quota_data
res = self.client.get(INDEX_URL)
@ -153,10 +157,14 @@ class SecurityGroupsViewTests(test.TestCase):
self.mock_tenant_quota_usages, 3,
mock.call(test.IsHttpRequest(), targets=('security_group', )))
@test.create_mocks({api.neutron: ('is_extension_supported',)})
def test_create_button_disabled_when_quota_exceeded_neutron_disabled(self):
self.mock_is_extension_supported.return_value = True
self._test_create_button_disabled_when_quota_exceeded(False)
@test.create_mocks({api.neutron: ('is_extension_supported',)})
def test_create_button_disabled_when_quota_exceeded_neutron_enabled(self):
self.mock_is_extension_supported.return_value = True
self._test_create_button_disabled_when_quota_exceeded(True)
def _add_security_group_rule_fixture(self, is_desc_support=True, **kwargs):
@ -806,10 +814,12 @@ class SecurityGroupsViewTests(test.TestCase):
self.mock_is_extension_supported.assert_called_once_with(
test.IsHttpRequest(), 'standard-attr-description')
@test.create_mocks({api.neutron: ('security_group_delete',)})
@test.create_mocks({api.neutron: ('security_group_delete',
'is_extension_supported')})
def test_delete_group(self):
sec_group = self.security_groups.get(name="other_group")
self.mock_security_group_delete.return_value = None
self.mock_is_extension_supported.return_value = True
form_data = {"action": "security_groups__delete__%s" % sec_group.id}
req = self.factory.post(INDEX_URL, form_data)
@ -820,10 +830,12 @@ class SecurityGroupsViewTests(test.TestCase):
self.mock_security_group_delete.assert_called_once_with(
test.IsHttpRequest(), sec_group.id)
@test.create_mocks({api.neutron: ('security_group_delete',)})
@test.create_mocks({api.neutron: ('security_group_delete',
'is_extension_supported')})
def test_delete_group_exception(self):
sec_group = self.security_groups.get(name="other_group")
self.mock_security_group_delete.side_effect = self.exceptions.nova
self.mock_is_extension_supported.return_value = True
form_data = {"action": "security_groups__delete__%s" % sec_group.id}
req = self.factory.post(INDEX_URL, form_data)

View File

@ -487,21 +487,30 @@ def data(TEST):
# Security group.
sec_group_1 = {'tenant_id': '1',
'shared': False,
'description': 'default',
'id': 'faad7c80-3b62-4440-967c-13808c37131d',
'name': 'default'}
sec_group_2 = {'tenant_id': '1',
'shared': False,
'description': 'NotDefault',
'id': '27a5c9a1-bdbb-48ac-833a-2e4b5f54b31d',
'name': 'other_group'}
sec_group_3 = {'tenant_id': '1',
'shared': False,
'description': 'NotDefault',
'id': '443a4d7a-4bd2-4474-9a77-02b35c9f8c95',
'name': 'another_group'}
sec_group_empty = {'tenant_id': '1',
'shared': False,
'description': 'SG without rules',
'id': 'f205f3bc-d402-4e40-b004-c62401e19b4b',
'name': 'empty_group'}
sec_group_shared = {'tenant_id': '1',
'shared': True,
'description': 'SG without rules',
'id': 'cca53e02-114e-4da3-917b-f19efa7cbc47',
'name': 'shared_group'}
def add_rule_to_group(secgroup, default_only=True):
rule_egress_ipv4 = {
@ -585,10 +594,11 @@ def data(TEST):
add_rule_to_group(sec_group_1, default_only=False)
add_rule_to_group(sec_group_2)
add_rule_to_group(sec_group_3)
# NOTE: sec_group_empty is a SG without rules,
# NOTE: sec_group_empty and sec_group_shared are SGs without rules,
# so we don't call add_rule_to_group.
groups = [sec_group_1, sec_group_2, sec_group_3, sec_group_empty]
groups = [sec_group_1, sec_group_2, sec_group_3,
sec_group_empty, sec_group_shared]
sg_name_dict = dict([(sg['id'], sg['name']) for sg in groups])
for sg in groups:
# Neutron API.
@ -684,12 +694,17 @@ def data(TEST):
extension_6 = {"name": "Trunks",
"alias": "trunk",
"description": "Provides support for trunk ports."}
extension_7 = {"name": "Security group filtering on the shared field",
"alias": "security-groups-shared-filtering",
"description": "Support filtering security groups on "
"the shared field"}
TEST.api_extensions.add(extension_1)
TEST.api_extensions.add(extension_2)
TEST.api_extensions.add(extension_3)
TEST.api_extensions.add(extension_4)
TEST.api_extensions.add(extension_5)
TEST.api_extensions.add(extension_6)
TEST.api_extensions.add(extension_7)
# 1st agent.
agent_dict = {"binary": "neutron-openvswitch-agent",

View File

@ -1162,24 +1162,51 @@ class NeutronApiSecurityGroupTests(test.APIMockTestCase):
for (exprule, retrule) in zip(exp_rules, ret_sg.rules):
self._cmp_sg_rule(exprule, retrule)
def _test_security_group_list(self, **params):
@mock.patch.object(api.neutron, 'is_extension_supported')
def _test_security_group_list(self, mock_is_extension_supported,
is_ext_supported=True, **params):
sgs = self.api_security_groups.list()
mock_is_extension_supported.return_value = is_ext_supported
if is_ext_supported:
# First call to get the tenant owned SGs
q_params_1 = {'tenant_id': self.request.user.tenant_id,
'shared': False}
# if tenant_id is specified, the passed tenant_id should be sent.
q_params_1.update(params)
# Second call to get shared SGs
q_params_2 = q_params_1.copy()
q_params_2.pop('tenant_id')
q_params_2['shared'] = True
# use deepcopy to ensure self.api_security_groups is not modified.
self.qclient.list_security_groups.side_effect = [
{'security_groups': copy.deepcopy(sgs[:4])},
{'security_groups': copy.deepcopy(sgs[-1:])},
]
rets = api.neutron.security_group_list(self.request, **params)
self.qclient.list_security_groups.assert_has_calls(
[mock.call(**q_params_1), mock.call(**q_params_2)])
else:
q_params = {'tenant_id': self.request.user.tenant_id}
# if tenant_id is specified, the passed tenant_id should be sent.
q_params.update(params)
# use deepcopy to ensure self.api_security_groups is not modified.
self.qclient.list_security_groups.return_value = {'security_groups':
copy.deepcopy(sgs)}
self.qclient.list_security_groups.return_value = {
'security_groups': copy.deepcopy(sgs)}
rets = api.neutron.security_group_list(self.request, **params)
mock_is_extension_supported.assert_called_once_with(
self.request, 'security-groups-shared-filtering')
self.assertEqual(len(sgs), len(rets))
for (exp, ret) in zip(sgs, rets):
self._cmp_sg(exp, ret)
self.qclient.list_security_groups.assert_called_once_with(**q_params)
def test_security_group_list(self):
self._test_security_group_list()
def test_security_group_list_no_shared(self):
# without the api extension to filter by the shared field
self._test_security_group_list(is_ext_supported=False)
def test_security_group_list_with_params(self):
self._test_security_group_list(name='sg1')

View File

@ -0,0 +1,7 @@
---
features:
- |
[:bug:`1907843`] RBAC shared security groups can now be shown in the
Security Groups page. Previously only the security groups owned by the
login tenant can be displayed and used. Besides, a column for the shared
field is added to the Security Groups table.